Source code for redismpx.multiplexer

import asyncio
import logging
import aioredis
from typing import Union, Awaitable, Callable, Optional
from .internal import Conn, List
from .utils import as_bytes, jitter_exp_backoff
from .channel import ChannelSubscription
from .pattern import PatternSubscription
from .promise import PromiseSubscription

OnMessage = Callable[[bytes, bytes], Optional[Awaitable[None]]]
OnDisconnect = Callable[[Exception], Optional[Awaitable[None]]]
OnActivation = Callable[[bytes], Optional[Awaitable[None]]]

[docs]class Multiplexer: """ A Multiplexer instance corresponds to one Redis Pub/Sub connection that will be shared by multiple subscription instances. Multiplexer accepts the same connection options that you can specify with :func:`aioredis.create_connection`. See the documentation of `aio-libs/aioredis <https://github.com/aio-libs/aioredis>`_ for more information. See the code example for an easy to understand explanation of the expected signature for `on_message`, `on_disconnect` and `on_activation`. Also, note that those callbacks should not throw exceptions as any exception will be logged as a warning and then discarded. If you are making use of Python's type hints, you can import `OnMessage`, `OnDisconnect`, and `OnActivation` from this package. Usage example: .. highlight:: python .. code-block:: python # Pass to Multiplexer the same connection options that # aioredis.create_connection() would accept. mpx = Multiplexer('redis://localhost') # on_message is a callback (can be async) # that accepts a channel name and a message. async def my_on_message(channel: bytes, message: bytes): await websocket.send(f"ch: {channel} msg: {message}") # on_disconnect is a callback (can be async) # that accepts the error that caused the disconnection. def my_on_disconnect(error: Exception): print("oh no!") # on_activation is a callback (can be async) # that accepts the name of the channel or pattern # whose subscription just became active (depends # on whether it's attached to a ChannelSubscription # or a PatternSubscription). def my_on_activation(name: bytes): print("activated:", name) # you can also pass None in place of `on_disconnect` # and `on_activation` if you're not interested in # reacting to those events. # Use `mpx` to create new subscriptions. channel_sub = mpx.new_channel_subcription( my_on_message, my_on_disconnect, None) pattern_sub = mpx.new_pattern_subscription("hello-*", my_on_message, None, my_on_activation) promise_sub = mpx.new_promise_subscription("hello-") """ def __init__(self, *args, **kwargs): kwargs["connection_cls"] = Conn self.channels = {} self.patterns = {} self.active_channels = set() self.active_patterns = set() self.subscriptions = List(None) self.connection = None self.connection_options = (args, kwargs) self.must_exit = False self.reconnecting = True self.connected_event = asyncio.Event() self.conn_reader = asyncio.create_task(self._read_messages())
[docs] def new_channel_subscription(self, on_message: OnMessage, on_disconnect: Optional[OnDisconnect], on_activation: Optional[OnActivation]) -> ChannelSubscription: """ Creates a new ChannelSubscription tied to the Multiplexer. Before disposing of a ChannelSubscription you must call its :func:`~redismpx.ChannelSubscription.close` method. The arguments `on_disconnect` and `on_activation` can be `None` if you're not interested in the corresponding types of event. :param on_message: a (async or non) function that gets called for every message recevied. :param on_disconnect: a (async or non) function that gets called when the connection is lost. :param on_activation: a (async or non) function that gets called when a subscription goes into effect. """ if on_message is None: raise Exception("on_message cannot be None") sub = ChannelSubscription(self, on_message, on_disconnect, on_activation) return sub
[docs] def new_pattern_subscription(self, pattern: Union[str, bytes], on_message: OnMessage, on_disconnect: Optional[OnDisconnect], on_activation: Optional[OnActivation]) -> PatternSubscription: """ Creates a new PatternSubscription tied to the Multiplexer. Before disposing of a PatternSubscription you must call its :func:`~redismpx.PatternSubscription.close` method. The arguments `on_disconnect` and `on_activation` can be `None` if you're not interested in the corresponding types of event. :param pattern: the Redis Pub/Sub pattern to subscribe to. :param on_message: a (async or non) function that gets called for every message recevied. :param on_disconnect: a (async or non) function that gets called when the connection is lost. :param on_activation: a (async or non) function that gets called when a subscription goes into effect. """ if on_message is None: raise Exception("on_message cannot be None") sub = PatternSubscription(self, pattern, on_message, on_disconnect, on_activation) return sub
[docs] def new_promise_subscription(self, prefix: Union[str, bytes]) -> PromiseSubscription: """ Creates a new PromiseSubscription tied to the Multiplexer. Before disposing of a PromiseSubscription you must call its :func:`~redismpx.PromiseSubscription.close` method. The prefix argument is used to create internally a PatternSubscription that will match all channels that start with the provided prefix. A Promise represents a timed, uninterrupted, single-message subscription to a Redis Pub/Sub channel. If network connectivity gets lost, thus causing an interruption, the Promise will be failed (unless already fullfilled). Use NewPromise from PromiseSubscription to create a new Promise. :param prefix: the prefix under which all Promises will be created under. """ return PromiseSubscription(self, prefix)
def close(self): self.must_exit = True self.conn_reader.cancel() if self.connection: self.connection.close() async def _reconnect(self, cause): if self.reconnecting: return for s in self.subscriptions: if s.on_disconnect is not None: try: if asyncio.iscoroutinefunction(s.on_disconnect): await s.on_disconnect(cause) else: s.on_disconnect(cause) except Exception as e: logging.warning(f"redismpx id({id(self)}): on_disconnect function threw exception: {e}") logging.info(f"redismpx id({id(self)}): reconnecting because of error: {cause}") self.reconnecting = True self.connected_event.clear() self.active_channels = set() self.active_patterns = set() self.conn_reader.cancel() try: await self.conn_reader except: pass self.connection.close() self.connection = None self.conn_reader = asyncio.create_task(self._read_messages()) async def _read_messages(self): logging.debug("redismpx started _read_messages") args, kwargs = self.connection_options # Keep trying to connect tries = 1 while not self.must_exit: try: self.connection = await aioredis.create_connection(*args, **kwargs) break # DO NOT DELETE THIS LINE LMAO except Exception as e: # Exp backoff + jitter sleep_ms = jitter_exp_backoff(8, 512, tries) await asyncio.sleep(sleep_ms/1000) if tries < 20: tries += 1 logging.debug("redismpx connected") self.reconnecting = False self.connected_event.set() # Resubscribe to all channels, if any is present. if len(self.channels) > 0: logging.debug("redismpx resubscribing %s", self.channels.keys()) self.connection.write_command(b"SUBSCRIBE", *self.channels.keys()) # Resubscribe to all patterns, if any is present. if len(self.patterns) > 0: logging.debug("redismpx resubscribing %s", self.patterns.keys()) self.connection.write_command(b"PSUBSCRIBE", *self.patterns.keys()) try: async for msg in self.connection.read_message(): if msg[0] == b"message": ch_name = msg[1] if ch_name in self.channels: for fn_box in self.channels[ch_name]: try: if asyncio.iscoroutinefunction(fn_box.on_message): await fn_box.on_message(ch_name, msg[2]) else: fn_box.on_message(ch_name, msg[2]) except Exception as e: logging.warning(f"redismpx id({id(self)}): on_message function threw exception: {e}") continue if msg[0] == b"pmessage": pat_name = msg[1] if pat_name in self.patterns: for fn_box in self.patterns[pat_name]: try: if asyncio.iscoroutinefunction(fn_box.on_message): await fn_box.on_message(msg[2], msg[3]) else: fn_box.on_message(msg[2], msg[3]) except Exception as e: logging.warning(f"redismpx id({id(self)}): on_message function threw exception: {e}") continue # SUBSCRIPTIONS if msg[0] == b'subscribe': ch_name = msg[1] self.active_channels.add(ch_name) if ch_name in self.channels: for fn_box in self.channels[ch_name]: if fn_box.on_activation is None: continue try: if asyncio.iscoroutinefunction(fn_box.on_activation): await fn_box.on_activation(ch_name) else: fn_box.on_activation(ch_name) except Exception as e: logging.warning(f"redismpx id({id(self)}): on_activation function threw exception: {e}") continue if msg[0] == b'psubscribe': pat_name = msg[1] self.active_channels.add(pat_name) if pat_name in self.patterns: for fn_box in self.patterns[pat_name]: if fn_box.on_activation is None: continue try: if asyncio.iscoroutinefunction(fn_box.on_activation): await fn_box.on_activation(pat_name) else: fn_box.on_activation(pat_name) except Exception as e: logging.warning(f"redismpx id({id(self)}): on_activation function threw exception: {e}") continue except Exception as e: if not self.must_exit: asyncio.create_task(self._reconnect(e)) async def _log_exeptions(self, callback, *args, **kwargs): try: if asyncio.iscoroutinefunction(callback): await callback(*args, **kwargs) else: callback(*args, **kwargs) except Exception as e: logging.warning(f"redismpx id({id(self)}): on_disconnect function threw exception: {e}") def _add_channel(self, channel, fn_box): if self.must_exit: raise Exception("tried to use a closed multiplexer") # Are we already subscribed inside the multiplexer? if channel not in self.channels: try: if self.connection is not None: self.connection.write_command(b"SUBSCRIBE", channel) except Exception as e: asyncio.create_task(self._reconnect(e)) self.channels[channel] = List(fn_box) else: # We are already subscribed, check if the sub is active # if so, we immediately trigger on_activation if channel in self.active_channels: if fn_box.on_activation is not None: asyncio.create_task(self._log_exeptions(fn_box.on_activation, channel)) self.channels[channel].prepend(fn_box) def _remove_channel(self, channel, fn_box): if self.must_exit: raise Exception("tried to use a closed multiplexer") fn_box_list = fn_box.remove_from_list() if fn_box_list.is_empty(): del self.channels[channel] self.active_channels.remove(channel) try: if self.connection is not None: self.connection.write_command(b"UNSUBSCRIBE", channel) except Exception as e: asyncio.create_task(self._reconnect(e)) def _add_pattern(self, pattern, fn_box): if self.must_exit: raise Exception("tried to use a closed multiplexer") # Are we already subscribed inside the multiplexer? if pattern not in self.patterns: try: if self.connection is not None: self.connection.write_command(b"PSUBSCRIBE", pattern) except Exception as e: asyncio.create_task(self._reconnect(e)) self.patterns[pattern] = List(fn_box) else: # We are already subscribed, check if the sub is active # if so, we immediately trigger on_activation if pattern in self.active_patterns: if fn_box.on_activation is not None: asyncio.create_task(self._log_exeptions(fn_box.on_activation, pattern)) self.patterns[pattern].prepend(fn_box) def _remove_pattern(self, pattern, fn_box): if self.must_exit: raise Exception("tried to use a closed multiplexer") fn_box_list = fn_box.remove_from_list() if fn_box_list.is_empty(): del self.patterns[pattern] self.active_patterns.remove(pattern) try: if self.connection is not None: self.connection.write_command(b"PUNSUBSCRIBE", pattern) except Exception as e: asyncio.create_task(self._reconnect(e))