From 58cb150692087384269c0fcb402f3a6a5b38b133 Mon Sep 17 00:00:00 2001 From: wizardforcel <562826179@qq.com> Date: Fri, 6 Mar 2020 10:41:28 +0800 Subject: [PATCH] =?UTF-8?q?=E5=BB=BA=E7=AB=8B=20util=20=E6=A8=A1=E5=9D=97?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- BiliDriveEx/__main__.py | 50 +------------------------------ BiliDriveEx/bilibili.py | 19 ++++++------ BiliDriveEx/util.py | 65 +++++++++++++++++++++++++++++++++++++++++ 3 files changed, 75 insertions(+), 59 deletions(-) create mode 100644 BiliDriveEx/util.py diff --git a/BiliDriveEx/__main__.py b/BiliDriveEx/__main__.py index a569bb5..01dda49 100644 --- a/BiliDriveEx/__main__.py +++ b/BiliDriveEx/__main__.py @@ -19,24 +19,12 @@ import types from BiliDriveEx import __version__ from BiliDriveEx.bilibili import Bilibili from BiliDriveEx.encoder import Encoder +from BiliDriveEx.util import * -log = Bilibili._log encoder = Encoder() -bundle_dir = os.path.dirname(sys.executable) if getattr(sys, "frozen", False) else os.path.dirname(os.path.abspath(__file__)) - default_url = lambda sha1: f"http://i0.hdslb.com/bfs/album/{sha1}.png" meta_string = lambda url: ("bdex://" + re.findall(r"[a-fA-F0-9]{40}", url)[0]) if re.match(r"^http(s?)://i0.hdslb.com/bfs/album/[a-fA-F0-9]{40}.png$", url) else url -size_string = lambda byte: f"{byte / 1024 / 1024 / 1024:.2f} GB" if byte > 1024 * 1024 * 1024 else f"{byte / 1024 / 1024:.2f} MB" if byte > 1024 * 1024 else f"{byte / 1024:.2f} KB" if byte > 1024 else f"{int(byte)} B" - -def calc_sha1(data, hexdigest=False): - sha1 = hashlib.sha1() - if isinstance(data, types.GeneratorType): - for chunk in data: - sha1.update(chunk) - else: - sha1.update(data) - return sha1.hexdigest() if hexdigest else sha1.digest() def fetch_meta(s): if re.match(r"^bdex://[a-fA-F0-9]{40}$", s): @@ -75,42 +63,6 @@ def image_upload(data, cookies): response = None return response -def image_download(url): - headers = { - 'Referer': "http://t.bilibili.com/", - 'User-Agent': "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/79.0.3945.79 Safari/537.36", - } - content = [] - last_chunk_time = None - try: - for chunk in requests.get(url, headers=headers, timeout=10, stream=True).iter_content(128 * 1024): - if last_chunk_time is not None and time.time() - last_chunk_time > 5: - return None - content.append(chunk) - last_chunk_time = time.time() - return b"".join(content) - except: - return None - -def read_history(): - try: - with open(os.path.join(bundle_dir, "history.json"), "r", encoding="utf-8") as f: - history = json.loads(f.read()) - except: - history = {} - return history - -def read_in_chunk(file_name, chunk_size=16 * 1024 * 1024, chunk_number=-1): - chunk_counter = 0 - with open(file_name, "rb") as f: - while True: - data = f.read(chunk_size) - if data != b"" and (chunk_number == -1 or chunk_counter < chunk_number): - yield data - chunk_counter += 1 - else: - return - def login_handle(args): bilibili = Bilibili() if bilibili.login(username=args.username, password=args.password): diff --git a/BiliDriveEx/bilibili.py b/BiliDriveEx/bilibili.py index e8e9bec..f858ed5 100644 --- a/BiliDriveEx/bilibili.py +++ b/BiliDriveEx/bilibili.py @@ -8,6 +8,7 @@ import requests import rsa import time from urllib import parse +from BiliDriveEx.util import * class Bilibili: app_key = "1d8b6e7d45233436" @@ -31,9 +32,7 @@ class Bilibili: 'nickname': "", } - @staticmethod - def _log(message): - print(f"[{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))}] {message}") + def _requests(self, method, url, decode_level=2, retry=0, timeout=10, **kwargs): if method in ["get", "post"]: @@ -95,7 +94,7 @@ class Bilibili: response = self._requests("get", url, headers=headers, decode_level=1) captcha = self._solve_captcha(response) if captcha: - self._log(f"登录验证码识别结果: {captcha}") + log(f"登录验证码识别结果: {captcha}") key = get_key() key_hash, pub_key = key['key_hash'], key['pub_key'] url = f"https://passport.bilibili.com/api/v2/oauth2/login" @@ -104,7 +103,7 @@ class Bilibili: headers = {'Content-type': "application/x-www-form-urlencoded"} response = self._requests("post", url, data=payload, headers=headers) else: - self._log(f"登录验证码识别服务暂时不可用, 10秒后重试") + log(f"登录验证码识别服务暂时不可用, 10秒后重试") time.sleep(10) break elif response['code'] == -449: @@ -113,13 +112,13 @@ class Bilibili: elif response['code'] == 0 and response['data']['status'] == 0: for cookie in response['data']['cookie_info']['cookies']: self._session.cookies.set(cookie['name'], cookie['value'], domain=".bilibili.com") - self._log("登录成功") + log("登录成功") return True else: - self._log(f"登录失败 {response}") + log(f"登录失败 {response}") return False else: - self._log(f"当前IP登录过于频繁, 1分钟后重试") + log(f"当前IP登录过于频繁, 1分钟后重试") time.sleep(60) break @@ -139,8 +138,8 @@ class Bilibili: self.info['face'] = response['data']['face'] self.info['level'] = response['data']['level'] self.info['nickname'] = response['data']['name'] - self._log(f"{self.info['nickname']}(UID={self.get_uid()}), Lv.{self.info['level']}({self.info['experience']['current']}/{self.info['experience']['next']}), 拥有{self.info['coins']}枚硬币, 账号{'状态正常' if not self.info['ban'] else '被封禁'}") + log(f"{self.info['nickname']}(UID={self.get_uid()}), Lv.{self.info['level']}({self.info['experience']['current']}/{self.info['experience']['next']}), 拥有{self.info['coins']}枚硬币, 账号{'状态正常' if not self.info['ban'] else '被封禁'}") return True else: - self._log("用户信息获取失败") + log("用户信息获取失败") return False diff --git a/BiliDriveEx/util.py b/BiliDriveEx/util.py new file mode 100644 index 0000000..c7f332a --- /dev/null +++ b/BiliDriveEx/util.py @@ -0,0 +1,65 @@ +# -*- coding: utf-8 -*- + +import os +import sys +from os import path +import hashlib +import types +import requests +import json +import time + +bundle_dir = path.dirname(sys.executable) if getattr(sys, "frozen", False) else path.dirname(path.abspath(__file__)) + +size_string = lambda byte: f"{byte / 1024 / 1024 / 1024:.2f} GB" if byte > 1024 * 1024 * 1024 else f"{byte / 1024 / 1024:.2f} MB" if byte > 1024 * 1024 else f"{byte / 1024:.2f} KB" if byte > 1024 else f"{int(byte)} B" + +def calc_sha1(data, hexdigest=False): + sha1 = hashlib.sha1() + if isinstance(data, types.GeneratorType): + for chunk in data: + sha1.update(chunk) + else: + sha1.update(data) + return sha1.hexdigest() if hexdigest else sha1.digest() + + +def image_download(url): + headers = { + 'Referer': "http://t.bilibili.com/", + 'User-Agent': "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/79.0.3945.79 Safari/537.36", + } + content = [] + last_chunk_time = None + try: + for chunk in requests.get(url, headers=headers, timeout=10, stream=True).iter_content(128 * 1024): + if last_chunk_time is not None and time.time() - last_chunk_time > 5: + return None + content.append(chunk) + last_chunk_time = time.time() + return b"".join(content) + except: + return None + + +def read_history(): + try: + with open(path.join(bundle_dir, "history.json"), "r", encoding="utf-8") as f: + history = json.loads(f.read()) + except: + history = {} + return history + + +def read_in_chunk(file_name, chunk_size=4 * 1024 * 1024, chunk_number=-1): + chunk_counter = 0 + with open(file_name, "rb") as f: + while True: + data = f.read(chunk_size) + if data != b"" and (chunk_number == -1 or chunk_counter < chunk_number): + yield data + chunk_counter += 1 + else: + return + +def log(message): + print(f"[{time.strftime('%Y-%m-%d %H:%M:%S')}] {message}") \ No newline at end of file -- GitLab