1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131
| import requests
import json import os import time import re import pandas as pd import random
""" 1.根据用户页面分享的字符串提取短url 2.根据短url加上302获取location,提取sec_id 3.拼接视频列表请求url params = { 'sec_uid' : 'MS4wLjABAAAAbtSlJK_BfUcuqyy8ypNouqEH7outUXePTYEcAIpY9rk', 'count' : '200', 'min_cursor' : '1612108800000', 'max_cursor' : '1619251716404', 'aid' : '1128', '_signature' : 'PtCNCgAAXljWCq93QOKsFT7QjR' } """
def delete_boring_characters(sentence): return re.sub('[0-9’!"#$%&\'()*+,-./:;<=>?@,。?★、…【】《》?“”‘’![\\]^_`{|}~\s]+', "", sentence)
headers = { "user-agent": "Mozilla/5.0 (Linux; Android 6.0; Nexus 5 Build/MRA58N) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/89.0.4389.114 Mobile Safari/537.36" }
def getUrlInfo(startTime, endTime, shareUrl): shroturl = re.findall('[a-z]+://[\S]+', shareUrl, re.I | re.M)[0] print("用户分享短连接:" + shroturl) startpage = requests.get(url=shroturl, headers=headers, allow_redirects=False) location = startpage.headers['location'] sec_uid = re.findall('(?<=sec_uid=)[a-z,A-Z,0-9, _, -]+', location, re.M | re.I)[0] getName = requests.get(url='https://www.iesdouyin.com/web/api/v2/user/info/?sec_uid={}'.format(sec_uid), headers=headers).text userinfo = json.loads(getName) name = userinfo['user_info']['nickname'] print("抖音用户名称:{}".format(userinfo['user_info']['nickname'])) download(startTime, endTime, name, sec_uid)
def download(startTime, endTime, name, sec_uid): Path = name if os.path.exists(path=Path) == False: os.mkdir(path=Path) else: print('目录不存在') os.chdir(path=Path)
timepool = list(pd.date_range(start=startTime, end=endTime, freq='D')) timepool = list(map(str, timepool)) k = len(timepool) for i in range(k - 1): print() print('发布日期 {} -- {}'.format(timepool[i], timepool[i + 1])) numstr = timepool[i][0:8] beginarray = time.strptime(timepool[i], "%Y-%m-%d %H:%M:%S") endarray = time.strptime(timepool[i + 1], "%Y-%m-%d %H:%M:%S") t1 = int(time.mktime(beginarray) * 1000) t2 = int(time.mktime(endarray) * 1000)
sleepTime = getSleepTime() print('===> 正在准备下载,请等待{}s'.format(sleepTime)) time.sleep(sleepTime)
params = { 'sec_uid': sec_uid, 'count': 200, 'min_cursor': t1, 'max_cursor': t2, 'aid': 1128, '_signature': 'PtCNCgAAXljWCq93QOKsFT7QjR' } awemeurl = 'https://www.iesdouyin.com/web/api/v2/aweme/post/?' awemehtml = requests.get(url=awemeurl, params=params, headers=headers).text data = json.loads(awemehtml) awemeCount = len(data['aweme_list']) if awemeCount == 0: print('===> 暂无视频发布......') for j in range(awemeCount): videotitle = data['aweme_list'][j]['desc'].replace("?", "").replace("\"", "").replace(":", "") videoid = data['aweme_list'][j]['aweme_id'] videourl = data['aweme_list'][j]['video']['play_addr']['url_list'][0] start = time.time() print('===> 下载内容 {}'.format(videotitle)) print('===> 下载中......') with open(videoid + '-' + delete_boring_characters(videotitle) + '.mp4', 'wb') as v: try: v.write(requests.get(url=videourl, headers=headers).content) end = time.time() cost = end - start print('===> 下载耗时 {}s'.format(cost)) except Exception as e: print('下载失败')
def getSleepTime(): return random.randint(0, 5)
if __name__ == '__main__': startTime = "2020-01-01 00:00:00" endTime = "2022-10-25 00:00:00" shareUrl = 'https://v.douyin.com/ekkTsYw/' getUrlInfo(startTime, endTime, shareUrl)
|