diff --git a/python/paddle/reader/decorator.py b/python/paddle/reader/decorator.py index 3129029d82920964994e22ef28044f464b647270..da9749722e132952e6a77ca82afae4580b427cee 100644 --- a/python/paddle/reader/decorator.py +++ b/python/paddle/reader/decorator.py @@ -17,6 +17,7 @@ import subprocess import multiprocessing import six import sys +import warnings from six.moves.queue import Queue from six.moves import zip_longest @@ -25,7 +26,9 @@ from six.moves import zip import itertools import random import zlib + import paddle.compat as cpt +from paddle.fluid.reader import QUEUE_GET_TIMEOUT __all__ = [] @@ -584,10 +587,13 @@ def multiprocess_reader(readers, use_pipe=True, queue_size=1000): raise NotImplementedError( "The multiprocess_reader method is not supported on windows.") + # ujson is ultra fast json encoder and decoder written in pure C with bindings for Python 3.6+. try: import ujson as json except Exception as e: - sys.stderr.write("import ujson error: " + str(e) + " use json\n") + warnings.warn( + "The `ujson` module is not found, use the `json` module, `ujson` encodes and decodes faster, " + "you can install `ujson` through `pip install ujson`.") import json assert isinstance(readers, (list, tuple)) and len(readers) > 0, ( @@ -614,11 +620,20 @@ def multiprocess_reader(readers, use_pipe=True, queue_size=1000): reader_num = len(readers) finish_num = 0 while finish_num < reader_num: - sample = queue.get() + try: + sample = queue.get(timeout=QUEUE_GET_TIMEOUT) + except: + logging.error( + "multiprocess_reader failed to get data from the multiprocessing.Queue." + ) + six.reraise(*sys.exc_info()) + if sample is None: finish_num += 1 elif sample == "": - raise ValueError("multiprocess reader raises an exception") + raise ValueError( + "multiprocess_reader failed to put data into the multiprocessing.Queue." + ) else: yield sample @@ -660,7 +675,9 @@ def multiprocess_reader(readers, use_pipe=True, queue_size=1000): elif sample == "": conn.close() conn_to_remove.append(conn) - raise ValueError("multiprocess reader raises an exception") + raise ValueError( + "multiprocess_reader failed to send data into the multiprocessing.Pipe." + ) else: yield sample