提交 d90be905 编写于 作者: B barrierye

update doc

上级 05782454
...@@ -60,9 +60,9 @@ The graph execution engine consists of OPs and Channels, and the connected OPs s ...@@ -60,9 +60,9 @@ The graph execution engine consists of OPs and Channels, and the connected OPs s
- Whether input buffers and output buffers in Channel will increase indefinitely - Whether input buffers and output buffers in Channel will increase indefinitely
- It will not increase indefinitely. The input to the entire graph execution engine is placed inside a Channel's internal queue, directly acting as a traffic control buffer queue for the entire service. - It will not increase indefinitely. The input to the entire graph execution engine is placed inside a Channel's internal queue, directly acting as a traffic control buffer queue for the entire service.
- For input buffer, adjust the number of concurrencies of OP1 and OP2 according to the amount of computation, so that the number of input buffers from each input OP is relatively balanced. - For input buffer, adjust the number of concurrencies of OP1 and OP2 according to the amount of computation, so that the number of input buffers from each input OP is relatively balanced. (The length of the input buffer depends on the speed at which each item in the internal queue is ready)
- For output buffer, you can use a similar process as input buffer, which adjusts the concurrency of OP3 and OP4 to control the buffer length of output buffer. - For output buffer, you can use a similar process as input buffer, which adjusts the concurrency of OP3 and OP4 to control the buffer length of output buffer. (The length of the output buffer depends on the speed at which downstream OPs obtain data from the output buffer)
- Note: The length of the input buffer depends on the speed at which each item in the internal queue is ready, and the length of the output buffer depends on the speed at which downstream OPs obtain data from the output buffer. - The amount of data in the Channel will not exceed `worker_num` of gRPC, that is, it will not exceed the thread pool size.
## Detailed Design ## Detailed Design
...@@ -103,13 +103,13 @@ The meaning of each parameter is as follows: ...@@ -103,13 +103,13 @@ The meaning of each parameter is as follows:
#### 2. General OP Secondary Development Interface #### 2. General OP Secondary Development Interface
| Interface or Variable | Explain | | Interface or Variable | Explain |
| :--------------------------------------------: | :----------------------------------------------------------: | | :----------------------------------------------: | :----------------------------------------------------------: |
| def preprocess(self, input_dicts) | Process the data obtained from the channel, and the processed data will be used as the input of the **process** function. (This function handles a **sample**) | | def preprocess(self, input_dicts) | Process the data obtained from the channel, and the processed data will be used as the input of the **process** function. (This function handles a **sample**) |
| def process(self, feed_dict_list) | The RPC prediction process is based on the Paddle Serving Client, and the processed data will be used as the input of the **postprocess** function. (This function handles a **batch**) | | def process(self, feed_dict_list, typical_logid) | The RPC prediction process is based on the Paddle Serving Client, and the processed data will be used as the input of the **postprocess** function. (This function handles a **batch**) |
| def postprocess(self, input_dicts, fetch_dict) | After processing the prediction results, the processed data will be put into the subsequent Channel to be obtained by the subsequent OP. (This function handles a **sample**) | | def postprocess(self, input_dicts, fetch_dict) | After processing the prediction results, the processed data will be put into the subsequent Channel to be obtained by the subsequent OP. (This function handles a **sample**) |
| def init_op(self) | Used to load resources (such as word dictionary). | | def init_op(self) | Used to load resources (such as word dictionary). |
| self.concurrency_idx | Concurrency index of current process(not thread) (different kinds of OP are calculated separately). | | self.concurrency_idx | Concurrency index of current process(not thread) (different kinds of OP are calculated separately). |
In a running cycle, OP will execute three operations: preprocess, process, and postprocess (when the `server_endpoints` parameter is not set, the process operation is not executed). Users can rewrite these three functions. The default implementation is as follows: In a running cycle, OP will execute three operations: preprocess, process, and postprocess (when the `server_endpoints` parameter is not set, the process operation is not executed). Users can rewrite these three functions. The default implementation is as follows:
...@@ -123,13 +123,17 @@ def preprocess(self, input_dicts): ...@@ -123,13 +123,17 @@ def preprocess(self, input_dicts):
(_, input_dict), = input_dicts.items() (_, input_dict), = input_dicts.items()
return input_dict return input_dict
def process(self, feed_dict_list): def process(self, feed_dict_list, typical_logid):
err, err_info = ChannelData.check_batch_npdata(feed_dict_list) err, err_info = ChannelData.check_batch_npdata(feed_dict_list)
if err != 0: if err != 0:
raise NotImplementedError( raise NotImplementedError(
"{} Please override preprocess func.".format(err_info)) "{} Please override preprocess func.".format(err_info))
call_result = self.client.predict( call_result = self.client.predict(
feed=feed_dict_list, fetch=self._fetch_names) feed=feed_dict_list, fetch=self._fetch_names, log_id=typical_logid)
if isinstance(self.client, MultiLangClient):
if call_result is None or call_result["serving_status_code"] != 0:
return None
call_result.pop("serving_status_code")
return call_result return call_result
def postprocess(self, input_dicts, fetch_dict): def postprocess(self, input_dicts, fetch_dict):
...@@ -138,7 +142,7 @@ def postprocess(self, input_dicts, fetch_dict): ...@@ -138,7 +142,7 @@ def postprocess(self, input_dicts, fetch_dict):
The parameter of **preprocess** is the data `input_dicts` in the previous Channel. This variable (as a **sample**) is a dictionary with the name of the previous OP as key and the output of the corresponding OP as value. The parameter of **preprocess** is the data `input_dicts` in the previous Channel. This variable (as a **sample**) is a dictionary with the name of the previous OP as key and the output of the corresponding OP as value.
The parameter of **process** is the input variable `fetch_dict_list` (a list of the return value of the preprocess function) of the Paddle Serving Client prediction interface. This variable (as a **batch**) is a list of dictionaries with feed_name as the key and the data in the ndarray format as the value. The parameter of **process** is the input variable `fetch_dict_list` (a list of the return value of the preprocess function) of the Paddle Serving Client prediction interface. This variable (as a **batch**) is a list of dictionaries with feed_name as the key and the data in the ndarray format as the value. `typical_logid` is used as the logid that penetrates to PaddleServingService.
The parameters of **postprocess** are `input_dicts` and `fetch_dict`. `input_dicts` is consistent with the parameter of preprocess, and `fetch_dict` (as a **sample**) is a sample of the return batch of the process function (if process is not executed, this value is the return value of preprocess). The parameters of **postprocess** are `input_dicts` and `fetch_dict`. `input_dicts` is consistent with the parameter of preprocess, and `fetch_dict` (as a **sample**) is a sample of the return batch of the process function (if process is not executed, this value is the return value of preprocess).
...@@ -255,7 +259,7 @@ dag: ...@@ -255,7 +259,7 @@ dag:
retry: 1 # The number of times DAG executor retries after failure. The default value is 1, that is, no retrying retry: 1 # The number of times DAG executor retries after failure. The default value is 1, that is, no retrying
use_profile: false # Whether to print the log on the server side. The default is false use_profile: false # Whether to print the log on the server side. The default is false
tracer: tracer:
interval_s: 600 # Monitoring time interval of Tracer (in seconds). Do not start monitoring when the value is less than 1. The default value is 600 interval_s: 600 # Monitoring time interval of Tracer (in seconds). Do not start monitoring when the value is less than 1. The default value is -1
``` ```
...@@ -286,8 +290,6 @@ Run the following code ...@@ -286,8 +290,6 @@ Run the following code
```python ```python
import logging import logging
logging.basicConfig(level=logging.INFO)
from paddle_serving_server.pipeline import Op, RequestOp, ResponseOp from paddle_serving_server.pipeline import Op, RequestOp, ResponseOp
from paddle_serving_server.pipeline import PipelineServer from paddle_serving_server.pipeline import PipelineServer
from paddle_serving_server.pipeline.proto import pipeline_service_pb2 from paddle_serving_server.pipeline.proto import pipeline_service_pb2
...@@ -295,10 +297,6 @@ from paddle_serving_server.pipeline.channel import ChannelDataEcode ...@@ -295,10 +297,6 @@ from paddle_serving_server.pipeline.channel import ChannelDataEcode
import numpy as np import numpy as np
from paddle_serving_app.reader import IMDBDataset from paddle_serving_app.reader import IMDBDataset
_LOGGER = logging.getLogger()
class ImdbRequestOp(RequestOp): class ImdbRequestOp(RequestOp):
def init_op(self): def init_op(self):
self.imdb_dataset = IMDBDataset() self.imdb_dataset = IMDBDataset()
......
...@@ -60,11 +60,9 @@ Server端基于 gRPC 和图执行引擎构建,两者的关系如下图所示 ...@@ -60,11 +60,9 @@ Server端基于 gRPC 和图执行引擎构建,两者的关系如下图所示
- Channel 设计中的 input buffer 和 output buffer 是否会无限增加 - Channel 设计中的 input buffer 和 output buffer 是否会无限增加
- 不会。整个图执行引擎的输入会放到一个 Channel 的 internal queue 里面,直接作为整个服务的流量控制缓冲队列 - 不会。整个图执行引擎的输入会放到一个 Channel 的 internal queue 里面,直接作为整个服务的流量控制缓冲队列
- 对于 input buffer,根据计算量的情况调整 OP1 和 OP2 的并发数,使得 input buffer 来自各个输入 OP 的数量相对平衡 - 对于 input buffer,根据计算量的情况调整 OP1 和 OP2 的并发数,使得 input buffer 来自各个输入 OP 的数量相对平衡(input buffer 的长度取决于 internal queue 中每个 item 完全 ready 的速度)
- 对于 output buffer,可以采用和 input buffer 类似的处理方法,即调整 OP3 和 OP4 的并发数,使得 output buffer 的缓冲长度得到控制 - 对于 output buffer,可以采用和 input buffer 类似的处理方法,即调整 OP3 和 OP4 的并发数,使得 output buffer 的缓冲长度得到控制(output buffer 的长度取决于下游 OP 从 output buffer 获取数据的速度)
- 注:input buffer 的长度取决于 internal queue 中每个 item 完全 ready 的速度,output buffer 的长度取决于下游 OP 从 output buffer 获取数据的速度 - 同时 Channel 中数据量不会超过 gRPC 的 `worker_num`,即线程池大小
## 详细设计
### 用户接口设计 ### 用户接口设计
...@@ -103,13 +101,13 @@ def __init__(name=None, ...@@ -103,13 +101,13 @@ def __init__(name=None,
#### 2. 普通 OP二次开发接口 #### 2. 普通 OP二次开发接口
| 变量或接口 | 说明 | | 变量或接口 | 说明 |
| :--------------------------------------------: | :----------------------------------------------------------: | | :----------------------------------------------: | :----------------------------------------------------------: |
| def preprocess(self, input_dicts) | 对从 Channel 中获取的数据进行处理,处理完的数据将作为 **process** 函数的输入。(该函数对一个 **sample** 进行处理) | | def preprocess(self, input_dicts) | 对从 Channel 中获取的数据进行处理,处理完的数据将作为 **process** 函数的输入。(该函数对一个 **sample** 进行处理) |
| def process(self, feed_dict_list) | 基于 Paddle Serving Client 进行 RPC 预测,处理完的数据将作为 **postprocess** 函数的输入。(该函数对一个 **batch** 进行处理) | | def process(self, feed_dict_list, typical_logid) | 基于 Paddle Serving Client 进行 RPC 预测,处理完的数据将作为 **postprocess** 函数的输入。(该函数对一个 **batch** 进行处理) |
| def postprocess(self, input_dicts, fetch_dict) | 处理预测结果,处理完的数据将被放入后继 Channel 中,以被后继 OP 获取。(该函数对一个 **sample** 进行处理) | | def postprocess(self, input_dicts, fetch_dict) | 处理预测结果,处理完的数据将被放入后继 Channel 中,以被后继 OP 获取。(该函数对一个 **sample** 进行处理) |
| def init_op(self) | 用于加载资源(如字典等)。 | | def init_op(self) | 用于加载资源(如字典等)。 |
| self.concurrency_idx | 当前进程(非线程)的并发数索引(不同种类的 OP 单独计算)。 | | self.concurrency_idx | 当前进程(非线程)的并发数索引(不同种类的 OP 单独计算)。 |
OP 在一个运行周期中会依次执行 preprocess,process,postprocess 三个操作(当不设置 `server_endpoints` 参数时,不执行 process 操作),用户可以对这三个函数进行重写,默认实现如下: OP 在一个运行周期中会依次执行 preprocess,process,postprocess 三个操作(当不设置 `server_endpoints` 参数时,不执行 process 操作),用户可以对这三个函数进行重写,默认实现如下:
...@@ -123,13 +121,17 @@ def preprocess(self, input_dicts): ...@@ -123,13 +121,17 @@ def preprocess(self, input_dicts):
(_, input_dict), = input_dicts.items() (_, input_dict), = input_dicts.items()
return input_dict return input_dict
def process(self, feed_dict_list): def process(self, feed_dict_list, typical_logid):
err, err_info = ChannelData.check_batch_npdata(feed_dict_list) err, err_info = ChannelData.check_batch_npdata(feed_dict_list)
if err != 0: if err != 0:
raise NotImplementedError( raise NotImplementedError(
"{} Please override preprocess func.".format(err_info)) "{} Please override preprocess func.".format(err_info))
call_result = self.client.predict( call_result = self.client.predict(
feed=feed_dict_list, fetch=self._fetch_names) feed=feed_dict_list, fetch=self._fetch_names, log_id=typical_logid)
if isinstance(self.client, MultiLangClient):
if call_result is None or call_result["serving_status_code"] != 0:
return None
call_result.pop("serving_status_code")
return call_result return call_result
def postprocess(self, input_dicts, fetch_dict): def postprocess(self, input_dicts, fetch_dict):
...@@ -138,7 +140,7 @@ def postprocess(self, input_dicts, fetch_dict): ...@@ -138,7 +140,7 @@ def postprocess(self, input_dicts, fetch_dict):
**preprocess** 的参数是前继 Channel 中的数据 `input_dicts`,该变量(作为一个 **sample**)是一个以前继 OP 的 name 为 Key,对应 OP 的输出为 Value 的字典。 **preprocess** 的参数是前继 Channel 中的数据 `input_dicts`,该变量(作为一个 **sample**)是一个以前继 OP 的 name 为 Key,对应 OP 的输出为 Value 的字典。
**process** 的参数是 Paddle Serving Client 预测接口的输入变量 `fetch_dict_list`(preprocess 函数的返回值的列表),该变量(作为一个 **batch**)是一个列表,列表中的元素为以 feed_name 为 Key,对应 ndarray 格式的数据为 Value 的字典。 **process** 的参数是 Paddle Serving Client 预测接口的输入变量 `fetch_dict_list`(preprocess 函数的返回值的列表),该变量(作为一个 **batch**)是一个列表,列表中的元素为以 feed_name 为 Key,对应 ndarray 格式的数据为 Value 的字典。`typical_logid` 作为向 PaddleServingService 穿透的 logid。
**postprocess** 的参数是 `input_dicts``fetch_dict``input_dicts` 与 preprocess 的参数一致,`fetch_dict` (作为一个 **sample**)是 process 函数的返回 batch 中的一个 sample(如果没有执行 process ,则该值为 preprocess 的返回值)。 **postprocess** 的参数是 `input_dicts``fetch_dict``input_dicts` 与 preprocess 的参数一致,`fetch_dict` (作为一个 **sample**)是 process 函数的返回 batch 中的一个 sample(如果没有执行 process ,则该值为 preprocess 的返回值)。
...@@ -255,7 +257,7 @@ dag: ...@@ -255,7 +257,7 @@ dag:
retry: 1 # DAG Executor 在失败后重试次数,默认为 1,即不重试 retry: 1 # DAG Executor 在失败后重试次数,默认为 1,即不重试
use_profile: false # 是否在 Server 端打印日志,默认为 false use_profile: false # 是否在 Server 端打印日志,默认为 false
tracer: tracer:
interval_s: 600 # Tracer 监控的时间间隔,单位为秒。当该值小于 1 时不启动监控,默认为 600 interval_s: 600 # Tracer 监控的时间间隔,单位为秒。当该值小于 1 时不启动监控,默认为 -1
``` ```
...@@ -286,8 +288,6 @@ python -m paddle_serving_server.serve --model imdb_bow_model --port 9393 &> bow. ...@@ -286,8 +288,6 @@ python -m paddle_serving_server.serve --model imdb_bow_model --port 9393 &> bow.
```python ```python
import logging import logging
logging.basicConfig(level=logging.INFO)
from paddle_serving_server.pipeline import Op, RequestOp, ResponseOp from paddle_serving_server.pipeline import Op, RequestOp, ResponseOp
from paddle_serving_server.pipeline import PipelineServer from paddle_serving_server.pipeline import PipelineServer
from paddle_serving_server.pipeline.proto import pipeline_service_pb2 from paddle_serving_server.pipeline.proto import pipeline_service_pb2
...@@ -295,9 +295,6 @@ from paddle_serving_server.pipeline.channel import ChannelDataEcode ...@@ -295,9 +295,6 @@ from paddle_serving_server.pipeline.channel import ChannelDataEcode
import numpy as np import numpy as np
from paddle_serving_app.reader import IMDBDataset from paddle_serving_app.reader import IMDBDataset
_LOGGER = logging.getLogger()
class ImdbRequestOp(RequestOp): class ImdbRequestOp(RequestOp):
def init_op(self): def init_op(self):
self.imdb_dataset = IMDBDataset() self.imdb_dataset = IMDBDataset()
...@@ -318,7 +315,6 @@ class CombineOp(Op): ...@@ -318,7 +315,6 @@ class CombineOp(Op):
def preprocess(self, input_data): def preprocess(self, input_data):
combined_prediction = 0 combined_prediction = 0
for op_name, data in input_data.items(): for op_name, data in input_data.items():
_LOGGER.info("{}: {}".format(op_name, data["prediction"]))
combined_prediction += data["prediction"] combined_prediction += data["prediction"]
data = {"prediction": combined_prediction / 2} data = {"prediction": combined_prediction / 2}
return data return data
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册