提交 2d75b032 编写于 作者: H Hsury

实现多线程上传下载与断点续传

上级 8d55af4d
#!/usr/bin/env python3.6
#!/usr/bin/env python3.7
# -*- coding: utf-8 -*-
"""Bilibili Toolkit 哔哩哔哩工具箱
https://github.com/Hsury/Bilibili-Toolkit"""
banner = r"""
\\ //
\\ //
##################### ________ ___ ___ ___ ________ ___ ___ ___
## ## |\ __ \ |\ \ |\ \ |\ \ |\ __ \ |\ \ |\ \ |\ \
## // \\ ## \ \ \|\ /_\ \ \\ \ \ \ \ \\ \ \|\ /_\ \ \\ \ \ \ \ \
## // \\ ## \ \ __ \\ \ \\ \ \ \ \ \\ \ __ \\ \ \\ \ \ \ \ \
## ## \ \ \|\ \\ \ \\ \ \____ \ \ \\ \ \|\ \\ \ \\ \ \____ \ \ \
## www ## \ \_______\\ \__\\ \_______\\ \__\\ \_______\\ \__\\ \_______\\ \__\
## ## \|_______| \|__| \|_______| \|__| \|_______| \|__| \|_______| \|__|
#####################
\/ \/ 哔哩哔哩 (゜-゜)つロ 干杯~
"""
import base64
import chardet
import hashlib
import json
import os
import platform
import random
import requests
import rsa
import shutil
import subprocess
import sys
import threading
import time
import toml
from multiprocessing import freeze_support, Manager, Pool, Process
from selenium import webdriver
from urllib import parse
__author__ = "Hsury"
__email__ = "i@hsury.com"
__license__ = "SATA"
__version__ = "2019.9.15"
class Bilibili:
app_key = "1d8b6e7d45233436"
patterns = {
'video': {
'id': 1,
'prefix': "https://www.bilibili.com/video/av",
},
'activity': {
'id': 4,
'prefix': "https://www.bilibili.com/blackboard/",
},
'gallery': {
'id': 11,
'prefix': "https://h.bilibili.com/",
},
'article': {
'id': 12,
'prefix': "https://www.bilibili.com/read/cv",
},
}
def __init__(self, https=True, queue=None):
def __init__(self, https=True):
self._session = requests.Session()
self._session.headers.update({'User-Agent': "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/77.0.3865.75 Safari/537.36"})
self.__queue = queue
self.get_cookies = lambda: self._session.cookies.get_dict(domain=".bilibili.com")
self.get_csrf = lambda: self.get_cookies().get("bili_jct", "")
self.get_sid = lambda: self.get_cookies().get("sid", "")
......@@ -91,9 +39,7 @@ class Bilibili:
self.proxy_pool = set()
def _log(self, message):
log = f"[{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))}][{self.username if self.username else '#' + self.get_uid() if self.get_uid() else ''}] {message}"
print(log)
self.__push_to_queue("log", log)
print(f"[{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))}][{self.username if self.username else '#' + self.get_uid() if self.get_uid() else ''}] {message}")
def _requests(self, method, url, decode_level=2, enable_proxy=True, retry=10, timeout=15, **kwargs):
if method in ["get", "post"]:
......@@ -112,15 +58,6 @@ class Bilibili:
response = self._requests("post", url, json=payload)
return response['message'] if response and response.get("code") == 0 else None
def __push_to_queue(self, manufacturer, data):
if self.__queue:
self.__queue.put({
'uid': self.get_uid(),
'time': time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())),
'manufacturer': manufacturer,
'data': data,
})
@staticmethod
def calc_sign(param):
salt = "560c52ccd288fed045859ed18bffd973"
......@@ -291,1231 +228,3 @@ class Bilibili:
else:
self._log("用户信息获取失败")
return False
# 修改隐私设置
def set_privacy(self, show_favourite=None, show_bangumi=None, show_tag=None, show_reward=None, show_info=None, show_game=None):
# show_favourite = 展示[我的收藏夹]
# show_bangumi = 展示[订阅番剧]
# show_tag = 展示[订阅标签]
# show_reward = 展示[最近投币的视频]
# show_info = 展示[个人资料]
# show_game = 展示[最近玩过的游戏]
privacy = {
'fav_video': show_favourite,
'bangumi': show_bangumi,
'tags': show_tag,
'coins_video': show_reward,
'user_info': show_info,
'played_game': show_game,
}
url = f"{self.protocol}://space.bilibili.com/ajax/settings/getSettings?mid={self.get_uid()}"
headers = {
'Host': "space.bilibili.com",
'Referer': f"https://space.bilibili.com/{self.get_uid()}/",
}
response = self._requests("get", url, headers=headers)
if response and response.get("status") == True:
for key, value in privacy.items():
if response['data']['privacy'][key] == value:
privacy[key] = None
else:
self._log(f"隐私设置获取失败 {response}")
return False
url = f"{self.protocol}://space.bilibili.com/ajax/settings/setPrivacy"
headers = {
'Host': "space.bilibili.com",
'Origin': "https://space.bilibili.com",
'Referer': f"https://space.bilibili.com/{self.get_uid()}/",
}
fail = []
for key, value in privacy.items():
if value is not None:
payload = {
key: 1 if value else 0,
'csrf': self.get_csrf(),
}
response = self._requests("post", url, data=payload, headers=headers)
if not response or response.get("status") != True:
fail.append(key)
if not fail:
self._log("隐私设置修改成功")
return True
else:
self._log(f"隐私设置修改失败 {fail}")
return False
# 银瓜子兑换硬币
def silver_to_coin(self, app=True, pc=False):
# app = APP通道
# pc = PC通道
if app:
param = f"access_key={self.access_token}&appkey={Bilibili.app_key}&ts={int(time.time())}"
url = f"{self.protocol}://api.live.bilibili.com/AppExchange/silver2coin?{param}&sign={self.calc_sign(param)}"
response = self._requests("get", url)
if response and response.get("code") == 0:
self._log("银瓜子兑换硬币(APP通道)成功")
else:
self._log(f"银瓜子兑换硬币(APP通道)失败 {response}")
if pc:
url = f"{self.protocol}://api.live.bilibili.com/pay/v1/Exchange/silver2coin"
payload = {
'platform': "pc",
'csrf_token': self.get_csrf(),
}
headers = {
'Host': "api.live.bilibili.com",
'Origin': "https://live.bilibili.com",
'Referer': "https://live.bilibili.com/exchange",
}
response = self._requests("post", url, data=payload, headers=headers)
if response and response.get("code") == 0:
self._log("银瓜子兑换硬币(PC通道)成功")
else:
self._log(f"银瓜子兑换硬币(PC通道)失败 {response}")
# 观看
def watch(self, aid):
# aid = 稿件av号
url = f"{self.protocol}://api.bilibili.com/x/web-interface/view?aid={aid}"
response = self._requests("get", url)
if response and response.get("data") is not None:
cid = response['data']['cid']
duration = response['data']['duration']
else:
self._log(f"av{aid}信息解析失败")
return False
url = f"{self.protocol}://api.bilibili.com/x/report/click/h5"
payload = {
'aid': aid,
'cid': cid,
'part': 1,
'did': self.get_sid(),
'ftime': int(time.time()),
'jsonp': "jsonp",
'lv': None,
'mid': self.get_uid(),
'csrf': self.get_csrf(),
'stime': int(time.time()),
}
headers = {
'Host': "api.bilibili.com",
'Origin': "https://www.bilibili.com",
'Referer': f"https://www.bilibili.com/video/av{aid}",
}
response = self._requests("post", url, data=payload, headers=headers)
if response and response.get("code") == 0:
url = f"{self.protocol}://api.bilibili.com/x/report/web/heartbeat"
payload = {
'aid': aid,
'cid': cid,
'jsonp': "jsonp",
'mid': self.get_uid(),
'csrf': self.get_csrf(),
'played_time': 0,
'pause': False,
'realtime': duration,
'dt': 7,
'play_type': 1,
'start_ts': int(time.time()),
}
response = self._requests("post", url, data=payload, headers=headers)
if response and response.get("code") == 0:
time.sleep(5)
payload['played_time'] = duration - 1
payload['play_type'] = 0
payload['start_ts'] = int(time.time())
response = self._requests("post", url, data=payload, headers=headers)
if response and response.get("code") == 0:
self._log(f"av{aid}观看成功")
return True
self._log(f"av{aid}观看失败 {response}")
return False
# 点赞
def like(self, aid):
# aid = 稿件av号
url = f"{self.protocol}://api.bilibili.com/x/web-interface/archive/like"
payload = {
'aid': aid,
'like': 1,
'csrf': self.get_csrf(),
}
headers = {
'Host': "api.bilibili.com",
'Origin': "https://www.bilibili.com",
'Referer': f"https://www.bilibili.com/video/av{aid}",
}
response = self._requests("post", url, data=payload, headers=headers)
if response and response.get("code") == 0:
self._log(f"av{aid}点赞成功")
return True
else:
self._log(f"av{aid}点赞失败 {response}")
return False
# 投币
def reward(self, aid, double=True):
# aid = 稿件av号
# double = 双倍投币
url = f"{self.protocol}://api.bilibili.com/x/web-interface/coin/add"
payload = {
'aid': aid,
'multiply': 2 if double else 1,
'cross_domain': "true",
'csrf': self.get_csrf(),
}
headers = {
'Host': "api.bilibili.com",
'Origin': "https://www.bilibili.com",
'Referer': f"https://www.bilibili.com/video/av{aid}",
}
response = self._requests("post", url, data=payload, headers=headers)
if response and response.get("code") == 0:
self._log(f"av{aid}{2 if double else 1}枚硬币成功")
return True
else:
self._log(f"av{aid}{2 if double else 1}枚硬币失败 {response}")
return self.reward(aid, False) if double else False
# 收藏
def favour(self, aid):
# aid = 稿件av号
url = f"{self.protocol}://api.bilibili.com/x/v2/fav/folder"
headers = {'Host': "api.bilibili.com"}
response = self._requests("get", url, headers=headers)
if response and response.get("data"):
fid = response['data'][0]['fid']
else:
self._log("fid获取失败")
return False
url = f"{self.protocol}://api.bilibili.com/x/v2/fav/video/add"
payload = {
'aid': aid,
'fid': fid,
'jsonp': "jsonp",
'csrf': self.get_csrf(),
}
headers = {
'Host': "api.bilibili.com",
'Origin': "https://www.bilibili.com",
'Referer': f"https://www.bilibili.com/video/av{aid}",
}
response = self._requests("post", url, data=payload, headers=headers)
if response and response.get("code") == 0:
self._log(f"av{aid}收藏成功")
return True
else:
self._log(f"av{aid}收藏失败 {response}")
return False
# 三连推荐
def combo(self, aid):
# aid = 稿件av号
url = f"{self.protocol}://api.bilibili.com/x/web-interface/archive/like/triple"
payload = {
'aid': aid,
'csrf': self.get_csrf(),
}
headers = {
'Host': "api.bilibili.com",
'Origin': "https://www.bilibili.com",
'Referer': f"https://www.bilibili.com/video/av{aid}",
}
response = self._requests("post", url, data=payload, headers=headers)
if response and response.get("code") == 0:
self._log(f"av{aid}三连推荐成功")
return True
else:
self._log(f"av{aid}三连推荐失败 {response}")
return False
# 分享
def share(self, aid):
# aid = 稿件av号
url = f"{self.protocol}://api.bilibili.com/x/web-interface/share/add"
payload = {
'aid': aid,
'jsonp': "jsonp",
'csrf': self.get_csrf(),
}
headers = {
'Host': "api.bilibili.com",
'Origin': "https://www.bilibili.com",
'Referer': f"https://www.bilibili.com/video/av{aid}",
}
response = self._requests("post", url, data=payload, headers=headers)
if response and response.get("code") == 0:
self._log(f"av{aid}分享成功")
return True
else:
self._log(f"av{aid}分享失败 {response}")
return False
# 关注
def follow(self, mid, secret=False):
# mid = 被关注用户UID
# secret = 悄悄关注
url = f"{self.protocol}://api.bilibili.com/x/relation/modify"
payload = {
'fid': mid,
'act': 3 if secret else 1,
're_src': 11,
'jsonp': "jsonp",
'csrf': self.get_csrf(),
}
headers = {
'Host': "api.bilibili.com",
'Origin': "https://space.bilibili.com",
'Referer': f"https://space.bilibili.com/{mid}/",
}
response = self._requests("post", url, data=payload, headers=headers)
if response and response.get("code") == 0:
self._log(f"用户{mid}{'悄悄' if secret else ''}关注成功")
return True
else:
self._log(f"用户{mid}{'悄悄' if secret else ''}关注失败 {response}")
return False
# 弹幕发送
def danmaku_post(self, aid, message, page=1, moment=-1):
# aid = 稿件av号
# message = 弹幕内容
# page = 分P
# moment = 弹幕发送时间
url = f"{self.protocol}://api.bilibili.com/x/web-interface/view?aid={aid}"
response = self._requests("get", url)
if response and response.get("data") is not None:
page_info = {page['page']: {
'cid': page['cid'],
'duration': page['duration'],
} for page in response['data']['pages']}
if page in page_info:
oid = page_info[page]['cid']
duration = page_info[page]['duration']
else:
self._log(f"av{aid}不存在P{page}")
return False
else:
self._log(f"av{aid}信息解析失败")
return False
url = f"{self.protocol}://api.bilibili.com/x/v2/dm/post"
headers = {
'Host': "api.bilibili.com",
'Origin': "https://www.bilibili.com",
'Referer': f"https://www.bilibili.com/video/av{aid}",
}
while True:
payload = {
'type': 1,
'oid': oid,
'msg': message,
'aid': aid,
'progress': int(moment * 1E3) if moment != -1 else random.randint(0, duration * 1E3),
'color': 16777215,
'fontsize': 25,
'pool': 0,
'mode': 1,
'rnd': int(time.time() * 1E6),
'plat': 1,
'csrf': self.get_csrf(),
}
response = self._requests("post", url, data=payload, headers=headers)
if response and response.get("code") is not None:
if response['code'] == 0:
self._log(f"av{aid}(P{page})弹幕\"{message}\"发送成功")
return True
elif response['code'] == 36703:
self._log(f"av{aid}(P{page})弹幕发送频率过快, 10秒后重试")
time.sleep(10)
else:
self._log(f"av{aid}(P{page})弹幕\"{message}\"发送失败 {response}")
return False
# 评论点赞
def comment_like(self, otype, oid, rpid):
# otype = 作品类型
# oid = 作品ID
# rpid = 评论ID
if Bilibili.patterns.get(otype) is None:
return False
url = f"{self.protocol}://api.bilibili.com/x/v2/reply/action"
payload = {
'oid': oid,
'type': Bilibili.patterns[otype]['id'],
'rpid': rpid,
'action': 1,
'jsonp': "jsonp",
'csrf': self.get_csrf(),
}
headers = {
'Content-Type': "application/x-www-form-urlencoded; charset=UTF-8",
'Host': "api.bilibili.com",
'Origin': "https://www.bilibili.com",
'Referer': f"{Bilibili.patterns[otype]['prefix']}{oid}",
}
response = self._requests("post", url, data=payload, headers=headers)
if response and response.get("code") == 0:
self._log(f"评论{rpid}点赞成功")
return True
else:
self._log(f"评论{rpid}点赞失败 {response}")
return False
# 评论发表
def comment_post(self, otype, oid, message):
# otype = 作品类型
# oid = 作品ID
# message = 评论内容
if Bilibili.patterns.get(otype) is None:
return False
url = f"{self.protocol}://api.bilibili.com/x/v2/reply/add"
while True:
payload = {
'oid': oid,
'type': Bilibili.patterns[otype]['id'],
'message': message,
'plat': 1,
'jsonp': "jsonp",
'csrf': self.get_csrf(),
}
headers = {
'Content-Type': "application/x-www-form-urlencoded; charset=UTF-8",
'Host': "api.bilibili.com",
'Origin': "https://www.bilibili.com",
'Referer': f"{Bilibili.patterns[otype]['prefix']}{oid}",
}
response = self._requests("post", url, data=payload, headers=headers)
if response and response.get("code") is not None:
if response['code'] == 0:
self._log(f"作品{oid}提交评论\"{message}\"成功")
return True
elif response['code'] == 12015:
response = self._requests("get", response['data']['url'], headers=headers, decode_level=1)
captcha = self._solve_captcha(response)
if captcha:
self._log(f"评论验证码识别结果: {captcha}")
payload['code'] = captcha
else:
self._log(f"评论验证码识别服务暂时不可用, 1分钟后重试")
time.sleep(60)
elif response['code'] == 12035:
self._log(f"作品{oid}提交评论\"{message}\"失败, 该账号被UP主列入评论黑名单")
return False
elif response['code'] == -105:
if "code" in payload:
payload.pop("code")
else:
self._log(f"作品{oid}提交评论\"{message}\"失败 {response}")
return False
# 动态点赞
def dynamic_like(self, did):
# did = 动态ID
url = f"{self.protocol}://api.vc.bilibili.com/dynamic_like/v1/dynamic_like/thumb"
payload = {
'uid': self.get_uid(),
'dynamic_id': did,
'up': 1,
'csrf_token': self.get_csrf(),
}
headers = {
'Content-Type': "application/x-www-form-urlencoded",
'Host': "api.vc.bilibili.com",
'Origin': "https://space.bilibili.com",
'Referer': "https://space.bilibili.com/208259/",
}
response = self._requests("post", url, data=payload, headers=headers)
if response and response.get("code") == 0:
self._log(f"动态{did}点赞成功")
return True
else:
self._log(f"动态{did}点赞失败 {response}")
return False
# 动态转发
def dynamic_repost(self, did, message="转发动态", ats=[]):
# did = 动态ID
# message = 转发内容
# ats = 被@用户UID列表
def uid_to_nickname(mid):
url = f"{self.protocol}://api.bilibili.com/x/web-interface/card?mid={mid}"
response = self._requests("get", url)
if response and response.get("code") == 0:
return response['data']['card']['name']
else:
return ""
url = f"{self.protocol}://api.vc.bilibili.com/dynamic_repost/v1/dynamic_repost/repost"
ctrl = []
for at in zip(ats, [uid_to_nickname(mid) for mid in ats]):
ctrl.append({
'data': str(at[0]),
'location': len(message) + 1,
'length': len(at[1]) + 1,
'type': 1,
})
message = f"{message} @{at[1]}"
payload = {
'uid': self.get_uid(),
'dynamic_id': did,
'content': message,
'at_uids': ",".join([str(at) for at in ats]),
'ctrl': json.dumps(ctrl),
'csrf_token': self.get_csrf(),
}
headers = {
'Content-Type': "application/x-www-form-urlencoded",
'Host': "api.vc.bilibili.com",
'Origin': "https://space.bilibili.com",
'Referer': "https://space.bilibili.com/208259/",
}
response = self._requests("post", url, data=payload, headers=headers)
if response and response.get("code") == 0:
self._log(f"动态{did}转发成功")
return True
else:
self._log(f"动态{did}转发失败 {response}")
return False
# 动态清理
def dynamic_purge(self):
def get_lottery_dynamics():
headers = {
'Host': "api.vc.bilibili.com",
'Origin': "https://space.bilibili.com",
'Referer': f"https://space.bilibili.com/{self.get_uid()}/dynamic",
}
dynamics = []
offset = 0
while True:
url = f"{self.protocol}://api.vc.bilibili.com/dynamic_svr/v1/dynamic_svr/space_history?visitor_uid={self.get_uid()}&host_uid={self.get_uid()}&offset_dynamic_id={offset}"
response = self._requests("get", url, headers=headers)
if response and response.get("code") == 0:
if response['data']['has_more']:
dynamics.extend([{
'did': card['desc']['dynamic_id'],
'lottery_did': card['desc']['orig_dy_id'],
} for card in response['data']['cards'] if card['desc']['orig_type'] == 2 or card['desc']['orig_type'] == 1024])
offset = response['data']['cards'][-1]['desc']['dynamic_id']
else:
return dynamics
dynamics = get_lottery_dynamics()
self._log(f"发现{len(dynamics)}条互动抽奖动态")
delete = 0
for dynamic in dynamics:
url = f"{self.protocol}://api.vc.bilibili.com/lottery_svr/v2/lottery_svr/lottery_notice?dynamic_id={dynamic['lottery_did']}"
headers = {
'Host': "api.vc.bilibili.com",
'Origin': "https://t.bilibili.com",
'Referer': "https://t.bilibili.com/lottery/h5/index/",
}
response = self._requests("get", url, headers=headers)
if response and response.get("code") == 0:
expired = response['data']['status'] == 2 or response['data']['status'] == -1
winning = any(self.get_uid() in winners for winners in [response['data'].get("lottery_result", {}).get(f"{level}_prize_result", []) for level in ["first", "second", "third"]])
if not expired:
self._log(f"动态{dynamic['lottery_did']}尚未开奖({time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(response['data']['lottery_time']))}), 跳过")
else:
if winning:
self._log(f"动态{dynamic['lottery_did']}中奖, 跳过")
else:
url = f"{self.protocol}://api.vc.bilibili.com/dynamic_repost/v1/dynamic_repost/rm_rp_dyn"
payload = {
'uid': self.get_uid(),
'dynamic_id': dynamic['did'],
'csrf_token': self.get_csrf(),
}
headers = {
'Content-Type': "application/x-www-form-urlencoded",
'Host': "api.vc.bilibili.com",
'Origin': "https://space.bilibili.com",
'Referer': f"https://space.bilibili.com/{self.get_uid()}/dynamic",
}
response = self._requests("post", url, data=payload, headers=headers)
if response and response.get("code") == 0:
delete += 1
self._log(f"动态{dynamic['lottery_did']}未中奖, 清理成功")
else:
self._log(f"动态{dynamic['lottery_did']}未中奖, 清理失败")
time.sleep(1)
self._log(f"清理了{delete}条动态")
# 系统通知查询
def system_notice(self, time_span=["", ""], keyword=[]):
# time_span = 时间范围
# keyword = 包含关键字
cursor_span = [int(time.mktime(time.strptime(element, "%Y-%m-%d %H:%M:%S")) * 1E9) if element else "" for element in time_span]
headers = {
'Host': "message.bilibili.com",
'Referer': "https://message.bilibili.com/",
}
notice_list = []
cursor = cursor_span[1]
while True:
url = f"{self.protocol}://message.bilibili.com/api/notify/query.sysnotify.list.do?data_type=1{'&cursor=' + str(cursor) if cursor else ''}"
response = self._requests("get", url, headers=headers)
if response and response.get("code") == 0:
for notice in response['data']:
if not cursor_span[0] or notice['cursor'] > cursor_span[0]:
if not keyword or any(keyword in notice['title'] or keyword in notice['content'] for keyword in keyword):
notice_list.append({
'time': notice['time_at'],
'title': notice['title'],
'content': notice['content'],
})
else:
break
else:
if len(response['data']) == 20:
cursor = notice['cursor']
continue
self._log(f"系统通知获取成功, 总计{len(notice_list)}条通知")
for notice in notice_list:
self._log(f"{notice['title']}({notice['time']}): {notice['content']}")
self.__push_to_queue("system_notice", notice_list)
return notice_list
# 会员购抢购
def mall_rush(self, item_id, thread=1, headless=True, timeout=10):
# item_id = 商品ID
# thread = 线程数
# headless = 隐藏窗口
# timeout = 超时刷新
def executor(thread_id):
def find_and_click(class_name):
try:
element = driver.find_element_by_class_name(class_name)
element.click()
except:
element = None
return element
options = webdriver.ChromeOptions()
options.add_argument("log-level=3")
if headless:
options.add_argument("headless")
else:
options.add_argument("disable-infobars")
options.add_argument("window-size=374,729")
if platform.system() == "Linux":
options.add_argument("no-sandbox")
options.add_experimental_option("mobileEmulation", {'deviceName': "Nexus 5"})
if platform.system() == "Windows":
options.binary_location = "chrome-win\\chrome.exe"
driver = webdriver.Chrome(executable_path="chromedriver.exe" if platform.system() == "Windows" else "chromedriver", options=options)
driver.get(f"{self.protocol}://mall.bilibili.com/detail.html?itemsId={item_id}")
for key, value in self.get_cookies().items():
driver.add_cookie({
'name': key,
'value': value,
'domain': ".bilibili.com",
})
self._log(f"(线程{thread_id})商品{item_id}开始监视库存")
url = f"{self.protocol}://mall.bilibili.com/mall-c/items/info?itemsId={item_id}"
while True:
response = self._requests("get", url)
if response and response.get("code") == 0 and response['data']['activityInfoVO']['serverTime'] >= response['data']['activityInfoVO']['startTime'] if response['data']['activityInfoVO'] else True:
break
timestamp = time.time()
in_stock = False
while True:
try:
result = {class_name: find_and_click(class_name) for class_name in ["bottom-buy-button", "button", "dot", "pay-btn", "expire-time-format", "alert-ok", "error-button"]}
if result['bottom-buy-button']:
if "bottom-buy-disable" not in result['bottom-buy-button'].get_attribute("class"):
if not in_stock:
self._log(f"(线程{thread_id})商品{item_id}已开放购买")
in_stock = True
else:
if in_stock:
self._log(f"(线程{thread_id})商品{item_id}暂无法购买, 原因为{result['bottom-buy-button'].text}")
in_stock = False
driver.refresh()
timestamp = time.time()
if result['pay-btn']:
timestamp = time.time()
if result['alert-ok']:
driver.refresh()
if result['expire-time-format']:
self._log(f"(线程{thread_id})商品{item_id}订单提交成功, 请在{result['expire-time-format'].text}内完成支付")
driver.quit()
return True
if time.time() - timestamp > timeout:
self._log(f"(线程{thread_id})商品{item_id}操作超时, 当前页面为{driver.current_url}")
driver.get(f"{self.protocol}://mall.bilibili.com/detail.html?itemsId={item_id}")
timestamp = time.time()
except:
pass
threads = []
for i in range(thread):
threads.append(threading.Thread(target=executor, args=(i + 1,)))
for thread in threads:
thread.start()
for thread in threads:
thread.join()
# 会员购优惠卷领取
def mall_coupon(self, coupon_id, thread=1):
# coupon_id = 优惠券ID
# thread = 线程数
def get_coupon_info(coupon_id):
url = f"{self.protocol}://mall.bilibili.com/mall-c/coupon/user_coupon_code_receive_status_list"
payload = {
'couponIds': [str(coupon_id)],
'mid': "",
'csrf': self.get_csrf(),
}
headers = {
'Host': "mall.bilibili.com",
'Origin': "https://www.bilibili.com",
}
response = self._requests("post", url, json=payload, headers=headers)
if response and response.get("code") == 0:
return {
'end': response['data'][0]['receiveEndTime'],
'message': response['data'][0]['couponStatusMsg'],
'name': response['data'][0]['couponName'],
'total': response['data'][0]['provideNum'],
'remain': response['data'][0]['remainNum'],
'start': response['data'][0]['receiveStartTime'],
'status': response['data'][0]['receiveStatus'],
}
def get_server_time(target_time=0):
url = f"{self.protocol}://mall.bilibili.com/mall-c/common/time/remain?v={int(time.time())}&targetTime={target_time}"
headers = {
'Host': "mall.bilibili.com",
'Origin': "https://www.bilibili.com",
}
response = self._requests("get", url, headers=headers)
if response and response.get("code") == 0:
return {
'current': response['data']['serverTime'],
'remain': response['data']['remainSeconds'],
}
def executor(thread_id):
url = f"{self.protocol}://mall.bilibili.com/mall-c/coupon/create_coupon_code?couponId={coupon_id}&deviceId="
payload = {'csrf': self.get_csrf()}
headers = {
'Host': "mall.bilibili.com",
'Origin': "https://www.bilibili.com",
}
nonlocal flag
while not flag:
response = self._requests("post", url, json=payload, headers=headers)
if response and response.get("code") is not None:
if response['code'] == 83094004:
self._log(f"(线程{thread_id})会员购优惠卷\"{coupon_info['name']}\"(ID={coupon_id})领取成功")
elif response['code'] == 83110005:
self._log(f"(线程{thread_id})会员购优惠卷\"{coupon_info['name']}\"(ID={coupon_id})领取失败, 优惠券领取数量已达到上限")
elif response['code'] == 83110015:
self._log(f"(线程{thread_id})会员购优惠卷\"{coupon_info['name']}\"(ID={coupon_id})领取失败, 优惠券库存不足")
else:
continue
else:
self._log(f"(线程{thread_id})会员购优惠卷\"{coupon_info['name']}\"(ID={coupon_id})领取失败, 当前IP请求过于频繁")
flag = True
coupon_info = get_coupon_info(coupon_id)
if coupon_info:
if coupon_info['message'] == "可领取":
server_time = get_server_time(coupon_info['start'])
if server_time:
delay = max(server_time['remain'] - 3, 0)
self._log(f"会员购优惠卷\"{coupon_info['name']}\"(ID={coupon_id})可领取时间为{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(coupon_info['start']))}{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(coupon_info['end']))}, 库存{coupon_info['remain']}张, 将于{delay}秒后开始领取")
time.sleep(delay)
else:
self._log(f"会员购服务器时间获取失败")
return
else:
self._log(f"会员购优惠卷\"{coupon_info['name']}\"(ID={coupon_id}){coupon_info['message']}")
return
else:
self._log(f"会员购优惠卷{coupon_id}信息获取失败")
return
flag = False
threads = []
for i in range(thread):
threads.append(threading.Thread(target=executor, args=(i + 1,)))
for thread in threads:
thread.start()
for thread in threads:
thread.join()
# 会员购订单列表查询
def mall_order_list(self, status=0, type=[2]):
# status = 订单状态
# type = 订单类型
def get_order_list(status, type):
headers = {
'Origin': "https://mall.bilibili.com",
'Referer': "https://mall.bilibili.com/orderlist.html",
}
order_list = []
page = 0
while True:
url = f"{self.protocol}://show.bilibili.com/api/ticket/ordercenter/list?pageNum={page}&pageSize=20&status={status}&customer=0&platform=h5&v={int(time.time())}"
response = self._requests("get", url, headers=headers)
if response and response.get("errno") == 0:
data = response['data']['list']
if data:
for order in data:
if not type or order['order_type'] in type:
order_list.append(order)
page += 1
else:
self._log(f"会员购订单列表获取成功, 总计{len(order_list)}个订单")
break
else:
self._log(f"会员购订单列表获取失败 {response}")
return order_list
def get_order_detail(order_id):
url = f"{self.protocol}://mall.bilibili.com/mall-c/order/detail?orderId={order_id}&platform=h5&time={int(time.time())}"
headers = {
'Origin': "https://mall.bilibili.com",
'Referer': f"https://mall.bilibili.com/orderdetail.html?orderId={order_id}",
}
response = self._requests("get", url, headers=headers)
if response and response.get("code") == 0 and response['data']['vo']:
data = response['data']['vo']
self._log(f"会员购订单{order_id}详情获取成功, 包含\"{data['skuList'][0]['itemsName']}\"{len(data['skuList'])}件商品")
return data
else:
self._log(f"会员购订单{order_id}详情获取失败 {response}")
return {}
def get_order_express(order_id):
url = f"{self.protocol}://mall.bilibili.com/mall-c/order/express/detail?orderId={order_id}"
headers = {
'Origin': "https://mall.bilibili.com",
'Referer': f"https://mall.bilibili.com/orderdetail.html?orderId={order_id}",
}
for _ in range(5):
response = self._requests("get", url, headers=headers)
if response and response.get("code") == 0 and response['data']['vo']:
data = response['data']['vo']
self._log(f"会员购订单{order_id}物流获取成功, 状态为{data['state_v']}")
return data
time.sleep(3)
self._log(f"会员购订单{order_id}物流获取失败 {response}")
return {}
order_list = []
for order in get_order_list(status, type):
order_detail = get_order_detail(order['order_id'])
order_express = get_order_express(order['order_id']) if order_detail and order_detail['orderExpress'] else {}
order_list.append({
'id': order.get("order_id"),
'item': [{
'id': item.get("itemsId"),
'name': item.get("itemsName"),
'spec': item.get("skuSpec"),
'number': item.get("skuNum"),
'price': item.get("price"),
} for item in order_detail.get("skuList", [])],
'create': time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(order.get("order_ctime"))) if order.get("current_timestamp") else None,
'status': {
'code': order.get("status"),
'name': order.get("status_name"),
},
'pay': {
'id': order_detail['orderBasic'].get("payId") if order_detail.get("orderBasic") else None,
'time': order.get("pay_ctime") if order.get("pay_ctime") != "0000-00-00 00:00:00" else None,
'channel': order_detail['orderBasic'].get("paymentChannel") if order_detail.get("orderBasic") else None,
'total': order.get("show_money") / 100 if order.get("show_money") else None,
'origin': order_detail['orderBasic'].get("payTotalMoney") if order_detail.get("orderBasic") else None,
'discount': order_detail['orderBasic'].get("discountMoneys") if order_detail.get("orderBasic") else None,
'express': order.get("express_fee") / 100 if order.get("express_fee") else None,
},
'preorder': {
'phone': order_detail['extData'].get("notifyPhoneOrigin") if order_detail.get("extData") else None,
'front': {
'total': order_detail['extData'].get("frontPayMoney") if order_detail.get("extData") else None,
'origin': order_detail['extData'].get("frontMoney") if order_detail.get("extData") else None,
'discount': order_detail['extData'].get("frontDisMoney") if order_detail.get("extData") else None,
},
'final': {
'total': order_detail['extData'].get("finalPayMoney") if order_detail.get("extData") else None,
'origin': order_detail['extData'].get("finalMoney") if order_detail.get("extData") else None,
'discount': order_detail['extData'].get("finalDisMoney") if order_detail.get("extData") else None,
'start': time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(order_detail['extData'].get("finalMoneyStart") / 1E3)) if order_detail.get("extData") and order_detail['extData'].get("finalMoneyStart") else None,
'end': time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(order_detail['extData'].get("finalMoneyEnd") / 1E3)) if order_detail.get("extData") and order_detail['extData'].get("finalMoneyStart") else None,
},
},
'shipping': {
'name': order_detail['orderDeliver'].get("deliverName") if order_detail.get("orderDeliver") else None,
'phone': order_detail['orderDeliver'].get("deliverPhone") if order_detail.get("orderDeliver") else None,
'address': order_detail['orderDeliver'].get("deliverAddr") if order_detail.get("orderDeliver") else None,
'company': order_detail['orderExpress'].get("com_v") if order_detail.get("orderExpress") else None,
'number': order_detail['orderExpress'].get("sno") if order_detail.get("orderExpress") else None,
'status': order_express.get("state_v"),
'detail': order_express.get("detail"),
},
})
self.__push_to_queue("mall_order_list", order_list)
return order_list
# 会员购优惠券列表查询
def mall_coupon_list(self, status=1):
# status = 优惠券状态
status_map = {
1: "validList",
2: "usedList",
3: "invalidList",
}
if status not in status_map:
return []
headers = {
'Referer': "https://mall.bilibili.com/couponlist.html?noTitleBar=1",
}
coupon_list = []
page = 1
while True:
url = f"{self.protocol}://mall.bilibili.com/mall-c/coupon/list?status={status}&pageIndex={page}&pageSize=20"
response = self._requests("get", url, headers=headers)
if response and response.get("code") == 0:
if response['data'][status_map[status]]:
for coupon in response['data'][status_map[status]]['list']:
coupon_list.append({
'name': coupon['couponCodeName'],
'description': coupon['couponDesc'],
'detail': coupon['couponDetail'],
'discount': coupon['couponDiscount'],
'status': coupon['status'],
'type': coupon['couponCodeType'],
'start': time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(coupon['useStartTime'] / 1E3)) if coupon['useStartTime'] else None,
'end': time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(coupon['useEndTime'] / 1E3)) if coupon['useEndTime'] else None,
'use': time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(coupon['useTime'] / 1E3)) if coupon['useTime'] else None,
'expire': time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(coupon['expireDate'] / 1E3)) if coupon['expireDate'] else None,
})
if response['data'][status_map[status]]['hasNextPage']:
page += 1
continue
self._log(f"会员购优惠券列表获取成功, 总计{len(coupon_list)}张优惠券")
for coupon in coupon_list:
self._log(f"会员购优惠券: {coupon['name']}" + (f", 失效时间为{coupon['expire']}" if coupon['expire'] else f", 使用时间为{coupon['use']}" if coupon['use'] else f", 使用有效期为{coupon['start']}{coupon['end']}" if coupon['start'] and coupon['end'] else ""))
break
else:
self._log(f"会员购优惠券列表获取失败 {response}")
break
self.__push_to_queue("mall_coupon_list", coupon_list)
return coupon_list
# 会员购奖品列表查询
def mall_prize_list(self, status=0, type=[1, 2]):
# status = 奖品状态
# type = 奖品类型
headers = {
'Referer': "https://mall.bilibili.com/prizecenter.html",
}
prize_list = []
page = 1
while True:
url = f"{self.protocol}://mall.bilibili.com/mall-c/prize/list?pageNum={page}&pageSize=20&type={status}&v={int(time.time())}"
response = self._requests("get", url, headers=headers)
if response and response.get("code") == 0:
for prize in response['data']['pageInfo']['list']:
if not type or prize['prizeType'] in type:
prize_list.append({
'name': prize['prizeName'],
'source': prize['sourceName'],
'status': prize['status'],
'type': prize['prizeType'],
'expire': time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(prize['expireTime'])),
})
if response['data']['pageInfo']['hasNextPage']:
page += 1
else:
self._log(f"会员购奖品列表获取成功, 总计{len(prize_list)}个奖品, {response['data']['waitDeliveryNum']}个奖品待发货")
for prize in prize_list:
self._log(f"会员购奖品: {prize['name']}, 来自{prize['source']}, 领取有效期至{prize['expire']}")
break
else:
self._log(f"会员购奖品列表获取失败 {response}")
break
self.__push_to_queue("mall_prize_list", prize_list)
return prize_list
# 直播奖品列表查询
def live_prize_list(self):
headers = {
'Origin': "https://link.bilibili.com",
'Referer': "https://link.bilibili.com/p/center/index",
}
prize_list = []
page = 1
while True:
url = f"{self.protocol}://api.live.bilibili.com/lottery/v1/award/award_list?page={page}&month="
response = self._requests("get", url, headers=headers)
if response and response.get("code") == 0:
for prize in response['data']['list']:
prize_list.append({
'name': prize['gift_name'],
'number': prize['gift_num'],
'source': prize['source'],
'status': prize['status'],
'type': prize['gift_type'],
'create': prize['create_time'],
'expire': prize['expire_time'],
})
if page < response['data']['total_page']:
page += 1
else:
self._log(f"直播奖品列表获取成功, 总计{len(prize_list)}个奖品")
for prize in prize_list:
self._log(f"直播奖品: {prize['name']} x{prize['number']}, 来自{prize['source']}, 中奖时间为{prize['create']}, 领取有效期至{prize['expire']}")
break
else:
self._log(f"直播奖品列表获取失败 {response}")
break
self.__push_to_queue("live_prize_list", prize_list)
return prize_list
def detect_charset(file, fallback="utf-8"):
with open(file, "rb") as f:
detector = chardet.UniversalDetector()
for line in f.readlines():
detector.feed(line)
if detector.done:
return detector.result['encoding']
return fallback
def download(url, save_as=None):
print(f"正在下载{url}")
if save_as is None:
save_as = url.split("/")[-1]
with open(save_as, "wb") as f:
response = requests.get(url, stream=True)
length = response.headers.get("content-length")
if length:
length = int(length)
receive = 0
for data in response.iter_content(chunk_size=100 * 1024):
f.write(data)
receive += len(data)
percent = receive / length
print(f"\r[{'=' * int(50 * percent)}{' ' * (50 - int(50 * percent))}] {percent:.0%}", end="", flush=True)
print()
else:
f.write(response.content)
return save_as
def decompress(file, remove=True):
shutil.unpack_archive(file)
if remove:
os.remove(file)
print(f"{file}解压完毕")
def export(queue, config):
bucket = {}
log_file = open(config['global']['log'], "a", encoding="utf-8") if config['global']['log'] else None
try:
while True:
packet = queue.get()
if isinstance(packet, dict) and all(key in packet for key in ['uid', 'manufacturer', 'data']):
if packet['manufacturer'] == "log":
if log_file:
log_file.write(packet['data'] + "\n")
else:
if packet['manufacturer'] not in bucket:
bucket[packet['manufacturer']] = {}
if packet['uid'] not in bucket[packet['manufacturer']]:
bucket[packet['manufacturer']][packet['uid']] = []
if isinstance(packet['data'], list):
bucket[packet['manufacturer']][packet['uid']].extend(packet['data'])
else:
bucket[packet['manufacturer']][packet['uid']].append(packet['data'])
elif packet is None:
for manufacturer, data in bucket.items():
if config.get(manufacturer, {}).get("export"):
with open(config[manufacturer]['export'], "w", encoding="utf-8") as f:
f.write(json.dumps(data, indent=4, ensure_ascii=False))
return
finally:
if log_file:
log_file.close()
def wrapper(arg):
def delay_wrapper(func, interval, arg_list=[()], shuffle=False):
if shuffle:
random.shuffle(arg_list)
for i in range(len(arg_list)):
func(*arg_list[i])
if i < len(arg_list) - 1:
time.sleep(interval)
config, account, queue = arg['config'], arg['account'], arg['queue']
instance = Bilibili(config['global']['https'], queue)
if config['proxy']['enable']:
if isinstance(config['proxy']['pool'], str):
try:
with open(config['proxy']['pool'], "r", encoding=detect_charset(config['proxy']['pool'])) as f:
instance.set_proxy(add=[proxy for proxy in f.read().strip().splitlines() if proxy and proxy[0] != "#"])
except:
pass
elif isinstance(config['proxy']['pool'], list):
instance.set_proxy(add=config['proxy']['pool'])
if instance.login(force_refresh_token=config['user']['force_refresh_token'], **account):
threads = []
if config['get_user_info']['enable']:
threads.append(threading.Thread(target=instance.get_user_info))
if config['set_privacy']['enable']:
threads.append(threading.Thread(target=instance.set_privacy, args=(config['set_privacy']['show_favourite'], config['set_privacy']['show_bangumi'], config['set_privacy']['show_tag'], config['set_privacy']['show_reward'], config['set_privacy']['show_info'], config['set_privacy']['show_game'])))
if config['silver_to_coin']['enable']:
threads.append(threading.Thread(target=instance.silver_to_coin))
if config['watch']['enable']:
threads.append(threading.Thread(target=delay_wrapper, args=(instance.watch, 5, list(zip(config['watch']['aid'])))))
if config['like']['enable']:
threads.append(threading.Thread(target=delay_wrapper, args=(instance.like, 5, list(zip(config['like']['aid'])))))
if config['reward']['enable']:
threads.append(threading.Thread(target=delay_wrapper, args=(instance.reward, 5, list(zip(config['reward']['aid'], config['reward']['double'])))))
if config['favour']['enable']:
threads.append(threading.Thread(target=delay_wrapper, args=(instance.favour, 5, list(zip(config['favour']['aid'])))))
if config['combo']['enable']:
threads.append(threading.Thread(target=delay_wrapper, args=(instance.combo, 5, list(zip(config['combo']['aid'])))))
if config['share']['enable']:
threads.append(threading.Thread(target=delay_wrapper, args=(instance.share, 5, list(zip(config['share']['aid'])))))
if config['follow']['enable']:
threads.append(threading.Thread(target=delay_wrapper, args=(instance.follow, 5, list(zip(config['follow']['mid'], config['follow']['secret'])))))
if config['danmaku_post']['enable']:
threads.append(threading.Thread(target=delay_wrapper, args=(instance.danmaku_post, 5, list(zip(config['danmaku_post']['aid'], config['danmaku_post']['message'], config['danmaku_post']['page'], config['danmaku_post']['moment'])))))
if config['comment_like']['enable']:
threads.append(threading.Thread(target=delay_wrapper, args=(instance.comment_like, 5, list(zip(config['comment_like']['otype'], config['comment_like']['oid'], config['comment_like']['rpid'])))))
if config['comment_post']['enable']:
threads.append(threading.Thread(target=delay_wrapper, args=(instance.comment_post, 5, list(zip(config['comment_post']['otype'], config['comment_post']['oid'], config['comment_post']['message'])))))
# for comment in zip(config['comment_post']['otype'], config['comment_post']['oid'], config['comment_post']['message']):
# threads.append(threading.Thread(target=instance.comment_post, args=(comment[0], comment[1], comment[2])))
if config['dynamic_like']['enable']:
threads.append(threading.Thread(target=delay_wrapper, args=(instance.dynamic_like, 5, list(zip(config['dynamic_like']['did'])))))
if config['dynamic_repost']['enable']:
threads.append(threading.Thread(target=delay_wrapper, args=(instance.dynamic_repost, 5, list(zip(config['dynamic_repost']['did'], config['dynamic_repost']['message'], config['dynamic_repost']['ats'])))))
if config['dynamic_purge']['enable']:
threads.append(threading.Thread(target=instance.dynamic_purge))
if config['system_notice']['enable']:
threads.append(threading.Thread(target=instance.system_notice, args=(config['system_notice']['time_span'], config['system_notice']['keyword'])))
if config['mall_rush']['enable']:
for item in zip(config['mall_rush']['item_id'], config['mall_rush']['thread']):
threads.append(threading.Thread(target=instance.mall_rush, args=(item[0], item[1], config['mall_rush']['headless'], config['mall_rush']['timeout'])))
if config['mall_coupon']['enable']:
for coupon in zip(config['mall_coupon']['coupon_id'], config['mall_coupon']['thread']):
threads.append(threading.Thread(target=instance.mall_coupon, args=(coupon[0], coupon[1])))
if config['mall_order_list']['enable']:
threads.append(threading.Thread(target=instance.mall_order_list, args=(config['mall_order_list']['status'], config['mall_order_list']['type'])))
if config['mall_coupon_list']['enable']:
threads.append(threading.Thread(target=instance.mall_coupon_list, args=(config['mall_coupon_list']['status'],)))
if config['mall_prize_list']['enable']:
threads.append(threading.Thread(target=instance.mall_prize_list, args=(config['mall_prize_list']['status'], config['mall_prize_list']['type'])))
if config['live_prize_list']['enable']:
threads.append(threading.Thread(target=instance.live_prize_list))
# instance._log("任务开始执行")
for thread in threads:
thread.start()
for thread in threads:
thread.join()
# instance._log("任务执行完毕")
return {
'username': instance.username,
'password': instance.password,
'access_token': instance.access_token,
'refresh_token': instance.refresh_token,
'cookie': instance.get_cookies(),
}
def main():
print(f"{banner}\n{__doc__}\n版本: {__version__}\n")
config_file = sys.argv[1] if len(sys.argv) > 1 else "config.toml"
try:
with open(config_file, "r", encoding=detect_charset(config_file)) as f:
config = toml.load(f)
except:
print(f"无法加载{config_file}")
return
accounts = []
for line in config['user']['account'].splitlines():
try:
if line[0] == "#":
continue
pairs = {}
for pair in line.strip(";").split(";"):
if len(pair.split("=")) == 2:
key, value = pair.split("=")
pairs[key] = value
password = all(key in pairs for key in ["username", "password"])
token = all(key in pairs for key in ["access_token", "refresh_token"])
cookie = all(key in pairs for key in ["bili_jct", "DedeUserID", "DedeUserID__ckMd5", "sid", "SESSDATA"])
if password or token or cookie:
accounts.append(pairs)
except:
pass
config['user'].pop("account")
print(f"导入了{len(accounts)}个用户")
if not accounts:
return
if config['mall_rush']['enable']:
if platform.system() == "Linux" and os.path.exists("/etc/debian_version"):
prefix = "sudo " if shutil.which("sudo") else ""
if shutil.which("chromium-browser") is None:
os.system(f"{prefix}apt -y install chromium-browser")
if shutil.which("chromedriver") is None:
os.system(f"{prefix}apt -y install chromium-chromedriver")
os.system(f"{prefix}ln -s /usr/lib/chromium-browser/chromedriver /usr/bin")
elif platform.system() == "Linux" and os.path.exists("/etc/redhat-release"):
prefix = "sudo " if shutil.which("sudo") else ""
if shutil.which("chromium-browser") is None:
os.system(f"{prefix}yum -y install chromium")
if shutil.which("chromedriver") is None:
os.system(f"{prefix}yum -y install chromedriver")
elif platform.system() == "Windows":
if not os.path.exists("chrome-win\\chrome.exe"):
decompress(download("https://npm.taobao.org/mirrors/chromium-browser-snapshots/Win/686378/chrome-win.zip"))
if not os.path.exists("chromedriver.exe"):
decompress(download("https://npm.taobao.org/mirrors/chromedriver/78.0.3904.11/chromedriver_win32.zip"))
else:
print("会员购抢购组件不支持在当前平台上运行")
config['mall_rush']['enable'] = False
queue = Manager().Queue()
export_process = Process(target=export, args=(queue, config))
export_process.start()
with Pool(min(config['global']['process'], len(accounts))) as p:
result = p.map(wrapper, [{
'config': config,
'account': account,
'queue': queue,
} for account in accounts])
p.close()
p.join()
if config['user']['update']:
with open(config_file, "r+", encoding=detect_charset(config_file)) as f:
content = f.read()
before = content.split("account")[0]
after = content.split("account")[-1].split("\"\"\"")[-1]
f.seek(0)
f.truncate()
f.write(before)
f.write("account = \"\"\"\n")
for credential in result:
new_line = False
for key, value in credential.items():
if value:
if key == "cookie":
f.write(f"{';'.join(f'{key}={value}' for key, value in value.items())};")
else:
f.write(f"{key}={value};")
new_line = True
if new_line:
f.write("\n")
f.write("\"\"\"")
f.write(after)
print("凭据已更新")
queue.put(None)
export_process.join()
if __name__ == "__main__":
freeze_support()
main()
if platform.system() == "Windows":
os.system("pause >nul | set /p =请按任意键退出")
......@@ -6,7 +6,9 @@ import hashlib
import json
import math
import os
import re
import requests
import threading
import time
import types
from bilibili import Bilibili
......@@ -15,14 +17,14 @@ from PIL import Image
def log(message):
print(f"[{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))}] {message}")
def calc_md5(data, hexdigest=False):
md5 = hashlib.md5()
def calc_sha1(data, hexdigest=False):
sha1 = hashlib.sha1()
if isinstance(data, types.GeneratorType):
for chunk in data:
md5.update(chunk)
sha1.update(chunk)
else:
md5.update(data)
return md5.hexdigest() if hexdigest else md5.digest()
sha1.update(data)
return sha1.hexdigest() if hexdigest else sha1.digest()
def read_in_chunks(file_name, chunk_size=1024 * 1024):
with open(file_name, "rb") as f:
......@@ -34,8 +36,7 @@ def read_in_chunks(file_name, chunk_size=1024 * 1024):
return
def image_dump(data, file_name):
md5 = calc_md5(data)
merged_data = data + md5 + b"\xff"
merged_data = data + b"\xff"
pixel_number = math.ceil(len(merged_data) / 3)
width = math.ceil(math.sqrt(pixel_number))
height = math.ceil(pixel_number / width)
......@@ -55,10 +56,9 @@ def image_load(file_name):
merged_data = b"".join(bytes(pixel_data) for pixel_data in image.getdata())
merged_data = merged_data.rstrip(b"\x00")
if merged_data[-1] == 255:
data, md5 = merged_data[:-(1 + 16)], merged_data[-(1 + 16):-1]
if calc_md5(data) == md5:
return data
return b""
return merged_data[:-1]
else:
return b""
def image_upload(file_name, cookies):
url = "https://api.vc.bilibili.com/api/v1/drawImage/upload"
......@@ -94,126 +94,198 @@ def image_download(url, file_name=None):
f.write(response.content)
return file_name
def fetch_meta(string):
if string.startswith("http://") or string.startswith("https://"):
meta_file_name = image_download(string)
elif re.match(r"^[a-fA-F0-9]{40}$", string):
meta_file_name = image_download(f"http://i0.hdslb.com/bfs/album/{string}.png")
else:
meta_file_name = string
try:
meta_data = json.loads(image_load(meta_file_name).decode("utf-8"))
return meta_data
except:
return None
finally:
os.remove(meta_file_name)
def login_handle(args):
bilibili = Bilibili()
bilibili.login(username=args.username, password=args.password)
bilibili.get_user_info()
with open(args.cookies_file, "w", encoding="utf-8") as f:
f.write(json.dumps(bilibili.get_cookies(), ensure_ascii=False, indent=2))
def info_handle(args):
if args.url.startswith("http://") or args.url.startswith("https://"):
meta_file_name = image_download(args.url)
else:
meta_file_name = args.url
try:
meta_data = json.loads(image_load(meta_file_name).decode("utf-8"))
os.remove(meta_file_name)
meta_data = fetch_meta(args.meta)
if meta_data:
log(f"文件名: {meta_data['filename']}")
log(f"大小: {meta_data['size'] / 1024 / 1024:.2f} MB")
log(f"MD5: {meta_data['md5']}")
log(f"SHA-1: {meta_data['sha1']}")
log(f"上传时间: {time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(meta_data['time']))}")
log(f"分块数: {len(meta_data['block'])}")
for index, url in enumerate(meta_data['block']):
log(f"分块{index} URL: {url}")
except:
os.remove(meta_file_name)
for index, block in enumerate(meta_data['block']):
log(f"分块{index} ({block['size'] / 1024 / 1024:.2f} MB) URL: {block['url']}")
else:
log("元数据解析出错")
def upload_handle(args):
def core(index, block):
block_file_name = f"{sha1}_{index}.png"
image_dump(block, block_file_name)
block_sha1 = calc_sha1(read_in_chunks(block_file_name), hexdigest=True)
url = skippable(block_sha1)
if url:
log(f"分块{index} ({os.path.getsize(block_file_name) / 1024 / 1024:.2f} MB) 已存在于服务器")
block_dict[index] = {
'url': url,
'size': os.path.getsize(block_file_name),
'sha1': block_sha1,
}
done_flag.release()
else:
for _ in range(3):
response = image_upload(block_file_name, cookies)
if response['code'] == 0:
url = response['data']['image_url']
log(f"分块{index} ({os.path.getsize(block_file_name) / 1024 / 1024:.2f} MB) 已上传")
block_dict[index] = {
'url': url,
'size': os.path.getsize(block_file_name),
'sha1': block_sha1,
}
done_flag.release()
break
elif response['code'] == -4:
terminate_flag.set()
log("上传失败, 请先登录")
break
else:
terminate_flag.set()
log(f"分块{index} ({os.path.getsize(block_file_name) / 1024 / 1024:.2f} MB) 上传失败, 服务器返回{response}")
os.remove(block_file_name)
def skippable(sha1):
url = f"http://i0.hdslb.com/bfs/album/{sha1}.png"
response = requests.head(url)
return url if response.status_code == 200 else None
done_flag = threading.Semaphore(0)
terminate_flag = threading.Event()
thread_pool = []
start_time = time.time()
try:
with open(args.cookies_file, "r", encoding="utf-8") as f:
cookies = json.loads(f.read())
except:
log("Cookies文件加载失败")
log("Cookies加载失败, 请先登录")
return None
file_name = args.file
url_list = []
block_dict = {}
log(f"上传: {file_name} ({os.path.getsize(file_name) / 1024 / 1024:.2f} MB)")
md5 = calc_md5(read_in_chunks(file_name), hexdigest=True)
log(f"MD5: {md5}")
for index, chunk in enumerate(read_in_chunks(file_name, chunk_size=args.block_size * 1024 * 1024)):
part_file_name = f"{md5}_{index}.png"
image_dump(chunk, part_file_name)
for _ in range(3):
response = image_upload(part_file_name, cookies)
if response['code'] == 0:
url = response['data']['image_url']
log(f"分块{index} ({os.path.getsize(part_file_name) / 1024 / 1024:.2f} MB) 已上传")
url_list.append(url)
os.remove(part_file_name)
break
elif response['code'] == -4:
log(f"上传失败, 账号未登录")
os.remove(part_file_name)
return None
sha1 = calc_sha1(read_in_chunks(file_name), hexdigest=True)
log(f"SHA-1: {sha1}")
log(f"线程数: {args.thread}")
for index, block in enumerate(read_in_chunks(file_name, chunk_size=args.block_size * 1024 * 1024 - 1)):
if len(thread_pool) >= args.thread:
done_flag.acquire()
if not terminate_flag.is_set():
thread_pool.append(threading.Thread(target=core, args=(index, block)))
thread_pool[-1].start()
else:
log(f"分块{index} ({os.path.getsize(part_file_name) / 1024 / 1024:.2f} MB) 上传失败, 服务器返回{response}")
os.remove(part_file_name)
return None
log("已终止上传, 等待线程回收")
for thread in thread_pool:
thread.join()
if terminate_flag.is_set():
return None
meta_data = {
'time': int(time.time()),
'filename': file_name,
'size': os.path.getsize(file_name),
'md5': md5,
'block': url_list,
'sha1': sha1,
'block': [block_dict[i] for i in range(len(block_dict))],
}
meta_file_name = f"{md5}_meta.png"
meta_file_name = f"{sha1}_meta.png"
image_dump(json.dumps(meta_data, ensure_ascii=False).encode("utf-8"), meta_file_name)
for _ in range(3):
response = image_upload(meta_file_name, cookies)
if response['code'] == 0:
url = response['data']['image_url']
log(f"元数据已上传")
log("元数据已上传")
os.remove(meta_file_name)
log(f"{file_name}上传完毕, 共有{index + 1}个分块, 耗时{int(time.time() - start_time)}秒")
log(f"META URL: {url}")
log(f"META: {re.findall(r'[a-fA-F0-9]{40}', url)[0] if re.match(r'^http(s?)://i0.hdslb.com/bfs/album/[a-fA-F0-9]{40}.png$', url) else url}")
return url
else:
log(f"元数据上传失败, 保留本地文件{meta_file_name}, 服务器返回{response}")
log(f"元数据上传失败, 保留文件{meta_file_name}, 服务器返回{response}")
return meta_file_name
def download_handle(args):
def core(index, block):
block_file_name = f"{meta_data['sha1']}_{index}.png"
if os.path.exists(block_file_name) and calc_sha1(read_in_chunks(block_file_name), hexdigest=True) == block['sha1']:
log(f"分块{index} ({os.path.getsize(block_file_name) / 1024 / 1024:.2f} MB) 已存在于本地")
block_file_name_dict[index] = block_file_name
done_flag.release()
else:
for _ in range(3):
image_download(block['url'], file_name=block_file_name)
if calc_sha1(read_in_chunks(block_file_name), hexdigest=True) == block['sha1']:
log(f"分块{index} ({os.path.getsize(block_file_name) / 1024 / 1024:.2f} MB) 已下载")
block_file_name_dict[index] = block_file_name
done_flag.release()
break
else:
terminate_flag.set()
log(f"分块{index}校验未通过, SHA-1与元数据中的记录{block['sha1']}不匹配")
os.remove(block_file_name)
return
done_flag = threading.Semaphore(0)
terminate_flag = threading.Event()
thread_pool = []
block_file_name_dict = {}
start_time = time.time()
if args.url.startswith("http://") or args.url.startswith("https://"):
meta_file_name = image_download(args.url)
else:
meta_file_name = args.url
try:
meta_data = json.loads(image_load(meta_file_name).decode("utf-8"))
os.remove(meta_file_name)
file_name = args.save_as if args.save_as else meta_data['filename']
meta_data = fetch_meta(args.meta)
if meta_data:
file_name = args.file if args.file else meta_data['filename']
log(f"下载: {file_name} ({meta_data['size'] / 1024 / 1024:.2f} MB), 共有{len(meta_data['block'])}个分块, 上传于{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(meta_data['time']))}")
except:
os.remove(meta_file_name)
else:
log("元数据解析出错")
return None
with open(file_name, "wb") as f:
for index, url in enumerate(meta_data['block']):
for _ in range(3):
part_file_name = image_download(url)
part_data = image_load(part_file_name)
if part_data != b"":
log(f"分块{index} ({len(part_data) / 1024 / 1024:.2f} MB) 已下载")
f.write(part_data)
os.remove(part_file_name)
break
log(f"线程数: {args.thread}")
if not (os.path.exists(file_name) and calc_sha1(read_in_chunks(file_name), hexdigest=True) == meta_data['sha1']):
for index, block in enumerate(meta_data['block']):
if len(thread_pool) >= args.thread:
done_flag.acquire()
if not terminate_flag.is_set():
thread_pool.append(threading.Thread(target=core, args=(index, block)))
thread_pool[-1].start()
else:
log(f"分块{index}校验出错")
os.remove(part_file_name)
return None
log(f"{file_name}下载完毕, 耗时{int(time.time() - start_time)}秒")
md5 = calc_md5(read_in_chunks(file_name), hexdigest=True)
log(f"MD5: {md5}")
if md5 == meta_data['md5']:
log(f"{file_name}校验通过")
return file_name
log("已终止下载, 等待线程回收")
for thread in thread_pool:
thread.join()
if terminate_flag.is_set():
return None
with open(file_name, "wb") as f:
for index in range(len(meta_data['block'])):
block_file_name = block_file_name_dict[index]
f.write(image_load(block_file_name))
os.remove(block_file_name)
sha1 = calc_sha1(read_in_chunks(file_name), hexdigest=True)
log(f"SHA-1: {sha1}")
if sha1 == meta_data['sha1']:
log(f"{file_name}校验通过")
log(f"{file_name}下载完毕, 耗时{int(time.time() - start_time)}秒")
return file_name
else:
log(f"{file_name}校验未通过, SHA-1与元数据中的记录{meta_data['sha1']}不匹配")
return None
else:
log(f"{file_name}校验出错, MD5与元数据中的记录{meta_data['md5']}不匹配")
return None
log(f"{file_name}已存在于本地")
if __name__ == "__main__":
parser = argparse.ArgumentParser(prog="BiliDrive", description="Bilibili Drive", epilog="By Hsury, 2019/10/23")
parser = argparse.ArgumentParser(prog="BiliDrive", description="Bilibili Drive", epilog="By Hsury, 2019/10/24")
parser.add_argument("-c", "--cookies-file", default="cookies.json", help="cookies json file name")
subparsers = parser.add_subparsers()
......@@ -224,17 +296,19 @@ if __name__ == "__main__":
login_parser.set_defaults(func=login_handle)
info_parser = subparsers.add_parser("info", help="get meta info")
info_parser.add_argument("url", help="meta url")
info_parser.add_argument("meta", help="meta url")
info_parser.set_defaults(func=info_handle)
upload_parser = subparsers.add_parser("upload", help="upload a file")
upload_parser.add_argument("file", help="file name")
upload_parser.add_argument("-b", "--block-size", default=1, type=int, help="block size in MB")
upload_parser.add_argument("-b", "--block-size", default=4, type=int, help="block size in MB")
upload_parser.add_argument("-t", "--thread", default=2, type=int, help="thread number")
upload_parser.set_defaults(func=upload_handle)
download_parser = subparsers.add_parser("download", help="download a file")
download_parser.add_argument("url", help="meta url")
download_parser.add_argument("save_as", nargs="?", default="", help="save as file name")
download_parser.add_argument("meta", help="meta url")
download_parser.add_argument("file", nargs="?", default="", help="save as file name")
download_parser.add_argument("-t", "--thread", default=4, type=int, help="thread number")
download_parser.set_defaults(func=download_handle)
args = parser.parse_args()
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册