bilibili.py 6.6 KB
Newer Older
H
Hsury 已提交
1
#!/usr/bin/env python3.7
H
Hsury 已提交
2 3 4 5 6 7 8 9 10
# -*- coding: utf-8 -*-

import base64
import hashlib
import random
import requests
import rsa
import time
from urllib import parse
W
wizardforcel 已提交
11
from BiliDriveEx.util import *
H
Hsury 已提交
12 13 14 15

class Bilibili:
    app_key = "1d8b6e7d45233436"

H
Hsury 已提交
16
    def __init__(self):
H
Hsury 已提交
17
        self._session = requests.Session()
H
Hsury 已提交
18
        self._session.headers.update({'User-Agent': "Mozilla/5.0 BiliDroid/5.51.1 (bbcallen@gmail.com)"})
H
Hsury 已提交
19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34
        self.get_cookies = lambda: self._session.cookies.get_dict(domain=".bilibili.com")
        self.get_uid = lambda: self.get_cookies().get("DedeUserID", "")
        self.username = ""
        self.password = ""
        self.info = {
            'ban': False,
            'coins': 0,
            'experience': {
                'current': 0,
                'next': 0,
            },
            'face': "",
            'level': 0,
            'nickname': "",
        }

W
wizardforcel 已提交
35

H
Hsury 已提交
36

H
Hsury 已提交
37
    def _requests(self, method, url, decode_level=2, retry=0, timeout=10, **kwargs):
H
Hsury 已提交
38 39 40
        if method in ["get", "post"]:
            for _ in range(retry + 1):
                try:
H
Hsury 已提交
41
                    response = getattr(self._session, method)(url, timeout=timeout, **kwargs)
H
Hsury 已提交
42 43
                    return response.json() if decode_level == 2 else response.content if decode_level == 1 else response
                except:
H
Hsury 已提交
44
                    pass
H
Hsury 已提交
45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60
        return None

    def _solve_captcha(self, image):
        url = "https://bili.dev:2233/captcha"
        payload = {'image': base64.b64encode(image).decode("utf-8")}
        response = self._requests("post", url, json=payload)
        return response['message'] if response and response.get("code") == 0 else None

    @staticmethod
    def calc_sign(param):
        salt = "560c52ccd288fed045859ed18bffd973"
        sign_hash = hashlib.md5()
        sign_hash.update(f"{param}{salt}".encode())
        return sign_hash.hexdigest()

    # 登录
H
Hsury 已提交
61 62 63 64 65 66 67 68 69
    def login(self, username, password):
        def get_key():
            url = f"https://passport.bilibili.com/api/oauth2/getKey"
            payload = {
                'appkey': Bilibili.app_key,
                'sign': self.calc_sign(f"appkey={Bilibili.app_key}"),
            }
            while True:
                response = self._requests("post", url, data=payload)
H
Hsury 已提交
70
                if response and response.get("code") == 0:
H
Hsury 已提交
71 72 73 74 75 76 77 78 79 80 81 82 83
                    return {
                        'key_hash': response['data']['hash'],
                        'pub_key': rsa.PublicKey.load_pkcs1_openssl_pem(response['data']['key'].encode()),
                    }
                else:
                    time.sleep(1)

        self.username = username
        self.password = password

        while True:
            key = get_key()
            key_hash, pub_key = key['key_hash'], key['pub_key']
H
Hsury 已提交
84
            url = f"https://passport.bilibili.com/api/v2/oauth2/login"
H
Hsury 已提交
85
            param = f"appkey={Bilibili.app_key}&password={parse.quote_plus(base64.b64encode(rsa.encrypt(f'{key_hash}{self.password}'.encode(), pub_key)))}&username={parse.quote_plus(self.username)}"
H
Hsury 已提交
86 87 88 89
            payload = f"{param}&sign={self.calc_sign(param)}"
            headers = {'Content-type': "application/x-www-form-urlencoded"}
            response = self._requests("post", url, data=payload, headers=headers)
            while True:
H
Hsury 已提交
90 91 92 93 94 95 96
                if response and response.get("code") is not None:
                    if response['code'] == -105:
                        url = f"https://passport.bilibili.com/captcha"
                        headers = {'Host': "passport.bilibili.com"}
                        response = self._requests("get", url, headers=headers, decode_level=1)
                        captcha = self._solve_captcha(response)
                        if captcha:
W
wizardforcel 已提交
97
                            log(f"登录验证码识别结果: {captcha}")
H
Hsury 已提交
98 99 100 101 102 103 104
                            key = get_key()
                            key_hash, pub_key = key['key_hash'], key['pub_key']
                            url = f"https://passport.bilibili.com/api/v2/oauth2/login"
                            param = f"appkey={Bilibili.app_key}&captcha={captcha}&password={parse.quote_plus(base64.b64encode(rsa.encrypt(f'{key_hash}{self.password}'.encode(), pub_key)))}&username={parse.quote_plus(self.username)}"
                            payload = f"{param}&sign={self.calc_sign(param)}"
                            headers = {'Content-type': "application/x-www-form-urlencoded"}
                            response = self._requests("post", url, data=payload, headers=headers)
H
Hsury 已提交
105
                        else:
W
wizardforcel 已提交
106
                            log(f"登录验证码识别服务暂时不可用, 10秒后重试")
H
Hsury 已提交
107 108
                            time.sleep(10)
                            break
109 110 111
                    elif response['code'] == -449:
                        time.sleep(1)
                        response = self._requests("post", url, data=payload, headers=headers)
H
Hsury 已提交
112 113 114
                    elif response['code'] == 0 and response['data']['status'] == 0:
                        for cookie in response['data']['cookie_info']['cookies']:
                            self._session.cookies.set(cookie['name'], cookie['value'], domain=".bilibili.com")
W
wizardforcel 已提交
115
                        log("登录成功")
H
Hsury 已提交
116
                        return True
H
Hsury 已提交
117
                    else:
W
wizardforcel 已提交
118
                        log(f"登录失败 {response}")
H
Hsury 已提交
119 120
                        return False
                else:
W
wizardforcel 已提交
121
                    log(f"当前IP登录过于频繁, 1分钟后重试")
H
Hsury 已提交
122 123
                    time.sleep(60)
                    break
H
Hsury 已提交
124 125 126

    # 获取用户信息
    def get_user_info(self):
H
Hsury 已提交
127
        url = f"https://api.bilibili.com/x/space/myinfo?jsonp=jsonp"
H
Hsury 已提交
128 129 130 131 132 133 134 135 136 137 138 139 140
        headers = {
            'Host': "api.bilibili.com",
            'Referer': f"https://space.bilibili.com/{self.get_uid()}/",
        }
        response = self._requests("get", url, headers=headers)
        if response and response.get("code") == 0:
            self.info['ban'] = bool(response['data']['silence'])
            self.info['coins'] = response['data']['coins']
            self.info['experience']['current'] = response['data']['level_exp']['current_exp']
            self.info['experience']['next'] = response['data']['level_exp']['next_exp']
            self.info['face'] = response['data']['face']
            self.info['level'] = response['data']['level']
            self.info['nickname'] = response['data']['name']
W
wizardforcel 已提交
141
            log(f"{self.info['nickname']}(UID={self.get_uid()}), Lv.{self.info['level']}({self.info['experience']['current']}/{self.info['experience']['next']}), 拥有{self.info['coins']}枚硬币, 账号{'状态正常' if not self.info['ban'] else '被封禁'}")
H
Hsury 已提交
142 143
            return True
        else:
W
wizardforcel 已提交
144
            log("用户信息获取失败")
H
Hsury 已提交
145
            return False