Source code for redismpx.pattern

from typing import Union
from .utils import as_bytes, SubscriptionIsClosed
from .internal import ListNode

[docs]class PatternSubscription: """ A PatternSubscription ties a on_message callback to one Redis Pub/Sub pattern. Use :func:`~redismpx.Multiplexer.new_pattern_subscription` to create a new PatternSubscription. Usage example: .. highlight:: python .. code-block:: python # This subscription will receive all messages sent to # channels that start with "red", like `redis` and `reddit`. pattern_sub = mpx.new_pattern_subscription("red*", my_on_message, my_on_disconnect, my_on_activation) # Once created, a PatternSubscription can only be closed. pattern_sub.close() """ def __init__(self, multiplexer, pattern, on_message, on_disconnect, on_activation): pattern = as_bytes(pattern) self.channels = {} self.mpx = multiplexer self.pattern = pattern self.fn_box = ListNode(on_message=on_message, on_activation=on_activation) self.on_disconnect = on_disconnect self.on_activation = on_activation self.closed = False self.subNode = ListNode(on_disconnect=on_disconnect) self.mpx.subscriptions.prepend(self.subNode) self.mpx._add_pattern(pattern, self.fn_box)
[docs] def close(self) -> None: """Closes the subscription.""" if self.closed: raise SubscriptionIsClosed("tried to use a closed PatternSubscription") self.mpx._remove_pattern(self.pattern, self.fn_box) self.subNode.remove_from_list() self.closed = True