From 88658ea358298c6e61e95a454284b8853a3e9484 Mon Sep 17 00:00:00 2001 From: Xu Jingxin Date: Mon, 27 Dec 2021 13:13:27 +0800 Subject: [PATCH] doc(xjx): add docs, wrap decorators for framework (#166) --- ding/framework/parallel.py | 20 ++++++++++++++++++++ ding/framework/task.py | 36 +++++++++++++++++++++++++++++++++++- ding/worker/buffer/buffer.py | 2 ++ 3 files changed, 57 insertions(+), 1 deletion(-) diff --git a/ding/framework/parallel.py b/ding/framework/parallel.py index ebe019a..4880582 100644 --- a/ding/framework/parallel.py +++ b/ding/framework/parallel.py @@ -207,13 +207,33 @@ now there are {} ports and {} workers".format(len(ports), n_workers) break def register_rpc(self, fn_name: str, fn: Callable) -> None: + """ + Overview: + Register an rpc on parallel instance, this function will be executed \ + when a remote process call this function via network. + Arguments: + - fn_name (:obj:`str`): Function name. + - fn (:obj:`Callable`): Function body. + """ self._rpc[fn_name] = fn def unregister_rpc(self, fn_name: str) -> None: + """ + Overview: + Unregister an rpc function. + Arguments: + - fn_name (:obj:`str`): Function name. + """ if fn_name in self._rpc: del self._rpc[fn_name] def send_rpc(self, func_name: str, *args, **kwargs) -> None: + """ + Overview: + Send an rpc via network to subscribed processes. + Arguments: + - fn_name (:obj:`str`): Function name. + """ if self.is_active: payload = {"f": func_name, "a": args, "k": kwargs} return self._sock and self._sock.send(pickle.dumps(payload, protocol=-1)) diff --git a/ding/framework/task.py b/ding/framework/task.py index ecf5787..5b11816 100644 --- a/ding/framework/task.py +++ b/ding/framework/task.py @@ -8,6 +8,7 @@ from types import GeneratorType from typing import Awaitable, Callable, Dict, Generator, Iterable, List, Optional, Set from ding.framework.context import Context from ding.framework.parallel import Parallel +from functools import wraps def enable_async(func: Callable) -> Callable: @@ -20,6 +21,7 @@ def enable_async(func: Callable) -> Callable: - runtime_handler (:obj:`Callable`): The wrap function. """ + @wraps(func) def runtime_handler(task: "Task", *args, **kwargs) -> "Task": """ Overview: @@ -115,6 +117,13 @@ class Task: return self def use_step_wrapper(self, fn: Callable) -> 'Task': + """ + Overview: + Register wrappers to task. A wrapper works like a decorator, but task will apply this \ + decorator on top of each middleware. + Arguments: + - fn (:obj:`Callable`): A wrapper is a decorator, so the first argument is a callable function. + """ self.step_wrappers.append(fn) return self @@ -227,6 +236,10 @@ class Task: self.stop() def stop(self) -> None: + """ + Overview: + Stop and cleanup every thing in the runtime of task. + """ self.emit("exit") if self._thread_pool: self._thread_pool.shutdown() @@ -284,7 +297,14 @@ be thrown after the timeout {}s is reached".format(n_timeout) t = self._loop.run_in_executor(self._thread_pool, fn, *args, **kwargs) self._async_stack.append(t) - def emit(self, event_name, *args, **kwargs): + def emit(self, event_name: str, *args, **kwargs): + """ + Overview: + Emit a event, call listeners. + Arguments: + - event_name (:obj:`str`): Event name. + - args (:obj:`any`): Rest arguments for listeners. + """ if event_name in self.event_listeners: for fn in self.event_listeners[event_name]: fn(*args, **kwargs) @@ -294,9 +314,23 @@ be thrown after the timeout {}s is reached".format(n_timeout) fn(*args, **kwargs) def on(self, event: str, fn: Callable) -> None: + """ + Overview: + Subscribe to an event, execute this function every time the event is emitted. + Arguments: + - event (:obj:`str`): Event name. + - fn (:obj:`Callable`): The function. + """ self.event_listeners[event].append(fn) def once(self, event: str, fn: Callable) -> None: + """ + Overview: + Subscribe to an event, execute this function only once when the event is emitted. + Arguments: + - event (:obj:`str`): Event name. + - fn (:obj:`Callable`): The function. + """ self.once_listeners[event].append(fn) @property diff --git a/ding/worker/buffer/buffer.py b/ding/worker/buffer/buffer.py index 1b6db8c..c0d6566 100644 --- a/ding/worker/buffer/buffer.py +++ b/ding/worker/buffer/buffer.py @@ -2,12 +2,14 @@ from abc import abstractmethod from typing import Any, List, Optional, Union, Callable import copy from dataclasses import dataclass +from functools import wraps def apply_middleware(func_name: str): def wrap_func(base_func: Callable): + @wraps(base_func) def handler(buffer, *args, **kwargs): """ Overview: -- GitLab