未验证 提交 88658ea3 编写于 作者: X Xu Jingxin 提交者: GitHub

doc(xjx): add docs, wrap decorators for framework (#166)

上级 0b71fc4e
...@@ -207,13 +207,33 @@ now there are {} ports and {} workers".format(len(ports), n_workers) ...@@ -207,13 +207,33 @@ now there are {} ports and {} workers".format(len(ports), n_workers)
break break
def register_rpc(self, fn_name: str, fn: Callable) -> None: def register_rpc(self, fn_name: str, fn: Callable) -> None:
"""
Overview:
Register an rpc on parallel instance, this function will be executed \
when a remote process call this function via network.
Arguments:
- fn_name (:obj:`str`): Function name.
- fn (:obj:`Callable`): Function body.
"""
self._rpc[fn_name] = fn self._rpc[fn_name] = fn
def unregister_rpc(self, fn_name: str) -> None: def unregister_rpc(self, fn_name: str) -> None:
"""
Overview:
Unregister an rpc function.
Arguments:
- fn_name (:obj:`str`): Function name.
"""
if fn_name in self._rpc: if fn_name in self._rpc:
del self._rpc[fn_name] del self._rpc[fn_name]
def send_rpc(self, func_name: str, *args, **kwargs) -> None: def send_rpc(self, func_name: str, *args, **kwargs) -> None:
"""
Overview:
Send an rpc via network to subscribed processes.
Arguments:
- fn_name (:obj:`str`): Function name.
"""
if self.is_active: if self.is_active:
payload = {"f": func_name, "a": args, "k": kwargs} payload = {"f": func_name, "a": args, "k": kwargs}
return self._sock and self._sock.send(pickle.dumps(payload, protocol=-1)) return self._sock and self._sock.send(pickle.dumps(payload, protocol=-1))
......
...@@ -8,6 +8,7 @@ from types import GeneratorType ...@@ -8,6 +8,7 @@ from types import GeneratorType
from typing import Awaitable, Callable, Dict, Generator, Iterable, List, Optional, Set from typing import Awaitable, Callable, Dict, Generator, Iterable, List, Optional, Set
from ding.framework.context import Context from ding.framework.context import Context
from ding.framework.parallel import Parallel from ding.framework.parallel import Parallel
from functools import wraps
def enable_async(func: Callable) -> Callable: def enable_async(func: Callable) -> Callable:
...@@ -20,6 +21,7 @@ def enable_async(func: Callable) -> Callable: ...@@ -20,6 +21,7 @@ def enable_async(func: Callable) -> Callable:
- runtime_handler (:obj:`Callable`): The wrap function. - runtime_handler (:obj:`Callable`): The wrap function.
""" """
@wraps(func)
def runtime_handler(task: "Task", *args, **kwargs) -> "Task": def runtime_handler(task: "Task", *args, **kwargs) -> "Task":
""" """
Overview: Overview:
...@@ -115,6 +117,13 @@ class Task: ...@@ -115,6 +117,13 @@ class Task:
return self return self
def use_step_wrapper(self, fn: Callable) -> 'Task': def use_step_wrapper(self, fn: Callable) -> 'Task':
"""
Overview:
Register wrappers to task. A wrapper works like a decorator, but task will apply this \
decorator on top of each middleware.
Arguments:
- fn (:obj:`Callable`): A wrapper is a decorator, so the first argument is a callable function.
"""
self.step_wrappers.append(fn) self.step_wrappers.append(fn)
return self return self
...@@ -227,6 +236,10 @@ class Task: ...@@ -227,6 +236,10 @@ class Task:
self.stop() self.stop()
def stop(self) -> None: def stop(self) -> None:
"""
Overview:
Stop and cleanup every thing in the runtime of task.
"""
self.emit("exit") self.emit("exit")
if self._thread_pool: if self._thread_pool:
self._thread_pool.shutdown() self._thread_pool.shutdown()
...@@ -284,7 +297,14 @@ be thrown after the timeout {}s is reached".format(n_timeout) ...@@ -284,7 +297,14 @@ be thrown after the timeout {}s is reached".format(n_timeout)
t = self._loop.run_in_executor(self._thread_pool, fn, *args, **kwargs) t = self._loop.run_in_executor(self._thread_pool, fn, *args, **kwargs)
self._async_stack.append(t) self._async_stack.append(t)
def emit(self, event_name, *args, **kwargs): def emit(self, event_name: str, *args, **kwargs):
"""
Overview:
Emit a event, call listeners.
Arguments:
- event_name (:obj:`str`): Event name.
- args (:obj:`any`): Rest arguments for listeners.
"""
if event_name in self.event_listeners: if event_name in self.event_listeners:
for fn in self.event_listeners[event_name]: for fn in self.event_listeners[event_name]:
fn(*args, **kwargs) fn(*args, **kwargs)
...@@ -294,9 +314,23 @@ be thrown after the timeout {}s is reached".format(n_timeout) ...@@ -294,9 +314,23 @@ be thrown after the timeout {}s is reached".format(n_timeout)
fn(*args, **kwargs) fn(*args, **kwargs)
def on(self, event: str, fn: Callable) -> None: def on(self, event: str, fn: Callable) -> None:
"""
Overview:
Subscribe to an event, execute this function every time the event is emitted.
Arguments:
- event (:obj:`str`): Event name.
- fn (:obj:`Callable`): The function.
"""
self.event_listeners[event].append(fn) self.event_listeners[event].append(fn)
def once(self, event: str, fn: Callable) -> None: def once(self, event: str, fn: Callable) -> None:
"""
Overview:
Subscribe to an event, execute this function only once when the event is emitted.
Arguments:
- event (:obj:`str`): Event name.
- fn (:obj:`Callable`): The function.
"""
self.once_listeners[event].append(fn) self.once_listeners[event].append(fn)
@property @property
......
...@@ -2,12 +2,14 @@ from abc import abstractmethod ...@@ -2,12 +2,14 @@ from abc import abstractmethod
from typing import Any, List, Optional, Union, Callable from typing import Any, List, Optional, Union, Callable
import copy import copy
from dataclasses import dataclass from dataclasses import dataclass
from functools import wraps
def apply_middleware(func_name: str): def apply_middleware(func_name: str):
def wrap_func(base_func: Callable): def wrap_func(base_func: Callable):
@wraps(base_func)
def handler(buffer, *args, **kwargs): def handler(buffer, *args, **kwargs):
""" """
Overview: Overview:
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册