提交 5936a888 编写于 作者: 泰斯特Test's avatar 泰斯特Test

[fix]修复get请求下全局变量替换参数

上级 49c6f6f5
......@@ -9,6 +9,7 @@ from bson import ObjectId
from threading import Thread
import ssl
ssl._create_default_https_context = ssl._create_unverified_context
......@@ -66,7 +67,7 @@ class tester:
if 'lastManualTestResult' in test_case:
test_case.pop('lastManualTestResult')
domain = test_case["domain"] if 'domain' in test_case and isinstance(test_case["domain"], str) and \
not test_case["domain"].strip() == '' else self.domain
not test_case["domain"].strip() == '' else self.domain
if 'requestProtocol' in test_case and 'route' in test_case:
url = '%s://%s%s' % (test_case['requestProtocol'].lower(), domain, test_case['route'])
test_case["url"] = url
......@@ -90,7 +91,7 @@ class tester:
if 'lastManualTestResult' in test_case:
test_case.pop('lastManualTestResult')
domain = test_case["domain"] if 'domain' in test_case and isinstance(test_case["domain"], str) and \
not test_case["domain"].strip() == '' else self.domain
not test_case["domain"].strip() == '' else self.domain
if 'requestProtocol' in test_case and 'route' in test_case:
url = '%s://%s%s' % (test_case['requestProtocol'].lower(), domain, test_case['route'])
test_case["url"] = url
......@@ -134,36 +135,27 @@ class tester:
set_global_vars = None # for example {'status': ['status']}
domain = test_case["domain"] if 'domain' in test_case and isinstance(test_case["domain"], str) and \
not test_case["domain"].strip() == '' else self.domain
not test_case["domain"].strip() == '' else self.domain
if 'requestProtocol' in test_case and 'route' in test_case:
test_case['route'] = \
common.resolve_global_var(pre_resolve_var=test_case['route'], global_var_dic=self.global_vars) \
if isinstance(test_case['route'], str) else test_case['route']
url = '%s://%s%s' % (test_case['requestProtocol'].lower(), domain, test_case['route'])
if 'requestMethod' in test_case:
method = test_case['requestMethod']
if 'requestMethod' in test_case and 'presendParams' in test_case \
and test_case['requestMethod'].lower() == 'get':
url += '?'
for key, value in test_case['presendParams'].items():
if value is not None:
get_method_params_value = common.resolve_global_var(pre_resolve_var=value,
global_var_dic=self.global_vars) \
if isinstance(value, str) else value
url += '%s=%s&' % (key, get_method_params_value)
url = url[0:(len(url) - 1)]
elif 'presendParams' in test_case and isinstance(test_case['presendParams'], dict):
if 'presendParams' in test_case and isinstance(test_case['presendParams'], dict):
# dict 先转 str,方便全局变量替换
test_case['presendParams'] = str(test_case['presendParams'])
# 全局替换
test_case['presendParams'] = common.resolve_global_var(pre_resolve_var=test_case['presendParams'],
global_var_dic=self.global_vars)
# 转回 dict
test_case['presendParams'] = ast.literal_eval(test_case['presendParams'])
json_data = test_case['presendParams']
if 'headers' in test_case and not test_case['headers'] in ["", None, {}, {'': ''}]:
......@@ -171,8 +163,8 @@ class tester:
for header in test_case['headers']:
if not header['name'].strip() == '':
headers[header['name']] = \
common.resolve_global_var(pre_resolve_var=header['value'], global_var_dic=self.global_vars)\
if isinstance(header['value'], str) else headers[header['name']]
common.resolve_global_var(pre_resolve_var=header['value'], global_var_dic=self.global_vars) \
if isinstance(header['value'], str) else headers[header['name']]
else:
raise TypeError('headers must be list!')
......@@ -193,13 +185,17 @@ class tester:
use_json_data = len(list(filter(lambda x: str(x).lower() == 'content-type' and 'json'
in headers[x], headers.keys() if headers else {}))) > 0
response = session.request(url=url, method=method, json=json_data, headers=headers, verify=False) if use_json_data\
else session.request(url=url, method=method, data=json_data, headers=headers, verify=False)
if test_case['requestMethod'].lower() == 'get':
response = session.request(url=url, method=method, params=json_data, headers=headers, verify=False)
else:
response = session.request(url=url, method=method, json=json_data, headers=headers,
verify=False) if use_json_data \
else session.request(url=url, method=method, data=json_data, headers=headers, verify=False)
except BaseException as e:
returned_data["status"] = 'failed'
returned_data["testConclusion"].append('请求失败, 错误信息: <%s> ' % e)
return returned_data
returned_data["status"] = 'failed'
returned_data["testConclusion"].append('请求失败, 错误信息: <%s> ' % e)
return returned_data
test_case['headers'] = headers # 重新赋值生成报告时用
......@@ -222,20 +218,20 @@ class tester:
if 'checkHttpCode' in test_case and not test_case['checkHttpCode'] in ["", None]:
check_http_code = test_case['checkHttpCode']
if check_http_code and not str(response_status_code) == str(check_http_code):
returned_data["status"] = 'failed'
returned_data["testConclusion"].append('响应状态码错误, 期待值: <%s>, 实际值: <%s>。\t'
% (check_http_code, response_status_code))
return returned_data
is_check_res_data_valid = isinstance(test_case.get('checkResponseData'), list) and\
is_check_res_data_valid = isinstance(test_case.get('checkResponseData'), list) and \
len(list(filter(lambda x: str(x.get('regex')).strip() == '',
test_case.get('checkResponseData')))) < 1
is_check_res_similarity_valid = isinstance(test_case.get('checkResponseSimilarity'), list) and\
is_check_res_similarity_valid = isinstance(test_case.get('checkResponseSimilarity'), list) and \
len(list(filter(lambda x: isinstance(x.get('targetSimilarity'), type(None)),
test_case.get('checkResponseSimilarity')))) < 1
is_check_res_number_valid = isinstance(test_case.get('checkResponseNumber'), list) and\
is_check_res_number_valid = isinstance(test_case.get('checkResponseNumber'), list) and \
len(list(filter(lambda x: str(x.get('expressions').get('expectResult')).strip()
== '', test_case.get('checkResponseNumber')))) < 1
# TODO 目前默认当 is_check_res_similarity_valid 和 is_check_res_number_valid 为真时,返回格式必须可转 json ,可优化
......@@ -245,7 +241,8 @@ class tester:
returned_data["testConclusion"].append('服务器返回格式不是json, 错误信息: %s, 服务器返回为: %s '
% (e, response.text)) if returned_data.get('status') and \
returned_data.get('status') == 'failed' else None
returned_data.get(
'status') == 'failed' else None
if returned_data['status'] == 'ok':
returned_data["testConclusion"].append('测试通过')
......@@ -283,7 +280,7 @@ class tester:
for index, crs in enumerate(test_case['checkResponseSimilarity']):
if not isinstance(crs, dict) or 'baseText' not in crs or 'targetSimilarity' not in crs \
or 'compairedText' not in crs or not isinstance(crs['baseText'], str) \
or not isinstance(crs['compairedText'], str):
or not isinstance(crs['compairedText'], str):
raise TypeError('checkResponseSimilarity is not valid!')
test_case['checkResponseSimilarity'][index]['baseText'] = \
common.resolve_global_var(pre_resolve_var=crs['baseText'], global_var_dic=self.global_vars) if \
......@@ -377,7 +374,8 @@ class tester:
returned_data["status"] = 'failed'
returned_data["testConclusion"].append('相似度校验未达标!已对比字符串: 「%s」、「%s」, 实际相似度: 「%s」 '
'预期相似度: 「%s」。\t ' % (base_text, compaired_text,
actual_similarity, target_similarity))
actual_similarity,
target_similarity))
except BaseException as e:
returned_data["status"] = 'failed'
returned_data["testConclusion"].append('判断相似度时报错, 模型服务器可能已宕机/断网。具体错误信息: <%s>。\t' % e)
......@@ -396,7 +394,7 @@ class tester:
test_result = common.format_response_in_dic(test_result)
self.test_result_list[index] = test_result
testing_case_model.update({"_id": ObjectId(test_case_id)},
{'$set': {'lastManualTestResult': test_result}})
{'$set': {'lastManualTestResult': test_result}})
def send_report(self, test_report_model, project_id, executor_nick_name, execution_mode):
test_count = len(self.test_result_list)
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册