diff --git a/index.py b/index.py index 6b38aae2d0a855f99cf8c96073e10e05fa903c88..e33941a78b490f9c04b35b176a640a8f528aa25e 100644 --- a/index.py +++ b/index.py @@ -1,284 +1,284 @@ - #coding:utf-8 - ''' - @author: ZainCheung - @LastEditors: ZainCheung - @description:网易云音乐全自动每日打卡云函数版 - @Date: 2020-06-25 14:28:48 - @LastEditTime: 2020-09-01 18:20:00 - ''' - from configparser import ConfigParser - from threading import Timer - import requests - import random - import hashlib - import datetime - import time - import json - import logging - import math - - logger = logging.getLogger() - grade = [10,40,70,130,200,400,1000,3000,8000,20000] - api = '' - - class Task(object): - - ''' - 对象的构造函数 - ''' - def __init__(self, uin, pwd, sckey, countrycode=86): - self.uin = uin - self.pwd = pwd - self.countrycode = countrycode - self.sckey = sckey - - ''' - 带上用户的cookie去发送数据 - url:完整的URL路径 - postJson:要以post方式发送的数据 - 返回response - ''' - def getResponse(self, url, postJson): - response = requests.post(url, data=postJson, headers={'Content-Type':'application/x-www-form-urlencoded'},cookies=self.cookies) - return response +#coding:utf-8 +''' +@author: ZainCheung +@LastEditors: ZainCheung +@description:网易云音乐全自动每日打卡云函数版 +@Date: 2020-06-25 14:28:48 +@LastEditTime: 2020-09-01 18:20:00 +''' +from configparser import ConfigParser +from threading import Timer +import requests +import random +import hashlib +import datetime +import time +import json +import logging +import math - ''' - 登录 - ''' - def login(self): - data = {"uin":self.uin,"pwd":self.pwd,"countrycode":self.countrycode,"r":random.random()} - if '@' in self.uin: - url = api + '?do=email' - else: - url = api + '?do=login' - response = requests.post(url, data=data, headers={'Content-Type':'application/x-www-form-urlencoded'}) - code = json.loads(response.text)['code'] - self.name = json.loads(response.text)['profile']['nickname'] - self.uid = json.loads(response.text)['account']['id'] - if code==200: - self.error = '' - else: - self.error = '登录失败,请检查账号' - self.cookies = response.cookies.get_dict() - self.log('登录成功') - - ''' - 每日签到 - ''' - def sign(self): - url = api + '?do=sign' - response = self.getResponse(url, {"r":random.random()}) - data = json.loads(response.text) - if data['code'] == 200: - self.log('签到成功') - else: - self.log('重复签到') - - ''' - 每日打卡300首歌 - ''' - def daka(self): - url = api + '?do=daka' - response = self.getResponse(url, {"r":random.random()}) - self.log(response.text) +logger = logging.getLogger() +grade = [10,40,70,130,200,400,1000,3000,8000,20000] +api = '' - ''' - 查询用户详情 - ''' - def detail(self): - url = api + '?do=detail' - data = {"uid":self.uid, "r":random.random()} - response = self.getResponse(url, data) - data = json.loads(response.text) - self.level = data['level'] - self.listenSongs = data['listenSongs'] - self.log('获取用户详情成功') +class Task(object): + + ''' + 对象的构造函数 + ''' + def __init__(self, uin, pwd, sckey, countrycode=86): + self.uin = uin + self.pwd = pwd + self.countrycode = countrycode + self.sckey = sckey - ''' - Server推送 - ''' - def server(self): - if self.sckey == '': - return - url = 'https://sc.ftqq.com/' + self.sckey + '.send' - self.diyText() # 构造发送内容 - response = requests.get(url,params={"text":self.title, "desp":self.content}) - data = json.loads(response.text) - if data['errno'] == 0: - self.log('用户:' + self.name + ' Server酱推送成功') - else: - self.log('用户:' + self.name + ' Server酱推送失败,请检查sckey是否正确') + ''' + 带上用户的cookie去发送数据 + url:完整的URL路径 + postJson:要以post方式发送的数据 + 返回response + ''' + def getResponse(self, url, postJson): + response = requests.post(url, data=postJson, headers={'Content-Type':'application/x-www-form-urlencoded'},cookies=self.cookies) + return response - ''' - 自定义要推送到微信的内容 - title:消息的标题 - content:消息的内容,支持MarkDown格式 - ''' - def diyText(self): - # today = datetime.date.today() - # kaoyan_day = datetime.date(2020,12,21) #2021考研党的末日 - # date = (kaoyan_day - today).days - one = requests.get('https://api.qinor.cn/soup/').text # 每日一句的api - for count in grade: - if self.level < 10: - if self.listenSongs < 20000: - if self.listenSongs < count: - self.tip = '还需听歌' + str(count-self.listenSongs) + '首即可升级' - break - else: - self.tip = '你已经听够20000首歌曲,如果登录天数达到800天即可满级' - else: - self.tip = '恭喜你已经满级!' - if self.error == '': - state = ("- 目前已完成签到\n" - "- 今日共打卡" + str(self.dakanum) + "次\n" - "- 今日共播放" + str(self.dakaSongs) + "首歌\n" - "- 还需要打卡" + str(self.day) +"天") - self.title = ("网易云今日打卡" + str(self.dakaSongs) + "首,已播放" + str(self.listenSongs) + "首") - else: - state = self.error - self.title = '网易云听歌任务出现问题!' - self.content = ( - "------\n" - "#### 账户信息\n" - "- 用户名称:" + str(self.name) + "\n" - "- 当前等级:" + str(self.level) + "级\n" - "- 累计播放:" + str(self.listenSongs) + "首\n" - "- 升级提示:" + self.tip + "\n\n" - "------\n" - "#### 任务状态\n" + str(state) + "\n\n" - "------\n" - "#### 打卡日志\n" + self.dakaSongs_list + "\n\n" - "------\n" - "#### 今日一句\n- " + one + "\n\n") + ''' + 登录 + ''' + def login(self): + data = {"uin":self.uin,"pwd":self.pwd,"countrycode":self.countrycode,"r":random.random()} + if '@' in self.uin: + url = api + '?do=email' + else: + url = api + '?do=login' + response = requests.post(url, data=data, headers={'Content-Type':'application/x-www-form-urlencoded'}) + code = json.loads(response.text)['code'] + self.name = json.loads(response.text)['profile']['nickname'] + self.uid = json.loads(response.text)['account']['id'] + if code==200: + self.error = '' + else: + self.error = '登录失败,请检查账号' + self.cookies = response.cookies.get_dict() + self.log('登录成功') - ''' - 打印日志 - ''' - def log(self, text): - time_stamp = datetime.datetime.now() - print(time_stamp.strftime('%Y.%m.%d-%H:%M:%S') + ' ' + str(text)) - self.time =time_stamp.strftime('%H:%M:%S') - self.list.append("- [" + self.time + "] " + str(text) + "\n\n") + ''' + 每日签到 + ''' + def sign(self): + url = api + '?do=sign' + response = self.getResponse(url, {"r":random.random()}) + data = json.loads(response.text) + if data['code'] == 200: + self.log('签到成功') + else: + self.log('重复签到') - ''' - 开始执行 - ''' - def start(self): - try: - self.list = [] - self.list.append("- 初始化完成\n\n") - self.login() - self.sign() - self.detail() - counter = self.listenSongs - for i in range(1,10): - self.daka() - #self.log('用户:' + self.name + ' 第' + str(i) + '次打卡成功,即将休眠10秒') - self.log('第' + str(i) + '次打卡成功,即将休眠10秒') - time.sleep(10) - self.dakanum = i - self.detail() - self.dakaSongs = self.listenSongs - counter - self.log('今日已打卡播放' + str(self.dakaSongs) + '首') - if self.dakaSongs == 300: - break - if self.listenSongs >= 20000: - self.day = 0 - else: - self.day = math.ceil((20000 - self.listenSongs)/300) - self.list.append("- 打卡结束,消息推送\n\n") - self.dakaSongs_list = ''.join(self.list) - self.server() - except: - self.log('用户任务执行中断,请检查账号密码是否正确') - else: - self.log('用户:' + self.name + ' 今日任务已完成') - - ''' - 初始化:读取配置,配置文件为init.config - 返回字典类型的配置对象 + 每日打卡300首歌 ''' - def init(): - global api # 初始化时设置api - config = ConfigParser() - config.read('init.config', encoding='UTF-8-sig') - uin = config['token']['account'] - pwd = config['token']['password'] - countrycode = config['token']['countrycode'] - api = config['setting']['api'] - md5Switch = config.getboolean('setting','md5Switch') - peopleSwitch = config.getboolean('setting','peopleSwitch') - sckey = config['setting']['sckey'] - logger.info('配置文件读取完毕') - conf = { - 'uin': uin, - 'pwd': pwd, - 'countrycode': countrycode, - 'api': api, - 'md5Switch': md5Switch, - 'peopleSwitch':peopleSwitch, - 'sckey':sckey - } - return conf + def daka(self): + url = api + '?do=daka' + response = self.getResponse(url, {"r":random.random()}) + self.log(response.text) ''' - MD5加密 - str:待加密字符 - 返回加密后的字符 + 查询用户详情 ''' - def md5(str): - hl = hashlib.md5() - hl.update(str.encode(encoding='utf-8')) - return hl.hexdigest() + def detail(self): + url = api + '?do=detail' + data = {"uid":self.uid, "r":random.random()} + response = self.getResponse(url, data) + data = json.loads(response.text) + self.level = data['level'] + self.listenSongs = data['listenSongs'] + self.log('获取用户详情成功') ''' - 加载Json文件 - jsonPath:json文件的名字,例如account.json + Server推送 ''' - def loadJson(jsonPath): - with open(jsonPath,encoding='utf-8') as f: - account = json.load(f) - return account + def server(self): + if self.sckey == '': + return + url = 'https://sc.ftqq.com/' + self.sckey + '.send' + self.diyText() # 构造发送内容 + response = requests.get(url,params={"text":self.title, "desp":self.content}) + data = json.loads(response.text) + if data['errno'] == 0: + self.log('用户:' + self.name + ' Server酱推送成功') + else: + self.log('用户:' + self.name + ' Server酱推送失败,请检查sckey是否正确') ''' - 检查api + 自定义要推送到微信的内容 + title:消息的标题 + content:消息的内容,支持MarkDown格式 ''' - def check(): - url = api + '?do=check' - respones = requests.get(url) - if respones.status_code == 200: - logger.info('api测试正常') + def diyText(self): + # today = datetime.date.today() + # kaoyan_day = datetime.date(2020,12,21) #2021考研党的末日 + # date = (kaoyan_day - today).days + one = requests.get('https://api.qinor.cn/soup/').text # 每日一句的api + for count in grade: + if self.level < 10: + if self.listenSongs < 20000: + if self.listenSongs < count: + self.tip = '还需听歌' + str(count-self.listenSongs) + '首即可升级' + break + else: + self.tip = '你已经听够20000首歌曲,如果登录天数达到800天即可满级' + else: + self.tip = '恭喜你已经满级!' + if self.error == '': + state = ("- 目前已完成签到\n" + "- 今日共打卡" + str(self.dakanum) + "次\n" + "- 今日共播放" + str(self.dakaSongs) + "首歌\n" + "- 还需要打卡" + str(self.day) +"天") + self.title = ("网易云今日打卡" + str(self.dakaSongs) + "首,已播放" + str(self.listenSongs) + "首") else: - logger.error('api测试异常') + state = self.error + self.title = '网易云听歌任务出现问题!' + self.content = ( + "------\n" + "#### 账户信息\n" + "- 用户名称:" + str(self.name) + "\n" + "- 当前等级:" + str(self.level) + "级\n" + "- 累计播放:" + str(self.listenSongs) + "首\n" + "- 升级提示:" + self.tip + "\n\n" + "------\n" + "#### 任务状态\n" + str(state) + "\n\n" + "------\n" + "#### 打卡日志\n" + self.dakaSongs_list + "\n\n" + "------\n" + "#### 今日一句\n- " + one + "\n\n") ''' - 任务池 + 打印日志 ''' - def taskPool(): - - config = init() - check() # 每天对api做一次检查 - if config['peopleSwitch'] is True: - logger.info('多人开关已打开,即将执行进行多人任务') - account = loadJson("account.json") - for man in account: - logger.info('账号: ' + man['account'] + ' 开始执行\n========================================') - task = Task(man['account'], man['password'], man['sckey']) - task.start() - time.sleep(10) - logger.info('所有账号已全部完成任务,服务进入休眠中,等待明天重新启动') - else : - logger.info('账号: ' + config['uin'] + ' 开始执行\n========================================') - if config['md5Switch'] is True: - logger.info('MD5开关已打开,即将开始为你加密,密码不会上传至服务器,请知悉') - config['pwd'] = md5(config['pwd']) - task = Task(config['uin'], config['pwd'], config['sckey'], config['countrycode']) - task.start() + def log(self, text): + time_stamp = datetime.datetime.now() + print(time_stamp.strftime('%Y.%m.%d-%H:%M:%S') + ' ' + str(text)) + self.time =time_stamp.strftime('%H:%M:%S') + self.list.append("- [" + self.time + "] " + str(text) + "\n\n") ''' - 程序的入口 + 开始执行 ''' - def main(event,content): - taskPool() + def start(self): + try: + self.list = [] + self.list.append("- 初始化完成\n\n") + self.login() + self.sign() + self.detail() + counter = self.listenSongs + for i in range(1,10): + self.daka() + #self.log('用户:' + self.name + ' 第' + str(i) + '次打卡成功,即将休眠10秒') + self.log('第' + str(i) + '次打卡成功,即将休眠10秒') + time.sleep(10) + self.dakanum = i + self.detail() + self.dakaSongs = self.listenSongs - counter + self.log('今日已打卡播放' + str(self.dakaSongs) + '首') + if self.dakaSongs == 300: + break + if self.listenSongs >= 20000: + self.day = 0 + else: + self.day = math.ceil((20000 - self.listenSongs)/300) + self.list.append("- 打卡结束,消息推送\n\n") + self.dakaSongs_list = ''.join(self.list) + self.server() + except: + self.log('用户任务执行中断,请检查账号密码是否正确') + else: + self.log('用户:' + self.name + ' 今日任务已完成') + + +''' +初始化:读取配置,配置文件为init.config +返回字典类型的配置对象 +''' +def init(): + global api # 初始化时设置api + config = ConfigParser() + config.read('init.config', encoding='UTF-8-sig') + uin = config['token']['account'] + pwd = config['token']['password'] + countrycode = config['token']['countrycode'] + api = config['setting']['api'] + md5Switch = config.getboolean('setting','md5Switch') + peopleSwitch = config.getboolean('setting','peopleSwitch') + sckey = config['setting']['sckey'] + logger.info('配置文件读取完毕') + conf = { + 'uin': uin, + 'pwd': pwd, + 'countrycode': countrycode, + 'api': api, + 'md5Switch': md5Switch, + 'peopleSwitch':peopleSwitch, + 'sckey':sckey + } + return conf + +''' +MD5加密 +str:待加密字符 +返回加密后的字符 +''' +def md5(str): + hl = hashlib.md5() + hl.update(str.encode(encoding='utf-8')) + return hl.hexdigest() + +''' +加载Json文件 +jsonPath:json文件的名字,例如account.json +''' +def loadJson(jsonPath): + with open(jsonPath,encoding='utf-8') as f: + account = json.load(f) + return account + +''' +检查api +''' +def check(): + url = api + '?do=check' + respones = requests.get(url) + if respones.status_code == 200: + logger.info('api测试正常') + else: + logger.error('api测试异常') + +''' +任务池 +''' +def taskPool(): + + config = init() + check() # 每天对api做一次检查 + if config['peopleSwitch'] is True: + logger.info('多人开关已打开,即将执行进行多人任务') + account = loadJson("account.json") + for man in account: + logger.info('账号: ' + man['account'] + ' 开始执行\n========================================') + task = Task(man['account'], man['password'], man['sckey']) + task.start() + time.sleep(10) + logger.info('所有账号已全部完成任务,服务进入休眠中,等待明天重新启动') + else : + logger.info('账号: ' + config['uin'] + ' 开始执行\n========================================') + if config['md5Switch'] is True: + logger.info('MD5开关已打开,即将开始为你加密,密码不会上传至服务器,请知悉') + config['pwd'] = md5(config['pwd']) + task = Task(config['uin'], config['pwd'], config['sckey'], config['countrycode']) + task.start() + +''' +程序的入口 +''' +def main(event,content): + taskPool()