worker.py 16.9 KB
Newer Older
F
fuyw 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
#   Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import cloudpickle
import multiprocessing
import os
F
fuyw 已提交
18
import psutil
B
Bo Zhou 已提交
19
import signal
F
fuyw 已提交
20
import socket
F
fuyw 已提交
21 22
import subprocess
import sys
23
import tempfile
F
fuyw 已提交
24 25
import time
import threading
B
Bo Zhou 已提交
26
import warnings
F
fuyw 已提交
27
import zmq
F
fuyw 已提交
28
from datetime import datetime
29
import parl
B
Bo Zhou 已提交
30
from parl.utils import get_ip_address, to_byte, to_str, logger, _IS_WINDOWS, kill_process
F
fuyw 已提交
31
from parl.remote import remote_constants
B
Bo Zhou 已提交
32 33 34
from parl.remote.message import InitializedWorker
from parl.remote.status import WorkerStatus
from six.moves import queue
F
fuyw 已提交
35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58


class Worker(object):
    """Worker provides the cpu computation resources for the cluster.

    A worker node is connected to the master node and will send its
    computation resources information to the master node. When a worker
    node is created, it will start `cpu_num` empty jobs and these jobs'
    ip addresses will be send to the master node. Further, when an old
    job is killed, worker will start a new job and send the new job ip
    address to the master node.

    To start a worker, we use the following xparl command line api:

    .. code-block:: python

        xparl connect --address localhost:1234 --cpu_num 8

    Attributes:
        master_address (str): Master's ip address.
        request_master_socket (zmq.Context.socket): A socket which sends job
                                                    address to the master node.
        reply_job_socket (zmq.Context.socket): A socket which receives
                                               job_address from the job.
B
Bo Zhou 已提交
59
        kill_job_socket (zmq.Context.socket): A socket that receives commands to kill the job from jobs.
60 61
        job_buffer (str): A buffer that stores initialized jobs for providing new jobs in a short time.

F
fuyw 已提交
62 63 64 65 66
    Args:
        master_address (str): IP address of the master node.
        cpu_num (int): Number of cpu to be used on the worker.
    """

67
    def __init__(self, master_address, cpu_num=None, log_server_port=None):
F
fuyw 已提交
68 69 70 71 72 73
        self.lock = threading.Lock()
        self.heartbeat_socket_initialized = threading.Event()
        self.ctx = zmq.Context.instance()
        self.master_address = master_address
        self.master_is_alive = True
        self.worker_is_alive = True
B
Bo Zhou 已提交
74
        self.worker_status = None  # initialized at `self._create_jobs`
F
fuyw 已提交
75
        self._set_cpu_num(cpu_num)
B
Bo Zhou 已提交
76
        self.job_buffer = queue.Queue(maxsize=self.cpu_num)
F
fuyw 已提交
77
        self._create_sockets()
78
        self.check_version()
79 80 81
        # create log server
        self.log_server_proc, self.log_server_address = self._create_log_server(
            port=log_server_port)
B
Bo Zhou 已提交
82 83 84

        # create a thread that waits commands from the job to kill the job.
        self.kill_job_thread = threading.Thread(target=self._reply_kill_job)
85
        self.kill_job_thread.setDaemon(True)
B
Bo Zhou 已提交
86 87 88 89 90 91 92 93
        self.kill_job_thread.start()

        self._create_jobs()

        # create a thread that initializes jobs and adds them into the job_buffer
        job_thread = threading.Thread(target=self._fill_job_buffer)
        job_thread.setDaemon(True)
        job_thread.start()
F
fuyw 已提交
94 95 96 97 98 99 100 101 102 103 104

    def _set_cpu_num(self, cpu_num=None):
        """set useable cpu number for worker"""
        if cpu_num is not None:
            assert isinstance(
                cpu_num, int
            ), "cpu_num should be INT type, please check the input type."
            self.cpu_num = cpu_num
        else:
            self.cpu_num = multiprocessing.cpu_count()

105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122
    def check_version(self):
        '''Verify that the parl & python version in 'worker' process matches that of the 'master' process'''
        self.request_master_socket.send_multipart(
            [remote_constants.CHECK_VERSION_TAG])
        message = self.request_master_socket.recv_multipart()
        tag = message[0]
        if tag == remote_constants.NORMAL_TAG:
            worker_parl_version = parl.__version__
            worker_python_version = str(sys.version_info.major)
            assert worker_parl_version == to_str(message[1]) and worker_python_version == to_str(message[2]),\
                '''Version mismatch: the "master" is of version "parl={}, python={}". However, 
                "parl={}, python={}"is provided in your environment.'''.format(
                        to_str(message[1]), to_str(message[2]),
                        worker_parl_version, worker_python_version
                    )
        else:
            raise NotImplementedError

F
fuyw 已提交
123
    def _create_sockets(self):
F
fuyw 已提交
124
        """ Each worker has three sockets at start:
F
fuyw 已提交
125 126

        (1) request_master_socket: sends job address to master node.
F
fuyw 已提交
127 128
        (2) reply_job_socket: receives job_address from subprocess.
        (3) kill_job_socket : receives commands to kill the job from jobs.
F
fuyw 已提交
129

B
Bo Zhou 已提交
130 131
        When a job starts, a new heartbeat socket is created to receive
        heartbeat signals from the job.
F
fuyw 已提交
132 133

        """
F
fuyw 已提交
134
        self.worker_ip = get_ip_address()
F
fuyw 已提交
135 136 137 138 139 140 141 142 143 144 145 146 147 148 149

        # request_master_socket: sends job address to master
        self.request_master_socket = self.ctx.socket(zmq.REQ)
        self.request_master_socket.linger = 0

        # wait for 0.5 second to check whether master is started
        self.request_master_socket.setsockopt(zmq.RCVTIMEO, 500)
        self.request_master_socket.connect("tcp://" + self.master_address)

        # reply_job_socket: receives job_address from subprocess
        self.reply_job_socket = self.ctx.socket(zmq.REP)
        self.reply_job_socket.linger = 0
        reply_job_port = self.reply_job_socket.bind_to_random_port("tcp://*")
        self.reply_job_address = "{}:{}".format(self.worker_ip, reply_job_port)

B
Bo Zhou 已提交
150 151 152 153 154 155 156 157
        # kill_job_socket
        self.kill_job_socket = self.ctx.socket(zmq.REP)
        self.kill_job_socket.linger = 0
        kill_job_port = self.kill_job_socket.bind_to_random_port("tcp://*")
        self.kill_job_address = "{}:{}".format(self.worker_ip, kill_job_port)

    def _create_jobs(self):
        """Create jobs and send a instance of ``InitializedWorker`` that contains the worker information to the master."""
F
fuyw 已提交
158 159 160 161 162 163 164 165 166 167
        try:
            self.request_master_socket.send_multipart(
                [remote_constants.WORKER_CONNECT_TAG])
            _ = self.request_master_socket.recv_multipart()
        except zmq.error.Again as e:
            logger.error("Can not connect to the master, "
                         "please check if master is started.")
            self.master_is_alive = False
            return

B
Bo Zhou 已提交
168
        initialized_jobs = self._init_jobs(job_num=self.cpu_num)
F
fuyw 已提交
169 170 171
        self.request_master_socket.setsockopt(
            zmq.RCVTIMEO, remote_constants.HEARTBEAT_TIMEOUT_S * 1000)

B
Bo Zhou 已提交
172
        self.reply_master_hearbeat_thread = threading.Thread(
F
fuyw 已提交
173
            target=self._reply_heartbeat,
B
Bo Zhou 已提交
174
            args=("master {}".format(self.master_address), ))
B
Bo Zhou 已提交
175
        self.reply_master_hearbeat_thread.start()
F
fuyw 已提交
176 177
        self.heartbeat_socket_initialized.wait()

F
fuyw 已提交
178 179 180 181 182 183
        for job in initialized_jobs:
            job.worker_address = self.master_heartbeat_address

        initialized_worker = InitializedWorker(self.master_heartbeat_address,
                                               initialized_jobs, self.cpu_num,
                                               socket.gethostname())
F
fuyw 已提交
184 185
        self.request_master_socket.send_multipart([
            remote_constants.WORKER_INITIALIZED_TAG,
B
Bo Zhou 已提交
186
            cloudpickle.dumps(initialized_worker)
F
fuyw 已提交
187
        ])
F
fuyw 已提交
188

F
fuyw 已提交
189
        _ = self.request_master_socket.recv_multipart()
F
fuyw 已提交
190
        self.worker_status = WorkerStatus(self.master_heartbeat_address,
B
Bo Zhou 已提交
191 192 193 194
                                          initialized_jobs, self.cpu_num)

    def _fill_job_buffer(self):
        """An endless loop that adds initialized job into the job buffer"""
195
        initialized_jobs = []
B
Bo Zhou 已提交
196
        while self.worker_is_alive:
197
            if self.job_buffer.full() is False:
198 199 200 201 202 203 204
                job_num = self.cpu_num - self.job_buffer.qsize()
                if job_num > 0:
                    initialized_jobs = self._init_jobs(job_num=job_num)
                    for job in initialized_jobs:
                        self.job_buffer.put(job)

            time.sleep(0.02)
205
        self.exit()
F
fuyw 已提交
206

B
Bo Zhou 已提交
207 208 209 210 211 212 213 214
    def _init_jobs(self, job_num):
        """Create jobs.

        Args:
            job_num(int): the number of jobs to create.
        """
        job_file = __file__.replace('worker.pyc', 'job.py')
        job_file = job_file.replace('worker.py', 'job.py')
F
fuyw 已提交
215
        command = [
216
            sys.executable, job_file, "--worker_address",
217 218
            self.reply_job_address, "--log_server_address",
            self.log_server_address
F
fuyw 已提交
219 220
        ]

F
fuyw 已提交
221 222 223
        if sys.version_info.major == 3:
            warnings.simplefilter("ignore", ResourceWarning)

B
Bo Zhou 已提交
224 225 226
        # avoid that many jobs are killed and restarted at the same time.
        self.lock.acquire()

B
Bo Zhou 已提交
227 228 229
        # Redirect the output to DEVNULL
        FNULL = open(os.devnull, 'w')
        for _ in range(job_num):
B
Bo Zhou 已提交
230
            subprocess.Popen(command, stdout=FNULL, stderr=subprocess.STDOUT)
B
Bo Zhou 已提交
231 232
        FNULL.close()

B
Bo Zhou 已提交
233
        new_jobs = []
B
Bo Zhou 已提交
234 235
        for _ in range(job_num):
            job_message = self.reply_job_socket.recv_multipart()
B
Bo Zhou 已提交
236 237 238 239 240
            self.reply_job_socket.send_multipart(
                [remote_constants.NORMAL_TAG,
                 to_byte(self.kill_job_address)])
            initialized_job = cloudpickle.loads(job_message[1])
            new_jobs.append(initialized_job)
B
Bo Zhou 已提交
241 242 243

            # a thread for sending heartbeat signals to job
            thread = threading.Thread(
B
Bo Zhou 已提交
244
                target=self._create_job_monitor, args=(initialized_job, ))
245
            thread.setDaemon(True)
B
Bo Zhou 已提交
246
            thread.start()
B
Bo Zhou 已提交
247 248 249
        self.lock.release()
        assert len(new_jobs) > 0, "init jobs failed"
        return new_jobs
F
fuyw 已提交
250 251

    def _kill_job(self, job_address):
B
Bo Zhou 已提交
252 253 254 255 256
        """Kill a job process and update worker information"""
        success = self.worker_status.remove_job(job_address)
        if success:
            while True:
                initialized_job = self.job_buffer.get()
F
fuyw 已提交
257
                initialized_job.worker_address = self.master_heartbeat_address
B
Bo Zhou 已提交
258 259 260 261 262 263 264 265 266 267 268 269 270
                if initialized_job.is_alive:
                    self.worker_status.add_job(initialized_job)
                    if not initialized_job.is_alive:  # make sure that the job is still alive.
                        self.worker_status.remove_job(
                            initialized_job.job_address)
                        continue
                else:
                    logger.warning(
                        "[Worker] a dead job found. The job buffer will not accept this one."
                    )
                if initialized_job.is_alive:
                    break

B
Bo Zhou 已提交
271
            self.lock.acquire()
B
Bo Zhou 已提交
272 273 274 275 276 277
            self.request_master_socket.send_multipart([
                remote_constants.NEW_JOB_TAG,
                cloudpickle.dumps(initialized_job),
                to_byte(job_address)
            ])
            _ = self.request_master_socket.recv_multipart()
B
Bo Zhou 已提交
278
            self.lock.release()
F
fuyw 已提交
279

B
Bo Zhou 已提交
280 281
    def _create_job_monitor(self, job):
        """Send heartbeat signals to check target's status"""
F
fuyw 已提交
282 283 284 285 286 287

        # job_heartbeat_socket: sends heartbeat signal to job
        job_heartbeat_socket = self.ctx.socket(zmq.REQ)
        job_heartbeat_socket.linger = 0
        job_heartbeat_socket.setsockopt(
            zmq.RCVTIMEO, remote_constants.HEARTBEAT_TIMEOUT_S * 1000)
B
Bo Zhou 已提交
288
        job_heartbeat_socket.connect("tcp://" + job.worker_heartbeat_address)
F
fuyw 已提交
289

B
Bo Zhou 已提交
290 291
        job.is_alive = True
        while job.is_alive and self.master_is_alive and self.worker_is_alive:
F
fuyw 已提交
292 293 294 295 296 297
            try:
                job_heartbeat_socket.send_multipart(
                    [remote_constants.HEARTBEAT_TAG])
                _ = job_heartbeat_socket.recv_multipart()
                time.sleep(remote_constants.HEARTBEAT_INTERVAL_S)
            except zmq.error.Again as e:
B
Bo Zhou 已提交
298 299 300 301 302 303
                job.is_alive = False
                logger.warning(
                    "[Worker] lost connection with the job:{}".format(
                        job.job_address))
                if self.master_is_alive and self.worker_is_alive:
                    self._kill_job(job.job_address)
F
fuyw 已提交
304 305 306 307 308 309

            except zmq.error.ZMQError as e:
                break

        job_heartbeat_socket.close(0)

B
Bo Zhou 已提交
310 311 312 313 314 315 316 317
    def _reply_kill_job(self):
        """Worker starts a thread to wait jobs' commands to kill the job"""
        self.kill_job_socket.linger = 0
        self.kill_job_socket.setsockopt(
            zmq.RCVTIMEO, remote_constants.HEARTBEAT_RCVTIMEO_S * 1000)
        while self.worker_is_alive and self.master_is_alive:
            try:
                message = self.kill_job_socket.recv_multipart()
318 319
                tag = message[0]
                assert tag == remote_constants.KILLJOB_TAG
B
Bo Zhou 已提交
320 321 322 323 324 325 326 327
                to_kill_job_address = to_str(message[1])
                self._kill_job(to_kill_job_address)
                self.kill_job_socket.send_multipart(
                    [remote_constants.NORMAL_TAG])
            except zmq.error.Again as e:
                #detect whether `self.worker_is_alive` is True periodically
                pass

F
fuyw 已提交
328 329 330 331 332 333
    def _get_worker_status(self):
        now = datetime.strftime(datetime.now(), '%H:%M:%S')
        virtual_memory = psutil.virtual_memory()
        total_memory = round(virtual_memory[0] / (1024**3), 2)
        used_memory = round(virtual_memory[3] / (1024**3), 2)
        vacant_memory = round(total_memory - used_memory, 2)
H
Hongsheng Zeng 已提交
334 335 336 337
        if _IS_WINDOWS:
            load_average = round(psutil.getloadavg()[0], 2)
        else:
            load_average = round(os.getloadavg()[0], 2)
F
fuyw 已提交
338 339
        return (vacant_memory, used_memory, now, load_average)

F
fuyw 已提交
340 341 342 343 344 345 346 347 348 349
    def _reply_heartbeat(self, target):
        """Worker will kill its jobs when it lost connection with the master.
        """

        socket = self.ctx.socket(zmq.REP)
        socket.linger = 0
        socket.setsockopt(zmq.RCVTIMEO,
                          remote_constants.HEARTBEAT_RCVTIMEO_S * 1000)
        heartbeat_master_port =\
            socket.bind_to_random_port("tcp://*")
B
Bo Zhou 已提交
350
        self.master_heartbeat_address = "{}:{}".format(self.worker_ip,
F
fuyw 已提交
351
                                                       heartbeat_master_port)
F
fuyw 已提交
352 353 354

        logger.set_dir(
            os.path.expanduser('~/.parl_data/worker/{}'.format(
H
Hongsheng Zeng 已提交
355
                self.master_heartbeat_address.replace(':', '_'))))
F
fuyw 已提交
356

F
fuyw 已提交
357 358 359
        self.heartbeat_socket_initialized.set()
        logger.info("[Worker] Connect to the master node successfully. "
                    "({} CPUs)".format(self.cpu_num))
360
        while self.master_is_alive and self.worker_is_alive:
F
fuyw 已提交
361 362
            try:
                message = socket.recv_multipart()
F
fuyw 已提交
363 364 365 366 367 368 369 370
                worker_status = self._get_worker_status()
                socket.send_multipart([
                    remote_constants.HEARTBEAT_TAG,
                    to_byte(str(worker_status[0])),
                    to_byte(str(worker_status[1])),
                    to_byte(worker_status[2]),
                    to_byte(str(worker_status[3]))
                ])
F
fuyw 已提交
371 372 373 374 375
            except zmq.error.Again as e:
                self.master_is_alive = False
            except zmq.error.ContextTerminated as e:
                break
        socket.close(0)
376
        logger.warning(
377
            "[Worker] lost connection with the master, will exit reply heartbeat for master."
378
        )
B
Bo Zhou 已提交
379
        self.worker_status.clear()
380 381
        self.log_server_proc.kill()
        self.log_server_proc.wait()
382 383
        # exit the worker
        self.worker_is_alive = False
384
        self.exit()
F
fuyw 已提交
385

386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413
    def _create_log_server(self, port):
        log_server_file = __file__.replace('worker.pyc', 'log_server.py')
        log_server_file = log_server_file.replace('worker.py', 'log_server.py')

        if port is None:
            port = "0"  # `0` means using a random port in flask
        command = [
            sys.executable, log_server_file, "--port",
            str(port), "--log_dir", "~/.parl_data/job/", "--line_num", "500"
        ]

        if sys.version_info.major == 3:
            warnings.simplefilter("ignore", ResourceWarning)

        if _IS_WINDOWS:
            FNULL = tempfile.TemporaryFile()
        else:
            FNULL = open(os.devnull, 'w')
        log_server_proc = subprocess.Popen(
            command,
            stdout=FNULL,
            stderr=subprocess.STDOUT,
        )
        FNULL.close()

        log_server_address = "{}:{}".format(self.worker_ip, port)
        return log_server_proc, log_server_address

F
fuyw 已提交
414
    def exit(self):
415
        """close the worker"""
F
fuyw 已提交
416
        self.worker_is_alive = False
B
Bo Zhou 已提交
417
        kill_process('remote/job.py.*{}'.format(self.reply_job_address))
F
fuyw 已提交
418 419

    def run(self):
B
Bo Zhou 已提交
420
        """Keep running until it lost connection with the master.
F
fuyw 已提交
421
        """
B
Bo Zhou 已提交
422
        self.reply_master_hearbeat_thread.join()