未验证 提交 a404ad3d 编写于 作者: A Abhimanyu Deora 提交者: GitHub

Add optional exception handler to PubSubWorkerThread (#1395)

Add optional exception handler to PubSubWorkerThread
Co-authored-by: NAbhimanyu Deora <adeora@drwholdings.com>
上级 15dafb14
......@@ -732,6 +732,20 @@ subscribed to patterns or channels that don't have message handlers attached.
# when it's time to shut it down...
>>> thread.stop()
`run_in_thread` also supports an optional exception handler, which lets you
catch exceptions that occur within the worker thread and handle them
appropriately. The exception handler will take as arguments the exception
itself, the pubsub object, and the worker thread returned by `run_in_thread`.
.. code-block:: pycon
>>> p.subscribe(**{'my-channel': my_handler})
>>> def exception_handler(ex, pubsub, thread):
>>> print(ex)
>>> thread.stop()
>>> thread.join(timeout=1.0)
>>> pubsub.close()
>>> thread = p.run_in_thread(exception_handler=exception_handler)
A PubSub object adheres to the same encoding semantics as the client instance
it was created from. Any channel or pattern that's unicode will be encoded
using the `charset` specified on the client before being sent to Redis. If the
......
......@@ -3803,7 +3803,8 @@ class PubSub:
return message
def run_in_thread(self, sleep_time=0, daemon=False):
def run_in_thread(self, sleep_time=0, daemon=False,
exception_handler=None):
for channel, handler in self.channels.items():
if handler is None:
raise PubSubError("Channel: '%s' has no handler registered" %
......@@ -3813,17 +3814,24 @@ class PubSub:
raise PubSubError("Pattern: '%s' has no handler registered" %
pattern)
thread = PubSubWorkerThread(self, sleep_time, daemon=daemon)
thread = PubSubWorkerThread(
self,
sleep_time,
daemon=daemon,
exception_handler=exception_handler
)
thread.start()
return thread
class PubSubWorkerThread(threading.Thread):
def __init__(self, pubsub, sleep_time, daemon=False):
def __init__(self, pubsub, sleep_time, daemon=False,
exception_handler=None):
super().__init__()
self.daemon = daemon
self.pubsub = pubsub
self.sleep_time = sleep_time
self.exception_handler = exception_handler
self._running = threading.Event()
def run(self):
......@@ -3833,8 +3841,13 @@ class PubSubWorkerThread(threading.Thread):
pubsub = self.pubsub
sleep_time = self.sleep_time
while self._running.is_set():
pubsub.get_message(ignore_subscribe_messages=True,
timeout=sleep_time)
try:
pubsub.get_message(ignore_subscribe_messages=True,
timeout=sleep_time)
except BaseException as e:
if self.exception_handler is None:
raise
self.exception_handler(e, pubsub, self)
pubsub.close()
def stop(self):
......
import pytest
import threading
import time
from unittest import mock
import redis
from redis.exceptions import ConnectionError
......@@ -543,3 +546,24 @@ class TestPubSubTimeouts:
p.subscribe('foo')
assert wait_for_message(p) == make_message('subscribe', 'foo', 1)
assert p.get_message(timeout=0.01) is None
class TestPubSubWorkerThread:
def test_pubsub_worker_thread_exception_handler(self, r):
event = threading.Event()
def exception_handler(ex, pubsub, thread):
thread.stop()
event.set()
p = r.pubsub()
p.subscribe(**{'foo': lambda m: m})
with mock.patch.object(p, 'get_message',
side_effect=Exception('error')):
pubsub_thread = p.run_in_thread(
exception_handler=exception_handler
)
assert event.wait(timeout=1.0)
pubsub_thread.join(timeout=1.0)
assert not pubsub_thread.is_alive()
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册