提交 43f05463 编写于 作者: W wizardforcel

优化登录逻辑

上级 8fd0bd3e
......@@ -44,7 +44,9 @@ def fetch_meta(s):
def login_handle(args):
if api.login(username=args.username, password=args.password):
api.get_user_info()
info = api.get_user_info()
if info: log_info(info)
else: log("用户信息获取失败")
with open(os.path.join(bundle_dir, "cookies.json"), "w", encoding="utf-8") as f:
f.write(json.dumps(api.get_cookies(), ensure_ascii=False, indent=2))
......
......@@ -19,26 +19,13 @@ class Bilibili:
default_url = lambda self, sha1: f"http://i0.hdslb.com/bfs/album/{sha1}.png"
meta_string = lambda self, url: ("bdex://" + re.findall(r"[a-fA-F0-9]{40}", url)[0]) if re.match(r"^http(s?)://i0.hdslb.com/bfs/album/[a-fA-F0-9]{40}.png$", url) else url
get_cookies = lambda self: self._session.cookies.get_dict(domain=".bilibili.com")
get_cookies = lambda self: self.cookies
get_uid = lambda self: self.get_cookies().get("DedeUserID", "")
def __init__(self):
self.cookie = {}
self.cookies = {}
self._session = requests.Session()
self._session.headers.update({'User-Agent': "Mozilla/5.0 BiliDroid/5.51.1 (bbcallen@gmail.com)"})
self.info = {
'ban': False,
'coins': 0,
'experience': {
'current': 0,
'next': 0,
},
'face': "",
'level': 0,
'nickname': "",
}
def _requests(self, method, url, decode_level=0, retry=0, timeout=10, **kwargs):
for _ in range(retry + 1):
......@@ -51,7 +38,10 @@ class Bilibili:
def _solve_captcha(self, image):
url = "https://bili.dev:2233/captcha"
payload = {'image': base64.b64encode(image).decode("utf-8")}
response = self._requests("post", url, json=payload).json()
response = request_retry("post", url,
headers=Bilibili.default_hdrs,
json=payload
).json()
return response['message'] if response and response.get("code") == 0 else None
@staticmethod
......@@ -68,11 +58,16 @@ class Bilibili:
'appkey': Bilibili.app_key,
'sign': self.calc_sign(f"appkey={Bilibili.app_key}"),
}
res = self._requests("post", url, data=payload, retry=999999).json()
if res and res['code'] == 0:
r = request_retry("post", url, data=payload,
headers=Bilibili.default_hdrs,
cookies=self.cookies, retry=999999
)
for k, v in r.cookies.items(): self.cookies[k] = v
j = r.json()
if j and j['code'] == 0:
return {
'key_hash': res['data']['hash'],
'pub_key': rsa.PublicKey.load_pkcs1_openssl_pem(res['data']['key'].encode()),
'key_hash': j['data']['hash'],
'pub_key': rsa.PublicKey.load_pkcs1_openssl_pem(j['data']['key'].encode()),
}
def login_once(self, username, password, captcha=None):
......@@ -87,14 +82,19 @@ class Bilibili:
payload = f"{param}&sign={self.calc_sign(param)}"
headers = Bilibili.default_hdrs.copy()
headers.update({'Content-type': "application/x-www-form-urlencoded"})
res = self._requests("POST", url, data=payload, headers=headers).json()
return res
j = request_retry("POST", url, data=payload,
headers=headers,
cookies=self.cookies
).json()
return j
def get_captcha(self):
url = f"https://passport.bilibili.com/captcha"
headers = {'Host': "passport.bilibili.com"}
res = self._requests('GET', url, headers=headers).content
return res
data = request_retry('GET', url,
headers=Bilibili.default_hdrs,
cookies=self.cookies
).content
return data
# 登录
def login(self, username, password):
......@@ -102,6 +102,7 @@ class Bilibili:
captcha = None
while True:
response = self.login_once(username, password, captcha)
print(response, self.cookies)
if not response or 'code' not in response:
log(f"当前IP登录过于频繁, 1分钟后重试")
......@@ -124,7 +125,7 @@ class Bilibili:
if response['code'] == 0 and response['data']['status'] == 0:
for cookie in response['data']['cookie_info']['cookies']:
self._session.cookies.set(cookie['name'], cookie['value'], domain=".bilibili.com")
self.cookies[cookie['name']] = cookie['value']
log("登录成功")
return True
......@@ -139,20 +140,37 @@ class Bilibili:
'Host': "api.bilibili.com",
'Referer': f"https://space.bilibili.com/{self.get_uid()}/",
}
response = self._requests("get", url, headers=headers).json()
if response and response.get("code") == 0:
self.info['ban'] = bool(response['data']['silence'])
self.info['coins'] = response['data']['coins']
self.info['experience']['current'] = response['data']['level_exp']['current_exp']
self.info['experience']['next'] = response['data']['level_exp']['next_exp']
self.info['face'] = response['data']['face']
self.info['level'] = response['data']['level']
self.info['nickname'] = response['data']['name']
log(f"{self.info['nickname']}(UID={self.get_uid()}), Lv.{self.info['level']}({self.info['experience']['current']}/{self.info['experience']['next']}), 拥有{self.info['coins']}枚硬币, 账号{'状态正常' if not self.info['ban'] else '被封禁'}")
return True
else:
log("用户信息获取失败")
response = request_retry("get", url,
headers=headers,
cookies=self.cookies
).json()
if not response or response.get("code") != 0:
return False
print(response)
info = {
'ban': False,
'coins': 0,
'experience': {
'current': 0,
'next': 0,
},
'face': "",
'level': 0,
'nickname': "",
'uid': 0,
}
info['ban'] = bool(response['data']['silence'])
info['coins'] = response['data']['coins']
info['experience']['current'] = response['data']['level_exp']['current_exp']
info['experience']['next'] = response['data']['level_exp']['next_exp']
info['face'] = response['data']['face']
info['level'] = response['data']['level']
info['nickname'] = response['data']['name']
info['uid'] = response['data']['mid']
return info
def exist(self, sha1):
......
......@@ -79,4 +79,7 @@ def request_retry(method, url, retry=5, **kwargs):
if i == retry - 1: raise ex
get_retry = lambda url, retry=5, **kwargs: request_retry('GET', url, retry, **kwargs)
post_retry = lambda url, retry=5, **kwargs: request_retry('POST', url, retry, **kwargs)
\ No newline at end of file
post_retry = lambda url, retry=5, **kwargs: request_retry('POST', url, retry, **kwargs)
def log_info(info):
log(f"{info['nickname']}(UID={info['uid']}), Lv.{info['level']}({info['experience']['current']}/{info['experience']['next']}), 拥有{info['coins']}枚硬币, 账号{'状态正常' if not info['ban'] else '被封禁'}")
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册