From 2d75b0327766f4e21de64a1700f9daed189b1388 Mon Sep 17 00:00:00 2001 From: Hsury Date: Thu, 24 Oct 2019 18:00:00 +0800 Subject: [PATCH] =?UTF-8?q?=E5=AE=9E=E7=8E=B0=E5=A4=9A=E7=BA=BF=E7=A8=8B?= =?UTF-8?q?=E4=B8=8A=E4=BC=A0=E4=B8=8B=E8=BD=BD=E4=B8=8E=E6=96=AD=E7=82=B9?= =?UTF-8?q?=E7=BB=AD=E4=BC=A0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- bilibili.py | 1297 +-------------------------------------------------- drive.py | 248 ++++++---- 2 files changed, 164 insertions(+), 1381 deletions(-) diff --git a/bilibili.py b/bilibili.py index ce07d0d..d6ff419 100644 --- a/bilibili.py +++ b/bilibili.py @@ -1,72 +1,20 @@ -#!/usr/bin/env python3.6 +#!/usr/bin/env python3.7 # -*- coding: utf-8 -*- -"""Bilibili Toolkit 哔哩哔哩工具箱 -https://github.com/Hsury/Bilibili-Toolkit""" - -banner = r""" - \\ // - \\ // - ##################### ________ ___ ___ ___ ________ ___ ___ ___ - ## ## |\ __ \ |\ \ |\ \ |\ \ |\ __ \ |\ \ |\ \ |\ \ - ## // \\ ## \ \ \|\ /_\ \ \\ \ \ \ \ \\ \ \|\ /_\ \ \\ \ \ \ \ \ - ## // \\ ## \ \ __ \\ \ \\ \ \ \ \ \\ \ __ \\ \ \\ \ \ \ \ \ - ## ## \ \ \|\ \\ \ \\ \ \____ \ \ \\ \ \|\ \\ \ \\ \ \____ \ \ \ - ## www ## \ \_______\\ \__\\ \_______\\ \__\\ \_______\\ \__\\ \_______\\ \__\ - ## ## \|_______| \|__| \|_______| \|__| \|_______| \|__| \|_______| \|__| - ##################### - \/ \/ 哔哩哔哩 (゜-゜)つロ 干杯~ -""" - import base64 -import chardet import hashlib -import json -import os -import platform import random import requests import rsa -import shutil -import subprocess -import sys -import threading import time -import toml -from multiprocessing import freeze_support, Manager, Pool, Process -from selenium import webdriver from urllib import parse -__author__ = "Hsury" -__email__ = "i@hsury.com" -__license__ = "SATA" -__version__ = "2019.9.15" - class Bilibili: app_key = "1d8b6e7d45233436" - patterns = { - 'video': { - 'id': 1, - 'prefix': "https://www.bilibili.com/video/av", - }, - 'activity': { - 'id': 4, - 'prefix': "https://www.bilibili.com/blackboard/", - }, - 'gallery': { - 'id': 11, - 'prefix': "https://h.bilibili.com/", - }, - 'article': { - 'id': 12, - 'prefix': "https://www.bilibili.com/read/cv", - }, - } - def __init__(self, https=True, queue=None): + def __init__(self, https=True): self._session = requests.Session() self._session.headers.update({'User-Agent': "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/77.0.3865.75 Safari/537.36"}) - self.__queue = queue self.get_cookies = lambda: self._session.cookies.get_dict(domain=".bilibili.com") self.get_csrf = lambda: self.get_cookies().get("bili_jct", "") self.get_sid = lambda: self.get_cookies().get("sid", "") @@ -91,9 +39,7 @@ class Bilibili: self.proxy_pool = set() def _log(self, message): - log = f"[{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))}][{self.username if self.username else '#' + self.get_uid() if self.get_uid() else ''}] {message}" - print(log) - self.__push_to_queue("log", log) + print(f"[{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))}][{self.username if self.username else '#' + self.get_uid() if self.get_uid() else ''}] {message}") def _requests(self, method, url, decode_level=2, enable_proxy=True, retry=10, timeout=15, **kwargs): if method in ["get", "post"]: @@ -112,15 +58,6 @@ class Bilibili: response = self._requests("post", url, json=payload) return response['message'] if response and response.get("code") == 0 else None - def __push_to_queue(self, manufacturer, data): - if self.__queue: - self.__queue.put({ - 'uid': self.get_uid(), - 'time': time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())), - 'manufacturer': manufacturer, - 'data': data, - }) - @staticmethod def calc_sign(param): salt = "560c52ccd288fed045859ed18bffd973" @@ -291,1231 +228,3 @@ class Bilibili: else: self._log("用户信息获取失败") return False - - # 修改隐私设置 - def set_privacy(self, show_favourite=None, show_bangumi=None, show_tag=None, show_reward=None, show_info=None, show_game=None): - # show_favourite = 展示[我的收藏夹] - # show_bangumi = 展示[订阅番剧] - # show_tag = 展示[订阅标签] - # show_reward = 展示[最近投币的视频] - # show_info = 展示[个人资料] - # show_game = 展示[最近玩过的游戏] - privacy = { - 'fav_video': show_favourite, - 'bangumi': show_bangumi, - 'tags': show_tag, - 'coins_video': show_reward, - 'user_info': show_info, - 'played_game': show_game, - } - url = f"{self.protocol}://space.bilibili.com/ajax/settings/getSettings?mid={self.get_uid()}" - headers = { - 'Host': "space.bilibili.com", - 'Referer': f"https://space.bilibili.com/{self.get_uid()}/", - } - response = self._requests("get", url, headers=headers) - if response and response.get("status") == True: - for key, value in privacy.items(): - if response['data']['privacy'][key] == value: - privacy[key] = None - else: - self._log(f"隐私设置获取失败 {response}") - return False - url = f"{self.protocol}://space.bilibili.com/ajax/settings/setPrivacy" - headers = { - 'Host': "space.bilibili.com", - 'Origin': "https://space.bilibili.com", - 'Referer': f"https://space.bilibili.com/{self.get_uid()}/", - } - fail = [] - for key, value in privacy.items(): - if value is not None: - payload = { - key: 1 if value else 0, - 'csrf': self.get_csrf(), - } - response = self._requests("post", url, data=payload, headers=headers) - if not response or response.get("status") != True: - fail.append(key) - if not fail: - self._log("隐私设置修改成功") - return True - else: - self._log(f"隐私设置修改失败 {fail}") - return False - - # 银瓜子兑换硬币 - def silver_to_coin(self, app=True, pc=False): - # app = APP通道 - # pc = PC通道 - if app: - param = f"access_key={self.access_token}&appkey={Bilibili.app_key}&ts={int(time.time())}" - url = f"{self.protocol}://api.live.bilibili.com/AppExchange/silver2coin?{param}&sign={self.calc_sign(param)}" - response = self._requests("get", url) - if response and response.get("code") == 0: - self._log("银瓜子兑换硬币(APP通道)成功") - else: - self._log(f"银瓜子兑换硬币(APP通道)失败 {response}") - if pc: - url = f"{self.protocol}://api.live.bilibili.com/pay/v1/Exchange/silver2coin" - payload = { - 'platform': "pc", - 'csrf_token': self.get_csrf(), - } - headers = { - 'Host': "api.live.bilibili.com", - 'Origin': "https://live.bilibili.com", - 'Referer': "https://live.bilibili.com/exchange", - } - response = self._requests("post", url, data=payload, headers=headers) - if response and response.get("code") == 0: - self._log("银瓜子兑换硬币(PC通道)成功") - else: - self._log(f"银瓜子兑换硬币(PC通道)失败 {response}") - - # 观看 - def watch(self, aid): - # aid = 稿件av号 - url = f"{self.protocol}://api.bilibili.com/x/web-interface/view?aid={aid}" - response = self._requests("get", url) - if response and response.get("data") is not None: - cid = response['data']['cid'] - duration = response['data']['duration'] - else: - self._log(f"av{aid}信息解析失败") - return False - url = f"{self.protocol}://api.bilibili.com/x/report/click/h5" - payload = { - 'aid': aid, - 'cid': cid, - 'part': 1, - 'did': self.get_sid(), - 'ftime': int(time.time()), - 'jsonp': "jsonp", - 'lv': None, - 'mid': self.get_uid(), - 'csrf': self.get_csrf(), - 'stime': int(time.time()), - } - headers = { - 'Host': "api.bilibili.com", - 'Origin': "https://www.bilibili.com", - 'Referer': f"https://www.bilibili.com/video/av{aid}", - } - response = self._requests("post", url, data=payload, headers=headers) - if response and response.get("code") == 0: - url = f"{self.protocol}://api.bilibili.com/x/report/web/heartbeat" - payload = { - 'aid': aid, - 'cid': cid, - 'jsonp': "jsonp", - 'mid': self.get_uid(), - 'csrf': self.get_csrf(), - 'played_time': 0, - 'pause': False, - 'realtime': duration, - 'dt': 7, - 'play_type': 1, - 'start_ts': int(time.time()), - } - response = self._requests("post", url, data=payload, headers=headers) - if response and response.get("code") == 0: - time.sleep(5) - payload['played_time'] = duration - 1 - payload['play_type'] = 0 - payload['start_ts'] = int(time.time()) - response = self._requests("post", url, data=payload, headers=headers) - if response and response.get("code") == 0: - self._log(f"av{aid}观看成功") - return True - self._log(f"av{aid}观看失败 {response}") - return False - - # 点赞 - def like(self, aid): - # aid = 稿件av号 - url = f"{self.protocol}://api.bilibili.com/x/web-interface/archive/like" - payload = { - 'aid': aid, - 'like': 1, - 'csrf': self.get_csrf(), - } - headers = { - 'Host': "api.bilibili.com", - 'Origin': "https://www.bilibili.com", - 'Referer': f"https://www.bilibili.com/video/av{aid}", - } - response = self._requests("post", url, data=payload, headers=headers) - if response and response.get("code") == 0: - self._log(f"av{aid}点赞成功") - return True - else: - self._log(f"av{aid}点赞失败 {response}") - return False - - # 投币 - def reward(self, aid, double=True): - # aid = 稿件av号 - # double = 双倍投币 - url = f"{self.protocol}://api.bilibili.com/x/web-interface/coin/add" - payload = { - 'aid': aid, - 'multiply': 2 if double else 1, - 'cross_domain': "true", - 'csrf': self.get_csrf(), - } - headers = { - 'Host': "api.bilibili.com", - 'Origin': "https://www.bilibili.com", - 'Referer': f"https://www.bilibili.com/video/av{aid}", - } - response = self._requests("post", url, data=payload, headers=headers) - if response and response.get("code") == 0: - self._log(f"av{aid}投{2 if double else 1}枚硬币成功") - return True - else: - self._log(f"av{aid}投{2 if double else 1}枚硬币失败 {response}") - return self.reward(aid, False) if double else False - - # 收藏 - def favour(self, aid): - # aid = 稿件av号 - url = f"{self.protocol}://api.bilibili.com/x/v2/fav/folder" - headers = {'Host': "api.bilibili.com"} - response = self._requests("get", url, headers=headers) - if response and response.get("data"): - fid = response['data'][0]['fid'] - else: - self._log("fid获取失败") - return False - url = f"{self.protocol}://api.bilibili.com/x/v2/fav/video/add" - payload = { - 'aid': aid, - 'fid': fid, - 'jsonp': "jsonp", - 'csrf': self.get_csrf(), - } - headers = { - 'Host': "api.bilibili.com", - 'Origin': "https://www.bilibili.com", - 'Referer': f"https://www.bilibili.com/video/av{aid}", - } - response = self._requests("post", url, data=payload, headers=headers) - if response and response.get("code") == 0: - self._log(f"av{aid}收藏成功") - return True - else: - self._log(f"av{aid}收藏失败 {response}") - return False - - # 三连推荐 - def combo(self, aid): - # aid = 稿件av号 - url = f"{self.protocol}://api.bilibili.com/x/web-interface/archive/like/triple" - payload = { - 'aid': aid, - 'csrf': self.get_csrf(), - } - headers = { - 'Host': "api.bilibili.com", - 'Origin': "https://www.bilibili.com", - 'Referer': f"https://www.bilibili.com/video/av{aid}", - } - response = self._requests("post", url, data=payload, headers=headers) - if response and response.get("code") == 0: - self._log(f"av{aid}三连推荐成功") - return True - else: - self._log(f"av{aid}三连推荐失败 {response}") - return False - - # 分享 - def share(self, aid): - # aid = 稿件av号 - url = f"{self.protocol}://api.bilibili.com/x/web-interface/share/add" - payload = { - 'aid': aid, - 'jsonp': "jsonp", - 'csrf': self.get_csrf(), - } - headers = { - 'Host': "api.bilibili.com", - 'Origin': "https://www.bilibili.com", - 'Referer': f"https://www.bilibili.com/video/av{aid}", - } - response = self._requests("post", url, data=payload, headers=headers) - if response and response.get("code") == 0: - self._log(f"av{aid}分享成功") - return True - else: - self._log(f"av{aid}分享失败 {response}") - return False - - # 关注 - def follow(self, mid, secret=False): - # mid = 被关注用户UID - # secret = 悄悄关注 - url = f"{self.protocol}://api.bilibili.com/x/relation/modify" - payload = { - 'fid': mid, - 'act': 3 if secret else 1, - 're_src': 11, - 'jsonp': "jsonp", - 'csrf': self.get_csrf(), - } - headers = { - 'Host': "api.bilibili.com", - 'Origin': "https://space.bilibili.com", - 'Referer': f"https://space.bilibili.com/{mid}/", - } - response = self._requests("post", url, data=payload, headers=headers) - if response and response.get("code") == 0: - self._log(f"用户{mid}{'悄悄' if secret else ''}关注成功") - return True - else: - self._log(f"用户{mid}{'悄悄' if secret else ''}关注失败 {response}") - return False - - # 弹幕发送 - def danmaku_post(self, aid, message, page=1, moment=-1): - # aid = 稿件av号 - # message = 弹幕内容 - # page = 分P - # moment = 弹幕发送时间 - url = f"{self.protocol}://api.bilibili.com/x/web-interface/view?aid={aid}" - response = self._requests("get", url) - if response and response.get("data") is not None: - page_info = {page['page']: { - 'cid': page['cid'], - 'duration': page['duration'], - } for page in response['data']['pages']} - if page in page_info: - oid = page_info[page]['cid'] - duration = page_info[page]['duration'] - else: - self._log(f"av{aid}不存在P{page}") - return False - else: - self._log(f"av{aid}信息解析失败") - return False - url = f"{self.protocol}://api.bilibili.com/x/v2/dm/post" - headers = { - 'Host': "api.bilibili.com", - 'Origin': "https://www.bilibili.com", - 'Referer': f"https://www.bilibili.com/video/av{aid}", - } - while True: - payload = { - 'type': 1, - 'oid': oid, - 'msg': message, - 'aid': aid, - 'progress': int(moment * 1E3) if moment != -1 else random.randint(0, duration * 1E3), - 'color': 16777215, - 'fontsize': 25, - 'pool': 0, - 'mode': 1, - 'rnd': int(time.time() * 1E6), - 'plat': 1, - 'csrf': self.get_csrf(), - } - response = self._requests("post", url, data=payload, headers=headers) - if response and response.get("code") is not None: - if response['code'] == 0: - self._log(f"av{aid}(P{page})弹幕\"{message}\"发送成功") - return True - elif response['code'] == 36703: - self._log(f"av{aid}(P{page})弹幕发送频率过快, 10秒后重试") - time.sleep(10) - else: - self._log(f"av{aid}(P{page})弹幕\"{message}\"发送失败 {response}") - return False - - # 评论点赞 - def comment_like(self, otype, oid, rpid): - # otype = 作品类型 - # oid = 作品ID - # rpid = 评论ID - if Bilibili.patterns.get(otype) is None: - return False - url = f"{self.protocol}://api.bilibili.com/x/v2/reply/action" - payload = { - 'oid': oid, - 'type': Bilibili.patterns[otype]['id'], - 'rpid': rpid, - 'action': 1, - 'jsonp': "jsonp", - 'csrf': self.get_csrf(), - } - headers = { - 'Content-Type': "application/x-www-form-urlencoded; charset=UTF-8", - 'Host': "api.bilibili.com", - 'Origin': "https://www.bilibili.com", - 'Referer': f"{Bilibili.patterns[otype]['prefix']}{oid}", - } - response = self._requests("post", url, data=payload, headers=headers) - if response and response.get("code") == 0: - self._log(f"评论{rpid}点赞成功") - return True - else: - self._log(f"评论{rpid}点赞失败 {response}") - return False - - # 评论发表 - def comment_post(self, otype, oid, message): - # otype = 作品类型 - # oid = 作品ID - # message = 评论内容 - if Bilibili.patterns.get(otype) is None: - return False - url = f"{self.protocol}://api.bilibili.com/x/v2/reply/add" - while True: - payload = { - 'oid': oid, - 'type': Bilibili.patterns[otype]['id'], - 'message': message, - 'plat': 1, - 'jsonp': "jsonp", - 'csrf': self.get_csrf(), - } - headers = { - 'Content-Type': "application/x-www-form-urlencoded; charset=UTF-8", - 'Host': "api.bilibili.com", - 'Origin': "https://www.bilibili.com", - 'Referer': f"{Bilibili.patterns[otype]['prefix']}{oid}", - } - response = self._requests("post", url, data=payload, headers=headers) - if response and response.get("code") is not None: - if response['code'] == 0: - self._log(f"作品{oid}提交评论\"{message}\"成功") - return True - elif response['code'] == 12015: - response = self._requests("get", response['data']['url'], headers=headers, decode_level=1) - captcha = self._solve_captcha(response) - if captcha: - self._log(f"评论验证码识别结果: {captcha}") - payload['code'] = captcha - else: - self._log(f"评论验证码识别服务暂时不可用, 1分钟后重试") - time.sleep(60) - elif response['code'] == 12035: - self._log(f"作品{oid}提交评论\"{message}\"失败, 该账号被UP主列入评论黑名单") - return False - elif response['code'] == -105: - if "code" in payload: - payload.pop("code") - else: - self._log(f"作品{oid}提交评论\"{message}\"失败 {response}") - return False - - # 动态点赞 - def dynamic_like(self, did): - # did = 动态ID - url = f"{self.protocol}://api.vc.bilibili.com/dynamic_like/v1/dynamic_like/thumb" - payload = { - 'uid': self.get_uid(), - 'dynamic_id': did, - 'up': 1, - 'csrf_token': self.get_csrf(), - } - headers = { - 'Content-Type': "application/x-www-form-urlencoded", - 'Host': "api.vc.bilibili.com", - 'Origin': "https://space.bilibili.com", - 'Referer': "https://space.bilibili.com/208259/", - } - response = self._requests("post", url, data=payload, headers=headers) - if response and response.get("code") == 0: - self._log(f"动态{did}点赞成功") - return True - else: - self._log(f"动态{did}点赞失败 {response}") - return False - - # 动态转发 - def dynamic_repost(self, did, message="转发动态", ats=[]): - # did = 动态ID - # message = 转发内容 - # ats = 被@用户UID列表 - def uid_to_nickname(mid): - url = f"{self.protocol}://api.bilibili.com/x/web-interface/card?mid={mid}" - response = self._requests("get", url) - if response and response.get("code") == 0: - return response['data']['card']['name'] - else: - return "" - - url = f"{self.protocol}://api.vc.bilibili.com/dynamic_repost/v1/dynamic_repost/repost" - ctrl = [] - for at in zip(ats, [uid_to_nickname(mid) for mid in ats]): - ctrl.append({ - 'data': str(at[0]), - 'location': len(message) + 1, - 'length': len(at[1]) + 1, - 'type': 1, - }) - message = f"{message} @{at[1]}" - payload = { - 'uid': self.get_uid(), - 'dynamic_id': did, - 'content': message, - 'at_uids': ",".join([str(at) for at in ats]), - 'ctrl': json.dumps(ctrl), - 'csrf_token': self.get_csrf(), - } - headers = { - 'Content-Type': "application/x-www-form-urlencoded", - 'Host': "api.vc.bilibili.com", - 'Origin': "https://space.bilibili.com", - 'Referer': "https://space.bilibili.com/208259/", - } - response = self._requests("post", url, data=payload, headers=headers) - if response and response.get("code") == 0: - self._log(f"动态{did}转发成功") - return True - else: - self._log(f"动态{did}转发失败 {response}") - return False - - # 动态清理 - def dynamic_purge(self): - def get_lottery_dynamics(): - headers = { - 'Host': "api.vc.bilibili.com", - 'Origin': "https://space.bilibili.com", - 'Referer': f"https://space.bilibili.com/{self.get_uid()}/dynamic", - } - dynamics = [] - offset = 0 - while True: - url = f"{self.protocol}://api.vc.bilibili.com/dynamic_svr/v1/dynamic_svr/space_history?visitor_uid={self.get_uid()}&host_uid={self.get_uid()}&offset_dynamic_id={offset}" - response = self._requests("get", url, headers=headers) - if response and response.get("code") == 0: - if response['data']['has_more']: - dynamics.extend([{ - 'did': card['desc']['dynamic_id'], - 'lottery_did': card['desc']['orig_dy_id'], - } for card in response['data']['cards'] if card['desc']['orig_type'] == 2 or card['desc']['orig_type'] == 1024]) - offset = response['data']['cards'][-1]['desc']['dynamic_id'] - else: - return dynamics - - dynamics = get_lottery_dynamics() - self._log(f"发现{len(dynamics)}条互动抽奖动态") - delete = 0 - for dynamic in dynamics: - url = f"{self.protocol}://api.vc.bilibili.com/lottery_svr/v2/lottery_svr/lottery_notice?dynamic_id={dynamic['lottery_did']}" - headers = { - 'Host': "api.vc.bilibili.com", - 'Origin': "https://t.bilibili.com", - 'Referer': "https://t.bilibili.com/lottery/h5/index/", - } - response = self._requests("get", url, headers=headers) - if response and response.get("code") == 0: - expired = response['data']['status'] == 2 or response['data']['status'] == -1 - winning = any(self.get_uid() in winners for winners in [response['data'].get("lottery_result", {}).get(f"{level}_prize_result", []) for level in ["first", "second", "third"]]) - if not expired: - self._log(f"动态{dynamic['lottery_did']}尚未开奖({time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(response['data']['lottery_time']))}), 跳过") - else: - if winning: - self._log(f"动态{dynamic['lottery_did']}中奖, 跳过") - else: - url = f"{self.protocol}://api.vc.bilibili.com/dynamic_repost/v1/dynamic_repost/rm_rp_dyn" - payload = { - 'uid': self.get_uid(), - 'dynamic_id': dynamic['did'], - 'csrf_token': self.get_csrf(), - } - headers = { - 'Content-Type': "application/x-www-form-urlencoded", - 'Host': "api.vc.bilibili.com", - 'Origin': "https://space.bilibili.com", - 'Referer': f"https://space.bilibili.com/{self.get_uid()}/dynamic", - } - response = self._requests("post", url, data=payload, headers=headers) - if response and response.get("code") == 0: - delete += 1 - self._log(f"动态{dynamic['lottery_did']}未中奖, 清理成功") - else: - self._log(f"动态{dynamic['lottery_did']}未中奖, 清理失败") - time.sleep(1) - self._log(f"清理了{delete}条动态") - - # 系统通知查询 - def system_notice(self, time_span=["", ""], keyword=[]): - # time_span = 时间范围 - # keyword = 包含关键字 - cursor_span = [int(time.mktime(time.strptime(element, "%Y-%m-%d %H:%M:%S")) * 1E9) if element else "" for element in time_span] - headers = { - 'Host': "message.bilibili.com", - 'Referer': "https://message.bilibili.com/", - } - notice_list = [] - cursor = cursor_span[1] - while True: - url = f"{self.protocol}://message.bilibili.com/api/notify/query.sysnotify.list.do?data_type=1{'&cursor=' + str(cursor) if cursor else ''}" - response = self._requests("get", url, headers=headers) - if response and response.get("code") == 0: - for notice in response['data']: - if not cursor_span[0] or notice['cursor'] > cursor_span[0]: - if not keyword or any(keyword in notice['title'] or keyword in notice['content'] for keyword in keyword): - notice_list.append({ - 'time': notice['time_at'], - 'title': notice['title'], - 'content': notice['content'], - }) - else: - break - else: - if len(response['data']) == 20: - cursor = notice['cursor'] - continue - self._log(f"系统通知获取成功, 总计{len(notice_list)}条通知") - for notice in notice_list: - self._log(f"{notice['title']}({notice['time']}): {notice['content']}") - self.__push_to_queue("system_notice", notice_list) - return notice_list - - # 会员购抢购 - def mall_rush(self, item_id, thread=1, headless=True, timeout=10): - # item_id = 商品ID - # thread = 线程数 - # headless = 隐藏窗口 - # timeout = 超时刷新 - def executor(thread_id): - def find_and_click(class_name): - try: - element = driver.find_element_by_class_name(class_name) - element.click() - except: - element = None - return element - - options = webdriver.ChromeOptions() - options.add_argument("log-level=3") - if headless: - options.add_argument("headless") - else: - options.add_argument("disable-infobars") - options.add_argument("window-size=374,729") - if platform.system() == "Linux": - options.add_argument("no-sandbox") - options.add_experimental_option("mobileEmulation", {'deviceName': "Nexus 5"}) - if platform.system() == "Windows": - options.binary_location = "chrome-win\\chrome.exe" - driver = webdriver.Chrome(executable_path="chromedriver.exe" if platform.system() == "Windows" else "chromedriver", options=options) - driver.get(f"{self.protocol}://mall.bilibili.com/detail.html?itemsId={item_id}") - for key, value in self.get_cookies().items(): - driver.add_cookie({ - 'name': key, - 'value': value, - 'domain': ".bilibili.com", - }) - self._log(f"(线程{thread_id})商品{item_id}开始监视库存") - url = f"{self.protocol}://mall.bilibili.com/mall-c/items/info?itemsId={item_id}" - while True: - response = self._requests("get", url) - if response and response.get("code") == 0 and response['data']['activityInfoVO']['serverTime'] >= response['data']['activityInfoVO']['startTime'] if response['data']['activityInfoVO'] else True: - break - timestamp = time.time() - in_stock = False - while True: - try: - result = {class_name: find_and_click(class_name) for class_name in ["bottom-buy-button", "button", "dot", "pay-btn", "expire-time-format", "alert-ok", "error-button"]} - if result['bottom-buy-button']: - if "bottom-buy-disable" not in result['bottom-buy-button'].get_attribute("class"): - if not in_stock: - self._log(f"(线程{thread_id})商品{item_id}已开放购买") - in_stock = True - else: - if in_stock: - self._log(f"(线程{thread_id})商品{item_id}暂无法购买, 原因为{result['bottom-buy-button'].text}") - in_stock = False - driver.refresh() - timestamp = time.time() - if result['pay-btn']: - timestamp = time.time() - if result['alert-ok']: - driver.refresh() - if result['expire-time-format']: - self._log(f"(线程{thread_id})商品{item_id}订单提交成功, 请在{result['expire-time-format'].text}内完成支付") - driver.quit() - return True - if time.time() - timestamp > timeout: - self._log(f"(线程{thread_id})商品{item_id}操作超时, 当前页面为{driver.current_url}") - driver.get(f"{self.protocol}://mall.bilibili.com/detail.html?itemsId={item_id}") - timestamp = time.time() - except: - pass - - threads = [] - for i in range(thread): - threads.append(threading.Thread(target=executor, args=(i + 1,))) - for thread in threads: - thread.start() - for thread in threads: - thread.join() - - # 会员购优惠卷领取 - def mall_coupon(self, coupon_id, thread=1): - # coupon_id = 优惠券ID - # thread = 线程数 - def get_coupon_info(coupon_id): - url = f"{self.protocol}://mall.bilibili.com/mall-c/coupon/user_coupon_code_receive_status_list" - payload = { - 'couponIds': [str(coupon_id)], - 'mid': "", - 'csrf': self.get_csrf(), - } - headers = { - 'Host': "mall.bilibili.com", - 'Origin': "https://www.bilibili.com", - } - response = self._requests("post", url, json=payload, headers=headers) - if response and response.get("code") == 0: - return { - 'end': response['data'][0]['receiveEndTime'], - 'message': response['data'][0]['couponStatusMsg'], - 'name': response['data'][0]['couponName'], - 'total': response['data'][0]['provideNum'], - 'remain': response['data'][0]['remainNum'], - 'start': response['data'][0]['receiveStartTime'], - 'status': response['data'][0]['receiveStatus'], - } - - def get_server_time(target_time=0): - url = f"{self.protocol}://mall.bilibili.com/mall-c/common/time/remain?v={int(time.time())}&targetTime={target_time}" - headers = { - 'Host': "mall.bilibili.com", - 'Origin': "https://www.bilibili.com", - } - response = self._requests("get", url, headers=headers) - if response and response.get("code") == 0: - return { - 'current': response['data']['serverTime'], - 'remain': response['data']['remainSeconds'], - } - - def executor(thread_id): - url = f"{self.protocol}://mall.bilibili.com/mall-c/coupon/create_coupon_code?couponId={coupon_id}&deviceId=" - payload = {'csrf': self.get_csrf()} - headers = { - 'Host': "mall.bilibili.com", - 'Origin': "https://www.bilibili.com", - } - nonlocal flag - while not flag: - response = self._requests("post", url, json=payload, headers=headers) - if response and response.get("code") is not None: - if response['code'] == 83094004: - self._log(f"(线程{thread_id})会员购优惠卷\"{coupon_info['name']}\"(ID={coupon_id})领取成功") - elif response['code'] == 83110005: - self._log(f"(线程{thread_id})会员购优惠卷\"{coupon_info['name']}\"(ID={coupon_id})领取失败, 优惠券领取数量已达到上限") - elif response['code'] == 83110015: - self._log(f"(线程{thread_id})会员购优惠卷\"{coupon_info['name']}\"(ID={coupon_id})领取失败, 优惠券库存不足") - else: - continue - else: - self._log(f"(线程{thread_id})会员购优惠卷\"{coupon_info['name']}\"(ID={coupon_id})领取失败, 当前IP请求过于频繁") - flag = True - - coupon_info = get_coupon_info(coupon_id) - if coupon_info: - if coupon_info['message'] == "可领取": - server_time = get_server_time(coupon_info['start']) - if server_time: - delay = max(server_time['remain'] - 3, 0) - self._log(f"会员购优惠卷\"{coupon_info['name']}\"(ID={coupon_id})可领取时间为{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(coupon_info['start']))}至{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(coupon_info['end']))}, 库存{coupon_info['remain']}张, 将于{delay}秒后开始领取") - time.sleep(delay) - else: - self._log(f"会员购服务器时间获取失败") - return - else: - self._log(f"会员购优惠卷\"{coupon_info['name']}\"(ID={coupon_id}){coupon_info['message']}") - return - else: - self._log(f"会员购优惠卷{coupon_id}信息获取失败") - return - flag = False - threads = [] - for i in range(thread): - threads.append(threading.Thread(target=executor, args=(i + 1,))) - for thread in threads: - thread.start() - for thread in threads: - thread.join() - - # 会员购订单列表查询 - def mall_order_list(self, status=0, type=[2]): - # status = 订单状态 - # type = 订单类型 - def get_order_list(status, type): - headers = { - 'Origin': "https://mall.bilibili.com", - 'Referer': "https://mall.bilibili.com/orderlist.html", - } - order_list = [] - page = 0 - while True: - url = f"{self.protocol}://show.bilibili.com/api/ticket/ordercenter/list?pageNum={page}&pageSize=20&status={status}&customer=0&platform=h5&v={int(time.time())}" - response = self._requests("get", url, headers=headers) - if response and response.get("errno") == 0: - data = response['data']['list'] - if data: - for order in data: - if not type or order['order_type'] in type: - order_list.append(order) - page += 1 - else: - self._log(f"会员购订单列表获取成功, 总计{len(order_list)}个订单") - break - else: - self._log(f"会员购订单列表获取失败 {response}") - return order_list - - def get_order_detail(order_id): - url = f"{self.protocol}://mall.bilibili.com/mall-c/order/detail?orderId={order_id}&platform=h5&time={int(time.time())}" - headers = { - 'Origin': "https://mall.bilibili.com", - 'Referer': f"https://mall.bilibili.com/orderdetail.html?orderId={order_id}", - } - response = self._requests("get", url, headers=headers) - if response and response.get("code") == 0 and response['data']['vo']: - data = response['data']['vo'] - self._log(f"会员购订单{order_id}详情获取成功, 包含\"{data['skuList'][0]['itemsName']}\"等{len(data['skuList'])}件商品") - return data - else: - self._log(f"会员购订单{order_id}详情获取失败 {response}") - return {} - - def get_order_express(order_id): - url = f"{self.protocol}://mall.bilibili.com/mall-c/order/express/detail?orderId={order_id}" - headers = { - 'Origin': "https://mall.bilibili.com", - 'Referer': f"https://mall.bilibili.com/orderdetail.html?orderId={order_id}", - } - for _ in range(5): - response = self._requests("get", url, headers=headers) - if response and response.get("code") == 0 and response['data']['vo']: - data = response['data']['vo'] - self._log(f"会员购订单{order_id}物流获取成功, 状态为{data['state_v']}") - return data - time.sleep(3) - self._log(f"会员购订单{order_id}物流获取失败 {response}") - return {} - - order_list = [] - for order in get_order_list(status, type): - order_detail = get_order_detail(order['order_id']) - order_express = get_order_express(order['order_id']) if order_detail and order_detail['orderExpress'] else {} - order_list.append({ - 'id': order.get("order_id"), - 'item': [{ - 'id': item.get("itemsId"), - 'name': item.get("itemsName"), - 'spec': item.get("skuSpec"), - 'number': item.get("skuNum"), - 'price': item.get("price"), - } for item in order_detail.get("skuList", [])], - 'create': time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(order.get("order_ctime"))) if order.get("current_timestamp") else None, - 'status': { - 'code': order.get("status"), - 'name': order.get("status_name"), - }, - 'pay': { - 'id': order_detail['orderBasic'].get("payId") if order_detail.get("orderBasic") else None, - 'time': order.get("pay_ctime") if order.get("pay_ctime") != "0000-00-00 00:00:00" else None, - 'channel': order_detail['orderBasic'].get("paymentChannel") if order_detail.get("orderBasic") else None, - 'total': order.get("show_money") / 100 if order.get("show_money") else None, - 'origin': order_detail['orderBasic'].get("payTotalMoney") if order_detail.get("orderBasic") else None, - 'discount': order_detail['orderBasic'].get("discountMoneys") if order_detail.get("orderBasic") else None, - 'express': order.get("express_fee") / 100 if order.get("express_fee") else None, - }, - 'preorder': { - 'phone': order_detail['extData'].get("notifyPhoneOrigin") if order_detail.get("extData") else None, - 'front': { - 'total': order_detail['extData'].get("frontPayMoney") if order_detail.get("extData") else None, - 'origin': order_detail['extData'].get("frontMoney") if order_detail.get("extData") else None, - 'discount': order_detail['extData'].get("frontDisMoney") if order_detail.get("extData") else None, - }, - 'final': { - 'total': order_detail['extData'].get("finalPayMoney") if order_detail.get("extData") else None, - 'origin': order_detail['extData'].get("finalMoney") if order_detail.get("extData") else None, - 'discount': order_detail['extData'].get("finalDisMoney") if order_detail.get("extData") else None, - 'start': time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(order_detail['extData'].get("finalMoneyStart") / 1E3)) if order_detail.get("extData") and order_detail['extData'].get("finalMoneyStart") else None, - 'end': time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(order_detail['extData'].get("finalMoneyEnd") / 1E3)) if order_detail.get("extData") and order_detail['extData'].get("finalMoneyStart") else None, - }, - }, - 'shipping': { - 'name': order_detail['orderDeliver'].get("deliverName") if order_detail.get("orderDeliver") else None, - 'phone': order_detail['orderDeliver'].get("deliverPhone") if order_detail.get("orderDeliver") else None, - 'address': order_detail['orderDeliver'].get("deliverAddr") if order_detail.get("orderDeliver") else None, - 'company': order_detail['orderExpress'].get("com_v") if order_detail.get("orderExpress") else None, - 'number': order_detail['orderExpress'].get("sno") if order_detail.get("orderExpress") else None, - 'status': order_express.get("state_v"), - 'detail': order_express.get("detail"), - }, - }) - self.__push_to_queue("mall_order_list", order_list) - return order_list - - # 会员购优惠券列表查询 - def mall_coupon_list(self, status=1): - # status = 优惠券状态 - status_map = { - 1: "validList", - 2: "usedList", - 3: "invalidList", - } - if status not in status_map: - return [] - headers = { - 'Referer': "https://mall.bilibili.com/couponlist.html?noTitleBar=1", - } - coupon_list = [] - page = 1 - while True: - url = f"{self.protocol}://mall.bilibili.com/mall-c/coupon/list?status={status}&pageIndex={page}&pageSize=20" - response = self._requests("get", url, headers=headers) - if response and response.get("code") == 0: - if response['data'][status_map[status]]: - for coupon in response['data'][status_map[status]]['list']: - coupon_list.append({ - 'name': coupon['couponCodeName'], - 'description': coupon['couponDesc'], - 'detail': coupon['couponDetail'], - 'discount': coupon['couponDiscount'], - 'status': coupon['status'], - 'type': coupon['couponCodeType'], - 'start': time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(coupon['useStartTime'] / 1E3)) if coupon['useStartTime'] else None, - 'end': time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(coupon['useEndTime'] / 1E3)) if coupon['useEndTime'] else None, - 'use': time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(coupon['useTime'] / 1E3)) if coupon['useTime'] else None, - 'expire': time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(coupon['expireDate'] / 1E3)) if coupon['expireDate'] else None, - }) - if response['data'][status_map[status]]['hasNextPage']: - page += 1 - continue - self._log(f"会员购优惠券列表获取成功, 总计{len(coupon_list)}张优惠券") - for coupon in coupon_list: - self._log(f"会员购优惠券: {coupon['name']}" + (f", 失效时间为{coupon['expire']}" if coupon['expire'] else f", 使用时间为{coupon['use']}" if coupon['use'] else f", 使用有效期为{coupon['start']}至{coupon['end']}" if coupon['start'] and coupon['end'] else "")) - break - else: - self._log(f"会员购优惠券列表获取失败 {response}") - break - self.__push_to_queue("mall_coupon_list", coupon_list) - return coupon_list - - # 会员购奖品列表查询 - def mall_prize_list(self, status=0, type=[1, 2]): - # status = 奖品状态 - # type = 奖品类型 - headers = { - 'Referer': "https://mall.bilibili.com/prizecenter.html", - } - prize_list = [] - page = 1 - while True: - url = f"{self.protocol}://mall.bilibili.com/mall-c/prize/list?pageNum={page}&pageSize=20&type={status}&v={int(time.time())}" - response = self._requests("get", url, headers=headers) - if response and response.get("code") == 0: - for prize in response['data']['pageInfo']['list']: - if not type or prize['prizeType'] in type: - prize_list.append({ - 'name': prize['prizeName'], - 'source': prize['sourceName'], - 'status': prize['status'], - 'type': prize['prizeType'], - 'expire': time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(prize['expireTime'])), - }) - if response['data']['pageInfo']['hasNextPage']: - page += 1 - else: - self._log(f"会员购奖品列表获取成功, 总计{len(prize_list)}个奖品, {response['data']['waitDeliveryNum']}个奖品待发货") - for prize in prize_list: - self._log(f"会员购奖品: {prize['name']}, 来自{prize['source']}, 领取有效期至{prize['expire']}") - break - else: - self._log(f"会员购奖品列表获取失败 {response}") - break - self.__push_to_queue("mall_prize_list", prize_list) - return prize_list - - # 直播奖品列表查询 - def live_prize_list(self): - headers = { - 'Origin': "https://link.bilibili.com", - 'Referer': "https://link.bilibili.com/p/center/index", - } - prize_list = [] - page = 1 - while True: - url = f"{self.protocol}://api.live.bilibili.com/lottery/v1/award/award_list?page={page}&month=" - response = self._requests("get", url, headers=headers) - if response and response.get("code") == 0: - for prize in response['data']['list']: - prize_list.append({ - 'name': prize['gift_name'], - 'number': prize['gift_num'], - 'source': prize['source'], - 'status': prize['status'], - 'type': prize['gift_type'], - 'create': prize['create_time'], - 'expire': prize['expire_time'], - }) - if page < response['data']['total_page']: - page += 1 - else: - self._log(f"直播奖品列表获取成功, 总计{len(prize_list)}个奖品") - for prize in prize_list: - self._log(f"直播奖品: {prize['name']} x{prize['number']}, 来自{prize['source']}, 中奖时间为{prize['create']}, 领取有效期至{prize['expire']}") - break - else: - self._log(f"直播奖品列表获取失败 {response}") - break - self.__push_to_queue("live_prize_list", prize_list) - return prize_list - -def detect_charset(file, fallback="utf-8"): - with open(file, "rb") as f: - detector = chardet.UniversalDetector() - for line in f.readlines(): - detector.feed(line) - if detector.done: - return detector.result['encoding'] - return fallback - -def download(url, save_as=None): - print(f"正在下载{url}") - if save_as is None: - save_as = url.split("/")[-1] - with open(save_as, "wb") as f: - response = requests.get(url, stream=True) - length = response.headers.get("content-length") - if length: - length = int(length) - receive = 0 - for data in response.iter_content(chunk_size=100 * 1024): - f.write(data) - receive += len(data) - percent = receive / length - print(f"\r[{'=' * int(50 * percent)}{' ' * (50 - int(50 * percent))}] {percent:.0%}", end="", flush=True) - print() - else: - f.write(response.content) - return save_as - -def decompress(file, remove=True): - shutil.unpack_archive(file) - if remove: - os.remove(file) - print(f"{file}解压完毕") - -def export(queue, config): - bucket = {} - log_file = open(config['global']['log'], "a", encoding="utf-8") if config['global']['log'] else None - try: - while True: - packet = queue.get() - if isinstance(packet, dict) and all(key in packet for key in ['uid', 'manufacturer', 'data']): - if packet['manufacturer'] == "log": - if log_file: - log_file.write(packet['data'] + "\n") - else: - if packet['manufacturer'] not in bucket: - bucket[packet['manufacturer']] = {} - if packet['uid'] not in bucket[packet['manufacturer']]: - bucket[packet['manufacturer']][packet['uid']] = [] - if isinstance(packet['data'], list): - bucket[packet['manufacturer']][packet['uid']].extend(packet['data']) - else: - bucket[packet['manufacturer']][packet['uid']].append(packet['data']) - elif packet is None: - for manufacturer, data in bucket.items(): - if config.get(manufacturer, {}).get("export"): - with open(config[manufacturer]['export'], "w", encoding="utf-8") as f: - f.write(json.dumps(data, indent=4, ensure_ascii=False)) - return - finally: - if log_file: - log_file.close() - -def wrapper(arg): - def delay_wrapper(func, interval, arg_list=[()], shuffle=False): - if shuffle: - random.shuffle(arg_list) - for i in range(len(arg_list)): - func(*arg_list[i]) - if i < len(arg_list) - 1: - time.sleep(interval) - - config, account, queue = arg['config'], arg['account'], arg['queue'] - instance = Bilibili(config['global']['https'], queue) - if config['proxy']['enable']: - if isinstance(config['proxy']['pool'], str): - try: - with open(config['proxy']['pool'], "r", encoding=detect_charset(config['proxy']['pool'])) as f: - instance.set_proxy(add=[proxy for proxy in f.read().strip().splitlines() if proxy and proxy[0] != "#"]) - except: - pass - elif isinstance(config['proxy']['pool'], list): - instance.set_proxy(add=config['proxy']['pool']) - if instance.login(force_refresh_token=config['user']['force_refresh_token'], **account): - threads = [] - if config['get_user_info']['enable']: - threads.append(threading.Thread(target=instance.get_user_info)) - if config['set_privacy']['enable']: - threads.append(threading.Thread(target=instance.set_privacy, args=(config['set_privacy']['show_favourite'], config['set_privacy']['show_bangumi'], config['set_privacy']['show_tag'], config['set_privacy']['show_reward'], config['set_privacy']['show_info'], config['set_privacy']['show_game']))) - if config['silver_to_coin']['enable']: - threads.append(threading.Thread(target=instance.silver_to_coin)) - if config['watch']['enable']: - threads.append(threading.Thread(target=delay_wrapper, args=(instance.watch, 5, list(zip(config['watch']['aid']))))) - if config['like']['enable']: - threads.append(threading.Thread(target=delay_wrapper, args=(instance.like, 5, list(zip(config['like']['aid']))))) - if config['reward']['enable']: - threads.append(threading.Thread(target=delay_wrapper, args=(instance.reward, 5, list(zip(config['reward']['aid'], config['reward']['double']))))) - if config['favour']['enable']: - threads.append(threading.Thread(target=delay_wrapper, args=(instance.favour, 5, list(zip(config['favour']['aid']))))) - if config['combo']['enable']: - threads.append(threading.Thread(target=delay_wrapper, args=(instance.combo, 5, list(zip(config['combo']['aid']))))) - if config['share']['enable']: - threads.append(threading.Thread(target=delay_wrapper, args=(instance.share, 5, list(zip(config['share']['aid']))))) - if config['follow']['enable']: - threads.append(threading.Thread(target=delay_wrapper, args=(instance.follow, 5, list(zip(config['follow']['mid'], config['follow']['secret']))))) - if config['danmaku_post']['enable']: - threads.append(threading.Thread(target=delay_wrapper, args=(instance.danmaku_post, 5, list(zip(config['danmaku_post']['aid'], config['danmaku_post']['message'], config['danmaku_post']['page'], config['danmaku_post']['moment']))))) - if config['comment_like']['enable']: - threads.append(threading.Thread(target=delay_wrapper, args=(instance.comment_like, 5, list(zip(config['comment_like']['otype'], config['comment_like']['oid'], config['comment_like']['rpid']))))) - if config['comment_post']['enable']: - threads.append(threading.Thread(target=delay_wrapper, args=(instance.comment_post, 5, list(zip(config['comment_post']['otype'], config['comment_post']['oid'], config['comment_post']['message']))))) - # for comment in zip(config['comment_post']['otype'], config['comment_post']['oid'], config['comment_post']['message']): - # threads.append(threading.Thread(target=instance.comment_post, args=(comment[0], comment[1], comment[2]))) - if config['dynamic_like']['enable']: - threads.append(threading.Thread(target=delay_wrapper, args=(instance.dynamic_like, 5, list(zip(config['dynamic_like']['did']))))) - if config['dynamic_repost']['enable']: - threads.append(threading.Thread(target=delay_wrapper, args=(instance.dynamic_repost, 5, list(zip(config['dynamic_repost']['did'], config['dynamic_repost']['message'], config['dynamic_repost']['ats']))))) - if config['dynamic_purge']['enable']: - threads.append(threading.Thread(target=instance.dynamic_purge)) - if config['system_notice']['enable']: - threads.append(threading.Thread(target=instance.system_notice, args=(config['system_notice']['time_span'], config['system_notice']['keyword']))) - if config['mall_rush']['enable']: - for item in zip(config['mall_rush']['item_id'], config['mall_rush']['thread']): - threads.append(threading.Thread(target=instance.mall_rush, args=(item[0], item[1], config['mall_rush']['headless'], config['mall_rush']['timeout']))) - if config['mall_coupon']['enable']: - for coupon in zip(config['mall_coupon']['coupon_id'], config['mall_coupon']['thread']): - threads.append(threading.Thread(target=instance.mall_coupon, args=(coupon[0], coupon[1]))) - if config['mall_order_list']['enable']: - threads.append(threading.Thread(target=instance.mall_order_list, args=(config['mall_order_list']['status'], config['mall_order_list']['type']))) - if config['mall_coupon_list']['enable']: - threads.append(threading.Thread(target=instance.mall_coupon_list, args=(config['mall_coupon_list']['status'],))) - if config['mall_prize_list']['enable']: - threads.append(threading.Thread(target=instance.mall_prize_list, args=(config['mall_prize_list']['status'], config['mall_prize_list']['type']))) - if config['live_prize_list']['enable']: - threads.append(threading.Thread(target=instance.live_prize_list)) - # instance._log("任务开始执行") - for thread in threads: - thread.start() - for thread in threads: - thread.join() - # instance._log("任务执行完毕") - return { - 'username': instance.username, - 'password': instance.password, - 'access_token': instance.access_token, - 'refresh_token': instance.refresh_token, - 'cookie': instance.get_cookies(), - } - -def main(): - print(f"{banner}\n{__doc__}\n版本: {__version__}\n") - config_file = sys.argv[1] if len(sys.argv) > 1 else "config.toml" - try: - with open(config_file, "r", encoding=detect_charset(config_file)) as f: - config = toml.load(f) - except: - print(f"无法加载{config_file}") - return - accounts = [] - for line in config['user']['account'].splitlines(): - try: - if line[0] == "#": - continue - pairs = {} - for pair in line.strip(";").split(";"): - if len(pair.split("=")) == 2: - key, value = pair.split("=") - pairs[key] = value - password = all(key in pairs for key in ["username", "password"]) - token = all(key in pairs for key in ["access_token", "refresh_token"]) - cookie = all(key in pairs for key in ["bili_jct", "DedeUserID", "DedeUserID__ckMd5", "sid", "SESSDATA"]) - if password or token or cookie: - accounts.append(pairs) - except: - pass - config['user'].pop("account") - print(f"导入了{len(accounts)}个用户") - if not accounts: - return - if config['mall_rush']['enable']: - if platform.system() == "Linux" and os.path.exists("/etc/debian_version"): - prefix = "sudo " if shutil.which("sudo") else "" - if shutil.which("chromium-browser") is None: - os.system(f"{prefix}apt -y install chromium-browser") - if shutil.which("chromedriver") is None: - os.system(f"{prefix}apt -y install chromium-chromedriver") - os.system(f"{prefix}ln -s /usr/lib/chromium-browser/chromedriver /usr/bin") - elif platform.system() == "Linux" and os.path.exists("/etc/redhat-release"): - prefix = "sudo " if shutil.which("sudo") else "" - if shutil.which("chromium-browser") is None: - os.system(f"{prefix}yum -y install chromium") - if shutil.which("chromedriver") is None: - os.system(f"{prefix}yum -y install chromedriver") - elif platform.system() == "Windows": - if not os.path.exists("chrome-win\\chrome.exe"): - decompress(download("https://npm.taobao.org/mirrors/chromium-browser-snapshots/Win/686378/chrome-win.zip")) - if not os.path.exists("chromedriver.exe"): - decompress(download("https://npm.taobao.org/mirrors/chromedriver/78.0.3904.11/chromedriver_win32.zip")) - else: - print("会员购抢购组件不支持在当前平台上运行") - config['mall_rush']['enable'] = False - queue = Manager().Queue() - export_process = Process(target=export, args=(queue, config)) - export_process.start() - with Pool(min(config['global']['process'], len(accounts))) as p: - result = p.map(wrapper, [{ - 'config': config, - 'account': account, - 'queue': queue, - } for account in accounts]) - p.close() - p.join() - if config['user']['update']: - with open(config_file, "r+", encoding=detect_charset(config_file)) as f: - content = f.read() - before = content.split("account")[0] - after = content.split("account")[-1].split("\"\"\"")[-1] - f.seek(0) - f.truncate() - f.write(before) - f.write("account = \"\"\"\n") - for credential in result: - new_line = False - for key, value in credential.items(): - if value: - if key == "cookie": - f.write(f"{';'.join(f'{key}={value}' for key, value in value.items())};") - else: - f.write(f"{key}={value};") - new_line = True - if new_line: - f.write("\n") - f.write("\"\"\"") - f.write(after) - print("凭据已更新") - queue.put(None) - export_process.join() - -if __name__ == "__main__": - freeze_support() - main() - if platform.system() == "Windows": - os.system("pause >nul | set /p =请按任意键退出") diff --git a/drive.py b/drive.py index 301595c..d590756 100644 --- a/drive.py +++ b/drive.py @@ -6,7 +6,9 @@ import hashlib import json import math import os +import re import requests +import threading import time import types from bilibili import Bilibili @@ -15,14 +17,14 @@ from PIL import Image def log(message): print(f"[{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))}] {message}") -def calc_md5(data, hexdigest=False): - md5 = hashlib.md5() +def calc_sha1(data, hexdigest=False): + sha1 = hashlib.sha1() if isinstance(data, types.GeneratorType): for chunk in data: - md5.update(chunk) + sha1.update(chunk) else: - md5.update(data) - return md5.hexdigest() if hexdigest else md5.digest() + sha1.update(data) + return sha1.hexdigest() if hexdigest else sha1.digest() def read_in_chunks(file_name, chunk_size=1024 * 1024): with open(file_name, "rb") as f: @@ -34,8 +36,7 @@ def read_in_chunks(file_name, chunk_size=1024 * 1024): return def image_dump(data, file_name): - md5 = calc_md5(data) - merged_data = data + md5 + b"\xff" + merged_data = data + b"\xff" pixel_number = math.ceil(len(merged_data) / 3) width = math.ceil(math.sqrt(pixel_number)) height = math.ceil(pixel_number / width) @@ -55,10 +56,9 @@ def image_load(file_name): merged_data = b"".join(bytes(pixel_data) for pixel_data in image.getdata()) merged_data = merged_data.rstrip(b"\x00") if merged_data[-1] == 255: - data, md5 = merged_data[:-(1 + 16)], merged_data[-(1 + 16):-1] - if calc_md5(data) == md5: - return data - return b"" + return merged_data[:-1] + else: + return b"" def image_upload(file_name, cookies): url = "https://api.vc.bilibili.com/api/v1/drawImage/upload" @@ -94,126 +94,198 @@ def image_download(url, file_name=None): f.write(response.content) return file_name +def fetch_meta(string): + if string.startswith("http://") or string.startswith("https://"): + meta_file_name = image_download(string) + elif re.match(r"^[a-fA-F0-9]{40}$", string): + meta_file_name = image_download(f"http://i0.hdslb.com/bfs/album/{string}.png") + else: + meta_file_name = string + try: + meta_data = json.loads(image_load(meta_file_name).decode("utf-8")) + return meta_data + except: + return None + finally: + os.remove(meta_file_name) + def login_handle(args): bilibili = Bilibili() bilibili.login(username=args.username, password=args.password) + bilibili.get_user_info() with open(args.cookies_file, "w", encoding="utf-8") as f: f.write(json.dumps(bilibili.get_cookies(), ensure_ascii=False, indent=2)) def info_handle(args): - if args.url.startswith("http://") or args.url.startswith("https://"): - meta_file_name = image_download(args.url) - else: - meta_file_name = args.url - try: - meta_data = json.loads(image_load(meta_file_name).decode("utf-8")) - os.remove(meta_file_name) + meta_data = fetch_meta(args.meta) + if meta_data: log(f"文件名: {meta_data['filename']}") log(f"大小: {meta_data['size'] / 1024 / 1024:.2f} MB") - log(f"MD5: {meta_data['md5']}") + log(f"SHA-1: {meta_data['sha1']}") log(f"上传时间: {time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(meta_data['time']))}") log(f"分块数: {len(meta_data['block'])}") - for index, url in enumerate(meta_data['block']): - log(f"分块{index} URL: {url}") - except: - os.remove(meta_file_name) + for index, block in enumerate(meta_data['block']): + log(f"分块{index} ({block['size'] / 1024 / 1024:.2f} MB) URL: {block['url']}") + else: log("元数据解析出错") def upload_handle(args): + def core(index, block): + block_file_name = f"{sha1}_{index}.png" + image_dump(block, block_file_name) + block_sha1 = calc_sha1(read_in_chunks(block_file_name), hexdigest=True) + url = skippable(block_sha1) + if url: + log(f"分块{index} ({os.path.getsize(block_file_name) / 1024 / 1024:.2f} MB) 已存在于服务器") + block_dict[index] = { + 'url': url, + 'size': os.path.getsize(block_file_name), + 'sha1': block_sha1, + } + done_flag.release() + else: + for _ in range(3): + response = image_upload(block_file_name, cookies) + if response['code'] == 0: + url = response['data']['image_url'] + log(f"分块{index} ({os.path.getsize(block_file_name) / 1024 / 1024:.2f} MB) 已上传") + block_dict[index] = { + 'url': url, + 'size': os.path.getsize(block_file_name), + 'sha1': block_sha1, + } + done_flag.release() + break + elif response['code'] == -4: + terminate_flag.set() + log("上传失败, 请先登录") + break + else: + terminate_flag.set() + log(f"分块{index} ({os.path.getsize(block_file_name) / 1024 / 1024:.2f} MB) 上传失败, 服务器返回{response}") + os.remove(block_file_name) + + def skippable(sha1): + url = f"http://i0.hdslb.com/bfs/album/{sha1}.png" + response = requests.head(url) + return url if response.status_code == 200 else None + + done_flag = threading.Semaphore(0) + terminate_flag = threading.Event() + thread_pool = [] start_time = time.time() try: with open(args.cookies_file, "r", encoding="utf-8") as f: cookies = json.loads(f.read()) except: - log("Cookies文件加载失败") + log("Cookies加载失败, 请先登录") return None file_name = args.file - url_list = [] + block_dict = {} log(f"上传: {file_name} ({os.path.getsize(file_name) / 1024 / 1024:.2f} MB)") - md5 = calc_md5(read_in_chunks(file_name), hexdigest=True) - log(f"MD5: {md5}") - for index, chunk in enumerate(read_in_chunks(file_name, chunk_size=args.block_size * 1024 * 1024)): - part_file_name = f"{md5}_{index}.png" - image_dump(chunk, part_file_name) - for _ in range(3): - response = image_upload(part_file_name, cookies) - if response['code'] == 0: - url = response['data']['image_url'] - log(f"分块{index} ({os.path.getsize(part_file_name) / 1024 / 1024:.2f} MB) 已上传") - url_list.append(url) - os.remove(part_file_name) - break - elif response['code'] == -4: - log(f"上传失败, 账号未登录") - os.remove(part_file_name) - return None + sha1 = calc_sha1(read_in_chunks(file_name), hexdigest=True) + log(f"SHA-1: {sha1}") + log(f"线程数: {args.thread}") + for index, block in enumerate(read_in_chunks(file_name, chunk_size=args.block_size * 1024 * 1024 - 1)): + if len(thread_pool) >= args.thread: + done_flag.acquire() + if not terminate_flag.is_set(): + thread_pool.append(threading.Thread(target=core, args=(index, block))) + thread_pool[-1].start() else: - log(f"分块{index} ({os.path.getsize(part_file_name) / 1024 / 1024:.2f} MB) 上传失败, 服务器返回{response}") - os.remove(part_file_name) - return None + log("已终止上传, 等待线程回收") + for thread in thread_pool: + thread.join() + if terminate_flag.is_set(): + return None meta_data = { 'time': int(time.time()), 'filename': file_name, 'size': os.path.getsize(file_name), - 'md5': md5, - 'block': url_list, + 'sha1': sha1, + 'block': [block_dict[i] for i in range(len(block_dict))], } - meta_file_name = f"{md5}_meta.png" + meta_file_name = f"{sha1}_meta.png" image_dump(json.dumps(meta_data, ensure_ascii=False).encode("utf-8"), meta_file_name) for _ in range(3): response = image_upload(meta_file_name, cookies) if response['code'] == 0: url = response['data']['image_url'] - log(f"元数据已上传") + log("元数据已上传") os.remove(meta_file_name) log(f"{file_name}上传完毕, 共有{index + 1}个分块, 耗时{int(time.time() - start_time)}秒") - log(f"META URL: {url}") + log(f"META: {re.findall(r'[a-fA-F0-9]{40}', url)[0] if re.match(r'^http(s?)://i0.hdslb.com/bfs/album/[a-fA-F0-9]{40}.png$', url) else url}") return url else: - log(f"元数据上传失败, 保留本地文件{meta_file_name}, 服务器返回{response}") + log(f"元数据上传失败, 保留文件{meta_file_name}, 服务器返回{response}") return meta_file_name def download_handle(args): + def core(index, block): + block_file_name = f"{meta_data['sha1']}_{index}.png" + if os.path.exists(block_file_name) and calc_sha1(read_in_chunks(block_file_name), hexdigest=True) == block['sha1']: + log(f"分块{index} ({os.path.getsize(block_file_name) / 1024 / 1024:.2f} MB) 已存在于本地") + block_file_name_dict[index] = block_file_name + done_flag.release() + else: + for _ in range(3): + image_download(block['url'], file_name=block_file_name) + if calc_sha1(read_in_chunks(block_file_name), hexdigest=True) == block['sha1']: + log(f"分块{index} ({os.path.getsize(block_file_name) / 1024 / 1024:.2f} MB) 已下载") + block_file_name_dict[index] = block_file_name + done_flag.release() + break + else: + terminate_flag.set() + log(f"分块{index}校验未通过, SHA-1与元数据中的记录{block['sha1']}不匹配") + os.remove(block_file_name) + return + + done_flag = threading.Semaphore(0) + terminate_flag = threading.Event() + thread_pool = [] + block_file_name_dict = {} start_time = time.time() - if args.url.startswith("http://") or args.url.startswith("https://"): - meta_file_name = image_download(args.url) - else: - meta_file_name = args.url - try: - meta_data = json.loads(image_load(meta_file_name).decode("utf-8")) - os.remove(meta_file_name) - file_name = args.save_as if args.save_as else meta_data['filename'] + meta_data = fetch_meta(args.meta) + if meta_data: + file_name = args.file if args.file else meta_data['filename'] log(f"下载: {file_name} ({meta_data['size'] / 1024 / 1024:.2f} MB), 共有{len(meta_data['block'])}个分块, 上传于{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(meta_data['time']))}") - except: - os.remove(meta_file_name) + else: log("元数据解析出错") return None - with open(file_name, "wb") as f: - for index, url in enumerate(meta_data['block']): - for _ in range(3): - part_file_name = image_download(url) - part_data = image_load(part_file_name) - if part_data != b"": - log(f"分块{index} ({len(part_data) / 1024 / 1024:.2f} MB) 已下载") - f.write(part_data) - os.remove(part_file_name) - break + log(f"线程数: {args.thread}") + if not (os.path.exists(file_name) and calc_sha1(read_in_chunks(file_name), hexdigest=True) == meta_data['sha1']): + for index, block in enumerate(meta_data['block']): + if len(thread_pool) >= args.thread: + done_flag.acquire() + if not terminate_flag.is_set(): + thread_pool.append(threading.Thread(target=core, args=(index, block))) + thread_pool[-1].start() else: - log(f"分块{index}校验出错") - os.remove(part_file_name) - return None - log(f"{file_name}下载完毕, 耗时{int(time.time() - start_time)}秒") - md5 = calc_md5(read_in_chunks(file_name), hexdigest=True) - log(f"MD5: {md5}") - if md5 == meta_data['md5']: - log(f"{file_name}校验通过") - return file_name + log("已终止下载, 等待线程回收") + for thread in thread_pool: + thread.join() + if terminate_flag.is_set(): + return None + with open(file_name, "wb") as f: + for index in range(len(meta_data['block'])): + block_file_name = block_file_name_dict[index] + f.write(image_load(block_file_name)) + os.remove(block_file_name) + sha1 = calc_sha1(read_in_chunks(file_name), hexdigest=True) + log(f"SHA-1: {sha1}") + if sha1 == meta_data['sha1']: + log(f"{file_name}校验通过") + log(f"{file_name}下载完毕, 耗时{int(time.time() - start_time)}秒") + return file_name + else: + log(f"{file_name}校验未通过, SHA-1与元数据中的记录{meta_data['sha1']}不匹配") + return None else: - log(f"{file_name}校验出错, MD5与元数据中的记录{meta_data['md5']}不匹配") - return None + log(f"{file_name}已存在于本地") if __name__ == "__main__": - parser = argparse.ArgumentParser(prog="BiliDrive", description="Bilibili Drive", epilog="By Hsury, 2019/10/23") + parser = argparse.ArgumentParser(prog="BiliDrive", description="Bilibili Drive", epilog="By Hsury, 2019/10/24") parser.add_argument("-c", "--cookies-file", default="cookies.json", help="cookies json file name") subparsers = parser.add_subparsers() @@ -224,17 +296,19 @@ if __name__ == "__main__": login_parser.set_defaults(func=login_handle) info_parser = subparsers.add_parser("info", help="get meta info") - info_parser.add_argument("url", help="meta url") + info_parser.add_argument("meta", help="meta url") info_parser.set_defaults(func=info_handle) upload_parser = subparsers.add_parser("upload", help="upload a file") upload_parser.add_argument("file", help="file name") - upload_parser.add_argument("-b", "--block-size", default=1, type=int, help="block size in MB") + upload_parser.add_argument("-b", "--block-size", default=4, type=int, help="block size in MB") + upload_parser.add_argument("-t", "--thread", default=2, type=int, help="thread number") upload_parser.set_defaults(func=upload_handle) download_parser = subparsers.add_parser("download", help="download a file") - download_parser.add_argument("url", help="meta url") - download_parser.add_argument("save_as", nargs="?", default="", help="save as file name") + download_parser.add_argument("meta", help="meta url") + download_parser.add_argument("file", nargs="?", default="", help="save as file name") + download_parser.add_argument("-t", "--thread", default=4, type=int, help="thread number") download_parser.set_defaults(func=download_handle) args = parser.parse_args() -- GitLab