提交 0ffbe102 编写于 作者: X xiaohanfei

区分yaml.conf

上级 aa599d7d
......@@ -6,18 +6,17 @@ import yaml
import codecs
def _get_yaml():
def _get_yaml(ticket_config):
"""
解析yaml
:return: s 字典
"""
path = os.path.join(os.path.dirname(__file__) + '/ticket_config.yaml')
path = os.path.join(os.path.dirname(__file__) + '/'+ticket_config)
f = codecs.open(path,encoding='utf-8')
s = yaml.load(f)
f.close()
return s
# def get_set_info():
# return _get_yaml()["set"]
#
......
......@@ -25,8 +25,7 @@
set:
station_date: "2018-02-10"
station_trains:
- "T17"
- "T297"
- "G1970"
# - "G2365"
# - "G1371"
# - "G1377"
......@@ -43,13 +42,14 @@ set:
- "硬卧"
is_more_ticket: True
ticke_peoples:
- ""
- "肖汉飞"
- ""
12306count:
# - uesr: "11"
# - pwd: "649823049lilymin"
- uesr: "@qq.com"
- pwd: "QWERTY"
- uesr: "vanglis"
- pwd: "fei314159"
select_refresh_interval: 0.1
expect_refresh_interval: 0.3
......@@ -58,7 +58,7 @@ is_aotu_code: False
#enable_proxy: False
damatu:
uesr: ""
uesr: "wenxianping"
pwd: "wen1995"
......
......@@ -10,196 +10,200 @@ from PIL import Image
from damatuCode.damatuWeb import DamatuApi
from myUrllib import myurllib2
codeimg = 'https://kyfw.12306.cn/otn/passcodeNew/getPassCodeNew?module=login&rand=sjrand&%s' % random.random()
def cookietp():
stoidinput("获取Cookie")
Url = "https://kyfw.12306.cn/otn/login/init"
myurllib2.get(Url)
# for index, c in enumerate(myurllib2.cookiejar):
# stoidinput(c)
def readImg():
"""
增加手动打码,只是登录接口,完全不用担心提交订单效率
思路
1.调用PIL显示图片
2.图片位置说明,验证码图片中每个图片代表一个下标,依次类推,1,2,3,4,5,6,7,8
3.控制台输入对应下标,按照英文逗号分开,即可手动完成打码,
:return:
"""
global randCode
stoidinput("下载验证码...")
img_path = './tkcode'
result = myurllib2.get(codeimg)
try:
open(img_path, 'wb').write(result)
if _get_yaml()["is_aotu_code"]:
randCode = DamatuApi(_get_yaml()["damatu"]["uesr"], _get_yaml()["damatu"]["pwd"], img_path).main()
else:
img = Image.open('./tkcode')
img.show()
codexy()
except OSError as e:
print (e)
pass
def stoidinput(text):
"""
正常信息输出
:param text:
:return:
"""
print("\033[34m[*]\033[0m %s " % text)
def errorinput(text):
"""
错误信息输出
:param text:
:return:
"""
print("\033[32m[!]\033[0m %s " % text)
return False
def codexy():
"""
获取验证码
:return: str
"""
Ofset = input("[*] 请输入验证码: ")
select = Ofset.split(',')
global randCode
post = []
offsetsX = 0 # 选择的答案的left值,通过浏览器点击8个小图的中点得到的,这样基本没问题
offsetsY = 0 # 选择的答案的top值
for ofset in select:
if ofset == '1':
offsetsY = 46
offsetsX = 42
elif ofset == '2':
offsetsY = 46
offsetsX = 105
elif ofset == '3':
offsetsY = 45
offsetsX = 184
elif ofset == '4':
offsetsY = 48
offsetsX = 256
elif ofset == '5':
offsetsY = 36
offsetsX = 117
elif ofset == '6':
offsetsY = 112
offsetsX = 115
elif ofset == '7':
offsetsY = 114
offsetsX = 181
elif ofset == '8':
offsetsY = 111
offsetsX = 252
else:
class go_login:
def __init__(self,ticket_config = ""):
self.codeimg = 'https://kyfw.12306.cn/otn/passcodeNew/getPassCodeNew?module=login&rand=sjrand&%s' % random.random()
self.ticket_config = ticket_config
self.text = ""
self.user = _get_yaml(ticket_config)["set"]["12306count"][0]["uesr"]
self.passwd = _get_yaml(ticket_config)["set"]["12306count"][1]["pwd"]
#self.randCode = ""
def cookietp(self):
self.stoidinput("获取Cookie")
Url = "https://kyfw.12306.cn/otn/login/init"
myurllib2.get(Url)
# for index, c in enumerate(myurllib2.cookiejar):
# stoidinput(c)
def readImg(self):
"""
增加手动打码,只是登录接口,完全不用担心提交订单效率
思路
1.调用PIL显示图片
2.图片位置说明,验证码图片中每个图片代表一个下标,依次类推,1,2,3,4,5,6,7,8
3.控制台输入对应下标,按照英文逗号分开,即可手动完成打码,
:return:
"""
global randCode
self.stoidinput("下载验证码...")
img_path = './tkcode'
result = myurllib2.get(self.codeimg)
try:
open(img_path, 'wb').write(result)
if _get_yaml(self.ticket_config)["is_aotu_code"]:
randCode = DamatuApi(_get_yaml(ticket_config)["damatu"]["uesr"], _get_yaml(ticket_config)["damatu"]["pwd"], img_path).main()
else:
img = Image.open('./tkcode')
img.show()
self.codexy()
except OSError as e:
print (e)
pass
post.append(offsetsX)
post.append(offsetsY)
randCode = str(post).replace(']', '').replace('[', '').replace("'", '').replace(' ', '')
def login(user, passwd):
"""
登陆
:param user: 账户名
:param passwd: 密码
:return:
"""
login_num = 0
while True:
cookietp()
readImg()
login_num += 1
randurl = 'https://kyfw.12306.cn/otn/passcodeNew/checkRandCodeAnsyn'
logurl = 'https://kyfw.12306.cn/otn/login/loginAysnSuggest'
surl = 'https://kyfw.12306.cn/otn/login/userLogin'
randdata = {
"randCode": randCode,
"rand": "sjrand"
}
logdata = {
"loginUserDTO.user_name": user,
"userDTO.password": passwd,
"randCode": randCode
}
ldata = {
"_json_att": None
}
print(randdata)
fresult = json.loads(myurllib2.Post(randurl, randdata), encoding='utf8')
checkcode = fresult['data']['msg']
if checkcode == 'FALSE':
errorinput("验证码有误,第{}次尝试重试".format(login_num))
else:
stoidinput("验证码通过,开始登录..")
sleep(1)
try:
tresult = json.loads(myurllib2.Post(logurl, logdata), encoding='utf8')
if 'data' not in tresult:
errorinput("登录失败: %s" % tresult['messages'][0])
# elif "messages" in tresult and tresult["messages"][0].find("密码输入错误") is not -1:
# errorinput("登陆失败:{}".format(tresult["messages"][0]))
# break
elif 'messages' in tresult and tresult['messages']:
messages = tresult['messages'][0]
if messages.find("密码输入错误") is not -1:
errorinput("登陆失败:{}".format(tresult["messages"][0]))
break
def stoidinput(self,text):
"""
正常信息输出
:param text:
:return:
"""
print("\033[34m[*]\033[0m %s " % text)
def errorinput(self,text):
"""
错误信息输出
:param text:
:return:
"""
print("\033[32m[!]\033[0m %s " % text)
return False
def codexy(self):
"""
获取验证码
:return: str
"""
Ofset = input("[*] 请输入验证码: ")
select = Ofset.split(',')
#global randCode
post = []
offsetsX = 0 # 选择的答案的left值,通过浏览器点击8个小图的中点得到的,这样基本没问题
offsetsY = 0 # 选择的答案的top值
for ofset in select:
if ofset == '1':
offsetsY = 46
offsetsX = 42
elif ofset == '2':
offsetsY = 46
offsetsX = 105
elif ofset == '3':
offsetsY = 45
offsetsX = 184
elif ofset == '4':
offsetsY = 48
offsetsX = 256
elif ofset == '5':
offsetsY = 36
offsetsX = 117
elif ofset == '6':
offsetsY = 112
offsetsX = 115
elif ofset == '7':
offsetsY = 114
offsetsX = 181
elif ofset == '8':
offsetsY = 111
offsetsX = 252
else:
pass
post.append(offsetsX)
post.append(offsetsY)
self.randCode = str(post).replace(']', '').replace('[', '').replace("'", '').replace(' ', '')
return self.randCode
def login(self):
"""
登陆
:param user: 账户名
:param passwd: 密码
:return:
"""
login_num = 0
while True:
self.cookietp()
self.readImg()
login_num += 1
randurl = 'https://kyfw.12306.cn/otn/passcodeNew/checkRandCodeAnsyn'
logurl = 'https://kyfw.12306.cn/otn/login/loginAysnSuggest'
surl = 'https://kyfw.12306.cn/otn/login/userLogin'
randdata = dict(randCode=self.randCode, rand="sjrand")
logdata = {
"loginUserDTO.user_name": self.user,
"userDTO.password": self.passwd,
"randCode": self.randCode
}
ldata = {
"_json_att": None
}
print(randdata)
fresult = json.loads(myurllib2.Post(randurl, randdata), encoding='utf8')
checkcode = fresult['data']['msg']
if checkcode == 'FALSE':
self.errorinput("验证码有误,第{}次尝试重试".format(login_num))
else:
self.stoidinput("验证码通过,开始登录..")
sleep(1)
try:
tresult = json.loads(myurllib2.Post(logurl, logdata), encoding='utf8')
if 'data' not in tresult:
self.errorinput("登录失败: %s" % tresult['messages'][0])
# elif "messages" in tresult and tresult["messages"][0].find("密码输入错误") is not -1:
# errorinput("登陆失败:{}".format(tresult["messages"][0]))
# break
elif 'messages' in tresult and tresult['messages']:
messages = tresult['messages'][0]
if messages.find("密码输入错误") is not -1:
self.errorinput("登陆失败:{}".format(tresult["messages"][0]))
break
else:
self.errorinput("登录失败: %s" % tresult['messages'][0])
self.stoidinput("尝试重新登陆")
else:
errorinput("登录失败: %s" % tresult['messages'][0])
stoidinput("尝试重新登陆")
else:
stoidinput("登录成功")
myurllib2.Post(surl, ldata)
getUserinfo()
break
except ValueError as e:
errorinput(e)
sleep(1)
def getUserinfo():
"""
登录成功后,显示用户名
:return:
"""
url = 'https://kyfw.12306.cn/otn/modifyUser/initQueryUserInfo'
data = dict(_json_att=None)
result = myurllib2.Post(url, data).decode('utf-8')
userinfo = result
name = r'<input name="userDTO.loginUserDTO.user_name" style="display:none;" type="text" value="(\S+)" />'
try:
stoidinput("欢迎 %s 登录" % re.search(name, result).group(1))
except AttributeError:
pass
def main():
login(_get_yaml()["set"]["12306count"][0]["uesr"], _get_yaml()["set"]["12306count"][1]["pwd"])
def logout():
url = 'https://kyfw.12306.cn/otn/login/loginOut'
result = myurllib2.get(url)
if result:
stoidinput("已退出")
else:
errorinput("退出失败")
self.stoidinput("登录成功")
myurllib2.Post(surl, ldata)
self.getUserinfo()
break
except ValueError as e:
self.errorinput(e)
sleep(1)
def getUserinfo(self):
"""
登录成功后,显示用户名
:return:
"""
url = 'https://kyfw.12306.cn/otn/modifyUser/initQueryUserInfo'
data = dict(_json_att=None)
result = myurllib2.Post(url, data).decode('utf-8')
userinfo = result
name = r'<input name="userDTO.loginUserDTO.user_name" style="display:none;" type="text" value="(\S+)" />'
try:
self.stoidinput("欢迎 %s 登录" % re.search(name, result).group(1))
except AttributeError:
pass
def logout(self):
url = 'https://kyfw.12306.cn/otn/login/loginOut'
result = myurllib2.get(url)
if result:
self.stoidinput("已退出")
else:
self.errorinput("退出失败")
if __name__ == "__main__":
main()
go_login.login()
# logout()
\ No newline at end of file
......@@ -24,8 +24,8 @@ import codecs
class select:
def __init__(self):
self.from_station, self.to_station, self.station_date, self._station_seat, self.is_more_ticket, self.ticke_peoples, self.select_refresh_interval, self.station_trains, self.expect_refresh_interval, self.ticket_black_list_time = self.get_ticket_info()
def __init__(self,ticket_config):
self.from_station, self.to_station, self.station_date, self._station_seat, self.is_more_ticket, self.ticke_peoples, self.select_refresh_interval, self.station_trains, self.expect_refresh_interval, self.ticket_black_list_time = self.get_ticket_info(ticket_config)
self.order_request_params = {} # 订单提交时的参数
self.ticketInfoForPassengerForm = {} # 初始化当前页面参数
self.current_seats = {} # 席别信息
......@@ -34,13 +34,14 @@ class select:
self.user_info = ""
self.secretStr = ""
self.ticket_black_list = dict()
self.ticket_config = ticket_config
def get_ticket_info(self):
def get_ticket_info(self,ticket_config):
"""
获取配置信息
:return:
"""
ticket_info_config = _get_yaml()
ticket_info_config = _get_yaml(ticket_config)
from_station = ticket_info_config["set"]["from_station"]
to_station = ticket_info_config["set"]["to_station"]
station_date = ticket_info_config["set"]["station_date"]
......
# -*- coding=utf-8 -*-
from init import login, select_ticket_info
from concurrent.futures import ThreadPoolExecutor
pool = ThreadPoolExecutor(max_workers=1)
def run():
login.main()
select_ticket_info.select().main()
def run(ticket_config):
login.go_login(ticket_config).login()
select_ticket_info.select(ticket_config).main()
run()
\ No newline at end of file
f1 = pool.submit(run("ticket_config.yaml"))
无法预览此类型文件
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册