1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
|
- # YOLOv5 🚀 by Ultralytics, GPL-3.0 license
- """
- Download utils
- """
- import os
- import platform
- import subprocess
- import time
- import urllib
- from pathlib import Path
- import requests
- import torch
- def gsutil_getsize(url=''):
- # gs://bucket/file size https://cloud.google.com/storage/docs/gsutil/commands/du
- s = subprocess.check_output(f'gsutil du {url}', shell=True).decode('utf-8')
- return eval(s.split(' ')[0]) if len(s) else 0 # bytes
- def safe_download(file, url, url2=None, min_bytes=1E0, error_msg=''):
- # Attempts to download file from url or url2, checks and removes incomplete downloads < min_bytes
- file = Path(file)
- assert_msg = f"Downloaded file '{file}' does not exist or size is < min_bytes={min_bytes}"
- try: # url1
- print(f'Downloading {url} to {file}...')
- torch.hub.download_url_to_file(url, str(file))
- assert file.exists() and file.stat().st_size > min_bytes, assert_msg # check
- except Exception as e: # url2
- file.unlink(missing_ok=True) # remove partial downloads
- print(f'ERROR: {e}\nRe-attempting {url2 or url} to {file}...')
- os.system(f"curl -L '{url2 or url}' -o '{file}' --retry 3 -C -") # curl download, retry and resume on fail
- finally:
- if not file.exists() or file.stat().st_size < min_bytes: # check
- file.unlink(missing_ok=True) # remove partial downloads
- print(f"ERROR: {assert_msg}\n{error_msg}")
- print('')
- def attempt_download(file, repo='ultralytics/yolov5'): # from utils.downloads import *; attempt_download()
- # Attempt file download if does not exist
- file = Path(str(file).strip().replace("'", ''))
- if not file.exists():
- # URL specified
- name = Path(urllib.parse.unquote(str(file))).name # decode '%2F' to '/' etc.
- if str(file).startswith(('http:/', 'https:/')): # download
- url = str(file).replace(':/', '://') # Pathlib turns :// -> :/
- name = name.split('?')[0] # parse authentication https://url.com/file.txt?auth...
- safe_download(file=name, url=url, min_bytes=1E5)
- return name
- # GitHub assets
- file.parent.mkdir(parents=True, exist_ok=True) # make parent dir (if required)
- try:
- response = requests.get(f'https://api.github.com/repos/{repo}/releases/latest').json() # github api
- assets = [x['name'] for x in response['assets']] # release assets, i.e. ['yolov5s.pt', 'yolov5m.pt', ...]
- tag = response['tag_name'] # i.e. 'v1.0'
- except: # fallback plan
- assets = ['yolov5s.pt', 'yolov5m.pt', 'yolov5l.pt', 'yolov5x.pt',
- 'yolov5s6.pt', 'yolov5m6.pt', 'yolov5l6.pt', 'yolov5x6.pt']
- try:
- tag = subprocess.check_output('git tag', shell=True, stderr=subprocess.STDOUT).decode().split()[-1]
- except:
- tag = 'v5.0' # current release
- if name in assets:
- safe_download(file,
- url=f'https://github.com/{repo}/releases/download/{tag}/{name}',
- # url2=f'https://storage.googleapis.com/{repo}/ckpt/{name}', # backup url (optional)
- min_bytes=1E5,
- error_msg=f'{file} missing, try downloading from https://github.com/{repo}/releases/')
- return str(file)
- def gdrive_download(id='16TiPfZj7htmTyhntwcZyEEAejOUxuT6m', file='tmp.zip'):
- # Downloads a file from Google Drive. from yolov5.utils.downloads import *; gdrive_download()
- t = time.time()
- file = Path(file)
- cookie = Path('cookie') # gdrive cookie
- print(f'Downloading https://drive.google.com/uc?export=download&id={id} as {file}... ', end='')
- file.unlink(missing_ok=True) # remove existing file
- cookie.unlink(missing_ok=True) # remove existing cookie
- # Attempt file download
- out = "NUL" if platform.system() == "Windows" else "/dev/null"
- os.system(f'curl -c ./cookie -s -L "drive.google.com/uc?export=download&id={id}" > {out}')
- if os.path.exists('cookie'): # large file
- s = f'curl -Lb ./cookie "drive.google.com/uc?export=download&confirm={get_token()}&id={id}" -o {file}'
- else: # small file
- s = f'curl -s -L -o {file} "drive.google.com/uc?export=download&id={id}"'
- r = os.system(s) # execute, capture return
- cookie.unlink(missing_ok=True) # remove existing cookie
- # Error check
- if r != 0:
- file.unlink(missing_ok=True) # remove partial
- print('Download error ') # raise Exception('Download error')
- return r
- # Unzip if archive
- if file.suffix == '.zip':
- print('unzipping... ', end='')
- os.system(f'unzip -q {file}') # unzip
- file.unlink() # remove zip to free space
- print(f'Done ({time.time() - t:.1f}s)')
- return r
- def get_token(cookie="./cookie"):
- with open(cookie) as f:
- for line in f:
- if "download" in line:
- return line.split()[-1]
- return ""
- # Google utils: https://cloud.google.com/storage/docs/reference/libraries ----------------------------------------------
- #
- #
- # def upload_blob(bucket_name, source_file_name, destination_blob_name):
- # # Uploads a file to a bucket
- # # https://cloud.google.com/storage/docs/uploading-objects#storage-upload-object-python
- #
- # storage_client = storage.Client()
- # bucket = storage_client.get_bucket(bucket_name)
- # blob = bucket.blob(destination_blob_name)
- #
- # blob.upload_from_filename(source_file_name)
- #
- # print('File {} uploaded to {}.'.format(
- # source_file_name,
- # destination_blob_name))
- #
- #
- # def download_blob(bucket_name, source_blob_name, destination_file_name):
- # # Uploads a blob from a bucket
- # storage_client = storage.Client()
- # bucket = storage_client.get_bucket(bucket_name)
- # blob = bucket.blob(source_blob_name)
- #
- # blob.download_to_filename(destination_file_name)
- #
- # print('Blob {} downloaded to {}.'.format(
- # source_blob_name,
- # destination_file_name))
|