提交 a20e2e3b 编写于 作者: O openeuler-ci-bot 提交者: Gitee

!11 coding specifications rectification

Merge pull request !11 from cuixucui/master
......@@ -92,28 +92,48 @@ class Command:
return
def print_output(self):
"""
结果显示
:return:
"""
if self.output:
for line in self.output:
sys.stdout.write( line )
sys.stdout.write(line)
sys.stdout.write("\n")
sys.stdout.flush()
def print_errors(self):
"""
页面显示错误信息
:return:
"""
if self.errors:
for line in self.errors:
sys.stderr.write( line )
sys.stderr.write(line)
sys.stderr.write("\n")
sys.stderr.flush()
def pid(self):
"""
获取管道pid值
:return:
"""
if self.pipe:
return self.pipe.pid
def readline(self):
"""
按行读取输出信息
:return
"""
if self.pipe:
return self.pipe.stdout.readline()
def read(self):
"""
执行命令,并读取结果
:return:
"""
self.pipe = subprocess.Popen(self.command, shell=True,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT)
......@@ -124,7 +144,7 @@ class Command:
if self.pipe:
return self.pipe.poll()
def _get_str(self, regex=None, regex_group=None, single_line=True, return_list=False, ignore_errors=False):
def _get_str(self, regex=None, regex_group=None, single_line=True, return_list=False):
self.regex = regex
self.single_line = single_line
self.regex_group = regex_group
......@@ -203,8 +223,10 @@ class Command:
class CertCommandError(Exception):
def __init__(self, command, message):
super(CertCommandError, self).__init__()
self.message = message
self.command = command
self.__message = None
def __str__(self):
return "\"%s\" %s" % (self.command.command, self.message)
......
......@@ -29,7 +29,7 @@ from .reboot import Reboot
from .client import Client
class EulerCertification():
class EulerCertification:
def __init__(self):
self.certification = None
......@@ -50,12 +50,12 @@ class EulerCertification():
print("All cases are passed, test end.")
return True
devices = certdevice.get_devices()
self.devices = DeviceDocument(CertEnv.devicefile, devices)
oec_devices = certdevice.get_devices()
self.devices = DeviceDocument(CertEnv.devicefile, oec_devices)
self.devices.save()
# test_factory format example: [{"name":"nvme", "device":device, "run":True, "status":"PASS", "reboot":False}]
test_factory = self.get_tests(devices)
test_factory = self.get_tests(oec_devices)
self.update_factory(test_factory)
if not self.choose_tests():
return True
......@@ -76,7 +76,7 @@ class EulerCertification():
reboot.clean()
self.save(job)
return True
except Exception as e:
except (IOError, OSError, TypeError) as e:
print(e)
return False
......@@ -86,7 +86,7 @@ class EulerCertification():
Command("rm -rf %s" % CertEnv.certificationfile).run()
Command("rm -rf %s" % CertEnv.factoryfile).run()
Command("rm -rf %s" % CertEnv.devicefile).run()
except Exception as e:
except CertCommandError as e:
print(e)
return False
return True
......@@ -104,10 +104,10 @@ class EulerCertification():
factory_doc = FactoryDocument(CertEnv.factoryfile)
self.test_factory = factory_doc.get_factory()
cert_id = self.certification.get_certify()
oec_id = self.certification.get_certify()
hardware_info = self.certification.get_hardware()
self.client = Client(hardware_info, cert_id)
print(" Compatibility Test ID: ".ljust(30) + cert_id)
self.client = Client(hardware_info, oec_id)
print(" Compatibility Test ID: ".ljust(30) + oec_id)
print(" Hardware Info: ".ljust(30) + hardware_info)
print(" Product URL: ".ljust(30) + self.certification.get_url())
print(" OS Info: ".ljust(30) + self.certification.get_os())
......@@ -127,7 +127,7 @@ class EulerCertification():
cwd = os.getcwd()
os.chdir(os.path.dirname(doc_dir))
dir_name = "oech-" + datetime.datetime.now().strftime("%Y%m%d%H%M%S") + "-" + job.job_id
pack_name = dir_name +".tar"
pack_name = dir_name + ".tar"
cmd = Command("tar -cf %s %s" % (pack_name, dir_name))
try:
os.rename(job.job_id, dir_name)
......@@ -146,6 +146,7 @@ class EulerCertification():
def submit(self):
packages = list()
pattern = re.compile("^oech-[0-9]{14}-[0-9a-zA-Z]{10}.tar$")
files = []
for (root, dirs, files) in os.walk(CertEnv.datadirectory):
break
packages.extend(filter(pattern.search, files))
......@@ -168,12 +169,17 @@ class EulerCertification():
def upload(self, path, server):
print("Uploading...")
if not self.client:
cert_id = self.certification.get_certify()
oec_id = self.certification.get_certify()
hardware_info = self.certification.get_hardware()
self.client = Client(hardware_info, cert_id)
self.client = Client(hardware_info, oec_id)
return self.client.upload(path, server)
def get_tests(self, devices):
"""
get test items
:param devices:
:return:
"""
nodevice = ["cpufreq", "memory", "clock", "profiler", "system", "stress", "kdump", "perf", "acpi", "watchdog"]
ethernet = ["ethernet"]
infiniband = ["infiniband"]
......@@ -214,9 +220,9 @@ class EulerCertification():
empty_device = Device()
for device in devices:
if device.get_property("SUBSYSTEM") == "usb" and \
device.get_property("ID_VENDOR_FROM_DATABASE") == "Linux Foundation" and \
("2." in device.get_property("ID_MODEL_FROM_DATABASE") or \
"3." in device.get_property("ID_MODEL_FROM_DATABASE")):
device.get_property("ID_VENDOR_FROM_DATABASE") == "Linux Foundation" and \
("2." in device.get_property("ID_MODEL_FROM_DATABASE") or
"3." in device.get_property("ID_MODEL_FROM_DATABASE")):
sort_devices["usb"] = [empty_device]
continue
if device.get_property("PCI_CLASS") == "30000" or device.get_property("PCI_CLASS") == "38000":
......@@ -229,7 +235,7 @@ class EulerCertification():
sort_devices["tape"] = [device]
continue
if (device.get_property("DEVTYPE") == "disk" and not device.get_property("ID_TYPE")) or \
device.get_property("ID_TYPE") == "disk":
device.get_property("ID_TYPE") == "disk":
if "nvme" in device.get_property("DEVPATH"):
sort_devices["disk"] = [empty_device]
try:
......@@ -267,9 +273,9 @@ class EulerCertification():
continue
if device.get_property("ID_CDROM") == "1":
types = ["DVD_RW", "DVD_PLUS_RW", "DVD_R", "DVD_PLUS_R", "DVD", \
"BD_RE", "BD_R", "BD", "CD_RW", "CD_R", "CD"]
for type in types:
if device.get_property("ID_CDROM_" + type) == "1":
"BD_RE", "BD_R", "BD", "CD_RW", "CD_R", "CD"]
for dev_type in types:
if device.get_property("ID_CDROM_" + dev_type) == "1":
try:
sort_devices["cdrom"].extend([device])
except KeyError:
......@@ -280,7 +286,7 @@ class EulerCertification():
try:
Command("dmidecode").get_str("IPMI Device Information", single_line=False)
sort_devices["ipmi"] = [empty_device]
except:
except OSError as e:
pass
return sort_devices
......@@ -313,15 +319,19 @@ class EulerCertification():
try:
num = int(reply)
except:
except ValueError:
continue
if num > 0 and num <= len(self.test_factory):
self.test_factory[num-1]["run"] = not self.test_factory[num-1]["run"]
self.test_factory[num - 1]["run"] = not self.test_factory[num - 1]["run"]
continue
def show_tests(self):
print("\033[1;35m" + "No.".ljust(4) + "Run-Now?".ljust(10) \
"""
show test items
:return:
"""
print("\033[1;35m" + "No.".ljust(4) + "Run-Now?".ljust(10) \
+ "Status".ljust(8) + "Class".ljust(14) + "Device\033[0m")
num = 0
for test in self.test_factory:
......@@ -334,24 +344,28 @@ class EulerCertification():
status = test["status"]
device = test["device"].get_name()
run = "no"
if test["run"] == True:
if test["run"] is True:
run = "yes"
num = num + 1
if status == "PASS":
print("%-6d"%num + run.ljust(8) + "\033[0;32mPASS \033[0m" \
+ name.ljust(14) + "%s"%device)
print("%-6d" % num + run.ljust(8) + "\033[0;32mPASS \033[0m" \
+ name.ljust(14) + "%s" % device)
elif status == "FAIL":
print("%-6d"%num + run.ljust(8) + "\033[0;31mFAIL \033[0m" \
+ name.ljust(14) + "%s"%device)
print("%-6d" % num + run.ljust(8) + "\033[0;31mFAIL \033[0m" \
+ name.ljust(14) + "%s" % device)
elif status == "Force":
print("%-6d"%num + run.ljust(8) + "\033[0;33mForce \033[0m" \
+ name.ljust(14) + "%s"%device)
print("%-6d" % num + run.ljust(8) + "\033[0;33mForce \033[0m" \
+ name.ljust(14) + "%s" % device)
else:
print("%-6d"%num + run.ljust(8) + "\033[0;34mNotRun \033[0m" \
+ name.ljust(14) + "%s"%device)
print("%-6d" % num + run.ljust(8) + "\033[0;34mNotRun \033[0m" \
+ name.ljust(14) + "%s" % device)
def choose_tests(self):
"""
选择测试用例
:return:
"""
for test in self.test_factory:
if test["status"] == "PASS":
test["run"] = False
......@@ -396,7 +410,8 @@ class EulerCertification():
self.test_factory.sort(key=lambda k: k["name"])
FactoryDocument(CertEnv.factoryfile, self.test_factory).save()
def search_factory(self, obj_test, test_factory):
@staticmethod
def search_factory(obj_test, test_factory):
for test in test_factory:
if test["name"] == obj_test["name"] and test["device"].path == obj_test["device"].path:
return True
......
......@@ -28,6 +28,7 @@ def filter_char(str):
filtered += str[start:]
return filtered
class CertDevice:
def __init__(self):
self.devices = None
......@@ -60,12 +61,13 @@ class CertDevice:
properties["INFO"] = attribute
else:
break
except Exception as e:
except OSError as e:
print("Warning: get devices fail")
print(e)
self.devices.sort(key=lambda k: k.path)
return self.devices
class Device:
def __init__(self, properties=None):
self.path = ""
......
......@@ -21,8 +21,8 @@ from .sysinfo import SysInfo
from .env import CertEnv
class Document():
def __init__(self, filename, document=dict()):
class Document:
def __init__(self, filename, document={}):
self.document = document
self.filename = filename
......@@ -34,7 +34,7 @@ class Document():
with open(self.filename, "w+") as save_f:
json.dump(self.document, save_f, indent=4)
save_f.close()
except Exception as e:
except (IOError, ValueError) as e:
print("Error: doc save fail.")
print(e)
return False
......@@ -46,11 +46,13 @@ class Document():
self.document = json.load(load_f)
load_f.close()
return True
except:
except (IOError, json.decoder.JSONDecodeError):
return False
class CertDocument(Document):
def __init__(self, filename, document=dict()):
def __init__(self, filename, document={}):
super(CertDocument, self).__init__()
self.document = dict()
self.filename = filename
if not document:
......@@ -74,7 +76,7 @@ class CertDocument(Document):
self.document[key] = value
else:
break
except Exception as e:
except OSError as e:
print("Error: get hardware info fail.")
print(e)
......@@ -103,8 +105,10 @@ class CertDocument(Document):
def get_kernel(self):
return self.document["kernel"]
class DeviceDocument(Document):
def __init__(self, filename, devices=list()):
def __init__(self, filename, devices=[]):
super(DeviceDocument, self).__init__()
self.filename = filename
self.document = list()
if not devices:
......@@ -113,8 +117,10 @@ class DeviceDocument(Document):
for device in devices:
self.document.append(device.properties)
class FactoryDocument(Document):
def __init__(self, filename, factory=list()):
def __init__(self, filename, factory=[]):
super(FactoryDocument, self).__init__()
self.document = list()
self.filename = filename
if not factory:
......@@ -143,21 +149,22 @@ class FactoryDocument(Document):
class ConfigFile:
def __init__(self, filename):
super(ConfigFile, self).__init__()
self.filename = filename
self.parameters = dict()
self.config = list()
self.load()
def load(self):
file = open(self.filename)
self.config = file.readlines()
fp = open(self.filename)
self.config = fp.readlines()
for line in self.config:
if line.strip() and line.strip()[0] == "#":
continue
words = line.strip().split(" ")
if words[0]:
self.parameters[words[0]] = " ".join(words[1:])
file.close()
fp.close()
def get_parameter(self, name):
if self.parameters:
......@@ -199,7 +206,7 @@ class ConfigFile:
self.save()
def save(self):
file = open(self.filename, "w")
fp = open(self.filename, "w")
for line in self.config:
file.write(line)
file.close()
fp.write(line)
fp.close()
......@@ -50,7 +50,8 @@ class Job(object):
for parameter_name, parameter_value in self.args.test_parameters:
self.test_parameters[parameter_name] = parameter_value
def discover(self, testname, device, subtests_filter=None):
@staticmethod
def discover(testname, device, subtests_filter=None):
if not testname:
print("testname not specified, discover test failed")
return None
......
......@@ -50,6 +50,7 @@ class Log(object):
self.log.close()
self.log = None
class Logger():
def __init__(self, logname, logdir, out, err):
self.log = Log(logname, logdir)
......
......@@ -63,7 +63,7 @@ class Reboot:
try:
Command("systemctl daemon-reload").run_quiet()
Command("systemctl enable oech").run_quiet()
except:
except OSError as e:
print("Error: enable oech.service fail.")
return False
......@@ -81,7 +81,7 @@ class Reboot:
self.job.job_id = self.reboot["job_id"]
self.job.subtests_filter = self.reboot["rebootup"]
time_reboot = datetime.datetime.strptime(self.reboot["time"], "%Y%m%d%H%M%S")
except:
except KeyError:
print("Error: reboot file format not as expect.")
return False
......
......@@ -35,7 +35,7 @@ class SysInfo:
f = open(file)
text = f.read()
f.close()
except:
except IOError:
print("Release file not found.")
return
......
......@@ -65,15 +65,23 @@ def get_results():
for host in next(os.walk(dir_results))[1]:
dir_host = os.path.join(dir_results, host)
results[host] = {}
for id in next(os.walk(dir_host))[1]:
dir_id = os.path.join(dir_host, id)
for oec_id in next(os.walk(dir_host))[1]:
dir_id = os.path.join(dir_host, oec_id)
results[host][id] = next(os.walk(dir_id))[1]
return render_template('results.html', results=results)
@app.route('/results/<host>/<id>/<job>')
def get_job(host, id, job):
dir_job = os.path.join(dir_results, host, id, job)
@app.route('/results/<host>/<oec_id>/<job>')
def get_job(host, oec_id, job):
"""
获取job信息
:param host:
:param oec_id:
:param job:
:return:
"""
dir_job = os.path.join(dir_results, host, oec_id, job)
json_info = os.path.join(dir_job, 'compatibility.json')
json_results = os.path.join(dir_job, 'factory.json')
try:
......@@ -81,19 +89,27 @@ def get_job(host, id, job):
info = json.load(f)
with open(json_results, 'r') as f:
results = json.load(f)
except Exception as e:
except (IOError, json.decoder.JSONDecodeError) as e:
abort(404)
return render_template('job.html', host=host, id=id, job=job, info=info, results=results)
@app.route('/results/<host>/<id>/<job>/devices/<interface>')
def get_device(host, id, job, interface):
dir_job = os.path.join(dir_results, host, id, job)
return render_template('job.html', host=host, id=oec_id, job=job, info=info, results=results)
@app.route('/results/<host>/<oec_id>/<job>/devices/<interface>')
def get_device(host, oec_id, job, interface):
"""
获取硬件设备信息
:param host:
:param oec_id:
:param job:
:param interface:
:return:
"""
dir_job = os.path.join(dir_results, host, oec_id, job)
json_results = os.path.join(dir_job, 'factory.json')
try:
with open(json_results, 'r') as f:
results = json.load(f)
except Exception as e:
except (IOError, json.decoder.JSONDecodeError) as e:
abort(404)
for testcase in results:
device = testcase.get('device')
......@@ -103,44 +119,73 @@ def get_device(host, id, job, interface):
abort(404)
@app.route('/results/<host>/<id>/<job>/devices')
def get_devices(host, id, job):
dir_job = os.path.join(dir_results, host, id, job)
@app.route('/results/<host>/<oec_id>/<job>/devices')
def get_devices(host, oec_id, job):
"""
获取设备信息
:param host:
:param oec_id:
:param job:
:return:
"""
dir_job = os.path.join(dir_results, host, oec_id, job)
json_devices = os.path.join(dir_job, 'device.json')
try:
with open(json_devices, 'r') as f:
devices = json.load(f)
except Exception as e:
except (IOError, json.decoder.JSONDecodeError) as e:
abort(404)
return render_template('devices.html', devices=devices)
@app.route('/results/<host>/<id>/<job>/attachment')
def get_attachment(host, id, job):
dir_job = os.path.join(dir_results, host, id, job)
@app.route('/results/<host>/<oec_id>/<job>/attachment')
def get_attachment(host, oec_id, job):
"""
发送结果附件
:param host:
:param oec_id:
:param job:
:return:
"""
dir_job = os.path.join(dir_results, host, oec_id, job)
attachment = dir_job + '.tar.gz'
filedir = os.path.dirname(attachment)
filename = os.path.basename(attachment)
return send_from_directory(filedir, filename, as_attachment=True)
@app.route('/results/<host>/<id>/<job>/logs/<name>')
def get_log(host, id, job, name):
dir_job = os.path.join(dir_results, host, id, job)
@app.route('/results/<host>/<oec_id>/<job>/logs/<name>')
def get_log(host, oec_id, job, name):
"""
获取日志
:param host:
:param oec_id:
:param job:
:param name:
:return:
"""
dir_job = os.path.join(dir_results, host, oec_id, job)
logpath = os.path.join(dir_job, name + '.log')
if not os.path.exists(logpath):
logpath = os.path.join(dir_job, 'job.log')
try:
with open(logpath, 'r') as f:
log = f.read().split('\n')
except Exception as e:
except IOError as e:
abort(404)
return render_template('log.html', name=name, log=log)
@app.route('/results/<host>/<id>/<job>/submit')
def submit(host, id, job):
dir_job = os.path.join(dir_results, host, id, job)
@app.route('/results/<host>/<oec_id>/<job>/submit')
def submit(host, oec_id, job):
"""
提交测试结果
:param host:
:param oec_id:
:param job:
:return:
"""
dir_job = os.path.join(dir_results, host, oec_id, job)
tar_job = dir_job + '.tar.gz'
json_cert = os.path.join(dir_job, 'compatibility.json')
try:
......@@ -148,7 +193,7 @@ def submit(host, id, job):
cert = json.load(f)
with open(tar_job, 'rb') as f:
attachment = base64.b64encode(f.read())
except Exception as e:
except (IOError, json.decoder.JSONDecodeError) as e:
print(e)
abort(500)
......@@ -180,16 +225,19 @@ def submit(host, id, job):
@app.route('/api/job/upload', methods=['GET', 'POST'])
def upload_job():
"""
上传job
:return:
"""
host = request.values.get('host', '').strip().replace(' ', '-')
id = request.values.get('id', '').strip().replace(' ', '-')
oec_id = request.values.get('id', '').strip().replace(' ', '-')
job = request.values.get('job', '').strip().replace(' ', '-')
filetext = request.values.get('filetext', '')
if not(all([host, id, job, filetext])):
if not(all([host, oec_id, job, filetext])):
return render_template('upload.html', host=host, id=id, job=job,
filetext=filetext, ret='Failed'), 400
filetext=filetext, ret='Failed'), 400
dir_job = os.path.join(dir_results, host, id, job)
dir_job = os.path.join(dir_results, host, oec_id, job)
tar_job = dir_job + '.tar.gz'
if not os.path.exists(dir_job):
os.makedirs(dir_job)
......@@ -197,10 +245,10 @@ def upload_job():
with open(tar_job, 'wb') as f:
f.write(base64.b64decode(filetext))
os.system("tar xf '%s' -C '%s'" % (tar_job, os.path.dirname(dir_job)))
except Exception as e:
except (IOError, OSError) as e:
print(e)
abort(400)
return render_template('upload.html', host=host, id=id, job=job,
return render_template('upload.html', host=host, id=oec_id, job=job,
filetext=filetext, ret='Successful')
......@@ -229,7 +277,7 @@ def upload_file():
try:
with open(filepath, 'wb') as f:
f.write(base64.b64decode(filetext))
except Exception as e:
except IOError as e:
print(e)
abort(400)
return render_template('upload.html', filename=filename, filetext=filetext,
......@@ -303,7 +351,7 @@ def __get_ib_dev_port(ib_server_ip):
ibport = str(ibport)
return ibdev, ibport
except Exception as e:
except (OSError, IndexError, ValueError) as e:
print(e)
return None, None
......
......@@ -13,7 +13,7 @@
# Create: 2020-04-01
from hwcompatible.test import Test
from hwcompatible.command import Command
from hwcompatible.command import Command, CertCommandError
class AcpiTest(Test):
......@@ -22,11 +22,12 @@ class AcpiTest(Test):
Test.__init__(self)
self.requirements = ["acpica-tools"]
def test(self):
@staticmethod
def test():
try:
Command("acpidump").echo()
return True
except Exception as e:
except CertCommandError as e:
print(e)
return False
......@@ -31,6 +31,7 @@ class CDRomTest(Test):
self.method = None
self.device = None
self.type = None
self.args = None
self.ui = CommandUI()
self.test_dir = "/usr/share/doc"
......@@ -65,33 +66,34 @@ class CDRomTest(Test):
return False
return True
def get_type(self, device):
@staticmethod
def get_type(device):
if not device:
return None
bd_types = ["BD_RE", "BD_R", "BD"]
dvd_types = ["DVD_RW", "DVD_PLUS_RW", "DVD_R", "DVD_PLUS_R", "DVD"]
cd_types = ["CD_RW", "CD_R", "CD"]
for type in bd_types:
if device.get_property("ID_CDROM_" + type) == "1":
return type
for type in dvd_types:
if device.get_property("ID_CDROM_" + type) == "1":
return type
for type in cd_types:
if device.get_property("ID_CDROM_" + type) == "1":
return type
for bd_type in bd_types:
if device.get_property("ID_CDROM_" + bd_type) == "1":
return bd_type
for bd_type in dvd_types:
if device.get_ertpropy("ID_CDROM_" + bd_type) == "1":
return bd_type
for bd_type in cd_types:
if device.get_property("ID_CDROM_" + bd_type) == "1":
return bd_type
print("Can not find pr)oper test-type for %s." % device.get_name())
return None
def get_mode(self, type):
if not type:
def get_mode(self, device_type):
if not device_type:
return
if "RW" in type or "RE" in type:
if "RW" in device_type or "RE" in device_type:
self.method = "rw_test"
elif "_R" in type:
elif "_R" in device_type:
self.method = "write_test"
else:
self.method = "read_test"
......@@ -195,7 +197,8 @@ class CDRomTest(Test):
print(e)
return False
def cmp_tree(self, dir1, dir2):
@staticmethod
def cmp_tree(dir1, dir2):
if not (dir1 and dir2):
print("Error: invalid input dir.")
return False
......@@ -216,14 +219,14 @@ class CDRomTest(Test):
Command("eject %s" % device).run()
print("tray ejected.")
sys.stdout.flush()
except:
except CertCommandError as e:
pass
try:
Command("eject -t %s" % device).run()
print("tray auto-closed.\n")
sys.stdout.flush()
except:
except CertCommandError as e:
print("Could not auto-close the tray, please close the tray manually.")
self.ui.prompt_confirm("Done well?")
......
......@@ -21,7 +21,8 @@ clock_dir = os.path.dirname(os.path.realpath(__file__))
class ClockTest(Test):
def test(self):
@staticmethod
def test():
return 0 == os.system("cd %s; ./clock" % clock_dir)
......
......@@ -59,7 +59,8 @@ class CPU:
return True
def set_freq(self, freq, cpu='all'):
@staticmethod
def set_freq(freq, cpu='all'):
cmd = Command("cpupower -c %s frequency-set --freq %s" % (cpu, freq))
try:
cmd.run()
......@@ -68,7 +69,8 @@ class CPU:
print(e)
return False
def get_freq(self, cpu):
@staticmethod
def get_freq(cpu):
cmd = Command("cpupower -c %s frequency-info -w" % cpu)
try:
return int(cmd.get_str(r'.* frequency: (?P<freq>\d+) .*', 'freq', False))
......@@ -76,7 +78,8 @@ class CPU:
print(e)
return False
def set_governor(self, governor, cpu='all'):
@staticmethod
def set_governor(governor, cpu='all'):
cmd = Command("cpupower -c %s frequency-set --governor %s" % (cpu, governor))
try:
cmd.run()
......@@ -85,7 +88,8 @@ class CPU:
print(e)
return False
def get_governor(self, cpu):
@staticmethod
def get_governor(cpu):
cmd = Command("cpupower -c %s frequency-info -p" % cpu)
try:
return cmd.get_str(r'.* governor "(?P<governor>\w+)".*', 'governor', False)
......@@ -93,7 +97,8 @@ class CPU:
print(e)
return False
def find_path(self, parent_dir, target_name):
@staticmethod
def find_path(parent_dir, target_name):
cmd = Command("find %s -name %s" % (parent_dir, target_name))
try:
cmd.run()
......@@ -262,7 +267,7 @@ class CPUFreqTest(Test):
(target_cpu, target_cpu_governor))
return False
print("[.] The governor of CPU%s is %s." %
(target_cpu, target_cpu_governor))
(target_cpu, target_cpu_governor))
load_test = Load(target_cpu)
load_test.run()
......
......@@ -32,7 +32,7 @@ class DiskTest(Test):
self.filesystems = ["ext4"]
self.ui = CommandUI()
def setup(self, args=None):
def setup(self):
try:
print("Disk Info:")
Command("fdisk -l").echo(ignore_errors=True)
......@@ -50,7 +50,7 @@ class DiskTest(Test):
Command("cat /proc/mdstat").echo(ignore_errors=True)
sys.stdout.flush()
print("\n")
except Exception as e:
except CertCommandError as e:
print("Warning: could not get disk info")
print(e)
......@@ -195,7 +195,7 @@ class DiskTest(Test):
if not self.do_fio(path, size, opts):
return_code = False
break
except Exception as e:
except CertCommandError as e:
print(e)
return_code = False
break
......@@ -205,7 +205,8 @@ class DiskTest(Test):
print("#############")
return return_code
def do_fio(self, filepath, size, option):
@staticmethod
def do_fio(filepath, size, option):
if os.path.isdir(filepath):
file_opt = "-directory=%s" % filepath
else:
......
......@@ -13,7 +13,7 @@
# Create: 2020-04-01
from hwcompatible.test import Test
from hwcompatible.command import Command
from hwcompatible.command import Command, CertCommandError
class IpmiTest(Test):
......@@ -22,21 +22,23 @@ class IpmiTest(Test):
Test.__init__(self)
self.requirements = ["OpenIPMI", "ipmitool"]
def start_ipmi(self):
@staticmethod
def start_ipmi():
try:
Command("systemctl start ipmi").run()
Command("systemctl status ipmi.service").get_str(regex="Active: active", single_line=False)
except:
except CertCommandError as e:
print("ipmi service cant't be started")
return False
return True
def ipmitool(self):
@staticmethod
def ipmitool():
cmd_list = ["ipmitool fru","ipmitool sensor"]
for cmd in cmd_list:
try:
Command(cmd).echo()
except:
except CertCommandError as e:
print("%s return error." % cmd)
return False
return True
......@@ -48,6 +50,7 @@ class IpmiTest(Test):
return False
return True
if __name__ == "__main__":
i = IpmiTest()
i.test()
......
......@@ -35,8 +35,8 @@ class KdumpTest(Test):
def test(self):
try:
Command("cat /proc/cmdline").get_str("crashkernel=[^\ ]*")
except:
Command("cat /proc/cmdline").get_str(r"crashkernel=[^\ ]*")
except (OSError, ValueError):
print("Error: no crashkernel found.")
return False
......@@ -53,7 +53,7 @@ class KdumpTest(Test):
try:
Command("systemctl restart kdump").run()
Command("systemctl status kdump").get_str(regex="Active: active", single_line=False)
except:
except (OSError, ValueError):
print("Error: kdump service not working.")
return False
......@@ -79,12 +79,12 @@ class KdumpTest(Test):
if config.get_parameter("path"):
self.vmcore_path = config.get_parameter("path")
dir_pattern = re.compile("(?P<ipaddr>[0-9]+\.[0-9]+\.[0-9]+)-(?P<date>[0-9]+(-|\.)[0-9]+(-|\.)[0-9]+)-(?P<time>[0-9]+:[0-9]+:[0-9]+)")
dir_pattern = re.compile(r"(?P<ipaddr>[0-9]+\.[0-9]+\.[0-9]+)-(?P<date>[0-9]+(-|\.)[0-9]+(-|\.)[0-9]+)-(?P<time>[0-9]+:[0-9]+:[0-9]+)")
vmcore_dirs = list()
for (root, dirs, files) in os.walk(self.vmcore_path):
for dir in dirs:
if dir_pattern.search(dir):
vmcore_dirs.append(dir)
for eve_dir in dirs:
if dir_pattern.search(eve_dir):
vmcore_dirs.append(eve_dir)
vmcore_dirs.sort()
vmcore_file = os.path.join(self.vmcore_path, vmcore_dirs[-1], "vmcore")
......
......@@ -30,12 +30,13 @@ class MemoryTest(Test):
self.system_memory = 0
self.swap_memory = 0
self.huge_pages = 1000
self.hugepage_size = 0
self.hugepage_total = 0
self.hugepage_free = 0
self.retry_list = list()
self.test_dir = os.path.dirname(os.path.realpath(__file__))
def setup(self, args=None):
def setup(self):
self.get_memory()
def test(self):
......@@ -58,6 +59,7 @@ class MemoryTest(Test):
self.free_memory = 0
self.system_memory = 0
self.swap_memory = 0
self.hugepage_size = 0
self.hugepage_total = 0
self.hugepage_free = 0
while True:
......@@ -160,7 +162,8 @@ class MemoryTest(Test):
return False
return True
def hot_plug_verify(self):
@staticmethod
def hot_plug_verify():
kernel = Command("uname -r").read()
config_file = "/boot/config-" + kernel
if not os.path.exists(config_file):
......@@ -196,21 +199,23 @@ class MemoryTest(Test):
if total_mem_3 != total_mem_1:
return False
def online_memory(self, memory_path):
@staticmethod
def online_memory(memory_path):
try:
Command("echo 1 > %s/online" % memory_path).run()
Command("cat %s/state" % memory_path).get_str("online")
return True
except:
except CertCommandError as e:
print("Error: fail to online %s." % memory_path)
return False
def offline_memory(self, memory_path):
@staticmethod
def offline_memory(memory_path):
try:
Command("echo 0 > %s/online" % memory_path).run()
Command("cat %s/state" % memory_path).get_str("offline")
return True
except:
except CertCommandError as e:
print("Error: fail to online %s." % memory_path)
return False
......@@ -242,7 +247,7 @@ class MemoryTest(Test):
Command("cat %s/removable" % memory_path).get_str("1")
print("%s is removable, start testing..." % os.path.basename(memory_path))
test_flag = 1
except:
except CertCommandError as e:
continue
if not self.hotplug_memory_test(memory_path):
print("%s hotplug test fail." % os.path.basename(memory_path))
......
......@@ -48,7 +48,8 @@ class NetworkTest(Test):
self.target_bandwidth_percent = 0.8
self.testfile = 'testfile'
def ifdown(self, interface):
@staticmethod
def ifdown(interface):
os.system("ip link set down %s" % interface)
for _ in range(5):
if 0 == os.system("ip link show %s | grep 'state DOWN'" % interface):
......@@ -56,7 +57,8 @@ class NetworkTest(Test):
time.sleep(1)
return False
def ifup(self, interface):
@staticmethod
def ifup(interface):
os.system("ip link set up %s" % interface)
for _ in range(5):
time.sleep(1)
......@@ -64,7 +66,8 @@ class NetworkTest(Test):
return True
return False
def get_other_interfaces(self):
@staticmethod
def get_other_interfaces():
ignore_interfaces = ['^lo', '^v', 'docker', 'br', 'bond']
cmd = "ip route show default | awk '/default/ {print $5}'"
c = Command(cmd)
......
......@@ -16,7 +16,7 @@ import os
import sys
import argparse
from hwcompatible.test import Test
from hwcompatible.command import Command
from hwcompatible.command import Command, CertCommandError
from hwcompatible.device import CertDevice, Device
......@@ -69,12 +69,13 @@ class NvmeTest(Test):
Command("nvme list").echo(ignore_errors=True)
return True
except Exception as e:
except CertCommandError as e:
print("Error: nvme cmd fail.")
print(e)
return False
def in_use(self, disk):
@staticmethod
def in_use(disk):
os.system("swapon -a 2>/dev/null")
swap_file = open("/proc/swaps", "r")
swap = swap_file.read()
......
......@@ -60,6 +60,7 @@ class PerfTest(Test):
return False
return True
if __name__ == "__main__":
main = PerfTest()
main.test()
......@@ -31,6 +31,8 @@ class SystemTest(Test):
Test.__init__(self)
self.pri = 1
self.sysinfo = SysInfo(CertEnv.releasefile)
self.args = None
self.logdir = None
def setup(self, args=None):
self.args = args or argparse.Namespace()
......@@ -56,7 +58,8 @@ class SystemTest(Test):
return return_code
def check_certrpm(self):
@staticmethod
def check_certrpm():
print("\nChecking installed cert package...")
return_code = True
for cert_package in ["oec-hardware"]:
......@@ -66,7 +69,7 @@ class SystemTest(Test):
sys.stdout.flush()
if rpm_verify.output and len(rpm_verify.output) > 0:
return_code = False
except:
except OSError as err:
print("Error: files in %s have been tampered." % cert_package)
return_code = False
return return_code
......@@ -92,7 +95,7 @@ class SystemTest(Test):
if kernel_dict.document[os_version] != self.sysinfo.kernel_version:
print("Error: kernel %s check GA status fail." % self.sysinfo.kernel_version)
return_code = False
except:
except KeyError:
print("Error: %s is not supported." % os_version)
return_code = False
......@@ -127,7 +130,7 @@ class SystemTest(Test):
print("")
tainted_file.close()
except Exception as e:
except (IOError, ValueError) as e:
print(e)
print("Error: could not determine if kernel is tainted.")
return_code = False
......@@ -140,15 +143,16 @@ class SystemTest(Test):
try:
params = Command("cat /proc/cmdline").get_str()
print("Boot Parameters: %s" % params)
except Exception as e:
except OSError as e:
print(e)
print("Error: could not determine boot parameters.")
return_code = False
return return_code
def get_modules(self, sign):
pattern = re.compile("^(?P<mod_name>\w+)[\s\S]+\((?P<signs>[A-Z]+)\)")
@staticmethod
def get_modules(sign):
pattern = re.compile(r"^(?P<mod_name>\w+)[\s\S]+\((?P<signs>[A-Z]+)\)")
proc_modules = open("/proc/modules")
modules = list()
for line in proc_modules.readlines():
......@@ -161,10 +165,10 @@ class SystemTest(Test):
def abi_check(self, module):
whitelist_path = [("/lib/modules/kabi-current/kabi_whitelist_" + self.sysinfo.arch),
("/lib/modules/kabi/kabi_whitelist_" + self.sysinfo.arch),
("/usr/src/kernels/%s/kabi_whitelist" % self.sysinfo.kernel)
]
("/lib/modules/kabi/kabi_whitelist_" + self.sysinfo.arch),
("/usr/src/kernels/%s/kabi_whitelist" % self.sysinfo.kernel)
]
whitelist = ""
for whitelist in whitelist_path:
if os.path.exists(whitelist):
break
......@@ -206,7 +210,8 @@ class SystemTest(Test):
print("")
return True
def read_abi_whitelist(self, whitelist):
@staticmethod
def read_abi_whitelist(whitelist):
symbols = list()
if not os.path.isfile(whitelist):
print("Error: Cannot read whitelist file")
......@@ -253,17 +258,19 @@ class SystemTest(Test):
nm.close()
return self.readSymbols(symbols)
def get_modulefile(self, module):
@staticmethod
def get_modulefile(module):
try:
modulefile = Command("modinfo -F filename %s" % module).get_str()
if os.path.islink(modulefile):
modulefile = os.readlink(modulefile)
return modulefile
except:
except OSError:
print("Error: could no find module file for %s:" % module)
return None
def check_selinux(self):
@staticmethod
def check_selinux():
print("\nChecking selinux...")
status = os.system("/usr/sbin/sestatus | grep 'SELinux status' | grep -qw 'enabled'")
mode = os.system("/usr/sbin/sestatus | grep 'Current mode' | grep -qw 'enforcing'")
......
......@@ -96,10 +96,11 @@ class UsbTest(Test):
devices = CertDevice().get_devices()
usb_devices = list()
for device in devices:
if (device.get_property("SUBSYSTEM") != "usb" or \
device.get_property("DEVTYPE") != "usb_device" or \
device.get_property("ID_BUS") != "usb" or \
device.get_property("BUSNUM") == "" or device.get_property("DEVNUM") == ""):
if (device.get_property("SUBSYSTEM") != "usb" or
device.get_property("DEVTYPE") != "usb_device" or
device.get_property("ID_BUS") != "usb" or
device.get_property("BUSNUM") == "" or
device.get_property("DEVNUM") == ""):
continue
else:
usb_devices.append(device.path)
......
......@@ -59,6 +59,7 @@ class WatchDogTest(Test):
print("")
return False
def startup(self):
@staticmethod
def startup():
print("Recover from watchdog.")
return True
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册