import os import requests import urllib3 import random import re import time from retrying import retry urllib3.disable_warnings() # 静态变量 BASE_URL = 'https://pan.baidu.com' ERROR_CODES = { 1: '链接失效,没获取到 shareid', 2: '链接失效,没获取到 user_id', 3: '链接失效,没获取到 fs_id', '百度网盘-链接不存在': '链接失效,文件已经被删除或取消分享', '百度网盘 请输入提取码': '链接错误,缺少提取码', -9: '链接错误,提取码错误或验证已过期', -62: '链接错误尝试次数过多,请手动转存或稍后再试', -65: '操作过于频繁,请您稍后重试', 105: '链接错误,链接格式不正确', -4: '转存失败,无效登录。请退出账号在其他地方的登录', -6: '转存失败,请用浏览器无痕模式获取 Cookie', 4: '转存失败,目录中已有同名文件或文件夹存在', -8: '转存失败,目录中已有同名文件或文件夹存在', 12: '转存失败,转存文件数超过限制', -7: '转存失败,秒传文件名有非法字符', 404: '转存失败,秒传无效', 31190: '转存失败,秒传未生效', 31039: '转存失败,秒传文件名冲突', -10: '转存失败,容量不足', 20: '转存失败,容量不足', 0: '转存成功', } class ReqAction: # 请求变量 request_header = { 'Host': 'pan.baidu.com', 'Connection': 'keep-alive', 'Upgrade-Insecure-Requests': '1', 'Sec-Fetch-Dest': 'document', 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.9', 'Sec-Fetch-Site': 'same-site', 'Sec-Fetch-Mode': 'navigate', 'Referer': 'https://pan.baidu.com', 'Accept-Encoding': 'gzip, deflate, br', 'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8,en-US;q=0.7,en-GB;q=0.6,ru;q=0.5', 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/98.0.4758.102 Safari/537.36', } def __init__(self): # 会话配置 self.session = requests.Session() self.bdstoken = None def get_cookies(self): with open('cookie.txt', 'r', encoding='utf-8') as f: cookie = f.readline() print("获取Cookie: 访问网盘首页开启控制台,查找main请求,使用原始格式查看请求头,复制cookie到cookie.txt") self.request_header['Cookie'] = cookie # 获取bdstoken函数 # @retry(stop_max_attempt_number=3, wait_fixed=random.randint(1000, 3000)) def get_bdstoken(self): url = f'{BASE_URL}/api/gettemplatevariable?clienttype=0&app_id=250528&web=1&fields=[%22bdstoken%22,%22token%22,%22uk%22,%22isdocuser%22,%22servertime%22]' response = self.session.get(url=url, headers=self.request_header, timeout=20, allow_redirects=True, verify=False) print('get_bdstoken') print(response.text) return response.json()['errno'] if response.json()['errno'] != 0 else response.json()['result']['bdstoken'] # 获取目录列表函数 @retry(stop_max_attempt_number=5, wait_fixed=random.randint(1000, 3000)) def get_dir_list(self, dir_path): url = f'{BASE_URL}/api/list?order=time&desc=1&showempty=0&web=1&page=1&num=1000&dir={dir_path}&bdstoken={self.bdstoken}' response = self.session.get(url=url, headers=self.request_header, timeout=15, allow_redirects=False, verify=False) print('get_dir_list') print(response.text) return response.json()['errno'] if response.json()['errno'] != 0 else response.json()['list'] # 新建目录函数 @retry(stop_max_attempt_number=5, wait_fixed=random.randint(1000, 3000)) def create_dir(self, target_directory_name): url = f'{BASE_URL}/api/create?a=commit&bdstoken={self.bdstoken}' post_data = {'path': target_directory_name, 'isdir': '1', 'block_list': '[]', } response = self.session.post(url=url, headers=self.request_header, data=post_data, timeout=15, allow_redirects=False, verify=False) print('create_dir') print(response.text.strip()) return response.json()['errno'] # 更新 cookie 函数 def update_cookie(self, bdclnd): if 'BDCLND=' in self.request_header['Cookie']: self.request_header['Cookie'] = re.sub(r'BDCLND=(\S+);?', f'BDCLND={bdclnd};', self.request_header['Cookie']) else: self.request_header['Cookie'] += f';BDCLND={bdclnd}' # 验证提取码函数 @retry(stop_max_attempt_number=6, wait_fixed=1700) def verify_pass_code(self, link_url, pass_code): check_url = f'{BASE_URL}/share/verify?surl={link_url[25:48]}&bdstoken={self.bdstoken}&t={str(int(round(time.time() * 1000)))}&channel=chunlei&web=1&clienttype=0' post_data = {'pwd': pass_code, 'vcode': '', 'vcode_str': '', } response = self.session.post(url=check_url, headers=self.request_header, data=post_data, timeout=10, allow_redirects=False, verify=False) print('verify_pass_code') print(response.text.strip()) return response.json()['errno'] if response.json()['errno'] != 0 else response.json()['randsk'] # 验证链接函数 @retry(stop_max_attempt_number=12, wait_fixed=1700) def verify_links(self, link_url, pass_code): if pass_code: bdclnd = self.verify_pass_code(link_url, pass_code) if isinstance(bdclnd, int): return bdclnd self.update_cookie(bdclnd) response = self.session.get(url=link_url, headers=self.request_header, timeout=15, allow_redirects=True, verify=False).content.decode("utf-8") print('verify_links') # print(response) shareid_list = re.findall('"shareid":(\\d+?),"', response) user_id_list = re.findall('"share_uk":"(\\d+?)","', response) fs_id_list = re.findall('"fs_id":(\\d+?),"', response) info_title_list = re.findall('(.+)', response) if not shareid_list: return 1 elif not user_id_list: return 2 elif not fs_id_list: return info_title_list[0] if info_title_list else 3 else: return [shareid_list[0], user_id_list[0], fs_id_list] # 转存文件函数 @retry(stop_max_attempt_number=3, wait_fixed=random.randint(1000, 3000)) def transfer_files(self, verify_links_reason, target_directory_name): url = f'{BASE_URL}/share/transfer?shareid={verify_links_reason[0]}&from={verify_links_reason[1]}&bdstoken={self.bdstoken}&channel=chunlei&web=1&clienttype=0' post_data = {'fsidlist': f'[{",".join(i for i in verify_links_reason[2])}]', 'path': f'/{target_directory_name}', } response = self.session.post(url=url, headers=self.request_header, data=post_data, timeout=15, allow_redirects=False, verify=False) print('transfer_files') print(response.text) result = response.json()['errno'] file_name = None if result == 0: file_name = response.json()['extra']['list'][0]['from'] return result, file_name def prepare(self): self.get_cookies() self.bdstoken = self.get_bdstoken() def process(self, path_list, save_link, save_code): base_path = os.path.join(path_list[0], path_list[1]) save_path = os.path.join(*path_list) # print(save_path, base_path) temp_path_list = self.get_dir_list(path_list[0]) if isinstance(temp_path_list, list) and base_path[:-1] not in [_dir['path'] for _dir in temp_path_list]: self.create_dir(base_path) base_path_list = self.get_dir_list(base_path) verified_links = self.verify_links(save_link, save_code) if isinstance(verified_links, int): if verified_links == -9: return True, '链接密码错误' if isinstance(verified_links, list): if isinstance(base_path_list, list) and save_path not in [_dir['path'] for _dir in base_path_list]: self.create_dir(save_path) result, name = self.transfer_files(verified_links, save_path) if result == 0 or result == 4: return True, name return False, None if __name__ == '__main__': req = ReqAction() # req.process()