提交 58cb1506 编写于 作者: W wizardforcel

建立 util 模块

上级 346a1041
......@@ -19,24 +19,12 @@ import types
from BiliDriveEx import __version__
from BiliDriveEx.bilibili import Bilibili
from BiliDriveEx.encoder import Encoder
from BiliDriveEx.util import *
log = Bilibili._log
encoder = Encoder()
bundle_dir = os.path.dirname(sys.executable) if getattr(sys, "frozen", False) else os.path.dirname(os.path.abspath(__file__))
default_url = lambda sha1: f"http://i0.hdslb.com/bfs/album/{sha1}.png"
meta_string = lambda url: ("bdex://" + re.findall(r"[a-fA-F0-9]{40}", url)[0]) if re.match(r"^http(s?)://i0.hdslb.com/bfs/album/[a-fA-F0-9]{40}.png$", url) else url
size_string = lambda byte: f"{byte / 1024 / 1024 / 1024:.2f} GB" if byte > 1024 * 1024 * 1024 else f"{byte / 1024 / 1024:.2f} MB" if byte > 1024 * 1024 else f"{byte / 1024:.2f} KB" if byte > 1024 else f"{int(byte)} B"
def calc_sha1(data, hexdigest=False):
sha1 = hashlib.sha1()
if isinstance(data, types.GeneratorType):
for chunk in data:
sha1.update(chunk)
else:
sha1.update(data)
return sha1.hexdigest() if hexdigest else sha1.digest()
def fetch_meta(s):
if re.match(r"^bdex://[a-fA-F0-9]{40}$", s):
......@@ -75,42 +63,6 @@ def image_upload(data, cookies):
response = None
return response
def image_download(url):
headers = {
'Referer': "http://t.bilibili.com/",
'User-Agent': "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/79.0.3945.79 Safari/537.36",
}
content = []
last_chunk_time = None
try:
for chunk in requests.get(url, headers=headers, timeout=10, stream=True).iter_content(128 * 1024):
if last_chunk_time is not None and time.time() - last_chunk_time > 5:
return None
content.append(chunk)
last_chunk_time = time.time()
return b"".join(content)
except:
return None
def read_history():
try:
with open(os.path.join(bundle_dir, "history.json"), "r", encoding="utf-8") as f:
history = json.loads(f.read())
except:
history = {}
return history
def read_in_chunk(file_name, chunk_size=16 * 1024 * 1024, chunk_number=-1):
chunk_counter = 0
with open(file_name, "rb") as f:
while True:
data = f.read(chunk_size)
if data != b"" and (chunk_number == -1 or chunk_counter < chunk_number):
yield data
chunk_counter += 1
else:
return
def login_handle(args):
bilibili = Bilibili()
if bilibili.login(username=args.username, password=args.password):
......
......@@ -8,6 +8,7 @@ import requests
import rsa
import time
from urllib import parse
from BiliDriveEx.util import *
class Bilibili:
app_key = "1d8b6e7d45233436"
......@@ -31,9 +32,7 @@ class Bilibili:
'nickname': "",
}
@staticmethod
def _log(message):
print(f"[{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))}] {message}")
def _requests(self, method, url, decode_level=2, retry=0, timeout=10, **kwargs):
if method in ["get", "post"]:
......@@ -95,7 +94,7 @@ class Bilibili:
response = self._requests("get", url, headers=headers, decode_level=1)
captcha = self._solve_captcha(response)
if captcha:
self._log(f"登录验证码识别结果: {captcha}")
log(f"登录验证码识别结果: {captcha}")
key = get_key()
key_hash, pub_key = key['key_hash'], key['pub_key']
url = f"https://passport.bilibili.com/api/v2/oauth2/login"
......@@ -104,7 +103,7 @@ class Bilibili:
headers = {'Content-type': "application/x-www-form-urlencoded"}
response = self._requests("post", url, data=payload, headers=headers)
else:
self._log(f"登录验证码识别服务暂时不可用, 10秒后重试")
log(f"登录验证码识别服务暂时不可用, 10秒后重试")
time.sleep(10)
break
elif response['code'] == -449:
......@@ -113,13 +112,13 @@ class Bilibili:
elif response['code'] == 0 and response['data']['status'] == 0:
for cookie in response['data']['cookie_info']['cookies']:
self._session.cookies.set(cookie['name'], cookie['value'], domain=".bilibili.com")
self._log("登录成功")
log("登录成功")
return True
else:
self._log(f"登录失败 {response}")
log(f"登录失败 {response}")
return False
else:
self._log(f"当前IP登录过于频繁, 1分钟后重试")
log(f"当前IP登录过于频繁, 1分钟后重试")
time.sleep(60)
break
......@@ -139,8 +138,8 @@ class Bilibili:
self.info['face'] = response['data']['face']
self.info['level'] = response['data']['level']
self.info['nickname'] = response['data']['name']
self._log(f"{self.info['nickname']}(UID={self.get_uid()}), Lv.{self.info['level']}({self.info['experience']['current']}/{self.info['experience']['next']}), 拥有{self.info['coins']}枚硬币, 账号{'状态正常' if not self.info['ban'] else '被封禁'}")
log(f"{self.info['nickname']}(UID={self.get_uid()}), Lv.{self.info['level']}({self.info['experience']['current']}/{self.info['experience']['next']}), 拥有{self.info['coins']}枚硬币, 账号{'状态正常' if not self.info['ban'] else '被封禁'}")
return True
else:
self._log("用户信息获取失败")
log("用户信息获取失败")
return False
# -*- coding: utf-8 -*-
import os
import sys
from os import path
import hashlib
import types
import requests
import json
import time
bundle_dir = path.dirname(sys.executable) if getattr(sys, "frozen", False) else path.dirname(path.abspath(__file__))
size_string = lambda byte: f"{byte / 1024 / 1024 / 1024:.2f} GB" if byte > 1024 * 1024 * 1024 else f"{byte / 1024 / 1024:.2f} MB" if byte > 1024 * 1024 else f"{byte / 1024:.2f} KB" if byte > 1024 else f"{int(byte)} B"
def calc_sha1(data, hexdigest=False):
sha1 = hashlib.sha1()
if isinstance(data, types.GeneratorType):
for chunk in data:
sha1.update(chunk)
else:
sha1.update(data)
return sha1.hexdigest() if hexdigest else sha1.digest()
def image_download(url):
headers = {
'Referer': "http://t.bilibili.com/",
'User-Agent': "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/79.0.3945.79 Safari/537.36",
}
content = []
last_chunk_time = None
try:
for chunk in requests.get(url, headers=headers, timeout=10, stream=True).iter_content(128 * 1024):
if last_chunk_time is not None and time.time() - last_chunk_time > 5:
return None
content.append(chunk)
last_chunk_time = time.time()
return b"".join(content)
except:
return None
def read_history():
try:
with open(path.join(bundle_dir, "history.json"), "r", encoding="utf-8") as f:
history = json.loads(f.read())
except:
history = {}
return history
def read_in_chunk(file_name, chunk_size=4 * 1024 * 1024, chunk_number=-1):
chunk_counter = 0
with open(file_name, "rb") as f:
while True:
data = f.read(chunk_size)
if data != b"" and (chunk_number == -1 or chunk_counter < chunk_number):
yield data
chunk_counter += 1
else:
return
def log(message):
print(f"[{time.strftime('%Y-%m-%d %H:%M:%S')}] {message}")
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册