提交 aef36b24 编写于 作者: B barrierye

update code & add general monitor

上级 8573e8af
...@@ -35,7 +35,7 @@ class Monitor(object): ...@@ -35,7 +35,7 @@ class Monitor(object):
self._remote_donefile_name = None self._remote_donefile_name = None
self._local_path = None self._local_path = None
self._local_model_name = None self._local_model_name = None
self._local_donefile_name = None self._local_timestamp_file = None
self._interval = interval self._interval = interval
self._remote_donefile_timestamp = None self._remote_donefile_timestamp = None
self._local_tmp_path = None self._local_tmp_path = None
...@@ -55,8 +55,8 @@ class Monitor(object): ...@@ -55,8 +55,8 @@ class Monitor(object):
def set_local_model_name(self, model_name): def set_local_model_name(self, model_name):
self._local_model_name = model_name self._local_model_name = model_name
def set_local_donefile_name(self, donefile_name): def set_local_timestamp_file(self, timestamp_file):
self._local_donefile_name = donefile_name self._local_timestamp_file = timestamp_file
def set_local_tmp_path(self, tmp_path): def set_local_tmp_path(self, tmp_path):
self._local_tmp_path = tmp_path self._local_tmp_path = tmp_path
...@@ -72,8 +72,8 @@ class Monitor(object): ...@@ -72,8 +72,8 @@ class Monitor(object):
raise Exception('local_model_name not set.') raise Exception('local_model_name not set.')
if self._local_path is None: if self._local_path is None:
raise Exception('local_path not set.') raise Exception('local_path not set.')
if self._local_donefile_name is None: if self._local_timestamp_file is None:
raise Exception('local_donefile_name not set.') raise Exception('local_timestamp_file not set.')
if self._local_tmp_path is None: if self._local_tmp_path is None:
raise Exception('local_tmp_path not set.') raise Exception('local_tmp_path not set.')
...@@ -86,7 +86,8 @@ class Monitor(object): ...@@ -86,7 +86,8 @@ class Monitor(object):
os.makedirs(self._local_tmp_path) os.makedirs(self._local_tmp_path)
while True: while True:
[flag, timestamp] = self._exist_remote_file( [flag, timestamp] = self._exist_remote_file(
self._remote_path, self._remote_donefile_name) self._remote_path, self._remote_donefile_name,
self._local_tmp_path)
if flag: if flag:
if self._remote_donefile_timestamp is None or \ if self._remote_donefile_timestamp is None or \
timestamp != self._remote_donefile_timestamp: timestamp != self._remote_donefile_timestamp:
...@@ -103,7 +104,7 @@ class Monitor(object): ...@@ -103,7 +104,7 @@ class Monitor(object):
).strftime('%Y-%m-%d %H:%M:%S'))) ).strftime('%Y-%m-%d %H:%M:%S')))
self._update_local_donefile(self._local_path, self._update_local_donefile(self._local_path,
self._local_model_name, self._local_model_name,
self._local_donefile_name) self._local_timestamp_file)
print('{} [INFO] update local donefile'.format( print('{} [INFO] update local donefile'.format(
datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')))
else: else:
...@@ -113,7 +114,7 @@ class Monitor(object): ...@@ -113,7 +114,7 @@ class Monitor(object):
'%Y-%m-%d %H:%M:%S'), self._interval)) '%Y-%m-%d %H:%M:%S'), self._interval))
time.sleep(self._interval) time.sleep(self._interval)
def _exist_remote_file(self, path, filename): def _exist_remote_file(self, path, filename, local_tmp_path):
raise Exception('This function must be inherited.') raise Exception('This function must be inherited.')
def _pull_remote_dir(self, remote_path, dirname, local_tmp_path): def _pull_remote_dir(self, remote_path, dirname, local_tmp_path):
...@@ -128,9 +129,9 @@ class Monitor(object): ...@@ -128,9 +129,9 @@ class Monitor(object):
raise Exception('update local model failed.') raise Exception('update local model failed.')
def _update_local_donefile(self, local_path, local_model_name, def _update_local_donefile(self, local_path, local_model_name,
local_donefile_name): local_timestamp_file):
donefile_path = os.path.join(local_path, local_model_name, donefile_path = os.path.join(local_path, local_model_name,
local_donefile_name) local_timestamp_file)
cmd = 'touch {}'.format(donefile_path) cmd = 'touch {}'.format(donefile_path)
if os.system(cmd) != 0: if os.system(cmd) != 0:
raise Exception('update local donefile failed.') raise Exception('update local donefile failed.')
...@@ -143,7 +144,7 @@ class HDFSMonitor(Monitor): ...@@ -143,7 +144,7 @@ class HDFSMonitor(Monitor):
super(HDFSMonitor, self).__init__(interval) super(HDFSMonitor, self).__init__(interval)
self._hdfs_bin_path = bin_path self._hdfs_bin_path = bin_path
def _exist_remote_file(self, path, filename): def _exist_remote_file(self, path, filename, local_tmp_path):
remote_filepath = os.path.join(path, filename) remote_filepath = os.path.join(path, filename)
cmd = '{} dfs -stat "%Y" {}'.format(self._hdfs_bin_path, cmd = '{} dfs -stat "%Y" {}'.format(self._hdfs_bin_path,
remote_filepath) remote_filepath)
...@@ -164,19 +165,15 @@ class HDFSMonitor(Monitor): ...@@ -164,19 +165,15 @@ class HDFSMonitor(Monitor):
class FTPMonitor(Monitor): class FTPMonitor(Monitor):
''' FTP Monitor. ''' ''' FTP Monitor. '''
def __init__(self, ftp_ip, ftp_port, username="", password="", interval=10): def __init__(self, host, port, username="", password="", interval=10):
super(FTPMonitor, self).__init__(interval) super(FTPMonitor, self).__init__(interval)
import ftplib import ftplib
self._ftp_ip = ftp_ip
self._ftp_port = ftp_port
self._ftp = ftplib.FTP() self._ftp = ftplib.FTP()
self._connect(ftp_ip, ftp_port, username, password) self._ftp.connect(host, port)
def _connect(self, ftp_ip, ftp_port, username, password):
self._ftp.connect(ftp_ip, ftp_port)
self._ftp.login(username, password) self._ftp.login(username, password)
self._ftp_url = 'ftp://{}:{}/'.format(host, port)
def _exist_remote_file(self, path, filename): def _exist_remote_file(self, path, filename, local_tmp_path):
import ftplib import ftplib
try: try:
filepath = os.path.join(path, filename) filepath = os.path.join(path, filename)
...@@ -186,10 +183,66 @@ class FTPMonitor(Monitor): ...@@ -186,10 +183,66 @@ class FTPMonitor(Monitor):
except ftplib.error_perm: except ftplib.error_perm:
return [False, None] return [False, None]
def _download_remote_files(remote_path,
remote_dirname,
local_tmp_path,
overwrite=True):
try:
remote_dirpath = os.path.join(remote_path, remote_dirname)
self._ftp.cwd(remote_dirpath)
os.mkdir(os.path.join(local_tmp_path, remote_dirname))
except OSError:
# folder already exists at the local_tmp_path
pass
except ftplib.error_perm:
raise Exception('remote_path({}) not exist.'.format(remote_path))
filelist = [x for x in self_ftp.mlsd()]
for file in filelist:
if file[1]['type'] == 'file':
fullpath = os.path.join(local_tmp_path, remote_dirname, file[0])
if not overwrite and os.path.isfile(fullpath):
continue
else:
with open(fullpath, 'wb') as f:
self._ftp.retrbinary('RETR ' + file[0], f.write)
elif file[1]['type'] == 'dir':
self._download_remote_files(
os.path.join(remote_path, remote_dirname), file[0],
os.path.join(local_tmp_path, remote_dirname), overwrite)
else:
print('Unknown type: ' + file[1]['type'])
def _pull_remote_dir(self, remote_path, dirname, local_tmp_path):
self._exist_remote_file(
remote_path, dirname, local_tmp_path, overwrite=True)
class GeneralMonitor(Monitor):
''' General Monitor. '''
def __init__(self, host, interval=10):
super(GeneralMonitor, self).__init__(interval)
self._host = host
def _get_local_file_timestamp(self, filename):
return os.path.getmtime(filename)
def _exist_remote_file(self, path, filename, local_tmp_path):
remote_filepath = os.path.join(path, filename)
url = '{}/{}'.format(self._host, remote_filepath)
cmd = 'wget -N -P {} {}'.format(local_tmp_path, url)
if os.system(cmd) != 0:
return [False, None]
else:
timestamp = self._get_local_file_timestamp(
os.path.join(local_tmp_path, filename))
return [True, timestamp]
def _pull_remote_dir(self, remote_path, dirname, local_tmp_path): def _pull_remote_dir(self, remote_path, dirname, local_tmp_path):
filepath = os.path.join(remote_path, dirname) remote_dirpath = os.path.join(remote_path, dirname)
cmd = 'wget -nH -r -P {} ftp://{}:{}/{} &> /dev/null'.format( url = '{}/{}'.format(self._host, remote_dirpath)
local_tmp_path, self._ftp_ip, self._ftp_port, filepath) cmd = 'wget -nH -r -P {} {} &> /dev/null'.format(local_tmp_path, url)
if os.system(cmd) != 0: if os.system(cmd) != 0:
raise Exception('pull remote dir failed.') raise Exception('pull remote dir failed.')
...@@ -198,7 +251,7 @@ def parse_args(): ...@@ -198,7 +251,7 @@ def parse_args():
''' parse args. ''' ''' parse args. '''
parser = argparse.ArgumentParser(description="Monitor") parser = argparse.ArgumentParser(description="Monitor")
parser.add_argument( parser.add_argument(
"--type", type=str, required=True, help="Type of remote server") "--type", type=str, default='general', help="Type of remote server")
parser.add_argument( parser.add_argument(
"--remote_path", type=str, required=True, help="Remote path") "--remote_path", type=str, required=True, help="Remote path")
parser.add_argument( parser.add_argument(
...@@ -216,26 +269,42 @@ def parse_args(): ...@@ -216,26 +269,42 @@ def parse_args():
parser.add_argument( parser.add_argument(
"--local_model_name", type=str, required=True, help="Local model name") "--local_model_name", type=str, required=True, help="Local model name")
parser.add_argument( parser.add_argument(
"--local_donefile_name", "--local_timestamp_file",
type=str, type=str,
required=True, default='fluid_time_file',
help="Local donfile name(fluid_time_file in model file)") help="Local timestamp file name(fluid_time_file in model file)")
parser.add_argument( parser.add_argument(
"--local_tmp_path", type=str, default='tmp', help="Local tmp path") "--local_tmp_path",
type=str,
default='_serving_monitor_tmp',
help="Local tmp path")
parser.add_argument( parser.add_argument(
"--interval", type=int, default=10, help="Time interval") "--interval", type=int, default=10, help="Time interval")
parser.add_argument("--ftp_ip", type=str, help="Ip the ftp")
parser.add_argument("--ftp_port", type=int, help="Port the ftp")
parser.add_argument( parser.add_argument(
"--hdfs_bin", type=str, default='hdfs', help="Hdfs binary file path") "--general_host", type=str, help="Host of general remote server")
parser.add_argument("--hdfs_bin", type=str, help="Hdfs binary file path")
parser.add_argument("--ftp_host", type=str, help="Host of ftp")
parser.add_argument("--ftp_port", type=int, help="Port of ftp")
parser.add_argument(
"--ftp_username", type=str, default='', help="Username of ftp")
parser.add_argument(
"--ftp_password", type=str, default='', help="Password of ftp")
return parser.parse_args() return parser.parse_args()
def get_monitor(mtype): def get_monitor(mtype):
''' get monitor. '''
if mtype == 'ftp': if mtype == 'ftp':
return FTPMonitor(args.ftp_ip, args.ftp_port, interval=args.interval) return FTPMonitor(
args.ftp_host,
args.ftp_port,
username=args.ftp_username,
password=args.ftp_password,
interval=args.interval)
elif mtype == 'hdfs': elif mtype == 'hdfs':
return HDFSMonitor(args.hdfs_bin, interval=args.interval) return HDFSMonitor(args.hdfs_bin, interval=args.interval)
elif mtype == 'general':
return GeneralMonitor(args.general_host, interval=args.interval)
else: else:
raise Exception('unsupport type.') raise Exception('unsupport type.')
...@@ -246,7 +315,7 @@ def start_monitor(monitor, args): ...@@ -246,7 +315,7 @@ def start_monitor(monitor, args):
monitor.set_remote_donefile_name(args.remote_donefile_name) monitor.set_remote_donefile_name(args.remote_donefile_name)
monitor.set_local_path(args.local_path) monitor.set_local_path(args.local_path)
monitor.set_local_model_name(args.local_model_name) monitor.set_local_model_name(args.local_model_name)
monitor.set_local_donefile_name(args.local_donefile_name) monitor.set_local_timestamp_file(args.local_timestamp_file)
monitor.set_local_tmp_path(args.local_tmp_path) monitor.set_local_tmp_path(args.local_tmp_path)
monitor.run() monitor.run()
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册