提交 a18286fd 编写于 作者: L liwh9

add fuzzy query for Name Field

上级 a8e15214
......@@ -14,7 +14,7 @@ from flask_login import login_required
@app.route('/api/project/<project_id>/caseSuiteList', methods=['GET', 'POST'])
@login_required
def case_suite_list(project_id):
total_num, case_suites = common.get_total_num_and_arranged_data(CaseSuite, request.args)
total_num, case_suites = common.get_total_num_and_arranged_data(CaseSuite, request.args, fuzzy_fields=['name'])
return jsonify({'status': 'ok', 'data': {'totalNum': total_num, 'rows': case_suites}})
......@@ -95,13 +95,11 @@ def update_case_suite(project_id, case_suite_id):
filtered_data = CaseSuite.filter_field(request.get_json())
for key, value in filtered_data.items():
CaseSuite.update({"_id": ObjectId(case_suite_id)},
{'$set': {key: value}})
{'$set': {key: value}})
update_response = CaseSuite.update({"_id": ObjectId(case_suite_id)},
{'$set': {'lastUpdateTime': datetime.datetime.utcnow()}},)
{'$set': {'lastUpdateTime': datetime.datetime.utcnow()}}, )
if update_response["n"] == 0:
return jsonify({'status': 'failed', 'data': '未找到相应更新数据!'})
return jsonify({'status': 'ok', 'data': '更新成功'})
except BaseException as e:
return jsonify({'status': 'failed', 'data': '更新失败: %s' % e})
......@@ -12,7 +12,7 @@ import datetime
@app.route('/api/project/projectList', methods=['GET'])
@login_required
def project_list():
total_num, projects = common.get_total_num_and_arranged_data(Project, request.args)
total_num, projects = common.get_total_num_and_arranged_data(Project, request.args, fuzzy_fields=['name'])
return jsonify({'status': 'ok', 'data': {'totalNum': total_num, 'rows': projects}})
......@@ -39,11 +39,9 @@ def update_project(project_id):
Project.update({"_id": ObjectId(project_id)},
{'$set': {key: value}})
update_response = Project.update({"_id": ObjectId(project_id)},
{'$set': {'lastUpdateTime': datetime.datetime.utcnow()}},)
{'$set': {'lastUpdateTime': datetime.datetime.utcnow()}}, )
if update_response["n"] == 0:
return jsonify({'status': 'failed', 'data': '未找到相应更新数据!'})
return jsonify({'status': 'ok', 'data': '更新成功'})
except BaseException as e:
return jsonify({'status': 'failed', 'data': '更新失败: %s' % e})
......@@ -21,7 +21,7 @@ import datetime
@app.route('/api/project/<project_id>/caseSuiteList/<case_suite_id>/caseList', methods=['GET'])
@login_required
def case_list(project_id, case_suite_id):
total_num, testing_cases = common.get_total_num_and_arranged_data(TestingCase, request.args)
total_num, testing_cases = common.get_total_num_and_arranged_data(TestingCase, request.args, fuzzy_fields=['name'])
return jsonify({'status': 'ok', 'data': {'totalNum': total_num, 'rows': testing_cases}})
......@@ -97,7 +97,7 @@ def update_case(project_id, case_suite_id, case_id):
TestingCase.update({"_id": ObjectId(case_id)},
{'$set': {key: value}})
update_response = TestingCase.update({"_id": ObjectId(case_id)},
{'$set': {'lastUpdateTime': datetime.datetime.utcnow()}})
{'$set': {'lastUpdateTime': datetime.datetime.utcnow()}})
if update_response["n"] == 0:
return jsonify({'status': 'failed', 'data': '未找到相应更新数据!'})
return jsonify({'status': 'ok', 'data': '更新成功'})
......@@ -163,7 +163,8 @@ def start_test():
return jsonify({'status': 'failed', 'data': '未在「测试用例」中找到任何「启用的」接口测试用例'})
testing_cases = copy.deepcopy(TestingCase.find({'isDeleted': {'$ne': True}, 'status': True}) # sort吃顺序
.sort([('caseSuiteId', pymongo.ASCENDING), ('createAt', pymongo.ASCENDING)])) # 再次初始化 Cursor object
.sort(
[('caseSuiteId', pymongo.ASCENDING), ('createAt', pymongo.ASCENDING)])) # 再次初始化 Cursor object
if case_id_list:
for testing_case in testing_cases:
if str(testing_case["_id"]) in case_id_list:
......@@ -194,7 +195,8 @@ def start_test():
if not is_single_test:
try:
tester.execute_all_test_and_send_report(TestingCase, TestReport, project_id, executor_nick_name, execution_mode)
tester.execute_all_test_and_send_report(TestingCase, TestReport, project_id, executor_nick_name,
execution_mode)
return jsonify({'status': 'ok', 'data': '测试已启动,稍后请留意自动化测试报告'})
except BaseException as e:
return jsonify({'status': 'failed', 'data': '测试启动失败: %s' % e})
......@@ -251,29 +253,29 @@ def single_test_result(case_id):
test_case_map = {
'caseSuiteId': '用例组_id',
'caseSuiteName': '用例组名称',
'_id': '用例_id',
'name': '用例名称',
'description': '用例描述',
'testCaseType': '用例类型',
'requestProtocol': '请求协议',
'requestMethod': '请求方法',
'domain': '请求域名',
'route': '请求路由',
'headers': '请求头部',
'presendParams': '请求参数',
'checkHttpCode': '状态码校验',
'checkResponseData': '正则校验',
'checkResponseSimilarity': '文本相似度校验',
'checkResponseNumber': '数值校验',
'setGlobalVars': '设置全局变量',
'isClearCookie': '请求前是否清除Cookie',
'createAt': '创建时间/UTC',
'creatorNickName': '创建人',
'lastUpdateTime': '最后更新时间/UTC',
'lastUpdatorNickName': '最后更新人',
}
'caseSuiteId': '用例组_id',
'caseSuiteName': '用例组名称',
'_id': '用例_id',
'name': '用例名称',
'description': '用例描述',
'testCaseType': '用例类型',
'requestProtocol': '请求协议',
'requestMethod': '请求方法',
'domain': '请求域名',
'route': '请求路由',
'headers': '请求头部',
'presendParams': '请求参数',
'checkHttpCode': '状态码校验',
'checkResponseData': '正则校验',
'checkResponseSimilarity': '文本相似度校验',
'checkResponseNumber': '数值校验',
'setGlobalVars': '设置全局变量',
'isClearCookie': '请求前是否清除Cookie',
'createAt': '创建时间/UTC',
'creatorNickName': '创建人',
'lastUpdateTime': '最后更新时间/UTC',
'lastUpdatorNickName': '最后更新人',
}
@app.route("/api/importTestCases", methods=['POST'])
......@@ -313,7 +315,8 @@ def import_test_cases():
missing_attributes = [nip for nip in non_intersection if nip in test_case_map.values()]
return jsonify({'status': 'failed', 'data': '「测试用例」Sheet 表头缺失字段: %s' % missing_attributes}) \
if missing_attributes else jsonify({'status': 'failed', 'data': '「测试用例」Sheet 表头存在多余字段: %s' %
[nip for nip in non_intersection if nip not in test_case_map.values()]})
[nip for nip in non_intersection if
nip not in test_case_map.values()]})
attributes_indexes = [test_case_attributes.index(v) for v in test_case_map.values()]
......@@ -331,8 +334,8 @@ def import_test_cases():
for j, v in enumerate(test_case_info.keys()):
test_case_info[v] = test_case_table.row_values(i + 1)[attributes_indexes[j]]
try:
is_case_exist, pre_import_case_info, is_case_suite_exist\
= get_pre_import_case_info(test_case_info, test_case_mapping=test_case_map, table_row_index=(i+2))
is_case_exist, pre_import_case_info, is_case_suite_exist \
= get_pre_import_case_info(test_case_info, test_case_mapping=test_case_map, table_row_index=(i + 2))
except BaseException as b_e:
return jsonify({'status': 'failed', 'data': '导入数据异常: %s' % b_e})
......@@ -343,7 +346,7 @@ def import_test_cases():
pre_import_case_info = TestingCase.filter_field(pre_import_case_info, use_set_default=False)
result = str(TestingCase.update({"_id": ObjectId(pre_import_case_info.get('_id'))},
{'$set': pre_import_case_info})) + \
' _id: {}'.format(pre_import_case_info.get('_id'))
' _id: {}'.format(pre_import_case_info.get('_id'))
update_count += 1
else:
try:
......@@ -361,13 +364,13 @@ def import_test_cases():
# 在用例列表内导入
else:
inserted_case_suite_id = None
case_suite_name = pre_import_case_info.get('caseSuiteName')\
case_suite_name = pre_import_case_info.get('caseSuiteName') \
if pre_import_case_info.get('caseSuiteName') else ''
if is_case_suite_exist:
if not case_suite_name == CaseSuite.find_one(
{"_id": ObjectId(pre_import_case_info.get('caseSuiteId'))})['name']:
CaseSuite.update({"_id": ObjectId(pre_import_case_info.get('caseSuiteId'))},
{'$set': {'name': case_suite_name}})
{'$set': {'name': case_suite_name}})
else:
pass
else:
......@@ -393,8 +396,8 @@ def import_test_cases():
if is_case_exist:
pre_import_case_info = TestingCase.filter_field(pre_import_case_info, use_set_default=False)
result = str(TestingCase.update({"_id": ObjectId(pre_import_case_info.get('_id'))},
{'$set': pre_import_case_info})) + ' _id: {}'\
.format(pre_import_case_info.get('_id'))
{'$set': pre_import_case_info})) + ' _id: {}' \
.format(pre_import_case_info.get('_id'))
update_count += 1
else:
......@@ -465,7 +468,7 @@ def export_test_cases():
elif isinstance(case.get(key), datetime.datetime):
case_data = str(case.get(key)).replace('.', ':', 1) \
if common.can_convert_to_str(case.get(key)) \
and str(case.get(key)).count('.') < 2 else str(case.get(key))
and str(case.get(key)).count('.') < 2 else str(case.get(key))
else:
case_data = str(case.get(key)) if case.get(key) is not None else ''
export_case.append(case_data)
......@@ -481,7 +484,7 @@ def export_test_cases():
print(e)
return _case_info
export_testing_cases = map(export_case_format, map(add_case_suite_name, TestingCase.find(query)))
export_testing_cases = map(export_case_format, map(add_case_suite_name, TestingCase.find(query)))
bytes_io = BytesIO()
workbook = xlsxwriter.Workbook(bytes_io, {'in_memory': True})
......@@ -498,9 +501,3 @@ def export_test_cases():
bytes_io.seek(0)
return send_file(bytes_io, mimetype='application/vnd.openxmlformats-officedocument.spreadsheetml.sheet')
......@@ -36,7 +36,8 @@ class tester:
from app import nlper
self.nlper = nlper
except ImportError as e:
raise ImportError('nlp模型导入失败!<%s>' % e)
pass
# raise ImportError('nlp模型导入失败!<%s>' % e)
self.test_case_list = test_case_list
self.domain = domain
......
......@@ -136,8 +136,16 @@ def format_js_dic_to_python_dic(query_dic):
return query_dic
def get_total_num_and_arranged_data(raw_model, query_dic):
def get_total_num_and_arranged_data(raw_model, query_dic, fuzzy_fields=None):
query_dic = query_dic.to_dict() if query_dic.to_dict() else {}
if fuzzy_fields is not None:
if not isinstance(fuzzy_fields, list):
raise TypeError('fuzzy_fields need to be list.')
for fuzzy_field in fuzzy_fields:
if not isinstance(fuzzy_field, str):
raise TypeError('fuzzy_field need to be str')
if fuzzy_field in query_dic:
query_dic[fuzzy_field] = re.compile(query_dic[fuzzy_field])
query_dic = format_js_dic_to_python_dic(query_dic)
raw_model_copy = copy.deepcopy(raw_model)
raw_model_data_copy = []
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册