未验证 提交 db5eac2d 编写于 作者: C Chen Weihang 提交者: GitHub

add timeout for queue.get (#32747)

上级 7610c2b4
...@@ -17,6 +17,7 @@ import subprocess ...@@ -17,6 +17,7 @@ import subprocess
import multiprocessing import multiprocessing
import six import six
import sys import sys
import warnings
from six.moves.queue import Queue from six.moves.queue import Queue
from six.moves import zip_longest from six.moves import zip_longest
...@@ -25,7 +26,9 @@ from six.moves import zip ...@@ -25,7 +26,9 @@ from six.moves import zip
import itertools import itertools
import random import random
import zlib import zlib
import paddle.compat as cpt import paddle.compat as cpt
from paddle.fluid.reader import QUEUE_GET_TIMEOUT
__all__ = [] __all__ = []
...@@ -584,10 +587,13 @@ def multiprocess_reader(readers, use_pipe=True, queue_size=1000): ...@@ -584,10 +587,13 @@ def multiprocess_reader(readers, use_pipe=True, queue_size=1000):
raise NotImplementedError( raise NotImplementedError(
"The multiprocess_reader method is not supported on windows.") "The multiprocess_reader method is not supported on windows.")
# ujson is ultra fast json encoder and decoder written in pure C with bindings for Python 3.6+.
try: try:
import ujson as json import ujson as json
except Exception as e: except Exception as e:
sys.stderr.write("import ujson error: " + str(e) + " use json\n") warnings.warn(
"The `ujson` module is not found, use the `json` module, `ujson` encodes and decodes faster, "
"you can install `ujson` through `pip install ujson`.")
import json import json
assert isinstance(readers, (list, tuple)) and len(readers) > 0, ( assert isinstance(readers, (list, tuple)) and len(readers) > 0, (
...@@ -614,11 +620,20 @@ def multiprocess_reader(readers, use_pipe=True, queue_size=1000): ...@@ -614,11 +620,20 @@ def multiprocess_reader(readers, use_pipe=True, queue_size=1000):
reader_num = len(readers) reader_num = len(readers)
finish_num = 0 finish_num = 0
while finish_num < reader_num: while finish_num < reader_num:
sample = queue.get() try:
sample = queue.get(timeout=QUEUE_GET_TIMEOUT)
except:
logging.error(
"multiprocess_reader failed to get data from the multiprocessing.Queue."
)
six.reraise(*sys.exc_info())
if sample is None: if sample is None:
finish_num += 1 finish_num += 1
elif sample == "": elif sample == "":
raise ValueError("multiprocess reader raises an exception") raise ValueError(
"multiprocess_reader failed to put data into the multiprocessing.Queue."
)
else: else:
yield sample yield sample
...@@ -660,7 +675,9 @@ def multiprocess_reader(readers, use_pipe=True, queue_size=1000): ...@@ -660,7 +675,9 @@ def multiprocess_reader(readers, use_pipe=True, queue_size=1000):
elif sample == "": elif sample == "":
conn.close() conn.close()
conn_to_remove.append(conn) conn_to_remove.append(conn)
raise ValueError("multiprocess reader raises an exception") raise ValueError(
"multiprocess_reader failed to send data into the multiprocessing.Pipe."
)
else: else:
yield sample yield sample
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册