tester.py 23.3 KB
Newer Older
1 2 3 4 5 6
import requests
import json
import time
import datetime
import re
from utils import common
7
import ast
8 9 10
from bson import ObjectId
from threading import Thread

11
import ssl
12

13 14
ssl._create_default_https_context = ssl._create_unverified_context

15

16
def async_test(f):
17 18 19 20 21 22
    def wrapper(*args, **kwargs):
        thr = Thread(target=f, args=args, kwargs=kwargs)
        thr.start()

    return wrapper

23

24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57
# 基础测试类,负责获取测试用例的参数,请求,验证等信息后,进行测试,测试通过则返回True,不通过则抛出异常 -- 2019-1-7 09:27

# 基础测试类,负责获取测试用例的参数,请求,验证等信息后,进行测试,测试通过则返回{'status': 'ok'} ,
# 不通过则返回{'status': 'failed'} -- 2019-1-11 15:03

class tester:

    def __init__(self, test_case_list, domain, test_result_list=None, max_retries=5, global_vars=None):

        if not isinstance(test_case_list, list):
            raise ValueError('test_case_list must be a list!')

        try:
            from app import nlper
            self.nlper = nlper
        except ImportError as e:
            raise ImportError('nlp模型导入失败!<%s>' % e)

        self.test_case_list = test_case_list
        self.domain = domain
        self.session = requests.Session()

        if isinstance(max_retries, int) and max_retries > 0:
            # 设置连接重试
            a = requests.adapters.HTTPAdapter(max_retries=max_retries)
            self.session.mount('https://', a)
            self.session.mount('http://', a)

        self.test_result_list = test_result_list

        if global_vars is None:
            self.global_vars = {}

    # 异步方便返回测试启动是否成功的提示给前端
58
    @async_test
59 60 61 62 63 64 65 66 67 68 69
    def execute_all_test_and_send_report(self, testing_case_model, test_report_model,
                                         project_id, executor_nick_name, execution_mode):
        test_results = []
        for test_case in self.test_case_list:
            test_start_time = time.time()
            test_start_datetime = datetime.datetime.utcnow()
            test_result = self.execute_single_test(test_case)
            test_end_time = time.time()
            if 'lastManualTestResult' in test_case:
                test_case.pop('lastManualTestResult')
            domain = test_case["domain"] if 'domain' in test_case and isinstance(test_case["domain"], str) and \
70
                                            not test_case["domain"].strip() == '' else self.domain
71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93
            if 'requestProtocol' in test_case and 'route' in test_case:
                url = '%s://%s%s' % (test_case['requestProtocol'].lower(), domain, test_case['route'])
                test_case["url"] = url
            test_result["testBaseInfo"] = test_case
            test_result["testStartTime"] = test_start_datetime
            test_result["spendingTimeInSec"] = round(test_end_time - test_start_time, 3)
            test_results.append(test_result)

        self.test_result_list = test_results
        self.update_case_info(testing_case_model)
        self.send_report(test_report_model, project_id, executor_nick_name, execution_mode)

    # TODO 方便单个接口调试时同步返回结果,需重构
    def execute_all_test_for_cron_and_single_test(self):
        test_results = []
        for test_case in self.test_case_list:
            test_start_time = time.time()
            test_start_datetime = datetime.datetime.utcnow()
            test_result = self.execute_single_test(test_case)
            test_end_time = time.time()
            if 'lastManualTestResult' in test_case:
                test_case.pop('lastManualTestResult')
            domain = test_case["domain"] if 'domain' in test_case and isinstance(test_case["domain"], str) and \
94
                                            not test_case["domain"].strip() == '' else self.domain
95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137
            if 'requestProtocol' in test_case and 'route' in test_case:
                url = '%s://%s%s' % (test_case['requestProtocol'].lower(), domain, test_case['route'])
                test_case["url"] = url
            test_result["testBaseInfo"] = test_case
            test_result["testStartTime"] = test_start_datetime
            test_result["spendingTimeInSec"] = round(test_end_time - test_start_time, 3)
            test_results.append(test_result)
        return test_results

    def execute_single_test(self, test_case):
        returned_data = dict()
        returned_data["_id"] = test_case["_id"]
        returned_data["testConclusion"] = []
        if not isinstance(test_case, dict):
            returned_data["status"] = 'failed'
            returned_data["testConclusion"].append('测试用例结构不正确! ')
            return returned_data

        def validate_test_case(test_case):
            compulsory_key_list = ['requestProtocol', 'route', 'requestMethod']
            return all([compulsory_key in test_case.keys() for compulsory_key in compulsory_key_list])

        if not validate_test_case(test_case):
            returned_data["status"] = 'failed'
            returned_data["testConclusion"].append('测试用例缺失必要参数! ')
            return returned_data

        if test_case.get('isClearCookie'):
            self.session.cookies.clear()

        session = self.session

        url = None
        method = None
        json_data = None
        headers = dict()
        check_http_code = None
        check_response_data = None
        check_response_number = None
        check_response_similarity = None
        set_global_vars = None  # for example {'status': ['status']}

        domain = test_case["domain"] if 'domain' in test_case and isinstance(test_case["domain"], str) and \
138
                                        not test_case["domain"].strip() == '' else self.domain
139 140 141 142 143
        if 'requestProtocol' in test_case and 'route' in test_case:
            test_case['route'] = \
                common.resolve_global_var(pre_resolve_var=test_case['route'], global_var_dic=self.global_vars) \
                    if isinstance(test_case['route'], str) else test_case['route']
            url = '%s://%s%s' % (test_case['requestProtocol'].lower(), domain, test_case['route'])
144

145 146
        if 'requestMethod' in test_case:
            method = test_case['requestMethod']
147 148

        if 'presendParams' in test_case and isinstance(test_case['presendParams'], dict):
149 150
            # dict 先转 str,方便全局变量替换
            test_case['presendParams'] = str(test_case['presendParams'])
151

152 153 154
            # 全局替换
            test_case['presendParams'] = common.resolve_global_var(pre_resolve_var=test_case['presendParams'],
                                                                   global_var_dic=self.global_vars)
155

156 157
            # 转回 dict
            test_case['presendParams'] = ast.literal_eval(test_case['presendParams'])
158

159 160 161 162 163 164 165
            json_data = test_case['presendParams']

        if 'headers' in test_case and not test_case['headers'] in ["", None, {}, {'': ''}]:
            if isinstance(test_case['headers'], list):
                for header in test_case['headers']:
                    if not header['name'].strip() == '':
                        headers[header['name']] = \
166 167
                            common.resolve_global_var(pre_resolve_var=header['value'], global_var_dic=self.global_vars) \
                                if isinstance(header['value'], str) else headers[header['name']]
168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183
            else:
                raise TypeError('headers must be list!')

        if 'setGlobalVars' in test_case and not test_case['setGlobalVars'] in [[], {}, "", None]:
            set_global_vars = test_case['setGlobalVars']

        headers = None if headers == {} else headers

        test_case['cookies'] = []
        for key, value in session.cookies.items():
            cookie_dic = dict()
            cookie_dic['name'] = key
            cookie_dic['value'] = value
            test_case['cookies'].append(cookie_dic)

        try:
184 185

            use_json_data = len(list(filter(lambda x: str(x).lower() == 'content-type' and 'json'
186 187
                                                      in headers[x], headers.keys() if headers else {}))) > 0

188 189 190 191 192 193
            if test_case['requestMethod'].lower() == 'get':
                response = session.request(url=url, method=method, params=json_data, headers=headers, verify=False)
            else:
                response = session.request(url=url, method=method, json=json_data, headers=headers,
                                           verify=False) if use_json_data \
                    else session.request(url=url, method=method, data=json_data, headers=headers, verify=False)
194

195
        except BaseException as e:
196 197 198
            returned_data["status"] = 'failed'
            returned_data["testConclusion"].append('请求失败, 错误信息: <%s> ' % e)
            return returned_data
199 200 201 202 203 204 205 206 207 208 209

        test_case['headers'] = headers  # 重新赋值生成报告时用

        response_status_code = response.status_code
        returned_data["responseHttpStatusCode"] = response_status_code
        returned_data["responseData"] = response.text

        try:
            response_json = json.loads(response.text) if isinstance(response.text, str) \
                                                         and response.text.strip() else {}
        except BaseException as e:
210 211 212 213 214 215 216 217 218

            if set_global_vars and isinstance(set_global_vars, list):
                for set_global_var in set_global_vars:
                    if isinstance(set_global_var, dict) and isinstance(set_global_var.get('name'), str):
                        name = set_global_var.get('name')
                        query = set_global_var.get('query')
                        value = common.dict_get(response.text, query)
                        self.global_vars[name] = str(value) if value else value

泰斯特Test's avatar
泰斯特Test 已提交
219 220
            if 'checkHttpCode' in test_case and not test_case['checkHttpCode'] in ["", None]:
                check_http_code = test_case['checkHttpCode']
221

泰斯特Test's avatar
泰斯特Test 已提交
222 223 224 225 226
            if check_http_code and not str(response_status_code) == str(check_http_code):
                returned_data["status"] = 'failed'
                returned_data["testConclusion"].append('响应状态码错误, 期待值: <%s>, 实际值: <%s>。\t'
                                                       % (check_http_code, response_status_code))
                return returned_data
227 228

            is_check_res_data_valid = isinstance(test_case.get('checkResponseData'), list) and \
229 230
                                      len(list(filter(lambda x: str(x.get('regex')).strip() == '',
                                                      test_case.get('checkResponseData')))) < 1
231
            is_check_res_similarity_valid = isinstance(test_case.get('checkResponseSimilarity'), list) and \
232 233
                                            len(list(filter(lambda x: isinstance(x.get('targetSimilarity'), type(None)),
                                                            test_case.get('checkResponseSimilarity')))) < 1
234
            is_check_res_number_valid = isinstance(test_case.get('checkResponseNumber'), list) and \
235 236 237 238 239 240 241 242 243
                                        len(list(filter(lambda x: str(x.get('expressions').get('expectResult')).strip()
                                                                  == '', test_case.get('checkResponseNumber')))) < 1
            # TODO 目前默认当 is_check_res_similarity_valid 和  is_check_res_number_valid 为真时,返回格式必须可转 json ,可优化
            is_test_failed = is_check_res_data_valid or is_check_res_number_valid or is_check_res_similarity_valid

            returned_data['status'] = 'failed' if is_test_failed else 'ok'

            returned_data["testConclusion"].append('服务器返回格式不是json, 错误信息: %s, 服务器返回为: %s '
                                                   % (e, response.text)) if returned_data.get('status') and \
244 245
                                                                            returned_data.get(
                                                                                'status') == 'failed' else None
246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282
            if returned_data['status'] == 'ok':
                returned_data["testConclusion"].append('测试通过')

            return returned_data

        if set_global_vars and isinstance(set_global_vars, list):
            for set_global_var in set_global_vars:
                if isinstance(set_global_var, dict) and isinstance(set_global_var.get('name'), str):
                    name = set_global_var.get('name')
                    query = set_global_var.get('query')
                    value = common.dict_get(response_json, query)
                    self.global_vars[name] = str(value) if value else value

        if 'checkHttpCode' in test_case and not test_case['checkHttpCode'] in ["", None]:
            check_http_code = test_case['checkHttpCode']

        if 'checkResponseData' in test_case and not test_case['checkResponseData'] in [[], {}, "", None]:
            if not isinstance(test_case['checkResponseData'], list):
                raise TypeError('checkResponseData must be list!')
            for index, crd in enumerate(test_case['checkResponseData']):
                if not isinstance(crd, dict) or 'regex' not in crd or 'query' not in crd or \
                        not isinstance(crd['regex'], str) or not isinstance(crd['query'], list):
                    raise TypeError('checkResponseData is not valid!')

                # TODO 可开启/关闭 全局替换
                test_case['checkResponseData'][index]['regex'] = \
                    common.resolve_global_var(pre_resolve_var=crd['regex'], global_var_dic=self.global_vars) if \
                        crd.get('regex') and isinstance(crd.get('regex'), str) else ''  # 警告!python判断空字符串为False

            check_response_data = test_case['checkResponseData']

        if 'checkResponseSimilarity' in test_case and not test_case['checkResponseSimilarity'] in [[], {}, "", None]:
            if not isinstance(test_case['checkResponseSimilarity'], list):
                raise TypeError('checkResponseSimilarity must be list!')
            for index, crs in enumerate(test_case['checkResponseSimilarity']):
                if not isinstance(crs, dict) or 'baseText' not in crs or 'targetSimilarity' not in crs \
                        or 'compairedText' not in crs or not isinstance(crs['baseText'], str) \
283
                        or not isinstance(crs['compairedText'], str):
284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361
                    raise TypeError('checkResponseSimilarity is not valid!')
                test_case['checkResponseSimilarity'][index]['baseText'] = \
                    common.resolve_global_var(pre_resolve_var=crs['baseText'], global_var_dic=self.global_vars) if \
                        crs.get('baseText') and isinstance(crs.get('baseText'), str) else ''
                test_case['checkResponseSimilarity'][index]['compairedText'] = \
                    common.resolve_global_var(pre_resolve_var=crs['compairedText'], global_var_dic=self.global_vars) if \
                        crs.get('compairedText') and isinstance(crs.get('compairedText'), str) else ''
            check_response_similarity = test_case['checkResponseSimilarity']

        if 'checkResponseNumber' in test_case and not test_case['checkResponseNumber'] in [[], {}, "", None]:
            if not isinstance(test_case['checkResponseNumber'], list):
                raise TypeError('checkResponseNumber must be list!')
            for index, crn in enumerate(test_case['checkResponseNumber']):
                if not isinstance(crn, dict) or 'expressions' not in crn or \
                        not isinstance(crn['expressions'], dict):
                    raise TypeError('checkResponseNumber is not valid!')

                test_case['checkResponseNumber'][index]['expressions']['firstArg'] = \
                    common.resolve_global_var(pre_resolve_var=crn['expressions']['firstArg'],
                                              global_var_dic=self.global_vars) if \
                        crn['expressions'].get('firstArg') and isinstance(crn['expressions'].get('firstArg'),
                                                                          str) else ''

                test_case['checkResponseNumber'][index]['expressions']['secondArg'] = \
                    common.resolve_global_var(pre_resolve_var=crn['expressions']['secondArg'],
                                              global_var_dic=self.global_vars) if \
                        crn['expressions'].get('secondArg') and isinstance(crn['expressions'].get('secondArg'),
                                                                           str) else ''

                test_case['checkResponseNumber'][index]['expressions']['expectResult'] = \
                    common.resolve_global_var(pre_resolve_var=crn['expressions']['expectResult'],
                                              global_var_dic=self.global_vars) if \
                        crn['expressions'].get('expectResult') and isinstance(crn['expressions'].get('expectResult'),
                                                                              str) else ''
            check_response_number = test_case['checkResponseNumber']

        if check_http_code and not str(response_status_code) == str(check_http_code):
            returned_data["status"] = 'failed'
            returned_data["testConclusion"].append('响应状态码错误, 期待值: <%s>, 实际值: <%s>。\t'
                                                   % (check_http_code, response_status_code))
        if check_response_data:
            try:
                for crd in check_response_data:
                    regex = crd['regex']
                    if regex.strip() == '':
                        continue
                    query = crd['query']
                    real_value = common.dict_get(response_json, query)
                    if real_value is None:
                        returned_data["status"] = 'failed'
                        returned_data["testConclusion"].append('未找到正则校验的Json值(查询语句为: %s),   服务器响应为: %s'
                                                               % (query, response_json))
                        return returned_data
                    result = re.search(regex, str(real_value))  # python 将regex字符串取了r''(原生字符串)
                    if not result:
                        returned_data["status"] = 'failed'
                        returned_data["testConclusion"].append('判断响应值错误(查询语句为: %s),    响应值应满足正则: <%s>,\
                                                                    实际值: <%s> (%s)。(正则匹配时会将数据转化成string)\t'
                                                               % (query, regex, real_value, type(real_value)))
            except BaseException as e:
                returned_data["status"] = 'failed'
                returned_data["testConclusion"].append('判断响应值时报错, 错误信息: <%s>。\t' % e)

        if check_response_number:
            try:
                for crn in check_response_number:
                    expressions = crn['expressions']
                    # print(expressions)
                    if '' in expressions.values() or None in expressions.values():
                        continue
                    expressions_str, result = common.get_numbers_compared_result(expressions)
                    if not result:
                        returned_data["status"] = 'failed'
                        returned_data["testConclusion"].append('判断数值错误(判断表达式为: %s)。\t' % expressions_str)
            except BaseException as e:
                returned_data["status"] = 'failed'
                returned_data["testConclusion"].append('判断数值时报错, 错误信息: <%s>。\t ' % e)

362
        if hasattr(self, 'nlper') and self.nlper and check_response_similarity:
363 364 365 366 367 368 369 370 371 372 373 374 375 376
            try:
                for crs in check_response_similarity:
                    base_text = crs['baseText']
                    compaired_text = crs['compairedText']
                    target_similarity = crs['targetSimilarity']
                    if base_text.strip() == '' or compaired_text.strip() == '' or \
                            not common.can_convert_to_float(target_similarity):
                        continue
                    actual_similarity = self.nlper.get_text_similarity(base_text, compaired_text)

                    if float(actual_similarity) < float(target_similarity):
                        returned_data["status"] = 'failed'
                        returned_data["testConclusion"].append('相似度校验未达标!已对比字符串: 「%s」、「%s」, 实际相似度: 「%s」 '
                                                               '预期相似度: 「%s」。\t ' % (base_text, compaired_text,
377 378
                                                                                    actual_similarity,
                                                                                    target_similarity))
379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396
            except BaseException as e:
                returned_data["status"] = 'failed'
                returned_data["testConclusion"].append('判断相似度时报错, 模型服务器可能已宕机/断网。具体错误信息: <%s>。\t' % e)

        if returned_data["testConclusion"] == []:
            returned_data["status"] = 'ok'
            returned_data["testConclusion"].append('测试通过')
        else:
            returned_data["status"] = 'failed'
            returned_data["testConclusion"].append('测试不通过!')
        return returned_data

    def update_case_info(self, testing_case_model):
        for index, test_result in enumerate(self.test_result_list):
            test_case_id = test_result["_id"]
            test_result = common.format_response_in_dic(test_result)
            self.test_result_list[index] = test_result
            testing_case_model.update({"_id": ObjectId(test_case_id)},
397
                                      {'$set': {'lastManualTestResult': test_result}})
398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436

    def send_report(self, test_report_model, project_id, executor_nick_name, execution_mode):
        test_count = len(self.test_result_list)
        passed_count = len(
            list(filter(lambda x: x == 'ok', [test_result["status"] for test_result in self.test_result_list])))
        failed_count = len(
            list(filter(lambda x: x == 'failed', [test_result["status"] for test_result in self.test_result_list])))
        passed_rate = '%d' % round((passed_count / test_count) * 100, 2) + '%'

        if test_count > 0:
            for test_result in self.test_result_list:
                if 'testBaseInfo' in test_result and 'lastManualTestResult' in test_result['testBaseInfo']:
                    test_result['testBaseInfo'].pop('lastManualTestResult')

            raw_data = {
                "projectId": ObjectId(project_id),
                "testCount": test_count,
                "passCount": passed_count,
                "failedCount": failed_count,
                "passRate": passed_rate,
                "comeFrom": execution_mode,
                "executorNickName": executor_nick_name,
                "testDetail": self.test_result_list,
                "createAt": datetime.datetime.utcnow()
            }
            filtered_data = test_report_model.filter_field(raw_data, use_set_default=True)
            test_report_model.insert(
                filtered_data
            )


if __name__ == '__main__':
    pass