diff --git a/BiliDriveEx/bilibili.py b/BiliDriveEx/bilibili.py index 0116b1c0a787de75ed2856f39a5c3b2400170c34..227bfd8528bdce53a67fe9cddd3bad28bb8485e8 100644 --- a/BiliDriveEx/bilibili.py +++ b/BiliDriveEx/bilibili.py @@ -37,11 +37,14 @@ class Bilibili: def _solve_captcha(self, image): url = "https://bili.dev:2233/captcha" payload = {'image': base64.b64encode(image).decode("utf-8")} - response = request_retry("post", url, - headers=Bilibili.default_hdrs, - json=payload - ).json() - return response['message'] if response and response.get("code") == 0 else None + try: + j = request_retry("post", url, + headers=Bilibili.default_hdrs, + json=payload + ).json() + except: + return None + return j['message'] if j['code'] == 0 else None @staticmethod def calc_sign(param): @@ -51,29 +54,37 @@ class Bilibili: return sign_hash.hexdigest() - def get_key(self): + def _get_key(self): url = f"https://passport.bilibili.com/api/oauth2/getKey" payload = { 'appkey': Bilibili.app_key, 'sign': self.calc_sign(f"appkey={Bilibili.app_key}"), } - r = request_retry("post", url, data=payload, - headers=Bilibili.default_hdrs, - cookies=self.cookies, retry=999999 - ) + try: + r = request_retry("post", url, data=payload, + headers=Bilibili.default_hdrs, + cookies=self.cookies, retry=999999 + ) + except: + return None for k, v in r.cookies.items(): self.cookies[k] = v j = r.json() - if j and j['code'] == 0: + if j['code'] == 0: return { 'key_hash': j['data']['hash'], 'pub_key': rsa.PublicKey.load_pkcs1_openssl_pem(j['data']['key'].encode()), } + else: + return None - def login_once(self, username, password, captcha=None): - key = self.get_key() + def _login_once(self, username, password, captcha=None): + key = self._get_key() + if not key: return {'code': 114514, 'message': 'key 获取失败'} key_hash, pub_key = key['key_hash'], key['pub_key'] + username = parse.quote_plus(username) password = parse.quote_plus(base64.b64encode(rsa.encrypt(f'{key_hash}{password}'.encode(), pub_key))) + url = f"https://passport.bilibili.com/api/v2/oauth2/login" param = f"appkey={Bilibili.app_key}" if captcha: param += f'&captcha={captcha}' @@ -81,36 +92,46 @@ class Bilibili: payload = f"{param}&sign={self.calc_sign(param)}" headers = Bilibili.default_hdrs.copy() headers.update({'Content-type': "application/x-www-form-urlencoded"}) - j = request_retry("POST", url, data=payload, - headers=headers, - cookies=self.cookies - ).json() + + try: + j = request_retry("POST", url, data=payload, + headers=headers, + cookies=self.cookies + ).json() + except Exception as ex: + return {'code': 114514, 'message': str(ex)} return j - def get_captcha(self): + def _get_captcha(self): url = f"https://passport.bilibili.com/captcha" - data = request_retry('GET', url, - headers=Bilibili.default_hdrs, - cookies=self.cookies - ).content - return data + try: + img = request_retry('GET', url, + headers=Bilibili.default_hdrs, + cookies=self.cookies + ).content + except: + return None + return img # 登录 def login(self, username, password): captcha = None while True: - response = self.login_once(username, password, captcha) - # print(response, self.cookies) + j = self._login_once(username, password, captcha) - if not response or 'code' not in response: + if 'code' not in j: log(f"当前IP登录过于频繁, 1分钟后重试") time.sleep(60) continue - if response['code'] == -105: - response = self.get_captcha() - captcha = self._solve_captcha(response) + if j['code'] == -105: + img = self._get_captcha() + if not img: + log(f"验证码获取失败") + time.sleep(1) + continue + captcha = self._solve_captcha(img) if captcha: log(f"登录验证码识别结果: {captcha}") else: @@ -118,36 +139,40 @@ class Bilibili: time.sleep(10) continue - if response['code'] == -449: + if j['code'] == -449: time.sleep(1) continue - if response['code'] == 0 and response['data']['status'] == 0: + if j['code'] == 0 and j['data']['status'] == 0: self.cookies = {} - for cookie in response['data']['cookie_info']['cookies']: + for cookie in j['data']['cookie_info']['cookies']: self.cookies[cookie['name']] = cookie['value'] log("登录成功") self.save_cookies() return True - log(f"登录失败 {response}") + log(f"登录失败 {j['message']}") return False # 获取用户信息 def get_user_info(self, fmt=True): url = f"https://api.bilibili.com/x/space/myinfo" - headers = { + headers = Bilibili.default_hdrs.copy() + headers.update({ 'Referer': f"https://space.bilibili.com", - } - response = request_retry("get", url, - headers=headers, - cookies=self.cookies - ).json() + }) - if not response or response.get("code") != 0: + try: + j = request_retry("get", url, + headers=headers, + cookies=self.cookies + ).json() + except: return + if j['code'] != 0: return + info = { 'ban': False, 'coins': 0, @@ -160,14 +185,14 @@ class Bilibili: 'nickname': "", 'uid': 0, } - info['ban'] = bool(response['data']['silence']) - info['coins'] = response['data']['coins'] - info['experience']['current'] = response['data']['level_exp']['current_exp'] - info['experience']['next'] = response['data']['level_exp']['next_exp'] - info['face'] = response['data']['face'] - info['level'] = response['data']['level'] - info['nickname'] = response['data']['name'] - info['uid'] = response['data']['mid'] + info['ban'] = bool(j['data']['silence']) + info['coins'] = j['data']['coins'] + info['experience']['current'] = j['data']['level_exp']['current_exp'] + info['experience']['next'] = j['data']['level_exp']['next_exp'] + info['face'] = j['data']['face'] + info['level'] = j['data']['level'] + info['nickname'] = j['data']['name'] + info['uid'] = j['data']['mid'] if fmt: return f"{info['nickname']}(UID={info['uid']}), Lv.{info['level']}({info['experience']['current']}/{info['experience']['next']}), 拥有{info['coins']}枚硬币, 账号{'状态正常' if not info['ban'] else '被封禁'}" @@ -186,17 +211,12 @@ class Bilibili: pass def exist(self, sha1): + url = self.default_url(sha1) try: - url = self.default_url(sha1) - headers = { - 'Referer': "http://t.bilibili.com/", - 'User-Agent': "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/79.0.3945.79 Safari/537.36", - } - res = request_retry('HEAD', url, headers=headers, timeout=10) - return url if res.status_code == 200 else None + r = request_retry('HEAD', url, headers=Bilibili.default_hdrs) except: return - + return url if r.status_code == 200 else None def image_upload(self, data): sha1 = calc_sha1(data) @@ -204,11 +224,11 @@ class Bilibili: if url: return {'code': 0, 'data': {'image_url': url}} url = "https://api.vc.bilibili.com/api/v1/drawImage/upload" - headers = { + headers = Bilibili.default_hdrs.copy() + headers.update({ 'Origin': "https://t.bilibili.com", 'Referer': "https://t.bilibili.com/", - 'User-Agent': "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/79.0.3945.79 Safari/537.36", - } + }) files = { 'file_up': (f"{int(time.time() * 1000)}.png", data), } @@ -217,11 +237,14 @@ class Bilibili: 'category': "daily", } try: - response = requests.post(url, data=data, + r = requests.post(url, data=data, headers=headers, cookies=self.cookies, files=files, timeout=300 - ).json() - except: - response = None - return response \ No newline at end of file + ) + except Exception as ex: + return {'code': 114514, 'message': str(ex)} + + if r.status_code != 200: + return {'code': r.status_code, 'message': r.text} + return r.json() \ No newline at end of file