提交 a5d489c0 编写于 作者: W wenjun

add OpenHarmony 1.0 baseline

上级 2b471859
### 该问题是怎么引起的?
### 重现步骤
### 报错信息
### 相关的Issue
### 原因(目的、解决的问题等)
### 描述(做了什么,变更了什么)
### 测试用例(新增、改动、可能影响的功能)
# Copyright (c) 2020 Huawei Device Co., Ltd.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import("//test/xts/tools/build/suite_lite.gni")
deploy_suite("xdevice"){
suite_name = "acts"
}
Apache License
Version 2.0, January 2004
http://www.apache.org/licenses/
TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
1. Definitions.
"License" shall mean the terms and conditions for use, reproduction,
and distribution as defined by Sections 1 through 9 of this document.
"Licensor" shall mean the copyright owner or entity authorized by
the copyright owner that is granting the License.
"Legal Entity" shall mean the union of the acting entity and all
other entities that control, are controlled by, or are under common
control with that entity. For the purposes of this definition,
"control" means (i) the power, direct or indirect, to cause the
direction or management of such entity, whether by contract or
otherwise, or (ii) ownership of fifty percent (50%) or more of the
outstanding shares, or (iii) beneficial ownership of such entity.
"You" (or "Your") shall mean an individual or Legal Entity
exercising permissions granted by this License.
"Source" form shall mean the preferred form for making modifications,
including but not limited to software source code, documentation
source, and configuration files.
"Object" form shall mean any form resulting from mechanical
transformation or translation of a Source form, including but
not limited to compiled object code, generated documentation,
and conversions to other media types.
"Work" shall mean the work of authorship, whether in Source or
Object form, made available under the License, as indicated by a
copyright notice that is included in or attached to the work
(an example is provided in the Appendix below).
"Derivative Works" shall mean any work, whether in Source or Object
form, that is based on (or derived from) the Work and for which the
editorial revisions, annotations, elaborations, or other modifications
represent, as a whole, an original work of authorship. For the purposes
of this License, Derivative Works shall not include works that remain
separable from, or merely link (or bind by name) to the interfaces of,
the Work and Derivative Works thereof.
"Contribution" shall mean any work of authorship, including
the original version of the Work and any modifications or additions
to that Work or Derivative Works thereof, that is intentionally
submitted to Licensor for inclusion in the Work by the copyright owner
or by an individual or Legal Entity authorized to submit on behalf of
the copyright owner. For the purposes of this definition, "submitted"
means any form of electronic, verbal, or written communication sent
to the Licensor or its representatives, including but not limited to
communication on electronic mailing lists, source code control systems,
and issue tracking systems that are managed by, or on behalf of, the
Licensor for the purpose of discussing and improving the Work, but
excluding communication that is conspicuously marked or otherwise
designated in writing by the copyright owner as "Not a Contribution."
"Contributor" shall mean Licensor and any individual or Legal Entity
on behalf of whom a Contribution has been received by Licensor and
subsequently incorporated within the Work.
2. Grant of Copyright License. Subject to the terms and conditions of
this License, each Contributor hereby grants to You a perpetual,
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
copyright license to reproduce, prepare Derivative Works of,
publicly display, publicly perform, sublicense, and distribute the
Work and such Derivative Works in Source or Object form.
3. Grant of Patent License. Subject to the terms and conditions of
this License, each Contributor hereby grants to You a perpetual,
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
(except as stated in this section) patent license to make, have made,
use, offer to sell, sell, import, and otherwise transfer the Work,
where such license applies only to those patent claims licensable
by such Contributor that are necessarily infringed by their
Contribution(s) alone or by combination of their Contribution(s)
with the Work to which such Contribution(s) was submitted. If You
institute patent litigation against any entity (including a
cross-claim or counterclaim in a lawsuit) alleging that the Work
or a Contribution incorporated within the Work constitutes direct
or contributory patent infringement, then any patent licenses
granted to You under this License for that Work shall terminate
as of the date such litigation is filed.
4. Redistribution. You may reproduce and distribute copies of the
Work or Derivative Works thereof in any medium, with or without
modifications, and in Source or Object form, provided that You
meet the following conditions:
(a) You must give any other recipients of the Work or
Derivative Works a copy of this License; and
(b) You must cause any modified files to carry prominent notices
stating that You changed the files; and
(c) You must retain, in the Source form of any Derivative Works
that You distribute, all copyright, patent, trademark, and
attribution notices from the Source form of the Work,
excluding those notices that do not pertain to any part of
the Derivative Works; and
(d) If the Work includes a "NOTICE" text file as part of its
distribution, then any Derivative Works that You distribute must
include a readable copy of the attribution notices contained
within such NOTICE file, excluding those notices that do not
pertain to any part of the Derivative Works, in at least one
of the following places: within a NOTICE text file distributed
as part of the Derivative Works; within the Source form or
documentation, if provided along with the Derivative Works; or,
within a display generated by the Derivative Works, if and
wherever such third-party notices normally appear. The contents
of the NOTICE file are for informational purposes only and
do not modify the License. You may add Your own attribution
notices within Derivative Works that You distribute, alongside
or as an addendum to the NOTICE text from the Work, provided
that such additional attribution notices cannot be construed
as modifying the License.
You may add Your own copyright statement to Your modifications and
may provide additional or different license terms and conditions
for use, reproduction, or distribution of Your modifications, or
for any such Derivative Works as a whole, provided Your use,
reproduction, and distribution of the Work otherwise complies with
the conditions stated in this License.
5. Submission of Contributions. Unless You explicitly state otherwise,
any Contribution intentionally submitted for inclusion in the Work
by You to the Licensor shall be under the terms and conditions of
this License, without any additional terms or conditions.
Notwithstanding the above, nothing herein shall supersede or modify
the terms of any separate license agreement you may have executed
with Licensor regarding such Contributions.
6. Trademarks. This License does not grant permission to use the trade
names, trademarks, service marks, or product names of the Licensor,
except as required for reasonable and customary use in describing the
origin of the Work and reproducing the content of the NOTICE file.
7. Disclaimer of Warranty. Unless required by applicable law or
agreed to in writing, Licensor provides the Work (and each
Contributor provides its Contributions) on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
implied, including, without limitation, any warranties or conditions
of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
PARTICULAR PURPOSE. You are solely responsible for determining the
appropriateness of using or redistributing the Work and assume any
risks associated with Your exercise of permissions under this License.
8. Limitation of Liability. In no event and under no legal theory,
whether in tort (including negligence), contract, or otherwise,
unless required by applicable law (such as deliberate and grossly
negligent acts) or agreed to in writing, shall any Contributor be
liable to You for damages, including any direct, indirect, special,
incidental, or consequential damages of any character arising as a
result of this License or out of the use or inability to use the
Work (including but not limited to damages for loss of goodwill,
work stoppage, computer failure or malfunction, or any and all
other commercial damages or losses), even if such Contributor
has been advised of the possibility of such damages.
9. Accepting Warranty or Additional Liability. While redistributing
the Work or Derivative Works thereof, You may choose to offer,
and charge a fee for, acceptance of support, warranty, indemnity,
or other liability obligations and/or rights consistent with this
License. However, in accepting such obligations, You may act only
on Your own behalf and on Your sole responsibility, not on behalf
of any other Contributor, and only if You agree to indemnify,
defend, and hold each Contributor harmless for any liability
incurred by, or claims asserted against, such Contributor by reason
of your accepting any such warranty or additional liability.
END OF TERMS AND CONDITIONS
{
"description": "Config for acts test suites",
"kits": [
{
"type": "QueryKit",
"server": "NfsServer",
"mount": [
{
"source": "resource/tools/query.bin",
"target": "/test_root/tools"
}
],
"query" : "/test_root/tools/query.bin"
}
]
}
<?xml version="1.0" encoding="UTF-8"?>
<!-- Copyright (c) 2020 Huawei Device Co., Ltd.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<user_config>
<environment>
<support_device>
<device_lite>true</device_lite>
</support_device>
<device type="com" label="wifiiot">
<serial>
<com></com>
<type>cmd</type>
<baund_rate>115200</baund_rate>
<data_bits>8</data_bits>
<stop_bits>1</stop_bits>
<timeout>20</timeout>
</serial>
<serial>
<com></com>
<type>deploy</type>
<baund_rate>115200</baund_rate>
</serial>
</device>
<device type="com" label="ipcamera">
<serial>
<com></com>
<type>cmd</type>
<baund_rate>115200</baund_rate>
<data_bits>8</data_bits>
<stop_bits>1</stop_bits>
<timeout>1</timeout>
</serial>
</device>
<device type="com" label="ipcamera">
<ip></ip>
<port></port>
</device>
</environment>
<testcases>
<dir></dir>
<server label="NfsServer">
<ip></ip>
<port></port>
<dir></dir>
<username></username>
<password></password>
<remote></remote>
</server>
</testcases>
<resource>
<dir></dir>
</resource>
</user_config>
\ No newline at end of file
详见:https://gitee.com/openharmony/docs/blob/master/readme/测试子系统README.md
@echo off
set BASE_DIR=%~dp0
set PYTHON=python
set TOOLS=tools
cd /d %BASE_DIR%
(where %PYTHON% | findstr %PYTHON%) >nul 2>&1 || (
@echo "Python3.7 or higher version required!"
goto:eof
)
python -c "import sys; exit(1) if sys.version_info.major < 3 or sys.version_info.minor < 7 else exit(0)"
@if errorlevel 1 (
@echo "Python3.7 or higher version required!"
goto:eof
)
python -c "import easy_install"
@if errorlevel 1 (
@echo "Please install setuptools first!"
goto:eof
)
if not exist %TOOLS% (
@echo "no %TOOLS% directory exist"
goto:eof
)
for %%a in (%TOOLS%/*.egg) do (
python -m easy_install --user %TOOLS%/%%a
@if errorlevel 1 (
@echo "Error occurs to install %%a!"
)
)
python -m xdevice %*
#!/usr/bin/env sh
# Copyright (c) Huawei Technologies Co., Ltd. 2019. All rights reserved.
error()
{
echo "$1"
exit 1
}
PYTHON="python3"
TOOLS="tools"
flag=$(command -v $PYTHON | grep -c $PYTHON)
if [ "$flag" -eq 0 ]; then
error "Python3.7 or higher version required!"
fi
$PYTHON -c 'import sys; exit(1) if sys.version_info.major < 3 or sys.version_info.minor < 7 else exit(0)' || \
error "Python3.7 or higher version required!"
cd $(dirname "$0") || error "Failure to change direcory!"
$PYTHON -c "import easy_install" || error "Please install setuptools first!"
if [ ! -d $TOOLS ]; then
error "$TOOLS directory not exists"
fi
for f in "$TOOLS"/*.egg
do
if [ ! -e "$f" ]; then
error "Can not find xdevice package!"
fi
$PYTHON -m easy_install --user "$f" || echo "Error occurs to install $f!"
done
$PYTHON -m xdevice "$@"
#!/usr/bin/env python3
# coding=utf-8
#
# Copyright (c) 2020 Huawei Device Co., Ltd.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
from setuptools import setup
def main():
setup(name='xdevice',
description='xdevice test framework',
url='',
package_dir={'': 'src'},
packages=['xdevice',
'xdevice._core',
'xdevice._core.build',
'xdevice._core.command',
'xdevice._core.config',
'xdevice._core.driver',
'xdevice._core.environment',
'xdevice._core.executor',
'xdevice._core.report',
'xdevice._core.testkit'
],
package_data={
'xdevice._core': [
'resource/*.txt',
'resource/config/*.xml',
'resource/template/*.html'
]
},
entry_points={
'console_scripts': [
'xdevice=xdevice.__main__:main_process',
'xdevice_report=xdevice._core.report.__main__:main_report'
]
},
zip_safe=False,
)
if __name__ == "__main__":
main()
#!/usr/bin/env python3
# coding=utf-8
#
# Copyright (c) 2020 Huawei Device Co., Ltd.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import pkg_resources
from .variables import Variables
from _core.plugin import Plugin
from _core.plugin import get_plugin
from _core.logger import platform_logger
from _core.interface import IDriver
from _core.interface import IDevice
from _core.interface import IDeviceManager
from _core.interface import IParser
from _core.interface import LifeCycle
from _core.interface import IShellReceiver
from _core.interface import ITestKit
from _core.interface import IListener
from _core.exception import ParamError
from _core.exception import DeviceError
from _core.exception import LiteDeviceError
from _core.exception import ExecuteTerminate
from _core.exception import ReportException
from _core.constants import DeviceTestType
from _core.constants import DeviceLabelType
from _core.constants import ManagerType
from _core.constants import DeviceOsType
from _core.constants import ProductForm
from _core.constants import CKit
from _core.config.config_manager import UserConfigManager
from _core.config.resource_manager import ResourceManager
from _core.executor.listener import CaseResult
from _core.executor.listener import SuiteResult
from _core.executor.listener import SuitesResult
from _core.executor.listener import StateRecorder
from _core.executor.listener import TestDescription
from _core.testkit.json_parser import JsonParser
from _core.driver.parser_lite import ShellHandler
from _core.report.encrypt import check_pub_key_exist
from _core.utils import get_file_absolute_path
from _core.utils import check_result_report
from _core.utils import get_device_log_file
from _core.utils import get_kit_instances
from _core.utils import get_config_value
from _core.utils import exec_cmd
from _core.environment.manager_env import DeviceSelectionOption
from _core.environment.manager_env import EnvironmentManager
from _core.executor.scheduler import Scheduler
from _core.report.suite_reporter import SuiteReporter
from _core.report.suite_reporter import ResultCode
from _core.command.console import Console
__all__ = [
"Variables",
"Console",
"platform_logger",
"Plugin",
"get_plugin",
"IDriver",
"IDevice",
"IDeviceManager",
"IParser",
"LifeCycle",
"IShellReceiver",
"ITestKit",
"IListener",
"ParamError",
"DeviceError",
"LiteDeviceError",
"ExecuteTerminate",
"ReportException",
"DeviceTestType",
"DeviceLabelType",
"ManagerType",
"DeviceOsType",
"ProductForm",
"CKit",
"UserConfigManager",
"ResourceManager",
"CaseResult",
"SuiteResult",
"SuitesResult",
"StateRecorder",
"TestDescription",
"Scheduler",
"SuiteReporter",
"DeviceSelectionOption",
"EnvironmentManager",
"JsonParser",
"ShellHandler",
"ResultCode",
"check_pub_key_exist",
"check_result_report",
"get_file_absolute_path",
"get_device_log_file",
"get_kit_instances",
"get_config_value",
"exec_cmd"
]
def _load_external_plugins():
plugins = [Plugin.SCHEDULER, Plugin.DRIVER, Plugin.DEVICE, Plugin.LOG,
Plugin.PARSER, Plugin.LISTENER, Plugin.TEST_KIT, Plugin.MANAGER]
for plugin_group in plugins:
for entry_point in pkg_resources.iter_entry_points(group=plugin_group):
entry_point.load()
return
_load_external_plugins()
del _load_external_plugins
#!/usr/bin/env python3
# coding=utf-8
#
# Copyright (c) 2020 Huawei Device Co., Ltd.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import sys
from xdevice import Console
from xdevice import platform_logger
LOG = platform_logger("Main")
def main_process(command=None):
LOG.info("*************** xDevice Test Framework Starting ***************")
if command:
args = str(command).split(" ")
args.insert(0, "xDevice")
else:
args = sys.argv
console = Console()
console.console(args)
return
if __name__ == "__main__":
main_process()
#!/usr/bin/env python3
# coding=utf-8
#
# Copyright (c) 2020 Huawei Device Co., Ltd.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
#!/usr/bin/env python3
# coding=utf-8
#
# Copyright (c) 2020 Huawei Device Co., Ltd.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
#!/usr/bin/env python3
# coding=utf-8
#
# Copyright (c) 2020 Huawei Device Co., Ltd.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
此差异已折叠。
#!/usr/bin/env python3
# coding=utf-8
#
# Copyright (c) 2020 Huawei Device Co., Ltd.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import os
__all__ = ["get_source_code_rootpath"]
def is_source_code_rootpath(path):
check_name_list = ["build.sh", "base", "build"]
for item in check_name_list:
check_path = os.path.join(path, item)
if not os.path.exists(check_path):
return False
return True
def get_source_code_rootpath(path):
source_code_rootpath = path
while True:
if source_code_rootpath == "":
break
if source_code_rootpath == "/" or source_code_rootpath.endswith(":\\"):
source_code_rootpath = ""
break
if is_source_code_rootpath(source_code_rootpath):
break
source_code_rootpath = os.path.dirname(source_code_rootpath)
return source_code_rootpath
#!/usr/bin/env python3
# coding=utf-8
#
# Copyright (c) 2020 Huawei Device Co., Ltd.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
#!/usr/bin/env python3
# coding=utf-8
#
# Copyright (c) 2020 Huawei Device Co., Ltd.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import os
import xml.etree.ElementTree as ET
from dataclasses import dataclass
from _core.exception import ParamError
from _core.logger import platform_logger
__all__ = ["UserConfigManager"]
LOG = platform_logger("ConfigManager")
@dataclass
class ConfigFileConst(object):
userconfig_filepath = "user_config.xml"
class UserConfigManager(object):
def __init__(self, config_file="", env=""):
from xdevice import Variables
if config_file:
pass
try:
if env:
self.config_content = ET.fromstring(env)
else:
user_path = os.path.join(Variables.exec_dir, "config")
top_user_path = os.path.join(Variables.top_dir, "config")
config_path = os.path.join(Variables.res_dir, "config")
paths = [user_path, top_user_path, config_path]
for path in paths:
if os.path.exists(os.path.abspath(os.path.join(
path, ConfigFileConst.userconfig_filepath))):
self.file_path = os.path.abspath(os.path.join(
path, ConfigFileConst.userconfig_filepath))
break
if os.path.exists(self.file_path):
tree = ET.parse(self.file_path)
self.config_content = tree.getroot()
else:
raise ParamError("config file not found")
except SyntaxError as error:
if env:
raise ParamError(
"Parse environment parameter fail! Error: %s" % error.args)
else:
raise ParamError(
"Parse %s fail! Error: %s" % (self.file_path, error.args))
def get_user_config_list(self, tag_name):
data_dic = {}
for child in self.config_content:
if tag_name == child.tag:
for sub in child:
data_dic[sub.tag] = sub.text
return data_dic
@staticmethod
def remove_strip(value):
return value.strip()
@staticmethod
def _verify_duplicate(items):
if len(set(items)) != len(items):
LOG.warning("find duplicate sn config, configuration incorrect")
return False
return True
def _handle_str(self, input_string):
config_list = map(self.remove_strip, input_string.split(';'))
config_list = [item for item in config_list if item]
if config_list:
if not self._verify_duplicate(config_list):
return []
return config_list
def get_sn_list(self):
sn_select_list = []
data_dic = {}
for node in self.config_content.findall("environment/device"):
if node.attrib["type"] != "usb-hdc":
continue
for sub in node:
data_dic[sub.tag] = sub.text if sub.text else ""
sn_config = data_dic.get("sn", "")
if sn_config:
sn_select_list = self._handle_str(sn_config)
break
return sn_select_list
def get_remote_config(self):
remote_dic = {}
data_dic = self.get_user_config_list("remote")
if "ip" in data_dic.keys() and "port" in data_dic.keys():
remote_ip = data_dic.get("ip", "")
remote_adb_port = data_dic.get("port", "")
else:
remote_ip = ""
remote_adb_port = ""
if (not remote_ip) or (not remote_adb_port):
remote_ip = ""
remote_adb_port = ""
remote_dic["ip"] = remote_ip
remote_dic["port"] = remote_adb_port
return remote_dic
def get_testcases_dir_config(self):
data_dic = self.get_user_config_list("testcases")
if "dir" in data_dic.keys():
testcase_dir = data_dic.get("dir", "")
if testcase_dir is None:
testcase_dir = ""
else:
testcase_dir = ""
return testcase_dir
def get_user_config(self, target_name, filter_name=None):
data_dic = {}
all_nodes = self.config_content.findall(target_name)
if not all_nodes:
return data_dic
for node in all_nodes:
if filter_name:
if node.get('label') != filter_name:
continue
for sub in node:
data_dic[sub.tag] = sub.text if sub.text else ""
return data_dic
def get_node_attr(self, target_name, attr_name):
nodes = self.config_content.find(target_name)
if attr_name in nodes.attrib:
return nodes.attrib.get(attr_name)
def get_com_device(self, target_name):
devices = []
for node in self.config_content.findall(target_name):
if node.attrib["type"] != "com":
continue
device = [node.attrib]
# get remote device
data_dic = {}
for sub in node:
if sub.text is not None and sub.tag != "serial":
data_dic[sub.tag] = sub.text
if data_dic:
device.append(data_dic)
devices.append(device)
continue
# get local device
for serial in node.findall("serial"):
data_dic = {}
for sub in serial:
if sub.text is None:
data_dic[sub.tag] = ""
else:
data_dic[sub.tag] = sub.text
device.append(data_dic)
devices.append(device)
return devices
def get_device(self, target_name):
data_dic = {}
for node in self.config_content.findall(target_name):
if node.attrib["type"] != "usb-hdc":
continue
for sub in node:
if sub.text is None:
data_dic[sub.tag] = ""
else:
data_dic[sub.tag] = sub.text
break
return data_dic
def get_testcases_dir(self):
from xdevice import Variables
testcases_dir = self.get_testcases_dir_config()
if testcases_dir:
if os.path.isabs(testcases_dir):
return testcases_dir
return os.path.abspath(os.path.join(Variables.exec_dir,
testcases_dir))
return os.path.abspath(os.path.join(Variables.exec_dir, "testcases"))
def get_resource_path(self):
from xdevice import Variables
data_dic = self.get_user_config_list("resource")
if "dir" in data_dic.keys():
resource_dir = data_dic.get("dir", "")
if resource_dir:
if os.path.isabs(resource_dir):
return resource_dir
return os.path.abspath(
os.path.join(Variables.exec_dir, resource_dir))
return os.path.abspath(
os.path.join(Variables.exec_dir, "resource"))
#!/usr/bin/env python3
# coding=utf-8
#
# Copyright (c) 2020 Huawei Device Co., Ltd.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import os
import xml.etree.ElementTree as ElementTree
from _core.logger import platform_logger
LOG = platform_logger("ResourceManager")
DEFAULT_TIMEOUT = "300"
class ResourceManager(object):
def __init__(self):
pass
def get_resource_data(self, xml_filepath, target_name):
data_dic = {}
if os.path.exists(xml_filepath):
data_dic = self._parse_test_xml_file(xml_filepath,
target_name)
return data_dic
def _parse_test_xml_file(self, filepath, targetname):
data_dic = {}
node = self._find_node_by_target(filepath, targetname)
if node:
target_attrib_list = []
target_attrib_list.append(node.attrib)
environment_data_list = []
env_node = node.find("environment")
if env_node:
environment_data_list.append(env_node.attrib)
for element in env_node.findall("device"):
environment_data_list.append(element.attrib)
for option_element in element.findall("option"):
environment_data_list.append(option_element.attrib)
preparer_data_list = []
pre_node = node.find("preparer")
if pre_node:
preparer_data_list.append(pre_node.attrib)
for element in pre_node.findall("option"):
preparer_data_list.append(element.attrib)
cleaner_data_list = []
clr_node = node.find("cleaner")
if clr_node:
cleaner_data_list.append(clr_node.attrib)
for element in clr_node.findall("option"):
cleaner_data_list.append(element.attrib)
data_dic["nodeattrib"] = target_attrib_list
data_dic["environment"] = environment_data_list
data_dic["preparer"] = preparer_data_list
data_dic["cleaner"] = cleaner_data_list
return data_dic
@staticmethod
def _find_node_by_target(filepath, targetname):
node = None
try:
if os.path.exists(filepath):
tree = ElementTree.parse(filepath)
root = tree.getroot()
targets = root.getiterator("target")
for target in targets:
curr_dic = target.attrib
if curr_dic.get("name") == targetname:
node = target
break
except (SyntaxError, ValueError, AttributeError, TypeError) as error:
LOG.error("resource_test.xml parsing failed. %s", error.args)
return node
@staticmethod
def _get_filename_extension(filepath):
_, fullname = os.path.split(filepath)
filename, ext = os.path.splitext(fullname)
return filename, ext
@staticmethod
def process_resource_file(resource_dir, preparer_list, device):
for item in preparer_list:
if "name" not in item.keys():
continue
if item["name"] == "push":
push_value = item["value"]
find_key = "->"
pos = push_value.find(find_key)
src = os.path.join(resource_dir, push_value[0:pos].strip())
dst = push_value[pos + len(find_key):len(push_value)].strip()
LOG.info("create_dir: dst = %s" % (dst))
device.execute_shell_command("mkdir -p %s" % dst)
LOG.info("push_file: src = %s, dst = %s" % (src, dst))
device.push_file(src, dst)
elif item["name"] == "pull":
push_value = item["value"]
find_key = "->"
pos = push_value.find(find_key)
src = os.path.join(resource_dir, push_value[0:pos].strip())
dst = push_value[pos + len(find_key):len(push_value)].strip()
LOG.info("pull_file: src = %s, dst = %s" % (src, dst))
device.pull_file(src, dst)
elif item["name"] == "shell":
command = item["value"].strip()
LOG.info("shell = %s", command)
device.execute_shell_command(command)
else:
command = "".join((item["name"], " ", item["value"]))
command = command.strip()
LOG.info("others = %s", command)
device.execute_command(command)
@staticmethod
def _get_xml_filepath(testsuite_filepath):
xml_filepath = ""
resource_dir = os.path.join(os.path.dirname(testsuite_filepath),
"resource")
if os.path.exists(resource_dir):
xml_filepath = os.path.join(resource_dir, "resource_test.xml")
return xml_filepath
def get_resource_data_dic(self, testsuit_filepath):
resource_dir = ""
data_dic = {}
target_name, _ = self._get_filename_extension(testsuit_filepath)
xml_filepath = self._get_xml_filepath(testsuit_filepath)
if not os.path.exists(xml_filepath):
return data_dic, resource_dir
data_dic = self.get_resource_data(xml_filepath, target_name)
resource_dir = os.path.abspath(os.path.dirname(xml_filepath))
return data_dic, resource_dir
def process_preparer_data(self, data_dic, resource_dir, device):
if "preparer" in data_dic.keys():
LOG.info("++++++++++++++preparer+++++++++++++++")
preparer_list = data_dic["preparer"]
self.process_resource_file(resource_dir, preparer_list, device)
return
def process_cleaner_data(self, data_dic, resource_dir, device):
if "cleaner" in data_dic.keys():
LOG.info("++++++++++++++cleaner+++++++++++++++")
cleaner_list = data_dic["cleaner"]
self.process_resource_file(resource_dir, cleaner_list, device)
return
#!/usr/bin/env python3
# coding=utf-8
#
# Copyright (c) 2020 Huawei Device Co., Ltd.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
from dataclasses import dataclass
__all__ = ["DeviceOsType", "ProductForm", "TestType", "TestExecType",
"DeviceTestType", "HostTestType", "HostDrivenTestType",
"SchedulerType", "ListenerType", "ToolCommandType",
"TEST_DRIVER_SET", "LogType", "ParserType", "CKit", "ComType",
"DeviceLabelType", "DeviceLiteKernel", "GTestConst", "ManagerType"]
@dataclass
class DeviceOsType(object):
"""
DeviceOsType enumeration
"""
default = "default"
lite = "lite"
@dataclass
class ProductForm(object):
"""
ProductForm enumeration
"""
phone = "phone"
car = "car"
television = "tv"
watch = "watch"
@dataclass
class TestType(object):
"""
TestType enumeration
"""
unittest = "unittest"
mst = "moduletest"
systemtest = "systemtest"
perf = "performance"
sec = "security"
reli = "reliability"
dst = "distributedtest"
all = "ALL"
@dataclass
class ComType(object):
"""
ComType enumeration
"""
cmd_com = "cmd"
deploy_com = "deploy"
@dataclass
class DeviceLabelType(object):
"""
DeviceLabelType enumeration
"""
wifiiot = "wifiiot"
ipcamera = "ipcamera"
watch = "watch"
phone = "phone"
@dataclass
class DeviceLiteKernel(object):
"""
Lite device os enumeration
"""
linux_kernel = "linux"
lite_kernel = "lite"
TEST_TYPE_DICT = {
"UT": TestType.unittest,
"MST": TestType.mst,
"ST": TestType.systemtest,
"PERF": TestType.perf,
"SEC": TestType.sec,
"RELI": TestType.reli,
"DST": TestType.dst,
"ALL": TestType.all,
}
@dataclass
class TestExecType(object):
"""
TestExecType enumeration according to test execution method
"""
# A test running on the device
device_test = "device"
# A test running on the host (pc)
host_test = "host"
# A test running on the host that interacts with one or more devices.
host_driven_test = "hostdriven"
@dataclass
class DeviceTestType(object):
"""
DeviceTestType enumeration
"""
cpp_test = "CppTest"
dex_test = "DexTest"
hap_test = "HapTest"
junit_test = "JUnitTest"
jsunit_test = "JSUnitTest"
ctest_lite = "CTestLite"
cpp_test_lite = "CppTestLite"
lite_cpp_test = "LiteUnitTest"
open_source_test = "OpenSourceTest"
@dataclass
class HostTestType(object):
"""
HostTestType enumeration
"""
host_gtest = "HostGTest"
host_junit_test = "HostJUnitTest"
@dataclass
class HostDrivenTestType(object):
"""
HostDrivenType enumeration
"""
device_test = "DeviceTest"
TEST_DRIVER_SET = {
DeviceTestType.cpp_test,
DeviceTestType.dex_test,
DeviceTestType.hap_test,
DeviceTestType.junit_test,
DeviceTestType.jsunit_test,
DeviceTestType.cpp_test_lite,
DeviceTestType.ctest_lite,
DeviceTestType.lite_cpp_test,
HostDrivenTestType.device_test
}
@dataclass
class SchedulerType(object):
"""
SchedulerType enumeration
"""
# default scheduler
scheduler = "Scheduler"
@dataclass
class LogType:
tool = "Tool"
device = "Device"
@dataclass
class ListenerType:
log = "Log"
report = "Report"
upload = "Upload"
collect = "Collect"
collect_lite = "CollectLite"
@dataclass
class ParserType:
ctest_lite = "CTestLite"
cpp_test_lite = "CppTestLite"
cpp_test_list_lite = "CppTestListLite"
open_source_test = "OpenSourceTest"
@dataclass
class ManagerType:
device = "device"
lite_device = "device_lite"
@dataclass
class ToolCommandType(object):
toolcmd_key_help = "help"
toolcmd_key_show = "show"
toolcmd_key_run = "run"
toolcmd_key_quit = "quit"
toolcmd_key_list = "list"
@dataclass
class CKit:
push = "PushKit"
install = "ApkInstallKit"
command = "CommandKit"
config = "ConfigKit"
wifi = "WIFIKit"
propertycheck = 'PropertyCheckKit'
sts = 'STSKit'
shell = "ShellKit"
deploy = 'DeployKit'
mount = 'MountKit'
liteuikit = 'LiteUiKit'
rootfs = "RootFsKit"
query = "QueryKit"
@dataclass
class GTestConst(object):
exec_para_filter = "--gtest_filter"
exec_para_level = "--gtest_testsize"
#!/usr/bin/env python3
# coding=utf-8
#
# Copyright (c) 2020 Huawei Device Co., Ltd.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
#!/usr/bin/env python3
# coding=utf-8
#
# Copyright (c) 2020 Huawei Device Co., Ltd.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import copy
import os
import sys
from _core.constants import HostDrivenTestType
from _core.constants import DeviceLabelType
from _core.driver.drivers_lite import init_remote_server
from _core.exception import DeviceError
from _core.exception import ParamError
from _core.exception import ReportException
from _core.exception import ExecuteTerminate
from _core.interface import IDriver
from _core.logger import platform_logger
from _core.plugin import Plugin
from _core.testkit.json_parser import JsonParser
from _core.utils import get_config_value
from _core.utils import get_kit_instances
from _core.utils import check_result_report
from _core.report.suite_reporter import SuiteReporter
LOG = platform_logger("DeviceTest")
def start_task(test_file, configs, device, logger):
from xdevice import Variables
# insert& devicetest path for loading devicetest module
devicetest_module = os.path.join(Variables.modules_dir, "_devicetest")
if os.path.exists(devicetest_module):
sys.path.insert(1, devicetest_module)
if configs["testcases_path"]:
sys.path.insert(1, configs["testcases_path"])
from _devicetest.devicetest.main import DeviceTest
device_test = DeviceTest(test_list=test_file, configs=configs,
devices=[device], log=logger)
device_test.run()
def set_up_env(request, source):
if not source.endswith(".json"):
source = "%s.json" % source
json_config = JsonParser(source)
test_args = get_config_value('xml-output', json_config.get_driver())
kits = get_kit_instances(json_config, request.config.resource_path,
request.config.testcases_path)
kits_copy = copy.deepcopy(kits)
from xdevice import Scheduler
for kit in kits:
if not Scheduler.is_execute:
raise ExecuteTerminate()
kit.__setup__(request.config.device, request=request)
return test_args, kits, kits_copy
def get_file_list(module_path):
file_list = []
if not file_list:
for test_file in os.listdir(module_path):
if test_file.endswith(".py") or test_file.endswith(".pyd"):
file_list.append(os.path.join(
module_path, test_file))
return file_list
else:
filter_file_list = []
for test_file in file_list:
if (test_file.endswith(".pyd") or test_file.endswith(".py")) and \
os.path.exists(os.path.join(module_path, test_file)):
filter_file_list.append(os.path.join(
module_path, test_file))
return filter_file_list
@Plugin(type=Plugin.DRIVER, id=HostDrivenTestType.device_test)
class DeviceTestDriver(IDriver):
"""
DeviceTest is a Test that runs a host-driven test on given devices.
"""
# test driver config
config = None
result = ""
error_message = ""
py_file = ""
def __init__(self):
self.linux_host = ""
self.linux_directory = ""
def __check_environment__(self, device_options):
pass
def __check_config__(self, config=None):
del config
self.py_file = ""
def __init_nfs_server__(self, request=None):
return init_remote_server(self, request)
def __execute__(self, request):
kits, source = self._parse_request(request)
configs = dict()
configs["testargs"] = self.config.testargs or ""
configs["testcases_path"] = self.config.testcases_path or ""
configs["request"] = request
try:
if self.config.device and self.config.device.label == \
DeviceLabelType.ipcamera:
self.__init_nfs_server__(request=request)
_, kits, kits_copy = set_up_env(request, source)
configs["linux_host"] = self.linux_host
configs["linux_directory"] = self.linux_directory
configs["kits"] = kits_copy
self.run_module(request, configs, source)
elif self.config.device and self.config.device.label == \
DeviceLabelType.watch:
self.run_module(request, configs, source)
elif self.config.device and self.config.device.label == \
DeviceLabelType.phone:
self.run_module(request, configs, source)
except (ReportException, ModuleNotFoundError, ExecuteTerminate,
SyntaxError, ValueError, AttributeError, TypeError,
KeyboardInterrupt, ParamError) as exception:
LOG.exception(exception)
self.error_message = exception
finally:
self._handle_finally(kits, request)
def _handle_finally(self, kits, request):
from xdevice import Scheduler
for kit in kits:
kit.__teardown__(request.config.device)
if self.config.device.label == \
DeviceLabelType.ipcamera or self.config.device.label == \
DeviceLabelType.watch:
self.config.device.close()
report_name = request.root.source.test_name if \
not request.root.source.test_name.startswith("{") \
else "report"
if Scheduler.mode != "decc":
self.result = check_result_report(
request.config.report_path, self.result, self.error_message,
report_name)
else:
tmp_list = copy.copy(SuiteReporter.get_report_result())
if os.path.dirname(self.result) not in \
[report_path for report_path, _ in tmp_list]:
if not self.error_message:
self.error_message = "An unknown exception occurred " \
"during the execution case"
self.result = check_result_report(
request.config.report_path, self.result,
self.error_message, report_name)
def _parse_request(self, request):
from xdevice import Variables
kits = []
self.config = request.config
self.config.device = request.config.environment.devices[0]
if not self.config.device:
raise DeviceError("no device..........................")
current_dir = request.config.resource_path if \
request.config.resource_path else Variables.exec_dir
if request.root.source.source_file.strip():
source = os.path.join(current_dir,
request.root.source.source_file.strip())
LOG.debug("Testfile FilePath: %s" % source)
else:
source = request.root.source.source_string.strip()
self.config.tmp_folder = os.path.join(self.config.report_path,
"temp")
self.config.tmp_id = str(request.uuid)
return kits, source
def run_module(self, request, configs, source):
json_config = JsonParser(source)
if request.root.source.source_file.strip():
folder_name = "task_%s_%s" % (self.config.tmp_id,
request.root.source.test_name)
tmp_sub_folder = os.path.join(self.config.tmp_folder, folder_name)
os.makedirs(tmp_sub_folder, exist_ok=True)
configs["report_path"] = tmp_sub_folder
self.result = "%s.xml" % os.path.join(tmp_sub_folder, "report")
module_path = os.path.dirname(source)
file_list = get_config_value('py_file', json_config.get_driver(),
is_list=True)
if not file_list:
file_list = get_file_list(module_path)
else:
folder_name = "task_%s_report" % self.config.tmp_id
tmp_sub_folder = os.path.join(self.config.tmp_folder,
folder_name)
self.result = "%s.xml" % os.path.join(tmp_sub_folder, "report")
json_config = JsonParser(source)
file_list = get_config_value('py_file', json_config.get_driver(),
is_list=True)
configs["test_name"] = request.root.source.test_name
configs["execute"] = get_config_value('execute',
json_config.get_driver(), False)
os.makedirs(tmp_sub_folder, exist_ok=True)
self._run_devicetest(configs, file_list)
def _run_devicetest(self, configs, test_file):
start_task(test_file, configs, self.config.device, LOG)
def __result__(self):
return self.result if os.path.exists(self.result) else ""
此差异已折叠。
此差异已折叠。
#!/usr/bin/env python3
# coding=utf-8
#
# Copyright (c) 2020 Huawei Device Co., Ltd.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
#!/usr/bin/env python3
# coding=utf-8
#
# Copyright (c) 2020 Huawei Device Co., Ltd.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import re
import telnetlib
import time
import os
from _core.constants import DeviceOsType
from _core.constants import ComType
from _core.constants import DeviceLabelType
from _core.environment.dmlib_lite import LiteHelper
from _core.exception import LiteDeviceConnectError
from _core.exception import ParamError
from _core.exception import DeviceError
from _core.interface import IDevice
from _core.exception import ExecuteTerminate
from _core.logger import platform_logger
from _core.environment.manager_env import DeviceAllocationState
from _core.plugin import Plugin
from _core.utils import exec_cmd
LOG = platform_logger("DeviceLite")
TIMEOUT = 90
HDC = "litehdc.exe"
def get_hdc_path():
from xdevice import Variables
user_path = os.path.join(Variables.exec_dir, "resource/tools")
top_user_path = os.path.join(Variables.top_dir, "config")
config_path = os.path.join(Variables.res_dir, "config")
paths = [user_path, top_user_path, config_path]
file_path = ""
for path in paths:
if os.path.exists(os.path.abspath(os.path.join(
path, HDC))):
file_path = os.path.abspath(os.path.join(
path, HDC))
break
if os.path.exists(file_path):
return file_path
else:
raise ParamError("config file not found")
def parse_available_com(com_str):
com_str = com_str.replace("\r", " ")
com_list = com_str.split("\n")
for index, item in enumerate(com_list):
com_list[index] = item.strip().strip(b'\x00'.decode())
return com_list
@Plugin(type=Plugin.DEVICE, id=DeviceOsType.lite)
class DeviceLite(IDevice):
"""
Class representing an device lite device.
Each object of this class represents one device lite device in xDevice.
Attributes:
device_connect_type: A string that's the type of lite device
"""
device_os_type = DeviceOsType.lite
device_allocation_state = DeviceAllocationState.available
def __init__(self):
self.error_message = ""
self.device_sn = ""
self.label = ""
self.device_connect_type = ""
self.device_kernel = ""
self.remote_device = None
self.local_device = None
def __set_serial__(self, device=None):
for item in device:
if "ip" in item.keys():
self.device_sn = "remote_%s" % item.get("ip")
break
elif "type" in item.keys() and ComType.deploy_com == item.get(
"type"):
self.device_sn = "local_%s" % item.get("com")
break
def __get_serial__(self):
return self.device_sn
def __set_device_kernel__(self, kernel_type=""):
self.device_kernel = kernel_type
def __get_device_kernel__(self):
return self.device_kernel
def __check_watch__(self, device):
for item in device:
if "label" not in item.keys():
if "com" not in item.keys() or ("com" in item.keys() and
not item.get("com")):
self.error_message = "watch local com cannot be " \
"empty, please check"
LOG.error(self.error_message)
raise ParamError(self.error_message)
else:
hdc = get_hdc_path()
result = exec_cmd([hdc])
com_list = parse_available_com(result)
if item.get("com").upper() in com_list:
return True
else:
self.error_message = "watch local com does not exist"
LOG.error(self.error_message)
raise ParamError(self.error_message)
def __check_wifiiot_config__(self, device):
com_type_set = set()
for item in device:
if "label" not in item.keys():
if "com" not in item.keys() or ("com" in item.keys() and
not item.get("com")):
self.error_message = "wifiiot local com cannot be " \
"empty, please check"
LOG.error(self.error_message)
raise ParamError(self.error_message)
if "type" not in item.keys() or ("type" not in item.keys() and
not item.get("type")):
self.error_message = "wifiiot com type cannot be " \
"empty, please check"
LOG.error(self.error_message)
raise ParamError(self.error_message)
else:
com_type_set.add(item.get("type"))
if len(com_type_set) < 2:
self.error_message = "wifiiot need cmd com and deploy com" \
" at the same time, please check"
LOG.error(self.error_message)
raise ParamError(self.error_message)
def __check_ipcamera_local__(self, device):
for item in device:
if "label" not in item.keys():
if "com" not in item.keys() or ("com" in item.keys() and
not item.get("com")):
self.error_message = "ipcamera local com cannot be " \
"empty, please check"
LOG.error(self.error_message)
raise ParamError(self.error_message)
def __check_ipcamera_remote__(self, device=None):
for item in device:
if "label" not in item.keys():
if "port" in item.keys() and item.get("port") and not item.get(
"port").isnumeric():
self.error_message = "ipcamera remote port should be " \
"a number, please check"
LOG.error(self.error_message)
raise ParamError(self.error_message)
elif "port" not in item.keys():
self.error_message = "ipcamera remote port cannot be" \
" empty, please check"
LOG.error(self.error_message)
raise ParamError(self.error_message)
def __check_config__(self, device=None):
self.set_connect_type(device)
if self.label == DeviceLabelType.wifiiot:
self.__check_wifiiot_config__(device)
elif self.label == DeviceLabelType.ipcamera and \
self.device_connect_type == "local":
self.__check_ipcamera_local__(device)
elif self.label == DeviceLabelType.ipcamera and \
self.device_connect_type == "remote":
self.__check_ipcamera_remote__(device)
elif self.label == DeviceLabelType.watch:
self.__check_watch__(device)
def set_connect_type(self, device):
pattern = r'^((25[0-5]|2[0-4]\d|[01]?\d\d?)\.){3}(25[0-5]|2[0-4]\d|[' \
r'01]?\d\d?)$'
for item in device:
if "label" in item.keys():
self.label = item.get("label")
if "com" in item.keys():
self.device_connect_type = "local"
if "ip" in item.keys():
if re.match(pattern, item.get("ip")):
self.device_connect_type = "remote"
else:
self.error_message = "Remote device ip not in right" \
"format, please check user_config.xml"
LOG.error(self.error_message)
raise ParamError(self.error_message)
if not self.label:
self.error_message = "device label cannot be empty, " \
"please check"
LOG.error(self.error_message)
raise ParamError(self.error_message)
else:
if self.label != DeviceLabelType.wifiiot and self.label != \
DeviceLabelType.ipcamera and self.label != \
DeviceLabelType.watch:
self.error_message = "device label should be ipcamera or" \
" wifiiot, please check"
LOG.error(self.error_message)
raise ParamError(self.error_message)
if not self.device_connect_type:
self.error_message = "device com or ip cannot be empty, " \
"please check"
LOG.error(self.error_message)
raise ParamError(self.error_message)
def __init_device__(self, device=None):
if not device:
LOG.error(
"no available device, please config it in lite_config.xml.")
raise DeviceError(
"no available device, please config it in lite_config.xml.")
self.__check_config__(device)
self.__set_serial__(device)
if self.device_connect_type == "remote":
self.remote_device = RemoteController(device)
else:
self.local_device = LocalController(device)
def connect(self):
"""
connect the device
"""
if self.device_connect_type == "remote":
self.remote_device.connect()
else:
self.local_device.connect()
def execute_command_with_timeout(self, command="", case_type="",
timeout=TIMEOUT, receiver=None):
"""
Executes command on the device.
Parameters:
command: the command to execute
case_type: CTest or CppTest
receiver: parser handler
timeout: timeout for read result
"""
try:
if self.device_connect_type == "remote":
filter_result, status, error_message = \
self.remote_device.execute_command_with_timeout(
command=command,
timeout=timeout,
receiver=receiver)
else:
filter_result, status, error_message = \
self.local_device.execute_command_with_timeout(
command=command,
case_type=case_type,
timeout=timeout,
receiver=receiver)
LOG.debug("execute result:%s", filter_result)
if not status:
LOG.debug("error_message:%s", error_message)
return filter_result, status, error_message
except ExecuteTerminate:
error_message = "Execute terminate."
return "", False, error_message
def close(self):
"""
Unmount the testcase from device server and close the telnet connection
with device server or close the local serial
"""
if self.device_connect_type == "remote":
return self.remote_device.close()
else:
return self.local_device.close()
class RemoteController:
"""
Class representing an device lite remote device.
Each object of this class represents one device lite remote device
in xDevice.
"""
def __init__(self, device):
self.host = device[1].get("ip")
self.port = int(device[1].get("port"))
self.directory = device[1].get("dir")
self.telnet = None
def connect(self):
"""
connect the device server
"""
now_time = time.strftime('%Y-%m-%d %H:%M:%S',
time.localtime(time.time()))
try:
if self.telnet:
return self.telnet
self.telnet = telnetlib.Telnet(self.host, self.port,
timeout=TIMEOUT)
except Exception as err_msgs:
LOG.error('TIME: %s IP: %s STATUS: telnet failed\n error:%s' % (
now_time, self.host, str(err_msgs)))
raise LiteDeviceConnectError("connect lite_device failed")
time.sleep(2)
self.telnet.set_debuglevel(0)
return self.telnet
def execute_command_with_timeout(self, command="", timeout=TIMEOUT,
receiver=None):
"""
Executes command on the device.
Parameters:
command: the command to execute
timeout: timeout for read result
receiver: parser handler
"""
return LiteHelper.execute_remote_cmd_with_timeout(
self.telnet, command, timeout, receiver)
def close(self):
"""
Unmount the testcase directory from device server and close the telnet
connection with device server
"""
error_message = ""
try:
if not self.telnet:
return
self.telnet.close()
self.telnet = None
except (ConnectionError, Exception):
error_message = "Remote device is disconnected abnormally"
LOG.error(error_message)
return error_message
class LocalController:
def __init__(self, device):
"""
Init Local device.
Parameters:
device: local device
"""
self.com_dict = {}
for item in device:
if "com" in item.keys():
if "type" in item.keys() and ComType.cmd_com == item.get(
"type"):
self.com_dict[ComType.cmd_com] = ComController(item)
elif "type" in item.keys() and ComType.deploy_com == item.get(
"type"):
self.com_dict[ComType.deploy_com] = ComController(item)
def connect(self, key=ComType.cmd_com):
"""
Open serial.
"""
try:
self.com_dict.get(key).connect()
except Exception:
raise LiteDeviceConnectError("connect serial failed")
def flush_input(self, key=ComType.cmd_com):
self.com_dict.get(key).flush_input()
def close(self, key=ComType.cmd_com):
"""
Close serial.
"""
if self.com_dict and self.com_dict.get(key):
self.com_dict.get(key).close()
def execute_command_with_timeout(self, **kwargs):
"""
Execute command on the serial and read all the output from the serial.
"""
args = kwargs
key = args.get("key", ComType.cmd_com)
command = args.get("command", None)
case_type = args.get("case_type", "")
receiver = args.get("receiver", None)
timeout = args.get("timeout", TIMEOUT)
return self.com_dict.get(key).execute_command_with_timeout(
command=command, case_type=case_type,
timeout=timeout, receiver=receiver)
class ComController:
def __init__(self, device):
"""
Init serial.
Parameters:
device: local com
"""
self.serial_port = device.get("com") if device.get("com") else None
self.baund_rate = int(device.get("baund_rate")) if device.get(
"baund_rate") else 115200
self.is_open = False
self.timeout = int(device.get("timeout")) if device.get(
"timeout") else TIMEOUT
self.directory = device.get("dir") if device.get("dir") else None
self.com = None
def connect(self):
"""
Open serial.
"""
try:
if not self.is_open:
import serial
self.com = serial.Serial(self.serial_port,
baudrate=self.baund_rate,
timeout=self.timeout)
self.is_open = True
except Exception:
LOG.error(
"connect %s serial failed, please make sure this port is not "
"occupied." % self.serial_port)
raise LiteDeviceConnectError("connect serial failed")
def flush_input(self):
self.com.flushInput()
def close(self):
"""
Close serial.
"""
error_message = ""
try:
if not self.com:
return
if self.is_open:
self.com.close()
self.is_open = False
except (ConnectionError, Exception):
error_message = "Local device is disconnected abnormally"
LOG.error(error_message)
return error_message
def execute_command_with_timeout(self, **kwargs):
"""
Execute command on the serial and read all the output from the serial.
"""
return LiteHelper.execute_local_cmd_with_timeout(self.com, **kwargs)
def execute_command(self, command):
"""
Execute command on the serial and read all the output from the serial.
"""
LiteHelper.execute_local_command(self.com, command)
#!/usr/bin/env python3
# coding=utf-8
#
# Copyright (c) 2020 Huawei Device Co., Ltd.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import time
import re
from _core.constants import DeviceTestType
from _core.exception import ExecuteTerminate
from _core.exception import LiteDeviceConnectError
from _core.exception import LiteDeviceExecuteCommandError
from _core.logger import platform_logger
__all__ = ["generate_report", "LiteHelper"]
CPP_TEST_STANDARD_SIGN = "[==========]"
CPP_TEST_END_SIGN = "Global test environment tear-down"
CPP_SYS_STANDARD_SIGN = " #"
CPP_ERR_MESSAGE = "[ERR]No such file or directory: "
TIMEOUT = 90
CTEST_STANDARD_SIGN = "Start to run test suite"
AT_CMD_ENDS = "OK"
CTEST_END_SIGN = "Framework finished."
PATTERN = re.compile(r'\x1B(\[([0-9]{1,2}(;[0-9]{1,2})*)?m)*')
LOG = platform_logger("DmlibLite")
def check_read_test_end(result=None, input_command=None):
if input_command not in result:
return False
index = result.find(input_command) + len(input_command)
result_output = result[index:]
if input_command.startswith("./"):
if result_output.find(CPP_TEST_STANDARD_SIGN) != -1:
if result_output.count(CPP_TEST_STANDARD_SIGN) == 2 or \
result_output.find(CPP_TEST_END_SIGN) != -1:
return True
if result_output.find(CPP_TEST_STANDARD_SIGN) == -1 and \
("test pass" in result_output.lower() or
"test fail" in result_output.lower()):
return True
if "%s%s" % (CPP_ERR_MESSAGE, input_command[2:]) in result_output:
LOG.error("execute file not exist, result is %s" % result_output)
raise LiteDeviceExecuteCommandError("execute file not exist")
else:
if " #" in result_output:
if input_command == "reboot" or input_command == "reset":
return False
if input_command.startswith("mount"):
if "Mount nfs finished." not in result_output:
return False
return True
return False
def generate_report(receiver, result):
if result and receiver:
if result:
receiver.__read__(result)
receiver.__done__()
def get_current_time():
current_time = time.time()
local_time = time.localtime(current_time)
data_head = time.strftime("%Y-%m-%d %H:%M:%S", local_time)
millisecond = (current_time - int(current_time)) * 1000
return "%s.%03d" % (data_head, millisecond)
class LiteHelper:
@staticmethod
def execute_remote_cmd_with_timeout(telnet, command="", timeout=TIMEOUT,
receiver=None):
"""
Executes command on the device.
Parameters:
telnet:
command: the command to execute
timeout: timeout for read result
receiver: parser handler
"""
from xdevice import Scheduler
time.sleep(2)
start_time = time.time()
status = True
error_message = ""
result = ""
LOG.info("execute command shell %s with timeout %s" %
(command, str(timeout)))
if not telnet:
raise LiteDeviceConnectError("remote device is not connected.")
telnet.write(command.encode('ascii') + b"\n")
expect_result = [bytes(CPP_TEST_STANDARD_SIGN, encoding="utf8"),
bytes(CPP_SYS_STANDARD_SIGN, encoding="utf8"),
bytes(CPP_TEST_END_SIGN, encoding="utf8")]
while time.time()-start_time < timeout:
if not Scheduler.is_execute:
raise ExecuteTerminate()
_, _, data = telnet.expect(expect_result, timeout=1)
data = PATTERN.sub('', data.decode('gbk', 'ignore'))
result = result + data.replace("\r", "")
if check_read_test_end(result, command):
break
else:
error_message = "execute command with timeout=%s " % command
LOG.debug("error_message=%s" % error_message)
status = False
if receiver:
if command in result:
index = result.find(command) + len(command)
result_output = result[index:]
else:
result_output = result
generate_report(receiver, result_output)
return result, status, error_message
@staticmethod
def read_local_output_test(com=None, command=None, timeout=TIMEOUT,
receiver=None):
input_command = command
error_message = ""
start_time = time.time()
result = ""
status = True
from xdevice import Scheduler
while time.time()-start_time < timeout:
if not Scheduler.is_execute:
raise ExecuteTerminate()
data = com.readline().decode('gbk', errors='ignore')
data = PATTERN.sub('', data)
result = "{}{}".format(result, data.replace("\r", ""))
if check_read_test_end(result, input_command):
break
else:
error_message = "execute command with timeout=%s " % command
LOG.debug("error_message:%s" % error_message)
status = False
if receiver:
if command in result:
index = result.find(command) + len(command)
result_output = result[index:]
else:
result_output = result
generate_report(receiver, result_output)
LOG.info('Info: execute command success')
return result, status, error_message
@staticmethod
def read_local_output_ctest(com=None, command=None, timeout=TIMEOUT,
receiver=None):
result = ""
input_command = command
start = time.time()
from xdevice import Scheduler
while True:
if not Scheduler.is_execute:
raise ExecuteTerminate()
data = com.readline().decode('gbk', errors='ignore')
data = PATTERN.sub('', data)
if isinstance(input_command, list):
data = "{} {}".format(get_current_time(), data)
result = "{}{}".format(result, data.replace("\r", ""))
if re.search(r"\d+\s+Tests\s+\d+\s+Failures\s+\d+\s+"
r"Ignored", data):
start = time.time()
if (int(time.time()) - int(start)) > timeout:
break
else:
result = "{}{}".format(
result, data.replace("\r", "").replace("\n", "").strip())
if AT_CMD_ENDS in data:
return result, True, ""
if (int(time.time()) - int(start)) > timeout:
return result, False, ""
result = PATTERN.sub('', result)
if receiver:
result = "{}{}".format(
result, "{} {}".format(get_current_time(), CTEST_END_SIGN))
generate_report(receiver, result)
LOG.info('Info: execute command success')
return result, True, ""
@staticmethod
def read_local_output(com=None, command=None, case_type="",
timeout=TIMEOUT, receiver=None):
if case_type == DeviceTestType.ctest_lite:
return LiteHelper.read_local_output_ctest(com, command,
timeout, receiver)
else:
return LiteHelper.read_local_output_test(com, command,
timeout, receiver)
@staticmethod
def execute_local_cmd_with_timeout(com, **kwargs):
"""
Execute command on the serial and read all the output from the serial.
"""
args = kwargs
command = args.get("command", None)
input_command = command
case_type = args.get("case_type", "")
timeout = args.get("timeout", TIMEOUT)
receiver = args.get("receiver", None)
if not com:
raise LiteDeviceConnectError("local device is not connected.")
LOG.info("%s \n execute command shell %s with timeout %s" %
(com, command, str(timeout)))
if isinstance(command, str):
command = command.encode("utf-8")
if command[-2:] != b"\r\n":
command = command.rstrip() + b'\r\n'
com.write(command)
else:
com.write(command)
return LiteHelper.read_local_output(
com, command=input_command, case_type=case_type, timeout=timeout,
receiver=receiver)
@staticmethod
def execute_local_command(com, command):
"""
Execute command on the serial and read all the output from the serial.
"""
if not com:
raise LiteDeviceConnectError("local device is not connected.")
LOG.info(
"%s \n execute command shell %s" % (com, command))
command = command.encode("utf-8")
if command[-2:] != b"\r\n":
command = command.rstrip() + b'\r\n'
com.write(command)
#!/usr/bin/env python3
# coding=utf-8
#
# Copyright (c) 2020 Huawei Device Co., Ltd.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import sys
from dataclasses import dataclass
from _core.config.config_manager import UserConfigManager
from _core.exception import DeviceError
from _core.exception import ParamError
from _core.logger import platform_logger
from _core.plugin import Plugin
from _core.plugin import get_plugin
from _core.utils import convert_serial
__all__ = ["EnvironmentManager", "DeviceSelectionOption",
"DeviceAllocationState"]
LOG = platform_logger("ManagerEnv")
class Environment(object):
"""
Environment required for each dispatch
"""
def __init__(self):
self.devices = []
self.phone = 0
self.wifiiot = 0
self.ipcamera = 0
def __get_serial__(self):
device_serials = []
for device in self.devices:
device_serials.append(convert_serial(device.__get_serial__()))
return ";".join(device_serials)
def get_devices(self):
return self.devices
def check_serial(self):
if self.__get_serial__():
return True
return False
def add_device(self, device):
if device.label == "phone":
self.phone += 1
device.device_id = "phone%s" % str(self.phone)
self.devices.append(device)
class EnvironmentManager(object):
"""
Class representing environment manager
managing the set of available devices for testing
"""
__instance = None
__init_flag = False
def __new__(cls, *args, **kwargs):
"""
Singleton instance
"""
del args, kwargs
if cls.__instance is None:
cls.__instance = super(EnvironmentManager, cls).__new__(cls)
return cls.__instance
def __init__(self, environment=""):
if EnvironmentManager.__init_flag:
return
self.manager_device = None
self.manager_lite = None
self._get_device_manager(environment)
EnvironmentManager.__init_flag = True
def _get_device_manager(self, environment=""):
try:
result = UserConfigManager(env=environment).get_user_config(
"environment/support_device")
except ParamError as error:
LOG.exception("ParamError: %s" % error.args, exc_info=False)
if not environment:
sys.exit(0)
else:
raise error
if result.get("device") == "true":
managers_device = get_plugin(plugin_type=Plugin.MANAGER,
plugin_id="device")
if managers_device:
self.manager_device = managers_device[0]
self.manager_device.init_environment(environment)
if result.get("device_lite") == "true":
managers_lite = get_plugin(plugin_type=Plugin.MANAGER,
plugin_id="device_lite")
if managers_lite:
self.manager_lite = managers_lite[0]
self.manager_lite.init_environment(environment)
def env_stop(self):
if self.manager_device:
self.manager_device.env_stop()
self.manager_device = None
if self.manager_lite:
self.manager_lite.devices_list = []
self.manager_lite = None
EnvironmentManager.__init_flag = False
def apply_environment(self, device_options):
environment = Environment()
for device_option in device_options:
device = self.apply_device(device_option)
if device is not None:
environment.add_device(device)
return environment
def release_environment(self, environment):
for device in environment.devices:
self.release_device(device)
def apply_device(self, device_option, timeout=10):
if not device_option.label or device_option.label == "phone":
device_manager = self.manager_device
else:
device_manager = self.manager_lite
if device_manager:
return device_manager.apply_device(device_option, timeout)
else:
raise DeviceError(
"%s is not supported, please check %s and "
"environment config user_config.xml" %
(str(device_option.test_driver), device_option.source_file))
def check_device_exist(self, device_options):
"""
Check if there are matched devices which can be allocated or available.
"""
for device_option in device_options:
device_exist = False
if self.manager_device:
for device in self.manager_device.devices_list:
if device_option.matches(device, False):
device_exist = True
if self.manager_lite:
for device in self.manager_lite.devices_list:
if device_option.matches(device, False):
device_exist = True
if not device_exist:
return False
return True
def release_device(self, device):
if self.manager_device and \
device in self.manager_device.devices_list:
self.manager_device.release_device(device)
elif self.manager_lite and \
device in self.manager_lite.devices_list:
self.manager_lite.release_device(device)
def list_devices(self):
LOG.info("list devices.")
if self.manager_device:
self.manager_device.list_devices()
if self.manager_lite:
self.manager_lite.list_devices()
class DeviceSelectionOption(object):
"""
Class representing device selection option
"""
def __init__(self, options, label=None, test_source=None):
self.device_sn = [x for x in options["device_sn"].split(";") if x]
self.label = label
self.test_driver = test_source.test_type
self.source_file = ""
def get_label(self):
return self.label
def matches(self, device, allocate=True):
if allocate and device.device_allocation_state != \
DeviceAllocationState.available:
return False
if not allocate and device.device_allocation_state == \
DeviceAllocationState.ignored:
return False
if len(self.device_sn) != 0 and device.device_sn not in self.device_sn:
return False
if self.label and self.label != device.label:
return False
return True
@dataclass
class DeviceAllocationState:
ignored = "Ignored"
available = "Available"
allocated = "Allocated"
#!/usr/bin/env python3
# coding=utf-8
#
# Copyright (c) 2020 Huawei Device Co., Ltd.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import time
from _core.config.config_manager import UserConfigManager
from _core.constants import DeviceOsType
from _core.constants import ManagerType
from _core.environment.manager_env import DeviceAllocationState
from _core.exception import LiteDeviceConnectError
from _core.exception import ParamError
from _core.plugin import Plugin
from _core.plugin import get_plugin
from _core.interface import IDeviceManager
from _core.logger import platform_logger
__all__ = ["ManagerLite"]
LOG = platform_logger("ManagerLite")
@Plugin(type=Plugin.MANAGER, id=ManagerType.lite_device)
class ManagerLite(IDeviceManager):
"""
Class representing device manager
managing the set of available devices for testing
"""
def __init__(self):
self.devices_list = []
def init_environment(self, environment=""):
device_lite = get_plugin(plugin_type=Plugin.DEVICE,
plugin_id=DeviceOsType.lite)[0]
devices = UserConfigManager(env=environment).get_com_device(
"environment/device")
for device in devices:
try:
device_lite_instance = device_lite.__class__()
device_lite_instance.__init_device__(device)
device_lite_instance.device_allocation_state = \
DeviceAllocationState.available
except (LiteDeviceConnectError, ParamError):
continue
self.devices_list.append(device_lite_instance)
def apply_device(self, device_option, timeout=10):
"""
Request a device for testing that meets certain criteria.
"""
del timeout
allocated_device = None
for device in self.devices_list:
if device_option.matches(device):
device.device_allocation_state = \
DeviceAllocationState.allocated
LOG.debug("allocate device sn: %s, type: %s" % (
device.__get_serial__(), device.__class__))
return device
time.sleep(10)
return allocated_device
@staticmethod
def release_device(device):
device.device_allocation_state = DeviceAllocationState.available
LOG.debug("free device sn: %s, type: %s" % (
device.__get_serial__(), device.__class__))
def list_devices(self):
print("lite devices:")
print("{0:<20}{1:<16}{2:<16}{3:<16}{4:<16}{5:<16}{6:<16}".
format("SerialPort/IP", "Baudrate/Port", "OsType", "Allocation",
"Product", "ConnectType", "ComType"))
for device in self.devices_list:
if device.device_connect_type == "remote":
print("{0:<20}{1:<16}{2:<16}{3:<16}{4:<16}{5:<16}".format(
device.remote_device.host,
device.remote_device.port,
device.device_os_type,
device.device_allocation_state,
device.label,
device.device_connect_type))
else:
for com_controller in device.local_device.com_dict:
print("{0:<20}{1:<16}{2:<16}{3:<16}{4:<16}{5:<16}{6:<16}".
format(device.local_device.com_dict[
com_controller].serial_port,
device.local_device.com_dict[
com_controller].baund_rate,
device.device_os_type,
device.device_allocation_state,
device.label,
device.device_connect_type,
com_controller))
此差异已折叠。
#!/usr/bin/env python3
# coding=utf-8
#
# Copyright (c) 2020 Huawei Device Co., Ltd.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
#!/usr/bin/env python3
# coding=utf-8
#
# Copyright (c) 2020 Huawei Device Co., Ltd.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import os
import sys
sys.path.insert(0, os.path.join(
os.path.dirname(os.path.dirname(os.path.dirname(__file__)))))
sys.path.insert(1, os.path.join(os.path.dirname(
os.path.dirname(os.path.dirname(os.path.dirname(__file__))))))
#!/usr/bin/env python3
# coding=utf-8
#
# Copyright (c) 2020 Huawei Device Co., Ltd.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import os
import sys
import time
from _core.logger import platform_logger
from _core.report.reporter_helper import ExecInfo
from _core.report.reporter_helper import ReportConstant
from _core.report.result_reporter import ResultReporter
LOG = platform_logger("ReportMain")
def main_report():
if sys.version < '3.7':
LOG.error("Please use python 3.7 or higher version to start "
"project")
return
args = sys.argv
if args is None or len(args) < 2:
report_path = input("report path >>> ")
else:
report_path = args[1]
exec_dir = os.getcwd()
if not os.path.isabs(report_path):
report_path = os.path.abspath(os.path.join(exec_dir, report_path))
if not os.path.exists(report_path):
LOG.error("report path %s not exists", report_path)
return
LOG.info("report path: %s", report_path)
task_info = ExecInfo()
task_info.platform = "None"
task_info.test_type = "Test"
task_info.device_name = "None"
task_info.test_time = time.strftime(ReportConstant.time_format,
time.localtime())
result_report = ResultReporter(report_path, task_info)
result_report.generate_reports()
if __name__ == "__main__":
main_report()
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
#!/usr/bin/env python3
# coding=utf-8
#
# Copyright (c) 2020 Huawei Device Co., Ltd.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册