目錄
- 1 、一般同步下載
- 2、 使用流式請(qǐng)求,requests.get方法的stream
- 3 、異步下載文件
- 4、 異步拆分下載文件
- 5、注意
1 、一般同步下載
示例代碼:
import requests
import os
def downlaod(url, file_path):
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; WOW64; rv:68.0) Gecko/20100101 Firefox/68.0"
}
r = requests.get(url=url, headers=headers)
with open(file_path, "wb") as f:
f.write(r.content)
f.flush()
2、 使用流式請(qǐng)求,requests.get方法的stream
默認(rèn)情況下是stream的值為false,它會(huì)立即開始下載文件并存放到內(nèi)存當(dāng)中,倘若文件過大就會(huì)導(dǎo)致內(nèi)存不足的情況,程序就會(huì)報(bào)錯(cuò)。
當(dāng)把get函數(shù)的stream參數(shù)設(shè)置成True時(shí),它不會(huì)立即開始下載,當(dāng)你使用iter_content或iter_lines遍歷內(nèi)容或訪問內(nèi)容屬性時(shí)才開始下載,需要注意一點(diǎn):文件沒有下載之前,它也需要保持連接。
iter_content:一塊一塊的遍歷要下載的內(nèi)容
iter_lines:一行一行的遍歷要下載的內(nèi)容
使用上面兩個(gè)函數(shù)下載大文件可以防止占用過多的內(nèi)存,因?yàn)槊看沃幌螺d小部分?jǐn)?shù)據(jù)。
示例代碼:
3 、異步下載文件
由于request的請(qǐng)求是阻塞式的,所以要用aiohttp模塊來發(fā)起請(qǐng)求。
示例代碼:
import aiohttp
import asyncio
import os
async def handler(url, file_path):
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; WOW64; rv:68.0) Gecko/20100101 Firefox/68.0"
}
async with aiohttp.ClientSession() as session:
r = await session.get(url=url, headers=headers)
with open(file_path, "wb") as f:
f.write(await r.read())
f.flush()
os.fsync(f.fileno())
loop = asyncio.get_event_loop()
loop.run_until_complete(handler(url, file_path))
4、 異步拆分下載文件
上面用的是一個(gè)協(xié)程下載一個(gè)文件,下面的方法是將文件分成幾部分,每個(gè)部分用一個(gè)協(xié)程下載,最后再寫入文件。
下面這個(gè)例子用的是流式寫入,即把內(nèi)容寫入到磁盤里面。
import aiohttp
import asyncio
import time
import os
async def consumer(queue):
option = await queue.get()
start = option["start"]
end = option["end"]
url = option["url"]
filename = option["filename"]
i = option["i"]
print(f"第{i}個(gè)任務(wù)開始運(yùn)行")
async with aiohttp.ClientSession() as session:
headers = {"Range": f"bytes={start}-{end}"}
r = await session.get(url=url, headers=headers)
with open(filename, "rb+") as f:
f.seek(start)
while True:
chunk = await r.content.read(end - start)
if not chunk:
break
f.write(chunk)
f.flush()
os.fsync(f.fileno())
print(f"第{i}個(gè)任務(wù)正在寫入中ing")
queue.task_done()
print(f"第{i}個(gè)任務(wù)寫入成功")
async def producer(url, headers, filename, queue, coro_num):
async with aiohttp.ClientSession() as session:
resp = await session.head(url=url, headers=headers)
file_size = int(resp.headers["content-length"])
# 創(chuàng)建一個(gè)文件
with open(filename, "wb") as f:
pass
part = file_size // coro_num
for i in range(coro_num):
start = part * i
if i == coro_num - 1:
end = file_size
else:
end = start + part
info = {
"start": start,
"end": end,
"url": url,
"filename": filename,
"i": i,
}
queue.put_nowait(info)
async def main():
# 需要填的有url,filename,coro_num
url = ""
filename = ""
coro_num = 0
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; WOW64; rv:68.0) Gecko/20100101 Firefox/68.0"
}
queue = asyncio.Queue(coro_num)
await producer(url, headers, filename, queue, coro_num)
task_list = []
for i in range(coro_num):
task = asyncio.create_task(consumer(queue))
task_list.append(task)
await queue.join()
for i in task_list:
i.cancel()
await asyncio.gather(*task_list)
startt = time.time()
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
end = time.time() - startt
print(f"用了{(lán)end}秒")
5、注意
以上的示例都是介紹思路,程序并不健壯,健壯的程序需要加入錯(cuò)誤捕獲和錯(cuò)誤處理。
以上就是python 下載文件的幾種方式分享的詳細(xì)內(nèi)容,更多關(guān)于python 下載文件的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!
您可能感興趣的文章:- 用Python自動(dòng)下載網(wǎng)站所有文件
- python基于paramiko庫遠(yuǎn)程執(zhí)行 SSH 命令,實(shí)現(xiàn) sftp 下載文件
- Python解析m3u8拼接下載mp4視頻文件的示例代碼
- python爬蟲智能翻頁批量下載文件的實(shí)例詳解
- python 下載文件的幾種方法匯總
- python 基于selectors庫實(shí)現(xiàn)文件上傳與下載
- Python實(shí)現(xiàn)FTP文件定時(shí)自動(dòng)下載的步驟
- Python 使用SFTP和FTP實(shí)現(xiàn)對(duì)服務(wù)器的文件下載功能
- python從ftp獲取文件并下載到本地
- Python根據(jù)URL地址下載文件并保存至對(duì)應(yīng)目錄的實(shí)現(xiàn)