提交 14287d72 编写于 作者: L liukunmo

新增DeployToolKit Driver支持三方芯片润和L0模组XTS测试认证。

上级 afa02d16
......@@ -232,6 +232,7 @@ class CKit:
query = "QueryKit"
liteshell = "LiteShellKit"
app_install = "AppInstallKit"
deploytool = "DeployToolKit"
@dataclass
......
......@@ -46,6 +46,7 @@ from _core.logger import platform_logger
from _core.report.reporter_helper import DataHelper
from _core.testkit.json_parser import JsonParser
from _core.testkit.kit_lite import DeployKit
from _core.testkit.kit_lite import DeployToolKit
from _core.utils import get_config_value
from _core.utils import get_kit_instances
from _core.utils import check_result_report
......@@ -368,6 +369,11 @@ class CppTestDriver(IDriver):
command=command, case_type=DeviceTestType.cpp_test_lite,
timeout=timeout, receiver=handler)
self.config.command_result += result
if result.count(CPP_TEST_NFS_SIGN) >= 1:
_, _, error = self.config.device.execute_command_with_timeout(
command="ping %s" % self.linux_host,
case_type=DeviceTestType.cpp_test_lite,
timeout=5)
return error, result, handler
def _do_test_run(self, command, request):
......@@ -500,7 +506,8 @@ class CppTestDriver(IDriver):
raise TypeError(err_msg)
tests = []
execute_bin_xml = (self.execute_bin + "_1.xml") if is_true else (
self.execute_bin + ".xml")
self.execute_bin + ".xml")
LOG.debug("run into :{}".format(is_true))
file_path = os.path.join(report_path, execute_bin_xml)
if not self.check_xml_exist(execute_bin_xml):
return tests
......@@ -632,8 +639,22 @@ class CTestDriver(IDriver):
request.root.source.source_file.strip()).split(".")[0]
else:
source = request.root.source.source_string.strip()
self._run_ctest(source=source, request=request)
json_config = JsonParser(source)
kit_instances = get_kit_instances(json_config,
request.config.resource_path,
request.config.testcases_path)
for (kit_instance, kit_info) in zip(kit_instances,
json_config.get_kits()):
if isinstance(kit_instance, DeployToolKit):
LOG.debug("run ctest third party")
self._run_ctest_third_party(source=source, request=request,
time_out=int(
kit_instance.time_out))
break
else:
LOG.debug("run ctest")
self._run_ctest(source=source, request=request)
break
except (LiteDeviceExecuteCommandError, Exception) as exception:
LOG.error(exception, error_no=getattr(exception, "error_no",
......@@ -689,6 +710,66 @@ class CTestDriver(IDriver):
self.config.device.device.com_dict.get(
ComType.deploy_com).close()
def _run_ctest_third_party(self, source=None, request=None, time_out=5):
parser_instances = []
parsers = get_plugin(Plugin.PARSER, ParserType.ctest_lite)
try:
if not source:
LOG.error("Error: source don't exist %s." % source,
error_no="00101")
return
version = get_test_component_version(self.config)
for parser in parsers:
parser_instance = parser.__class__()
parser_instance.suites_name = self.file_name
parser_instance.product_info.setdefault("Version", version)
parser_instance.listeners = request.listeners
parser_instances.append(parser_instance)
handler = ShellHandler(parser_instances)
while True:
input_burning = input("Please enter 'y' or 'n' after "
"the burning is complete,"
"enter 'quit' to exit:")
if input_burning.lower().strip() in ["y", "yes"]:
LOG.info("Burning succeeded.")
break
elif input_burning.lower().strip() in ["n", "no"]:
LOG.info("Burning failed.")
elif input_burning.lower().strip() == "quit":
break
else:
LOG.info("The input {} parameter is incorrect,"
"please enter 'y' or 'n' after the "
"burning is complete ,enter 'quit' "
"to exit.".format(input_burning))
LOG.info("Please press the device "
"reset button when the send commmand [] appears ")
time.sleep(3)
self.result = "%s.xml" % os.path.join(
request.config.report_path, "result", self.file_name)
self.config.device.device.com_dict.get(
ComType.deploy_com).connect()
result, _, error = self.config.device.device. \
execute_command_with_timeout(
command=[], case_type=DeviceTestType.ctest_lite,
key=ComType.deploy_com, timeout=time_out, receiver=handler)
device_log_file = get_device_log_file(request.config.report_path,
request.config.device.
__get_serial__())
device_log_file_open = \
os.open(device_log_file, os.O_WRONLY | os.O_CREAT |
os.O_APPEND, 0o755)
with os.fdopen(device_log_file_open, "a") as file_name:
file_name.write("{}{}".format(
"\n".join(result.split("\n")[0:-1]), "\n"))
file_name.flush()
finally:
self.config.device.device.com_dict.get(
ComType.deploy_com).close()
def _reset_device(self, request, source):
json_config = JsonParser(source)
reset_cmd = []
......
......@@ -105,7 +105,7 @@ class EnvironmentManager(object):
self.managers[manager_instance.__class__.__name__] = \
manager_instance
except Exception as error:
LOG.debug(error)
LOG.debug("Env start error:%s" % error)
def env_stop(self):
for manager in self.managers.values():
......
......@@ -48,7 +48,7 @@ from _core.environment.manager_env import DeviceAllocationState
__all__ = ["DeployKit", "MountKit", "RootFsKit", "QueryKit", "LiteShellKit",
"LiteAppInstallKit"]
"LiteAppInstallKit", "DeployToolKit"]
LOG = platform_logger("KitLite")
RESET_CMD = "0xEF, 0xBE, 0xAD, 0xDE, 0x0C, 0x00, 0x87, 0x78, 0x00, 0x00, " \
......@@ -382,15 +382,15 @@ class MountKit(ITestKit):
result, status, _ = device.execute_command_with_timeout(
command="umount {}".format(mounted_dir),
timeout=2)
if result.find("Resource busy") == -1:
device.execute_command_with_timeout(command="rm -r {}".
format(mounted_dir)
, timeout=1)
if status:
break
LOG.info("umount failed,try "
"again {} time".format(mount_time))
time.sleep(1)
if result.find("Resource busy") == -1:
device.execute_command_with_timeout(command="rm -r {}".
format(mounted_dir)
, timeout=1)
def copy_file_as_temp(original_file, str_length):
......@@ -682,3 +682,30 @@ def process_product_info(message, product_info):
items = message[len("The "):].split(" is ")
product_info.setdefault(items[0].strip(),
items[1].strip().strip("[").strip("]"))
@Plugin(type=Plugin.TEST_KIT, id=CKit.deploytool)
class DeployToolKit(ITestKit):
def __init__(self):
self.config = None
self.auto_deploy = None
self.time_out = None
def __check_config__(self, config):
self.config = config
self.auto_deploy = get_config_value('auto_deploy', config, False)
self.time_out = get_config_value('timeout', config, False)
if self.auto_deploy or not self.time_out:
msg = "The config for deploytool kit is invalid " \
"with auto_deploy:{},timeout:{}".format(self.auto_deploy,
self.time_out)
LOG.error(msg, error_no="00108")
raise ParamError(msg, error_no="00108")
def __setup__(self, device, **kwargs):
args = kwargs
request = args.get("request", None)
request.confing.deploy_tool_kit = self.config
def __teardown__(self, device):
pass
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册