提交 42fbfa0d 编写于 作者: O openeuler-ci-bot 提交者: Gitee

!12 Code specification rectification

Merge pull request !12 from cuixucui/master
......@@ -33,11 +33,11 @@
OS 厂商为了扩大自己产品的兼容性范围,常常寻求与硬件厂商的合作,进行兼容性测试。OS 厂商制定一个测试标准,并提供测试用例,硬件厂商进行实际的测试,测试通过后,OS 厂商和硬件厂商将共同对结果负责。这是一个双赢的合作,双方都可以藉此推销自己的产品。
证目的就是保证 OS 与硬件平台的兼容性,认证仅限于基本功能验证,不包括性能测试等其它测试。
证目的就是保证 OS 与硬件平台的兼容性,认证仅限于基本功能验证,不包括性能测试等其它测试。
欧拉硬件兼容性认证测试框架有如下特点:
openEuler硬件兼容性验证测试框架有如下特点:
1. 为满足可信要求,必须使用欧拉操作系统,不能随意重编/插入内核模块。
1. 为满足可信要求,必须使用openEuler操作系统,不能随意重编/插入内核模块。
2. 通过扫描机制自适应发现硬件列表,来确定要运行的测试用例集合。
3. 面向对象抽象各种硬件类型以及测试用例类,用于扩展开发。
......@@ -141,7 +141,7 @@ OS 厂商为了扩大自己产品的兼容性范围,常常寻求与硬件厂
dnf install oec-hardware-server-XXX.rpm
```
2. 服务端 web 展示页面部分组件系统本身不提供,需要使用 `pip3` 安装(请自行配置可用 pip 源)。
2. 服务端 web 展示页面需要的部分组件系统本身不提供,需要使用 `pip3` 安装(请自行配置可用 pip 源)。
```
pip3 install Flask Flask-bootstrap uwsgi
......@@ -276,7 +276,8 @@ OS 厂商为了扩大自己产品的兼容性范围,常常寻求与硬件厂
## 已有测试项
1. **system**
注意:在安装openEuler系统的时候,非最小安装会引入kmod-kvdo模块,此模块会修改内核,导致system测试项"检测内核是否被修改"不过。若非有意引入,可卸载该模块。
- 检查本工具是否被修改。
- 检查 OS 版本和 kernel 版本是否匹配。
- 检查内核是否被修改/感染。
......
......@@ -24,16 +24,25 @@ except ImportError:
class Client:
def __init__(self, host, id):
"""
upload client
"""
def __init__(self, host, oec_id):
self.host = host
self.id = id
self.id = oec_id
self.form = {}
def upload(self, file, server='localhost'):
filename = os.path.basename(file)
def upload(self, files, server='localhost'):
"""
upload client request
:param files:
:param server:
:return:
"""
filename = os.path.basename(files)
try:
job = filename.split('.')[0]
with open(file, 'rb') as f:
with open(files, 'rb') as f:
filetext = base64.b64encode(f.read())
except Exception as e:
print(e)
......@@ -69,6 +78,5 @@ class Client:
if __name__ == '__main__':
c = Client(' Taishan 2280', ' Testid-123523')
import sys
file = sys.argv[1]
c.upload(file)
file_name = sys.argv[1]
c.upload(file_name)
......@@ -222,6 +222,9 @@ class Command:
class CertCommandError(Exception):
"""
Cert command error handling
"""
def __init__(self, command, message):
super(CertCommandError, self).__init__()
self.message = message
......
......@@ -17,11 +17,18 @@ import readline
class CommandUI:
"""
Command user interface selection
"""
def __init__(self, echoResponses=False):
self.echo = echoResponses
def printPipe(self, pipe):
"""
print pipe data
:param pipe:
:return:
"""
while 1:
line = pipe.readline()
if line:
......@@ -30,6 +37,12 @@ class CommandUI:
return pipe.close()
def prompt(self, question, choices=None):
"""
choice test item
:param question:
:param choices:
:return:
"""
while True:
sys.stdout.write(question)
if choices:
......@@ -45,6 +58,12 @@ class CommandUI:
sys.stdout.write("Please enter a choice\n")
def prompt_integer(self, question, choices=None):
"""
choice test item
:param question:
:param choices:
:return:
"""
while True:
sys.stdout.write(question)
if choices:
......@@ -62,6 +81,11 @@ class CommandUI:
sys.stdout.write("Please enter an integer.\n")
def prompt_confirm(self, question):
"""
Command interface displays confirmation information
:param question:
:return:
"""
YES = "y"
SAMEASYES = ["y", "yes"]
NO = "n"
......@@ -72,9 +96,16 @@ class CommandUI:
return True
if reply.lower() in SAMEASNO:
return False
sys.stdout.write("Please reply %s or %s.\n" %(YES, NO))
sys.stdout.write("Please reply %s or %s.\n" % (YES, NO))
def prompt_edit(self, label, value, choices=None):
"""
prompt choice edit
:param label:
:param value:
:param choices:
:return:
"""
if not value:
value = ""
if choices:
......
......@@ -29,8 +29,10 @@ from .reboot import Reboot
from .client import Client
class EulerCertification:
class EulerCertification():
"""
Main program of oec-hardware
"""
def __init__(self):
self.certification = None
self.test_factory = list()
......@@ -39,6 +41,10 @@ class EulerCertification:
self.client = None
def run(self):
"""
Openeuler compatibility verification
:return:
"""
print("The openEuler Hardware Compatibility Test Suite")
self.load()
certdevice = CertDevice()
......@@ -66,6 +72,10 @@ class EulerCertification:
self.save(job)
def run_rebootup(self):
"""
rebootup
:return:
"""
try:
self.load()
args = argparse.Namespace(test_factory=self.test_factory)
......@@ -76,22 +86,30 @@ class EulerCertification:
reboot.clean()
self.save(job)
return True
except (IOError, OSError, TypeError) as e:
except Exception as e:
print(e)
return False
def clean(self):
"""
clean all compatibility test file
:return:
"""
if self.ui.prompt_confirm("Are you sure to clean all compatibility test data?"):
try:
Command("rm -rf %s" % CertEnv.certificationfile).run()
Command("rm -rf %s" % CertEnv.factoryfile).run()
Command("rm -rf %s" % CertEnv.devicefile).run()
except CertCommandError as e:
except Exception as e:
print(e)
return False
return True
def load(self):
"""
load certification
:return:
"""
if not os.path.exists(CertEnv.datadirectory):
os.mkdir(CertEnv.datadirectory)
......@@ -116,6 +134,11 @@ class EulerCertification:
print("")
def save(self, job):
"""
collect Job log
:param job:
:return:
"""
doc_dir = os.path.join(CertEnv.logdirectoy, job.job_id)
if not os.path.exists(doc_dir):
return
......@@ -144,6 +167,10 @@ class EulerCertification:
os.chdir(cwd)
def submit(self):
"""
submit last result
:return:
"""
packages = list()
pattern = re.compile("^oech-[0-9]{14}-[0-9a-zA-Z]{10}.tar$")
files = []
......@@ -167,6 +194,12 @@ class EulerCertification:
os.remove(os.path.join(CertEnv.datadirectory, filename))
def upload(self, path, server):
"""
uploaded result to server
:param path:
:param server:
:return:
"""
print("Uploading...")
if not self.client:
oec_id = self.certification.get_certify()
......@@ -216,6 +249,11 @@ class EulerCertification:
return test_factory
def sort_tests(self, devices):
"""
sort tests
:param devices:
:return:
"""
sort_devices = dict()
empty_device = Device()
for device in devices:
......@@ -286,12 +324,16 @@ class EulerCertification:
try:
Command("dmidecode").get_str("IPMI Device Information", single_line=False)
sort_devices["ipmi"] = [empty_device]
except OSError as e:
except:
pass
return sort_devices
def edit_tests(self):
"""
edit test items
:return:
"""
while True:
for test in self.test_factory:
if test["name"] == "system":
......@@ -319,7 +361,7 @@ class EulerCertification:
try:
num = int(reply)
except ValueError:
except:
continue
if num > 0 and num <= len(self.test_factory):
......@@ -363,7 +405,7 @@ class EulerCertification:
def choose_tests(self):
"""
选择测试用例
choose test behavior
:return:
"""
for test in self.test_factory:
......@@ -387,6 +429,10 @@ class EulerCertification:
return self.choose_tests()
def check_result(self):
"""
check test result
:return:
"""
if len(self.test_factory) == 0:
return False
for test in self.test_factory:
......@@ -395,6 +441,11 @@ class EulerCertification:
return True
def update_factory(self, test_factory):
"""
update tese factory
:param test_factory:
:return:
"""
if not self.test_factory:
self.test_factory = test_factory
else:
......@@ -410,8 +461,13 @@ class EulerCertification:
self.test_factory.sort(key=lambda k: k["name"])
FactoryDocument(CertEnv.factoryfile, self.test_factory).save()
@staticmethod
def search_factory(obj_test, test_factory):
def search_factory(self, obj_test, test_factory):
"""
Determine whether test exists by searching test_factory
:param obj_test:
:param test_factory:
:return:
"""
for test in test_factory:
if test["name"] == obj_test["name"] and test["device"].path == obj_test["device"].path:
return True
......
......@@ -15,25 +15,37 @@
from .command import Command, CertCommandError
def filter_char(str):
ascii_blacklist = map(chr, range(9) + range(11,13) + range(14,32))
def filter_char(string):
"""
fileter char
:param string:
:return:
"""
ascii_blacklist = map(chr, range(9) + range(11, 13) + range(14, 32))
filtered = u''
start = 0
for i in range(len(str)):
c = str[i]
if c in ascii_blacklist or (type(str) != unicode and ord(c) >= 128):
for i in range(len(string)):
c = string[i]
if c in ascii_blacklist or (type(string) != unicode and ord(c) >= 128):
if start < i:
filtered += str[start:i]
start = i+1
filtered += str[start:]
filtered += string[start:i]
start = i + 1
filtered += string[start:]
return filtered
class CertDevice:
"""
Certified device
"""
def __init__(self):
self.devices = None
def get_devices(self):
"""
get devices information
:return:
"""
self.devices = list()
try:
pipe = Command("udevadm info --export-db")
......@@ -49,19 +61,19 @@ class CertDevice:
self.devices.append(device)
properties = dict()
else:
property = line.split(":", 1)
if len(property) == 2:
type = property[0].strip('\ \'\n')
attribute = property[1].strip('\ \'\n')
if type == "E":
prop = line.split(":", 1)
if len(prop) == 2:
tp = prop[0].strip('\ \'\n')
attribute = prop[1].strip('\ \'\n')
if tp == "E":
keyvalue = attribute.split("=", 1)
if len(keyvalue) == 2:
properties[keyvalue[0]]= keyvalue[1]
elif type == "P":
properties[keyvalue[0]] = keyvalue[1]
elif tp == "P":
properties["INFO"] = attribute
else:
break
except OSError as e:
except Exception as e:
print("Warning: get devices fail")
print(e)
self.devices.sort(key=lambda k: k.path)
......@@ -69,6 +81,9 @@ class CertDevice:
class Device:
"""
get device properties
"""
def __init__(self, properties=None):
self.path = ""
if properties:
......@@ -77,13 +92,22 @@ class Device:
else:
self.properties = dict()
def get_property(self, property):
def get_property(self, prop):
"""
get properties
:param prop:
:return:
"""
try:
return self.properties[property]
return self.properties[prop]
except KeyError:
return ""
def get_name(self):
"""
get property value
:return:
"""
if "INTERFACE" in self.properties.keys():
return self.properties["INTERFACE"]
elif "DEVNAME" in self.properties.keys():
......
......@@ -15,14 +15,17 @@
import json
from .commandUI import CommandUI
from .command import Command, CertCommandError
from .command import Command
from .device import Device
from .sysinfo import SysInfo
from .env import CertEnv
class Document:
def __init__(self, filename, document={}):
class Document():
"""
Read and write documents
"""
def __init__(self, filename, document=''):
self.document = document
self.filename = filename
......@@ -34,7 +37,7 @@ class Document:
with open(self.filename, "w+") as save_f:
json.dump(self.document, save_f, indent=4)
save_f.close()
except (IOError, ValueError) as e:
except Exception as e:
print("Error: doc save fail.")
print(e)
return False
......@@ -46,13 +49,16 @@ class Document:
self.document = json.load(load_f)
load_f.close()
return True
except (IOError, json.decoder.JSONDecodeError):
except:
return False
class CertDocument(Document):
def __init__(self, filename, document={}):
super(CertDocument, self).__init__()
"""
get hardware and release information
"""
def __init__(self, filename, document=''):
# super(CertDocument, self).__init__(filename, document)
self.document = dict()
self.filename = filename
if not document:
......@@ -61,6 +67,10 @@ class CertDocument(Document):
self.documemt = document
def new(self):
"""
new document object
:return:
"""
try:
pipe = Command("/usr/sbin/dmidecode -t 1")
pipe.start()
......@@ -68,15 +78,15 @@ class CertDocument(Document):
while True:
line = pipe.readline()
if line:
property = line.split(":", 1)
if len(property) == 2:
key = property[0].strip()
value = property[1].strip()
property_right = line.split(":", 1)
if len(property_right) == 2:
key = property_right[0].strip()
value = property_right[1].strip()
if key in ["Manufacturer", "Product Name", "Version"]:
self.document[key] = value
else:
break
except OSError as e:
except Exception as e:
print("Error: get hardware info fail.")
print(e)
......@@ -107,8 +117,11 @@ class CertDocument(Document):
class DeviceDocument(Document):
def __init__(self, filename, devices=[]):
super(DeviceDocument, self).__init__()
"""
get device document
"""
def __init__(self, filename, devices=''):
# super(DeviceDocument, self).__init__(filename, devices)
self.filename = filename
self.document = list()
if not devices:
......@@ -119,8 +132,12 @@ class DeviceDocument(Document):
class FactoryDocument(Document):
def __init__(self, filename, factory=[]):
super(FactoryDocument, self).__init__()
"""
get factory from file or factory parameter
"""
def __init__(self, filename, factory=''):
# super(FactoryDocument, self).__init__(filename, factory)
self.document = list()
self.filename = filename
if not factory:
......@@ -135,6 +152,10 @@ class FactoryDocument(Document):
self.document.append(element)
def get_factory(self):
"""
Get factory parameter information
:return:
"""
factory = list()
for element in self.document:
test = dict()
......@@ -148,8 +169,10 @@ class FactoryDocument(Document):
class ConfigFile:
"""
Get parameters from configuration file
"""
def __init__(self, filename):
super(ConfigFile, self).__init__()
self.filename = filename
self.parameters = dict()
self.config = list()
......@@ -190,6 +213,11 @@ class ConfigFile:
return False
def remove_parameter(self, name):
"""
Update configuration information
:param name:
:return:
"""
if self.getParameter(name):
del self.parameters[name]
newconfig = list()
......@@ -206,6 +234,10 @@ class ConfigFile:
self.save()
def save(self):
"""
Save the config property value to a file
:return:
"""
fp = open(self.filename, "w")
for line in self.config:
fp.write(line)
......
......@@ -14,6 +14,9 @@
class CertEnv:
"""
Certification file path
"""
environmentfile = "/etc/oech.json"
releasefile = "/etc/os-release"
datadirectory = "/var/oech"
......
......@@ -23,12 +23,13 @@ from .env import CertEnv
from .command import Command, CertCommandError
from .commandUI import CommandUI
from .log import Logger
from .document import FactoryDocument
from .reboot import Reboot
class Job(object):
"""
Test task management
"""
def __init__(self, args=None):
"""
Creates an instance of Job class.
......@@ -50,13 +51,19 @@ class Job(object):
for parameter_name, parameter_value in self.args.test_parameters:
self.test_parameters[parameter_name] = parameter_value
@staticmethod
def discover(testname, device, subtests_filter=None):
def discover(self, testname, subtests_filter=None):
"""
discover test
:param testname:
:param subtests_filter:
:return:
"""
if not testname:
print("testname not specified, discover test failed")
return None
filename = testname + ".py"
dirpath = ''
for (dirpath, dirs, files) in os.walk(CertEnv.testdirectoy):
if filename in files:
break
......@@ -91,13 +98,18 @@ class Job(object):
return None
def create_test_suite(self, subtests_filter=None):
"""
Create test suites
:param subtests_filter:
:return:
"""
if self.test_suite:
return
self.test_suite = []
for test in self.test_factory:
if test["run"]:
testclass = self.discover(test["name"], test["device"], subtests_filter)
testclass = self.discover(test["name"], subtests_filter)
if testclass:
testcase = dict()
testcase["test"] = testclass
......@@ -114,6 +126,10 @@ class Job(object):
print("No test found")
def check_test_depends(self):
"""
Install dependency packages
:return: depending
"""
required_rpms = []
for tests in self.test_suite:
for pkg in tests["test"].requirements:
......@@ -136,11 +152,19 @@ class Job(object):
return True
def _run_test(self, testcase, subtests_filter=None):
"""
Start a testing item
:param testcase:
:param subtests_filter:
:return:
"""
name = testcase["name"]
if testcase["device"].get_name():
name = testcase["name"] + "-" + testcase["device"].get_name()
logname = name + ".log"
reboot = None
test = None
logger = None
try:
test = testcase["test"]
logger = Logger(logname, self.job_id, sys.stdout, sys.stderr)
......@@ -171,6 +195,11 @@ class Job(object):
return return_code
def run_tests(self, subtests_filter=None):
"""
Start testing
:param subtests_filter:
:return:
"""
if not len(self.test_suite):
print("No test to run.")
return
......@@ -183,6 +212,10 @@ class Job(object):
testcase["status"] = "FAIL"
def run(self):
"""
Test entrance
:return:
"""
logger = Logger("job.log", self.job_id, sys.stdout, sys.stderr)
logger.start()
self.create_test_suite(self.subtests_filter)
......@@ -195,6 +228,10 @@ class Job(object):
self.show_summary()
def show_summary(self):
"""
Command line interface display summary
:return:
"""
print("------------- Summary -------------")
for test in self.test_factory:
if test["run"]:
......@@ -208,8 +245,11 @@ class Job(object):
print("")
def save_result(self):
"""
Get test status
:return:
"""
for test in self.test_factory:
for testcase in self.test_suite:
if test["name"] == testcase["name"] and test["device"].path == testcase["device"].path:
test["status"] = testcase["status"]
......@@ -20,7 +20,9 @@ from .env import CertEnv
class Log(object):
"""
Read and write log
"""
def __init__(self, logname='oech.log', logdir='__temp__'):
if not logdir:
curtime = datetime.datetime.now().isoformat()
......@@ -37,32 +39,55 @@ class Log(object):
self.log = open(logfile, "a+")
def write(self, message):
"""
Write messages to terminals and files
:param message:
:return:
"""
self.terminal.write(message)
if self.log:
self.log.write(message)
def flush(self):
"""
Refresh buffer to terminal and file
:return:
"""
self.terminal.flush()
if self.log:
self.log.flush()
def close(self):
"""
close logfile
:return:
"""
self.log.close()
self.log = None
class Logger():
"""
Output results to file
"""
def __init__(self, logname, logdir, out, err):
self.log = Log(logname, logdir)
self.stdout = out
self.stderr = err
def start(self):
"""
Start outputing to file
:return:
"""
sys.stdout = self.log
sys.stderr = sys.stdout
def stop(self):
"""
Stop outputing to file
:return:
"""
sys.stdout.close()
sys.stdout = self.stdout
sys.stderr = self.stderr
......@@ -20,7 +20,9 @@ from .command import Command, CertCommandError
class Reboot:
"""
Special for restart tasks, so that the test can be continued after the machine is restarted
"""
def __init__(self, testname, job, rebootup):
self.testname = testname
self.rebootup = rebootup
......@@ -28,6 +30,10 @@ class Reboot:
self.reboot = dict()
def clean(self):
"""
Remove reboot file
:return:
"""
if not (self.job and self.testname):
return
......@@ -39,6 +45,10 @@ class Reboot:
Command("systemctl disable oech").run(ignore_errors=True)
def setup(self):
"""
Reboot setuping
:return:
"""
if not (self.job and self.testname):
print("Error: invalid reboot input.")
return False
......@@ -63,13 +73,17 @@ class Reboot:
try:
Command("systemctl daemon-reload").run_quiet()
Command("systemctl enable oech").run_quiet()
except OSError as e:
except:
print("Error: enable oech.service fail.")
return False
return True
def check(self):
"""
Reboot file check
:return:
"""
doc = Document(CertEnv.rebootfile)
if not doc.load():
print("Error: reboot file load fail.")
......@@ -81,7 +95,7 @@ class Reboot:
self.job.job_id = self.reboot["job_id"]
self.job.subtests_filter = self.reboot["rebootup"]
time_reboot = datetime.datetime.strptime(self.reboot["time"], "%Y%m%d%H%M%S")
except KeyError:
except:
print("Error: reboot file format not as expect.")
return False
......
......@@ -17,7 +17,10 @@ import re
class SysInfo:
def __init__(self, file):
"""
Get system information
"""
def __init__(self, filename):
self.product = None
self.version = None
self.update = None
......@@ -28,23 +31,28 @@ class SysInfo:
self.kerneldevel_rpm = None
self.kernel_version = None
self.debug_kernel = False
self.load(file)
self.load(filename)
def load(self, file):
def load(self, filename):
"""
Collect system information
:param filename:
:return:
"""
try:
f = open(file)
f = open(filename)
text = f.read()
f.close()
except IOError:
except:
print("Release file not found.")
return
if text:
pattern = re.compile('NAME="(\w+)"')
pattern = re.compile(r'NAME="(\w+)"')
results = pattern.findall(text)
self.product = results[0].strip() if results else ""
pattern = re.compile('VERSION="(.+)"')
pattern = re.compile(r'VERSION="(.+)"')
results = pattern.findall(text)
self.version = results[0].strip() if results else ""
......@@ -59,5 +67,9 @@ class SysInfo:
self.kernel_version = self.kernel.split('-')[0]
def get_version(self):
"""
Get system version information
:return:
"""
return self.version
......@@ -14,8 +14,10 @@
class Test:
def __init__(self, name=None):
"""
Test set template
"""
def __init__(self):
self.pri = 0
self.requirements = list()
self.reboot = False
......
......@@ -67,14 +67,14 @@ def get_results():
results[host] = {}
for oec_id in next(os.walk(dir_host))[1]:
dir_id = os.path.join(dir_host, oec_id)
results[host][id] = next(os.walk(dir_id))[1]
results[host][oec_id] = next(os.walk(dir_id))[1]
return render_template('results.html', results=results)
@app.route('/results/<host>/<oec_id>/<job>')
def get_job(host, oec_id, job):
"""
获取job信息
get job information
:param host:
:param oec_id:
:param job:
......@@ -89,7 +89,7 @@ def get_job(host, oec_id, job):
info = json.load(f)
with open(json_results, 'r') as f:
results = json.load(f)
except (IOError, json.decoder.JSONDecodeError) as e:
except Exception as e:
abort(404)
return render_template('job.html', host=host, id=oec_id, job=job, info=info, results=results)
......@@ -109,7 +109,7 @@ def get_device(host, oec_id, job, interface):
try:
with open(json_results, 'r') as f:
results = json.load(f)
except (IOError, json.decoder.JSONDecodeError) as e:
except Exception as e:
abort(404)
for testcase in results:
device = testcase.get('device')
......@@ -133,7 +133,7 @@ def get_devices(host, oec_id, job):
try:
with open(json_devices, 'r') as f:
devices = json.load(f)
except (IOError, json.decoder.JSONDecodeError) as e:
except Exception as e:
abort(404)
return render_template('devices.html', devices=devices)
......@@ -171,7 +171,7 @@ def get_log(host, oec_id, job, name):
try:
with open(logpath, 'r') as f:
log = f.read().split('\n')
except IOError as e:
except Exception as e:
abort(404)
return render_template('log.html', name=name, log=log)
......@@ -193,7 +193,7 @@ def submit(host, oec_id, job):
cert = json.load(f)
with open(tar_job, 'rb') as f:
attachment = base64.b64encode(f.read())
except (IOError, json.decoder.JSONDecodeError) as e:
except Exception as e:
print(e)
abort(500)
......@@ -245,7 +245,7 @@ def upload_job():
with open(tar_job, 'wb') as f:
f.write(base64.b64decode(filetext))
os.system("tar xf '%s' -C '%s'" % (tar_job, os.path.dirname(dir_job)))
except (IOError, OSError) as e:
except Exception as e:
print(e)
abort(400)
return render_template('upload.html', host=host, id=oec_id, job=job,
......@@ -277,7 +277,7 @@ def upload_file():
try:
with open(filepath, 'wb') as f:
f.write(base64.b64decode(filetext))
except IOError as e:
except Exception as e:
print(e)
abort(400)
return render_template('upload.html', filename=filename, filetext=filetext,
......@@ -351,7 +351,7 @@ def __get_ib_dev_port(ib_server_ip):
ibport = str(ibport)
return ibdev, ibport
except (OSError, IndexError, ValueError) as e:
except Exception as e:
print(e)
return None, None
......
......@@ -13,21 +13,21 @@
# Create: 2020-04-01
from hwcompatible.test import Test
from hwcompatible.command import Command, CertCommandError
from hwcompatible.command import Command
class AcpiTest(Test):
"""
acpi test
"""
def __init__(self):
Test.__init__(self)
self.requirements = ["acpica-tools"]
@staticmethod
def test():
def test(self):
try:
Command("acpidump").echo()
return True
except CertCommandError as e:
except Exception as e:
print(e)
return False
......@@ -24,7 +24,9 @@ from hwcompatible.command import Command, CertCommandError
class CDRomTest(Test):
"""
CDRom Test
"""
def __init__(self):
Test.__init__(self)
self.requirements = ["dvd+rw-tools", "genisoimage", "wodim", "util-linux"]
......@@ -36,12 +38,21 @@ class CDRomTest(Test):
self.test_dir = "/usr/share/doc"
def setup(self, args=None):
"""
The Setup before testing
:param args:
:return:
"""
self.args = args or argparse.Namespace()
self.device = getattr(args, "device", None)
self.type = self.get_type(self.device)
self.get_mode(self.type)
def test(self):
"""
Test case
:return:
"""
if not (self.method and self.device and self.type):
return False
......@@ -66,8 +77,12 @@ class CDRomTest(Test):
return False
return True
@staticmethod
def get_type(device):
def get_type(self, device):
"""
Get the type of CDROM
:param device:
:return:
"""
if not device:
return None
......@@ -77,17 +92,22 @@ class CDRomTest(Test):
for bd_type in bd_types:
if device.get_property("ID_CDROM_" + bd_type) == "1":
return bd_type
for bd_type in dvd_types:
if device.get_ertpropy("ID_CDROM_" + bd_type) == "1":
return bd_type
for bd_type in cd_types:
if device.get_property("ID_CDROM_" + bd_type) == "1":
return bd_type
for dvd_type in dvd_types:
if device.get_ertpropy("ID_CDROM_" + dvd_type) == "1":
return dvd_type
for cd_type in cd_types:
if device.get_property("ID_CDROM_" + cd_type) == "1":
return cd_type
print("Can not find pr)oper test-type for %s." % device.get_name())
return None
def get_mode(self, device_type):
"""
Get the read-write mode of CDROM
:param device_type:
:return:
"""
if not device_type:
return
......@@ -99,6 +119,10 @@ class CDRomTest(Test):
self.method = "read_test"
def rw_test(self):
"""
RW mode test of CDROM
:return:
"""
try:
devname = self.device.get_property("DEVNAME")
Command("umount %s" % devname).run(ignore_errors=True)
......@@ -119,7 +143,8 @@ class CDRomTest(Test):
else:
print("Blanking ...")
sys.stdout.flush()
blankCommand = Command("cdrecord -v dev=%s blank=fast" % devname).echo()
# blankCommand = Command("cdrecord -v dev=%s blank=fast" % devname).echo()
Command("cdrecord -v dev=%s blank=fast" % devname).echo()
self.reload_disc(devname)
sys.stdout.flush()
return self.write_test()
......@@ -127,6 +152,10 @@ class CDRomTest(Test):
return False
def write_test(self):
"""
Write mode test of CDROM
:return:
"""
try:
devname = self.device.get_property("DEVNAME")
Command("umount %s" % devname).run(ignore_errors=True)
......@@ -136,15 +165,17 @@ class CDRomTest(Test):
sys.stdout.flush()
return True
else:
write_opts ="-sao"
write_opts = "-sao"
try:
command = Command("cdrecord dev=%s -checkdrive" % devname)
modes = command.get_str(regex="^Supported modes[^:]*:(?P<modes>.*$)", regex_group="modes", single_line=False, ignore_errors=True)
modes = command.get_str(regex="^Supported modes[^:]*:(?P<modes>.*$)", regex_group="modes",
single_line=False, ignore_errors=True)
if "TAO" in modes:
write_opts="-tao"
write_opts = "-tao"
if "SAO" in modes:
write_opts="-sao"
flags = command.get_str(regex="^Driver flags[^:]*:(?P<flags>.*$)", regex_group="flags", single_line=False, ignore_errors=True)
write_opts = "-sao"
flags = command.get_str(regex="^Driver flags[^:]*:(?P<flags>.*$)", regex_group="flags",
single_line=False, ignore_errors=True)
if "BURNFREE" in flags:
write_opts += " driveropts=burnfree"
except CertCommandError as e:
......@@ -153,7 +184,8 @@ class CDRomTest(Test):
size = Command("mkisofs -quiet -R -print-size %s " % self.test_dir).get_str()
blocks = int(size)
Command("mkisofs -quiet -R %s | cdrecord -v %s dev=%s fs=32M tsize=%ss -" % (self.test_dir, write_opts, devname, blocks)).echo()
Command("mkisofs -quiet -R %s | cdrecord -v %s dev=%s fs=32M tsize=%ss -" %
(self.test_dir, write_opts, devname, blocks)).echo()
self.reload_disc(devname)
sys.stdout.flush()
return True
......@@ -161,6 +193,10 @@ class CDRomTest(Test):
return False
def read_test(self):
"""
Read mode test of CDROM
:return:
"""
try:
devname = self.device.get_property("DEVNAME")
if os.path.exists("mnt_cdrom"):
......@@ -197,8 +233,13 @@ class CDRomTest(Test):
print(e)
return False
@staticmethod
def cmp_tree(dir1, dir2):
def cmp_tree(self, dir1, dir2):
"""
Compare the differences between the two directories
:param dir1:
:param dir2:
:return:
"""
if not (dir1 and dir2):
print("Error: invalid input dir.")
return False
......@@ -210,6 +251,11 @@ class CDRomTest(Test):
return False
def reload_disc(self, device):
"""
Reloading the media
:param device:
:return:
"""
if not device:
return False
......@@ -219,17 +265,16 @@ class CDRomTest(Test):
Command("eject %s" % device).run()
print("tray ejected.")
sys.stdout.flush()
except CertCommandError as e:
except:
pass
try:
Command("eject -t %s" % device).run()
print("tray auto-closed.\n")
sys.stdout.flush()
except CertCommandError as e:
except:
print("Could not auto-close the tray, please close the tray manually.")
self.ui.prompt_confirm("Done well?")
time.sleep(20)
return True
......@@ -15,14 +15,19 @@
import os
from hwcompatible.test import Test
from hwcompatible.command import Command
clock_dir = os.path.dirname(os.path.realpath(__file__))
class ClockTest(Test):
@staticmethod
def test():
"""
Clock Test
"""
def test(self):
"""
Clock test case
:return:
"""
return 0 == os.system("cd %s; ./clock" % clock_dir)
......
......@@ -20,7 +20,8 @@ def cal():
decimal.getcontext().prec = 1000
one = decimal.Decimal(1)
for i in range(1000):
j = (i * one).sqrt()
# j = (i * one).sqrt()
(i * one).sqrt()
if __name__ == '__main__':
......
......@@ -17,7 +17,7 @@ from time import sleep
from hwcompatible.env import CertEnv
from hwcompatible.test import Test
from hwcompatible.command import Command, CertCommandError
from hwcompatible.command import Command
class CPU:
......@@ -32,10 +32,14 @@ class CPU:
self.min_freq = None
def get_info(self):
"""
Get CPU info
:return:
"""
cmd = Command("lscpu")
try:
nums = cmd.get_str(r'^CPU\S*:\s+(?P<cpus>\d+)$', 'cpus', False)
except CertCommandError as e:
except Exception as e:
print(e)
return False
self.nums = int(nums)
......@@ -44,7 +48,7 @@ class CPU:
cmd = Command("cat /sys/devices/system/cpu/cpu0/cpufreq/scaling_max_freq")
try:
max_freq = cmd.get_str()
except CertCommandError as e:
except Exception as e:
print(e)
return False
self.max_freq = int(max_freq)
......@@ -52,72 +56,106 @@ class CPU:
cmd = Command("cat /sys/devices/system/cpu/cpu0/cpufreq/scaling_min_freq")
try:
min_freq = cmd.get_str()
except CertCommandError as e:
except Exception as e:
print(e)
return False
self.min_freq = int(min_freq)
return True
@staticmethod
def set_freq(freq, cpu='all'):
def set_freq(self, freq, cpu='all'):
"""
Set CPU frequency
:param freq:
:param cpu:
:return:
"""
cmd = Command("cpupower -c %s frequency-set --freq %s" % (cpu, freq))
try:
cmd.run()
return cmd.returncode
except CertCommandError as e:
except Exception as e:
print(e)
return False
@staticmethod
def get_freq(cpu):
def get_freq(self, cpu):
"""
Get CPU frequency
:param cpu:
:return:
"""
cmd = Command("cpupower -c %s frequency-info -w" % cpu)
try:
return int(cmd.get_str(r'.* frequency: (?P<freq>\d+) .*', 'freq', False))
except CertCommandError as e:
except Exception as e:
print(e)
return False
@staticmethod
def set_governor(governor, cpu='all'):
def set_governor(self, governor, cpu='all'):
"""
Set the frequency governor mode of CPU
:param governor:
:param cpu:
:return:
"""
cmd = Command("cpupower -c %s frequency-set --governor %s" % (cpu, governor))
try:
cmd.run()
return cmd.returncode
except CertCommandError as e:
except Exception as e:
print(e)
return False
@staticmethod
def get_governor(cpu):
def get_governor(self, cpu):
"""
Get cpu governor
:param cpu:
:return:
"""
cmd = Command("cpupower -c %s frequency-info -p" % cpu)
try:
return cmd.get_str(r'.* governor "(?P<governor>\w+)".*', 'governor', False)
except CertCommandError as e:
except Exception as e:
print(e)
return False
@staticmethod
def find_path(parent_dir, target_name):
def find_path(self, parent_dir, target_name):
"""
Find the target path from the specified directory
:param parent_dir:
:param target_name:
:return:
"""
cmd = Command("find %s -name %s" % (parent_dir, target_name))
try:
cmd.run()
return cmd.returncode
except CertCommandError as e:
except Exception as e:
print(e)
return False
class Load:
"""
Let a program run on a specific CPU
"""
def __init__(self, cpu):
self.cpu = cpu
self.process = Command("taskset -c {} python -u {}/cpufreq/cal.py".format(self.cpu, CertEnv.testdirectoy))
self.returncode = None
def run(self):
self.process.start() ## background
"""
Process started
:return:
"""
self.process.start() # background
def get_runtime(self):
"""
Get the running time of the process
:return:
"""
if not self.process:
return None
......@@ -131,6 +169,9 @@ class Load:
class CPUFreqTest(Test):
"""
CPU frequency test
"""
def __init__(self):
Test.__init__(self)
self.requirements = ['util-linux', 'kernel-tools']
......@@ -138,6 +179,10 @@ class CPUFreqTest(Test):
self.original_governor = self.cpu.get_governor(0)
def test_userspace(self):
"""
userspace mode of testing CPU frequency
:return:
"""
target_cpu = randint(0, self.cpu.nums-1)
target_freq = randint(self.cpu.min_freq, self.cpu.max_freq)
if self.cpu.set_freq(target_freq, cpu=target_cpu) != 0:
......@@ -150,12 +195,12 @@ class CPUFreqTest(Test):
target_cpu_governor = self.cpu.get_governor(target_cpu)
if target_cpu_governor != 'userspace':
print("[X] The governor of CPU%s(%s) is not userspace." %
(target_cpu, target_cpu_governor))
(target_cpu, target_cpu_governor))
return False
print("[.] The governor of CPU%s is %s." %
(target_cpu, target_cpu_governor))
(target_cpu, target_cpu_governor))
## min_freq -> max_runtime
# min_freq -> max_runtime
self.cpu.set_freq(self.cpu.min_freq)
load_list = []
runtime_list = []
......@@ -171,9 +216,9 @@ class CPUFreqTest(Test):
print("[X] Max average time is 0.")
return False
print("[.] Max average time of all CPUs userspace load test: %.2f" %
max_average_runtime)
max_average_runtime)
## max_freq -> min_runtime
# max_freq -> min_runtime
self.cpu.set_freq(self.cpu.max_freq)
load_list = []
runtime_list = []
......@@ -189,7 +234,7 @@ class CPUFreqTest(Test):
print("[X] Min average time is 0.")
return False
print("[.] Min average time of all CPUs userspace load test: %.2f" %
min_average_runtime)
min_average_runtime)
measured_speedup = 1.0 * max_average_runtime / min_average_runtime
expected_speedup = 1.0 * self.cpu.max_freq / self.cpu.min_freq
......@@ -198,14 +243,18 @@ class CPUFreqTest(Test):
max_speedup = expected_speedup + (expected_speedup - 1.0) * tolerance
if not min_speedup < measured_speedup < max_speedup:
print("[X] The speedup(%.2f) is not between %.2f and %.2f" %
(measured_speedup, min_speedup, max_speedup))
(measured_speedup, min_speedup, max_speedup))
return False
print("[.] The speedup(%.2f) is between %.2f and %.2f" %
(measured_speedup, min_speedup, max_speedup))
(measured_speedup, min_speedup, max_speedup))
return True
def test_ondemand(self):
"""
ondemand mode of testing CPU frequency
:return:
"""
if self.cpu.set_governor('powersave') != 0:
print("[X] Set governor of all CPUs to powersave failed.")
return False
......@@ -220,10 +269,10 @@ class CPUFreqTest(Test):
target_cpu_governor = self.cpu.get_governor(target_cpu)
if target_cpu_governor != 'ondemand':
print("[X] The governor of CPU%s(%s) is not ondemand." %
(target_cpu, target_cpu_governor))
(target_cpu, target_cpu_governor))
return False
print("[.] The governor of CPU%s is %s." %
(target_cpu, target_cpu_governor))
(target_cpu, target_cpu_governor))
load_test = Load(target_cpu)
load_test.run()
......@@ -231,25 +280,29 @@ class CPUFreqTest(Test):
target_cpu_freq = self.cpu.get_freq(target_cpu)
if target_cpu_freq != self.cpu.max_freq:
print("[X] The freq of CPU%s(%d) is not scaling_max_freq(%d)." %
(target_cpu, target_cpu_freq, self.cpu.max_freq))
(target_cpu, target_cpu_freq, self.cpu.max_freq))
return False
print("[.] The freq of CPU%s is scaling_max_freq(%d)." %
(target_cpu, target_cpu_freq))
(target_cpu, target_cpu_freq))
load_test_time = load_test.get_runtime()
print("[.] Time of CPU%s ondemand load test: %.2f" %
(target_cpu, load_test_time))
(target_cpu, load_test_time))
target_cpu_freq = self.cpu.get_freq(target_cpu)
if not target_cpu_freq <= self.cpu.max_freq:
print("[X] The freq of CPU%s(%d) is not less equal than %d." %
(target_cpu, target_cpu_freq, self.cpu.max_freq))
(target_cpu, target_cpu_freq, self.cpu.max_freq))
return False
print("[.] The freq of CPU%s(%d) is less equal than %d." %
(target_cpu, target_cpu_freq, self.cpu.max_freq))
(target_cpu, target_cpu_freq, self.cpu.max_freq))
return True
def test_conservative(self):
"""
conservative mode of testing CPU frequency
:return:
"""
if self.cpu.set_governor('powersave') != 0:
print("[X] Set governor of all CPUs to powersave failed.")
return False
......@@ -264,7 +317,7 @@ class CPUFreqTest(Test):
target_cpu_governor = self.cpu.get_governor(target_cpu)
if target_cpu_governor != 'conservative':
print("[X] The governor of CPU%s(%s) is not conservative." %
(target_cpu, target_cpu_governor))
(target_cpu, target_cpu_governor))
return False
print("[.] The governor of CPU%s is %s." %
(target_cpu, target_cpu_governor))
......@@ -275,20 +328,24 @@ class CPUFreqTest(Test):
target_cpu_freq = self.cpu.get_freq(target_cpu)
if not self.cpu.min_freq < target_cpu_freq < self.cpu.max_freq:
print("[X] The freq of CPU%s(%d) is not between %d~%d." %
(target_cpu, target_cpu_freq, self.cpu.min_freq, self.cpu.max_freq))
(target_cpu, target_cpu_freq, self.cpu.min_freq, self.cpu.max_freq))
return False
print("[.] The freq of CPU%s(%d) is between %d~%d." %
(target_cpu, target_cpu_freq, self.cpu.min_freq, self.cpu.max_freq))
(target_cpu, target_cpu_freq, self.cpu.min_freq, self.cpu.max_freq))
load_test_time = load_test.get_runtime()
print("[.] Time of CPU%s conservative load test: %.2f" %
(target_cpu, load_test_time))
(target_cpu, load_test_time))
target_cpu_freq = self.cpu.get_freq(target_cpu)
print("[.] Current freq of CPU%s is %d." % (target_cpu, target_cpu_freq))
return True
def test_powersave(self):
"""
powersave mode of testing CPU frequency
:return:
"""
if self.cpu.set_governor('powersave') != 0:
print("[X] Set governor of all CPUs to powersave failed.")
return False
......@@ -298,15 +355,15 @@ class CPUFreqTest(Test):
target_cpu_governor = self.cpu.get_governor(target_cpu)
if target_cpu_governor != 'powersave':
print("[X] The governor of CPU%s(%s) is not powersave." %
(target_cpu, target_cpu_governor))
(target_cpu, target_cpu_governor))
return False
print("[.] The governor of CPU%s is %s." %
(target_cpu, target_cpu_governor))
(target_cpu, target_cpu_governor))
target_cpu_freq = self.cpu.get_freq(target_cpu)
if target_cpu_freq != self.cpu.min_freq:
print("[X] The freq of CPU%s(%d) is not scaling_min_freq(%d)." %
(target_cpu, target_cpu_freq, self.cpu.min_freq))
(target_cpu, target_cpu_freq, self.cpu.min_freq))
return False
print("[.] The freq of CPU%s is %d." % (target_cpu, target_cpu_freq))
......@@ -314,13 +371,17 @@ class CPUFreqTest(Test):
load_test.run()
load_test_time = load_test.get_runtime()
print("[.] Time of CPU%s powersave load test: %.2f" %
(target_cpu, load_test_time))
(target_cpu, load_test_time))
target_cpu_freq = self.cpu.get_freq(target_cpu)
print("[.] Current freq of CPU%s is %d." % (target_cpu, target_cpu_freq))
return True
def test_performance(self):
"""
Performance mode of testing CPU frequency
:return:
"""
if self.cpu.set_governor('performance') != 0:
print("[X] Set governor of all CPUs to performance failed.")
return False
......@@ -330,15 +391,15 @@ class CPUFreqTest(Test):
target_cpu_governor = self.cpu.get_governor(target_cpu)
if target_cpu_governor != 'performance':
print("[X] The governor of CPU%s(%s) is not performance." %
(target_cpu, target_cpu_governor))
(target_cpu, target_cpu_governor))
return False
print("[.] The governor of CPU%s is %s." %
(target_cpu, target_cpu_governor))
(target_cpu, target_cpu_governor))
target_cpu_freq = self.cpu.get_freq(target_cpu)
if target_cpu_freq != self.cpu.max_freq:
print("[X] The freq of CPU%s(%d) is not scaling_max_freq(%d)." %
(target_cpu, target_cpu_freq, self.cpu.max_freq))
(target_cpu, target_cpu_freq, self.cpu.max_freq))
return False
print("[.] The freq of CPU%s is %d." % (target_cpu, target_cpu_freq))
......@@ -346,16 +407,20 @@ class CPUFreqTest(Test):
load_test.run()
load_test_time = load_test.get_runtime()
print("[.] Time of CPU%s performance load test: %.2f" %
(target_cpu, load_test_time))
(target_cpu, load_test_time))
target_cpu_freq = self.cpu.get_freq(target_cpu)
print("[.] Current freq of CPU%s is %d." % (target_cpu, target_cpu_freq))
return True
def test(self):
"""
Test case
:return:
"""
if not self.cpu.get_info():
print("[X] Fail to get CPU info." \
" Please check if the CPU supports cpufreq.")
print("[X] Fail to get CPU info."
" Please check if the CPU supports cpufreq.")
return False
ret = True
......
......@@ -14,14 +14,12 @@
import os
import sys
import time
import shutil
import string
from hwcompatible.test import Test
from hwcompatible.command import Command, CertCommandError
from hwcompatible.commandUI import CommandUI
from hwcompatible.device import CertDevice, Device
from hwcompatible.device import CertDevice
class DiskTest(Test):
......@@ -32,7 +30,11 @@ class DiskTest(Test):
self.filesystems = ["ext4"]
self.ui = CommandUI()
def setup(self):
def setup(self, args=None):
"""
The Setup before testing
:return:
"""
try:
print("Disk Info:")
Command("fdisk -l").echo(ignore_errors=True)
......@@ -50,7 +52,7 @@ class DiskTest(Test):
Command("cat /proc/mdstat").echo(ignore_errors=True)
sys.stdout.flush()
print("\n")
except CertCommandError as e:
except Exception as e:
print("Warning: could not get disk info")
print(e)
......@@ -63,7 +65,7 @@ class DiskTest(Test):
self.disks.append("all")
disk = self.ui.prompt_edit("Which disk would you like to test: ", self.disks[0], self.disks)
return_code = True
if disk == "all":
if disk == "all":
for disk in self.disks[:-1]:
if not self.raw_test(disk):
return_code = False
......@@ -126,9 +128,9 @@ class DiskTest(Test):
device = "/dev/" + disk
if not os.path.exists(device):
print("Error: device %s not exists." % device)
proc_path="/sys/block/" + disk
proc_path = "/sys/block/" + disk
if not os.path.exists(proc_path):
proc_path="/sys/block/*/" + disk
proc_path = "/sys/block/*/" + disk
size = Command("cat %s/size" % proc_path).get_str()
size = int(size)/2
if size <= 0:
......@@ -160,9 +162,9 @@ class DiskTest(Test):
device = "/dev/" + disk
if not os.path.exists(device):
print("Error: device %s not exists." % device)
proc_path="/sys/block/" + disk
proc_path = "/sys/block/" + disk
if not os.path.exists(proc_path):
proc_path="/sys/block/*/" + disk
proc_path = "/sys/block/*/" + disk
size = Command("cat %s/size" % proc_path).get_str()
size = int(size)/2/2
if size <= 0:
......@@ -195,7 +197,7 @@ class DiskTest(Test):
if not self.do_fio(path, size, opts):
return_code = False
break
except CertCommandError as e:
except Exception as e:
print(e)
return_code = False
break
......@@ -205,8 +207,7 @@ class DiskTest(Test):
print("#############")
return return_code
@staticmethod
def do_fio(filepath, size, option):
def do_fio(self, filepath, size, option):
if os.path.isdir(filepath):
file_opt = "-directory=%s" % filepath
else:
......@@ -219,8 +220,5 @@ class DiskTest(Test):
return False
print("\n")
sys.stdout.flush()
bs = bs *2
bs = bs * 2
return True
......@@ -13,37 +13,49 @@
# Create: 2020-04-01
from hwcompatible.test import Test
from hwcompatible.command import Command, CertCommandError
from hwcompatible.command import Command
class IpmiTest(Test):
"""
Intelligent Platform Management Interface test
"""
def __init__(self):
Test.__init__(self)
self.requirements = ["OpenIPMI", "ipmitool"]
@staticmethod
def start_ipmi():
def start_ipmi(self):
"""
Start IPMI test
:return:
"""
try:
Command("systemctl start ipmi").run()
Command("systemctl status ipmi.service").get_str(regex="Active: active", single_line=False)
except CertCommandError as e:
except:
print("ipmi service cant't be started")
return False
return True
@staticmethod
def ipmitool():
cmd_list = ["ipmitool fru","ipmitool sensor"]
def ipmitool(self):
"""
Testing with iptool tools
:return:
"""
cmd_list = ["ipmitool fru", "ipmitool sensor"]
for cmd in cmd_list:
try:
Command(cmd).echo()
except CertCommandError as e:
except:
print("%s return error." % cmd)
return False
return True
def test(self):
"""
Test case
:return:
"""
if not self.start_ipmi():
return False
if not self.ipmitool():
......@@ -54,4 +66,3 @@ class IpmiTest(Test):
if __name__ == "__main__":
i = IpmiTest()
i.test()
......@@ -23,7 +23,9 @@ from hwcompatible.command import Command, CertCommandError
class KdumpTest(Test):
"""
Kdump Test
"""
def __init__(self):
Test.__init__(self)
self.pri = 9
......@@ -34,9 +36,13 @@ class KdumpTest(Test):
self.requirements = ["crash", "kernel-debuginfo", "kexec-tools"]
def test(self):
"""
Test case
:return:
"""
try:
Command("cat /proc/cmdline").get_str(r"crashkernel=[^\ ]*")
except (OSError, ValueError):
except:
print("Error: no crashkernel found.")
return False
......@@ -53,7 +59,7 @@ class KdumpTest(Test):
try:
Command("systemctl restart kdump").run()
Command("systemctl status kdump").get_str(regex="Active: active", single_line=False)
except (OSError, ValueError):
except:
print("Error: kdump service not working.")
return False
......@@ -75,11 +81,17 @@ class KdumpTest(Test):
return False
def verify_vmcore(self):
"""
Verify vmcore
:return:
"""
config = ConfigFile(self.kdump_conf)
if config.get_parameter("path"):
self.vmcore_path = config.get_parameter("path")
dir_pattern = re.compile(r"(?P<ipaddr>[0-9]+\.[0-9]+\.[0-9]+)-(?P<date>[0-9]+(-|\.)[0-9]+(-|\.)[0-9]+)-(?P<time>[0-9]+:[0-9]+:[0-9]+)")
dir_pattern = re.compile(r'(?P<ipaddr>[0-9]+\.[0-9]+\.[0-9]+)-(?P<date>[0-9]+[-.][0-9]+[-.][0-9]+)-'
r'(?P<time>[0-9]+:[0-9]+:[0-9]+)')
vmcore_dirs = list()
for (root, dirs, files) in os.walk(self.vmcore_path):
for eve_dir in dirs:
......@@ -96,4 +108,3 @@ class KdumpTest(Test):
print("Error: could not verify kdump image %s" % vmcore_file)
print(e)
return False
......@@ -13,16 +13,16 @@
# Create: 2020-04-01
import os
import sys
import time
import re
import string
from hwcompatible.test import Test
from hwcompatible.command import Command, CertCommandError
class MemoryTest(Test):
"""
Memory Test
"""
def __init__(self):
Test.__init__(self)
self.requirements = ["libhugetlbfs-utils"]
......@@ -36,10 +36,18 @@ class MemoryTest(Test):
self.retry_list = list()
self.test_dir = os.path.dirname(os.path.realpath(__file__))
def setup(self):
def setup(self, args=None):
"""
Initialization before test
:return:
"""
self.get_memory()
def test(self):
"""
test case
:return:
"""
if not self.memory_rw():
return False
......@@ -55,6 +63,10 @@ class MemoryTest(Test):
return True
def get_memory(self):
"""
Get memory
:return:
"""
proc_meminfo = open("/proc/meminfo", "r")
self.free_memory = 0
self.system_memory = 0
......@@ -86,6 +98,10 @@ class MemoryTest(Test):
self.free_memory = self.free_memory/1024
def memory_rw(self):
"""
Test memory request
:return:
"""
if not self.system_memory:
print("Error: get system memory fail.")
return False
......@@ -100,6 +116,10 @@ class MemoryTest(Test):
return True
def eat_memory(self):
"""
Eat memory test
:return:
"""
print("\nEat memory testing...")
print("System Memory: %u MB" % self.system_memory)
print("Free Memory: %u MB" % self.free_memory)
......@@ -124,6 +144,10 @@ class MemoryTest(Test):
return True
def hugetlb_test(self):
"""
hugetlb test
:return:
"""
print("\nHugetlb testing...")
self.get_memory()
print("HugePages Total: %u" % self.hugepage_total)
......@@ -135,10 +159,10 @@ class MemoryTest(Test):
if not self.hugepage_total:
os.system("hugeadm --create-mounts")
os.system("hugeadm --pool-pages-min %dMB:%d" %
(self.hugepage_size, self.huge_pages))
(self.hugepage_size, self.huge_pages))
elif self.hugepage_free < self.huge_pages:
os.system("hugeadm --pool-pages-min %dMB:%d" %
(self.hugepage_size, self.hugepage_total + self.huge_pages))
(self.hugepage_size, self.hugepage_total + self.huge_pages))
else:
update_hugepage = 0
......@@ -162,8 +186,11 @@ class MemoryTest(Test):
return False
return True
@staticmethod
def hot_plug_verify():
def hot_plug_verify(self):
"""
Verify hot plug
:return:
"""
kernel = Command("uname -r").read()
config_file = "/boot/config-" + kernel
if not os.path.exists(config_file):
......@@ -175,6 +202,11 @@ class MemoryTest(Test):
return True
def hotplug_memory_test(self, memory_path):
"""
Hotplug memory test
:param memory_path:
:return:
"""
print("Keep %s online before test." % memory_path)
if not self.online_memory(memory_path):
return False
......@@ -199,27 +231,39 @@ class MemoryTest(Test):
if total_mem_3 != total_mem_1:
return False
@staticmethod
def online_memory(memory_path):
def online_memory(self, memory_path):
"""
Set memory online
:param memory_path:
:return:
"""
try:
Command("echo 1 > %s/online" % memory_path).run()
Command("cat %s/state" % memory_path).get_str("online")
return True
except CertCommandError as e:
except:
print("Error: fail to online %s." % memory_path)
return False
@staticmethod
def offline_memory(memory_path):
def offline_memory(self, memory_path):
"""
Set memory offline
:param memory_path:
:return:
"""
try:
Command("echo 0 > %s/online" % memory_path).run()
Command("cat %s/state" % memory_path).get_str("offline")
return True
except CertCommandError as e:
except:
print("Error: fail to online %s." % memory_path)
return False
def memory_hotplug(self):
"""
Memory hotplug test
:return:
"""
print("\nMemory hotplug testing...")
if not self.hot_plug_verify():
print("Warning: memory hotplug test skipped.")
......@@ -247,7 +291,7 @@ class MemoryTest(Test):
Command("cat %s/removable" % memory_path).get_str("1")
print("%s is removable, start testing..." % os.path.basename(memory_path))
test_flag = 1
except CertCommandError as e:
except:
continue
if not self.hotplug_memory_test(memory_path):
print("%s hotplug test fail." % os.path.basename(memory_path))
......
......@@ -15,13 +15,16 @@
import os
import argparse
from hwcompatible.test import Test
from hwcompatible.env import CertEnv
from hwcompatible.document import CertDocument
from rdma import RDMATest
class EthernetTest(RDMATest):
"""
Ethernet Test
"""
def __init__(self):
RDMATest.__init__(self)
self.args = None
......@@ -32,6 +35,10 @@ class EthernetTest(RDMATest):
self.target_bandwidth_percent = 0.75
def is_RoCE(self):
"""
Judge whether ethernet is roce
:return:
"""
path_netdev = ''.join(['/sys', self.device.get_property("DEVPATH")])
path_pci = path_netdev.split('net')[0]
cmd = "ls %s | grep -q infiniband" % path_pci
......@@ -39,6 +46,11 @@ class EthernetTest(RDMATest):
return 0 == os.system(cmd)
def setup(self, args=None):
"""
Initialization before test
:param args:
:return:
"""
self.args = args or argparse.Namespace()
self.device = getattr(self.args, 'device', None)
self.interface = self.device.get_property("INTERFACE")
......@@ -50,7 +62,7 @@ class EthernetTest(RDMATest):
input = raw_input
except NameError:
from builtins import input
choice = input("[!] RoCE interface found. " \
choice = input("[!] RoCE interface found. "
"Run RDMA tests instead? [y/N] ")
if choice.lower() != "y":
return
......@@ -61,6 +73,10 @@ class EthernetTest(RDMATest):
self.test_eth_link, self.test_icmp, self.test_rdma]
def test(self):
"""
Test case
:return:
"""
for subtest in self.subtests:
if not subtest():
return False
......
......@@ -12,15 +12,13 @@
# See the Mulan PSL v2 for more details.
# Create: 2020-04-01
import os
import argparse
from hwcompatible.test import Test
from hwcompatible.command import Command
from rdma import RDMATest
class InfiniBandTest(RDMATest):
"""
InfiniBand Test
"""
def __init__(self):
RDMATest.__init__(self)
self.link_layer = 'InfiniBand'
......@@ -30,6 +28,10 @@ class InfiniBandTest(RDMATest):
self.target_bandwidth_percent = 0.5
def test_ib_link(self):
"""
IB Link test
:return:
"""
if 'LinkUp' not in self.phys_state:
print("[X] Device is not LinkUp.")
......
......@@ -13,7 +13,6 @@
# Create: 2020-04-01
import os
import re
import time
import argparse
import base64
......@@ -26,12 +25,15 @@ except ImportError:
from urllib2 import urlopen, Request, HTTPError
from hwcompatible.test import Test
from hwcompatible.command import Command, CertCommandError
from hwcompatible.command import Command
from hwcompatible.document import CertDocument
from hwcompatible.env import CertEnv
class NetworkTest(Test):
"""
Network Test
"""
def __init__(self):
Test.__init__(self)
self.args = None
......@@ -48,8 +50,12 @@ class NetworkTest(Test):
self.target_bandwidth_percent = 0.8
self.testfile = 'testfile'
@staticmethod
def ifdown(interface):
def ifdown(self, interface):
"""
Judge whether the specified interface is closed successfully
:param interface:
:return:
"""
os.system("ip link set down %s" % interface)
for _ in range(5):
if 0 == os.system("ip link show %s | grep 'state DOWN'" % interface):
......@@ -57,8 +63,12 @@ class NetworkTest(Test):
time.sleep(1)
return False
@staticmethod
def ifup(interface):
def ifup(self, interface):
"""
Judge whether the specified interface is enabled successfully
:param interface:
:return:
"""
os.system("ip link set up %s" % interface)
for _ in range(5):
time.sleep(1)
......@@ -66,8 +76,11 @@ class NetworkTest(Test):
return True
return False
@staticmethod
def get_other_interfaces():
def get_other_interfaces(self):
"""
Get other interfaces
:return:
"""
ignore_interfaces = ['^lo', '^v', 'docker', 'br', 'bond']
cmd = "ip route show default | awk '/default/ {print $5}'"
c = Command(cmd)
......@@ -85,44 +98,64 @@ class NetworkTest(Test):
try:
c.run()
return c.output
except CertCommandError as e:
except Exception as e:
print(e)
return []
def set_other_interfaces_down(self):
"""
Judge whether the interface is closed
:return:
"""
for interface in self.other_interfaces:
if not self.ifdown(interface):
return False
return True
def set_other_interfaces_up(self):
"""
Set other interfaces to up
:return:
"""
for interface in self.other_interfaces:
## Not ifup(), as some interfaces may not be linked
# Not ifup(), as some interfaces may not be linked
os.system("ip link set up %s" % interface)
# os.system("ip link | grep -w %s" % interface)
return True
def get_speed(self):
"""
Get speed on the interface
:return:
"""
c = Command("ethtool %s" % self.interface)
pattern = r".*Speed:\s+(?P<speed>\d+)Mb/s"
try:
speed = c.get_str(pattern, 'speed', False)
return int(speed)
except CertCommandError as e:
except Exception as e:
print("[X] No speed found on the interface.")
return None
def get_interface_ip(self):
"""
Get interface ip
:return:
"""
c = Command("ip addr show %s" % self.interface)
pattern = r".*inet.? (?P<ip>.+)/.*"
try:
ip = c.get_str(pattern, 'ip', False)
return ip
except CertCommandError as e:
except Exception as e:
print("[X] No available ip on the interface.")
return None
def test_icmp(self):
"""
Test ICMP
:return:
"""
count = 500
c = Command("ping -q -c %d -i 0 %s" % (count, self.server_ip))
pattern = r".*, (?P<loss>\d+\.{0,1}\d*)% packet loss.*"
......@@ -134,12 +167,19 @@ class NetworkTest(Test):
c.print_output()
if float(loss) == 0:
return True
except CertCommandError as e:
except Exception as e:
print(e)
return False
def call_remote_server(self, cmd, act='start', ib_server_ip=''):
form = {}
"""
Call remote server
:param cmd:
:param act:
:param ib_server_ip:
:return:
"""
form = dict()
form['cmd'] = cmd
form['ib_server_ip'] = ib_server_ip
url = 'http://%s/api/%s' % (self.server_ip, act)
......@@ -151,13 +191,17 @@ class NetworkTest(Test):
try:
request = Request(url, data=data, headers=headers)
response = urlopen(request)
except HTTPError as e:
except Exception as e:
print(e)
return False
print("Status: %u %s" % (response.code, response.msg))
return int(response.code) == 200
def test_udp_latency(self):
"""
Test udp latency
:return:
"""
cmd = "qperf %s udp_lat" % self.server_ip
print(cmd)
for _ in range(self.retries):
......@@ -174,6 +218,10 @@ class NetworkTest(Test):
return False
def test_tcp_bandwidth(self):
"""
Test tcp bandwidth
:return:
"""
cmd = "qperf %s tcp_bw" % self.server_ip
print(cmd)
c = Command(cmd)
......@@ -192,17 +240,25 @@ class NetworkTest(Test):
(bandwidth, target_bandwidth))
if bandwidth > target_bandwidth:
return True
except CertCommandError as e:
except Exception as e:
print(e)
return False
def create_testfile(self):
"""
Create testfile
:return:
"""
bs = 128
count = self.speed/8
cmd = "dd if=/dev/urandom of=%s bs=%uk count=%u" % (self.testfile, bs, count)
return 0 == os.system(cmd)
def test_http_upload(self):
"""
Test http upload
:return:
"""
form = {}
size = os.path.getsize(self.testfile)
filename = os.path.basename(self.testfile)
......@@ -210,7 +266,7 @@ class NetworkTest(Test):
try:
with open(self.testfile, 'rb') as f:
filetext = base64.b64encode(f.read())
except IOError as e:
except Exception as e:
print(e)
return False
......@@ -227,7 +283,7 @@ class NetworkTest(Test):
try:
request = Request(url, data=data, headers=headers)
response = urlopen(request)
except HTTPError as e:
except Exception as e:
print(e)
return False
time_stop = time.time()
......@@ -240,13 +296,17 @@ class NetworkTest(Test):
return True
def test_http_download(self):
"""
Test http download
:return:
"""
filename = os.path.basename(self.testfile)
url = "http://%s/files/%s" % (self.server_ip, filename)
time_start = time.time()
try:
response = urlopen(url)
except HTTPError as e:
except Exception as e:
print(e)
return False
time_stop = time.time()
......@@ -258,7 +318,7 @@ class NetworkTest(Test):
try:
with open(self.testfile, 'wb') as f:
f.write(filetext)
except IOError as e:
except Exception as e:
print(e)
return False
......@@ -268,6 +328,10 @@ class NetworkTest(Test):
return True
def test_udp_tcp(self):
"""
Test udp tcp
:return:
"""
if not self.call_remote_server('qperf', 'start'):
print("[X] start qperf server failed.")
return False
......@@ -291,6 +355,10 @@ class NetworkTest(Test):
return True
def test_http(self):
"""
Test http
:return:
"""
print("[+] Creating testfile to upload...")
if not self.create_testfile():
print("[X] Create testfile failed.")
......@@ -309,6 +377,10 @@ class NetworkTest(Test):
return True
def test_eth_link(self):
"""
Test eth link
:return:
"""
self.other_interfaces = self.get_other_interfaces()
print("[+] Setting irrelevant interfaces down...")
if not self.set_other_interfaces_down():
......@@ -337,6 +409,10 @@ class NetworkTest(Test):
return True
def test_ip_info(self):
"""
Test ip info
:return:
"""
if not self.interface:
print("[X] No interface assigned.")
return False
......@@ -356,6 +432,11 @@ class NetworkTest(Test):
return True
def setup(self, args=None):
"""
Initialization before test
:param args:
:return:
"""
self.args = args or argparse.Namespace()
self.device = getattr(self.args, 'device', None)
self.interface = self.device.get_property("INTERFACE")
......@@ -364,12 +445,20 @@ class NetworkTest(Test):
self.server_ip = self.cert.get_server()
def test(self):
"""
test case
:return:
"""
for subtest in self.subtests:
if not subtest():
return False
return True
def teardown(self):
"""
Environment recovery after test
:return:
"""
print("[.] Stop all test servers...")
self.call_remote_server('all', 'stop')
......
......@@ -16,14 +16,16 @@ import os
import re
import argparse
from hwcompatible.test import Test
from hwcompatible.command import Command, CertCommandError
from hwcompatible.command import Command
from hwcompatible.document import CertDocument
from hwcompatible.env import CertEnv
from network import NetworkTest
class RDMATest(NetworkTest):
"""
RDMA Test
"""
def __init__(self):
NetworkTest.__init__(self)
self.args = None
......@@ -47,6 +49,10 @@ class RDMATest(NetworkTest):
self.target_bandwidth_percent = 0.5
def get_ibstatus(self):
"""
Get ibstatus
:return:
"""
path_netdev = ''.join(['/sys', self.device.get_property("DEVPATH")])
path_pci = path_netdev.split('net')[0]
path_ibdev = 'infiniband_verbs/uverb*/ibdev'
......@@ -55,7 +61,7 @@ class RDMATest(NetworkTest):
c = Command(cmd)
try:
self.ib_device = c.read()
except CertCommandError as e:
except Exception as e:
print(e)
return False
......@@ -64,7 +70,7 @@ class RDMATest(NetworkTest):
c = Command(cmd)
try:
self.ib_port = int(c.read(), 16) + 1
except CertCommandError as e:
except Exception as e:
print(e)
return False
......@@ -87,13 +93,17 @@ class RDMATest(NetworkTest):
self.phys_state = re.search(r"phys state:\s+(.*)", info).group(1)
self.link_layer = re.search(r"link_layer:\s+(.*)", info).group(1)
self.speed = int(re.search(r"rate:\s+(\d*)", info).group(1)) * 1024
except CertCommandError as e:
except Exception as e:
print(e)
return False
return True
def test_rping(self):
"""
Test rping
:return:
"""
if not self.call_remote_server('rping', 'start', self.server_ip):
print("start rping server failed.")
return False
......@@ -107,6 +117,10 @@ class RDMATest(NetworkTest):
return False
def test_rcopy(self):
"""
Test rcopy
:return:
"""
if not self.call_remote_server('rcopy', 'start', self.server_ip):
print("start rcopy server failed.")
return False
......@@ -118,6 +132,11 @@ class RDMATest(NetworkTest):
return 0 == ret
def test_bw(self, cmd):
"""
Test bandwidth
:param cmd:
:return:
"""
if self.link_layer == 'Ethernet':
cmd = cmd + ' -R'
......@@ -130,19 +149,23 @@ class RDMATest(NetworkTest):
c = Command(cmd)
pattern = r"\s+(\d+)\s+(\d+)\s+([\.\d]+)\s+(?P<avg_bw>[\.\d]+)\s+([\.\d]+)"
try:
avg_bw = c.get_str(pattern, 'avg_bw', False) ## MB/sec
avg_bw = c.get_str(pattern, 'avg_bw', False) # MB/sec
avg_bw = float(avg_bw) * 8
tgt_bw = self.target_bandwidth_percent * self.speed
print("Current bandwidth is %.2fMb/s, target is %.2fMb/s" %
(avg_bw, tgt_bw))
return avg_bw > tgt_bw
except CertCommandError as e:
except Exception as e:
print(e)
self.call_remote_server(cmd, 'stop')
return False
def test_rdma(self):
"""
Test Remote Direct Memory Access
:return:
"""
print("[+] Testing rping...")
if not self.test_rping():
print("[X] Test rping failed.")
......@@ -176,6 +199,10 @@ class RDMATest(NetworkTest):
return True
def test_ibstatus(self):
"""
Test ibstatus
:return:
"""
if 0 != os.system("systemctl start opensm"):
print("[X] start opensm failed.")
return False
......@@ -191,6 +218,11 @@ class RDMATest(NetworkTest):
return True
def setup(self, args=None):
"""
Initialization before test
:param args:
:return:
"""
self.args = args or argparse.Namespace()
self.device = getattr(self.args, 'device', None)
self.interface = self.device.get_property("INTERFACE")
......@@ -199,6 +231,10 @@ class RDMATest(NetworkTest):
self.server_ip = self.cert.get_server()
def test(self):
"""
test case
:return:
"""
try:
input = raw_input
except NameError:
......
......@@ -16,22 +16,34 @@ import os
import sys
import argparse
from hwcompatible.test import Test
from hwcompatible.command import Command, CertCommandError
from hwcompatible.device import CertDevice, Device
from hwcompatible.command import Command
class NvmeTest(Test):
"""
Test Non-Volatile Memory express
"""
def __init__(self):
Test.__init__(self)
self.requirements = ["nvme-cli"]
self.args = None
self.device = None
def setup(self, args=None):
"""
Initialization before test
:param args:
:return:
"""
self.args = args or argparse.Namespace()
self.device = getattr(args, "device", None)
Command("nvme list").echo(ignore_errors=True)
def test(self):
"""
test case
:return:
"""
disk = self.device.get_name()
if self.in_use(disk):
print("%s is in use now, skip this test." % disk)
......@@ -69,13 +81,17 @@ class NvmeTest(Test):
Command("nvme list").echo(ignore_errors=True)
return True
except CertCommandError as e:
except Exception as e:
print("Error: nvme cmd fail.")
print(e)
return False
@staticmethod
def in_use(disk):
def in_use(self, disk):
"""
Determine whether the swapon is in use
:param disk:
:return:
"""
os.system("swapon -a 2>/dev/null")
swap_file = open("/proc/swaps", "r")
swap = swap_file.read()
......@@ -99,4 +115,3 @@ class NvmeTest(Test):
return True
if os.system("pvs 2>/dev/null | grep -q '/dev/%s'" % disk) == 0:
return True
......@@ -14,11 +14,13 @@
import re
from hwcompatible.test import Test
from hwcompatible.command import Command, CertCommandError
from hwcompatible.command import Command
class PerfTest(Test):
"""
Perf Test
"""
def __init__(self):
Test.__init__(self)
self.requirements = ["perf"]
......@@ -27,6 +29,10 @@ class PerfTest(Test):
self.perfReport = "perf report -i hwcompatible-perf.data --stdio"
def exec_perf(self):
"""
Execute perf command
:return:
"""
# record
print("Collecting the perf record using the command '%s'." % self.perfRecord)
perfRecordEcho = Command(self.perfRecord).read()
......@@ -47,7 +53,7 @@ class PerfTest(Test):
# report
perfReportEcho = Command(self.perfReport).read()
perfReportMacth = re.search("\s*\S+\s+(\[\S+.\S+\])\s+\S+", perfReportEcho)
perfReportMacth = re.search(r"\s*\S+\s+(\[\S+.\S+\])\s+\S+", perfReportEcho)
if not perfReportMacth:
print("Error: no samples found. Failed to fetch report because of:\n %s." % perfReportEcho)
return False
......@@ -56,6 +62,10 @@ class PerfTest(Test):
return True
def test(self):
"""
test case
:return:
"""
if not self.exec_perf():
return False
return True
......
......@@ -15,18 +15,19 @@
import os
import sys
import re
import shutil
import argparse
from hwcompatible.test import Test
from hwcompatible.command import Command, CertCommandError
from hwcompatible.command import Command
from hwcompatible.sysinfo import SysInfo
from hwcompatible.env import CertEnv
from hwcompatible.document import Document
class SystemTest(Test):
"""
System Test
"""
def __init__(self):
Test.__init__(self)
self.pri = 1
......@@ -35,10 +36,19 @@ class SystemTest(Test):
self.logdir = None
def setup(self, args=None):
"""
Initialization before test
:param args:
:return:
"""
self.args = args or argparse.Namespace()
self.logdir = getattr(args, "logdir", None)
def test(self):
"""
test case
:return:
"""
os.system("uname -a")
print("")
os.system("lsmod")
......@@ -58,8 +68,11 @@ class SystemTest(Test):
return return_code
@staticmethod
def check_certrpm():
def check_certrpm(self):
"""
Check installed cert package
:return:
"""
print("\nChecking installed cert package...")
return_code = True
for cert_package in ["oec-hardware"]:
......@@ -69,12 +82,16 @@ class SystemTest(Test):
sys.stdout.flush()
if rpm_verify.output and len(rpm_verify.output) > 0:
return_code = False
except OSError as err:
except:
print("Error: files in %s have been tampered." % cert_package)
return_code = False
return return_code
def check_kernel(self):
"""
Check kernel
:return:
"""
print("\nChecking kernel...")
kernel_rpm = self.sysinfo.kernel_rpm
os_version = self.sysinfo.product + " " + self.sysinfo.get_version()
......@@ -95,7 +112,7 @@ class SystemTest(Test):
if kernel_dict.document[os_version] != self.sysinfo.kernel_version:
print("Error: kernel %s check GA status fail." % self.sysinfo.kernel_version)
return_code = False
except KeyError:
except:
print("Error: %s is not supported." % os_version)
return_code = False
......@@ -113,16 +130,16 @@ class SystemTest(Test):
print(module)
print("")
if tainted & (1<<12):
if tainted & (1 << 12):
modules = self.get_modules("O")
print("Out-of-tree modules:")
for module in modules:
print(module)
#self.abi_check(module)
# self.abi_check(module)
return_code = False
print("")
if tainted & (1<<13):
if tainted & (1 << 13):
modules = self.get_modules("E")
print("Unsigned modules:")
for module in modules:
......@@ -130,7 +147,7 @@ class SystemTest(Test):
print("")
tainted_file.close()
except (IOError, ValueError) as e:
except Exception as e:
print(e)
print("Error: could not determine if kernel is tainted.")
return_code = False
......@@ -143,15 +160,19 @@ class SystemTest(Test):
try:
params = Command("cat /proc/cmdline").get_str()
print("Boot Parameters: %s" % params)
except OSError as e:
except Exception as e:
print(e)
print("Error: could not determine boot parameters.")
return_code = False
return return_code
@staticmethod
def get_modules(sign):
def get_modules(self, sign):
"""
Get the module with signs character
:param sign:
:return:
"""
pattern = re.compile(r"^(?P<mod_name>\w+)[\s\S]+\((?P<signs>[A-Z]+)\)")
proc_modules = open("/proc/modules")
modules = list()
......@@ -164,6 +185,11 @@ class SystemTest(Test):
return modules
def abi_check(self, module):
"""
Check abi whitelist
:param module:
:return:
"""
whitelist_path = [("/lib/modules/kabi-current/kabi_whitelist_" + self.sysinfo.arch),
("/lib/modules/kabi/kabi_whitelist_" + self.sysinfo.arch),
("/usr/src/kernels/%s/kabi_whitelist" % self.sysinfo.kernel)
......@@ -210,44 +236,53 @@ class SystemTest(Test):
print("")
return True
@staticmethod
def read_abi_whitelist(whitelist):
def read_abi_whitelist(self, whitelist):
"""
Read abi whitelist
:param whitelist:
:return:
"""
symbols = list()
if not os.path.isfile(whitelist):
print("Error: Cannot read whitelist file")
return None
whitelistFile = open(whitelist,"r")
whitelistfile = open(whitelist, "r")
while True:
line = whitelistFile.readline()
line = whitelistfile.readline()
if line == "":
break
if line == "\n":
continue
line.split()
if line[0] == '[':
group=line[1:-2]
# group = line[1:-2]
continue
symbol=line.strip()
symbol = line.strip()
symbols.append(symbol)
return symbols
def read_module(self, module):
"""
Read module
:param module:
:return:
"""
symbols = list()
modulefile = self.get_modulefile(module)
module_file = self.get_modulefile(module)
if not modulefile:
if not module_file:
print("Error: Can not find module file for %s" % module)
return None
if not os.path.isfile(modulefile):
print("Error: Cannot read module file %s" % modulefile)
if not os.path.isfile(module_file):
print("Error: Cannot read module file %s" % module_file)
return None
if modulefile[-2:] == "ko":
nm = os.popen('modprobe --dump-modversions ' + modulefile)
if module_file[-2:] == "ko":
nm = os.popen('modprobe --dump-modversions ' + module_file)
else:
nm = open(modulefile,"r")
nm = open(module_file, "r")
while True:
line = nm.readline()
......@@ -258,19 +293,26 @@ class SystemTest(Test):
nm.close()
return self.readSymbols(symbols)
@staticmethod
def get_modulefile(module):
def get_modulefile(self, module):
"""
Get module file
:param module:
:return:
"""
try:
modulefile = Command("modinfo -F filename %s" % module).get_str()
if os.path.islink(modulefile):
modulefile = os.readlink(modulefile)
return modulefile
except OSError:
except:
print("Error: could no find module file for %s:" % module)
return None
@staticmethod
def check_selinux():
def check_selinux(self):
"""
check selinux
:return:
"""
print("\nChecking selinux...")
status = os.system("/usr/sbin/sestatus | grep 'SELinux status' | grep -qw 'enabled'")
mode = os.system("/usr/sbin/sestatus | grep 'Current mode' | grep -qw 'enforcing'")
......
......@@ -16,11 +16,24 @@ import argparse
from hwcompatible.test import Test
from hwcompatible.command import Command, CertCommandError
from hwcompatible.device import CertDevice, Device
class TapeTest(Test):
"""
Tape test
"""
def __init__(self):
Test.__init__(self)
self.args = None
self.device = None
self.tapeDevice = None
def setup(self, args=None):
"""
Initialization before test
:param args:
:return:
"""
self.args = args or argparse.Namespace()
self.device = getattr(args, "device", None)
self.tapeDevice = self.device.get_property("DEVNAME")
......@@ -30,6 +43,10 @@ class TapeTest(Test):
print("Found the Tape Device :\n %s" % self.tapeDevice)
def test(self):
"""
test case
:return:
"""
if not self.tapeDevice:
return False
......@@ -38,19 +55,19 @@ class TapeTest(Test):
bs = 64
# rewind the tape
try:
tapeRewind = Command("mt -f %s rewind 2>/dev/null" % self.tapeDevice).read()
print("Rewind tape : \n %s" % tapeRewind)
tape_rewind = Command("mt -f %s rewind 2>/dev/null" % self.tapeDevice).read()
print("Rewind tape : \n %s" % tape_rewind)
except CertCommandError as exception:
print(exception)
return False
# Write data
try:
tapeWriteData = Command("tar -Pcb %s -f %s /usr" % (bs, self.tapeDevice)).read()
if tapeWriteData == 0:
tapewritedata = Command("tar -Pcb %s -f %s /usr" % (bs, self.tapeDevice)).read()
if tapewritedata == 0:
print("Write data done. Start comparing ...")
# Compare data
compareData = Command("tar -Pdb %s -f %s /usr" % (bs, self.tapeDevice)).read()
if compareData == 0:
comparedata = Command("tar -Pdb %s -f %s /usr" % (bs, self.tapeDevice)).read()
if comparedata == 0:
print("Tape test on device %s passed." % self.tapeDevice)
return True
else:
......
......@@ -12,24 +12,29 @@
# See the Mulan PSL v2 for more details.
# Create: 2020-04-01
import os
import sys
import time
from hwcompatible.test import Test
from hwcompatible.commandUI import CommandUI
from hwcompatible.command import Command, CertCommandError
from hwcompatible.device import CertDevice, Device
from hwcompatible.command import Command
from hwcompatible.device import CertDevice
class UsbTest(Test):
"""
Usb test
"""
def __init__(self):
Test.__init__(self)
self.requirements = ["usbutils"]
self.ui = CommandUI()
def test(self):
"""
Test case
:return:
"""
print("USB device:")
Command("lsusb -t").echo()
print("")
......@@ -93,6 +98,10 @@ class UsbTest(Test):
return True
def get_usb(self):
"""
Get usb
:return:
"""
devices = CertDevice().get_devices()
usb_devices = list()
for device in devices:
......
......@@ -15,7 +15,6 @@
import os
import sys
import time
import re
from hwcompatible.test import Test
from hwcompatible.commandUI import CommandUI
......@@ -23,7 +22,9 @@ from hwcompatible.command import Command, CertCommandError
class WatchDogTest(Test):
"""
WatchDog Test
"""
def __init__(self):
Test.__init__(self)
self.pri = 9
......@@ -33,16 +34,21 @@ class WatchDogTest(Test):
self.test_dir = os.path.dirname(os.path.realpath(__file__))
def test(self):
"""
test case
:return:
"""
if not os.path.exists("/dev/watchdog"):
os.system("modprobe softdog")
os.chdir(self.test_dir)
try:
timeout = Command("./watchdog -g").get_str(regex="^Watchdog timeout is (?P<timeout>[0-9]*) seconds.$",regex_group="timeout")
timeout = Command("./watchdog -g").get_str(regex="^Watchdog timeout is (?P<timeout>[0-9]*) seconds.$",
regex_group="timeout")
timeout = int(timeout)
if timeout > self.max_timeout:
Command("./watchdog -s %d" % self.max_timeout).echo()
except Exception as e:
except CertCommandError as e:
print(e)
print("Set/get watchdog timeout failed.")
return False
......@@ -59,7 +65,10 @@ class WatchDogTest(Test):
print("")
return False
@staticmethod
def startup():
def startup(self):
"""
Initialization before test
:return:
"""
print("Recover from watchdog.")
return True
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册