The Publish-Subscribe (Pub-Sub) Pattern | ||||||||
publish-subscribe pattern is a messaging architecture publishers do not directly send messages to subscribers messages are published to channels no need for the publisher to know which subscribers (if any) will receive them subscribers indicate interest in specific channels subscribers receive only the messages that are relevant to them promotes a clear separation of concerns. pattern allows for a decoupling of message producers (publishers) and message consumers (subscribers) decoupling enhances flexibility and scalability easy to add new subscribers or change the way messages are handled without affecting the publishers |
||||||||
Components of the PubSub Pattern | ||||||||
|
||||||||
Putting It Together | ||||||||
from collections import defaultdict from typing import Protocol from dataclasses import field, dataclass class Subscriber[Message](Protocol): def __call__(self, message: Message) -> None: ... @dataclass(slots=True, repr=False, kw_only=True) class Channel[Message]: subscribers: set[Subscriber[Message]] = field(default_factory=set) def subscribe(self, subscriber: Subscriber[Message]) -> None: self.subscribers.add(subscriber) def unsubscribe(self, subscriber: Subscriber[Message]) -> None: self.subscribers.remove(subscriber) def publish(self, message: str) -> None: for subscriber in self.subscribers: subscriber(message) @dataclass(slots=True) class Publisher[Message]: channels: dict[str, Channel[Message]] = field(default_factory=lambda: defaultdict(Channel)) def publish(self, channel_name: str, message: Message) -> None: self.channels[channel_name].publish(message) def publish_all(self, message: Message) -> None: for channel in self.channels.values(): channel.publish(message) def subscribe(self, channel_name: str, subscriber: Subscriber) -> None: self.channels[channel_name].subscribe(subscriber) def subscribe_all(self, subscriber: Subscriber) -> None: for channel in self.channels.values(): channel.subscribe(subscriber) def unsubscribe(self, channel_name: str, subscriber: Subscriber) -> None: self.channels[channel_name].unsubscribe(subscriber) def unsubscribe_all(self, subscriber: Subscriber) -> None: for channel in self.channels.values(): channel.unsubscribe(subscriber) def __repr__(self) -> str: return f"{self.__class__.__name__}({self.channels})" |
||||||||
Usage Example | ||||||||
below
class EmailSubscriber: def __init__(self, email: str): self.email = email def __call__(self, message: str): print(f"Sending email to {self.email}: {message}") def main() -> None: publisher = Publisher() email_subscriber = EmailSubscriber('[email protected]') spam = publisher.channels["spam"] eggs = publisher.channels["eggs"] # Subscribing to channels spam.subscribe(email_subscriber) eggs.subscribe(email_subscriber) # Publishing messages spam.publish('Hello, spam subscribers!') eggs.publish('Hello, eggs subscribers!') # Unsubscribe spam.unsubscribe(email_subscriber) # Publishing after unsubscription spam.publish('Hello again, spam subscribers!') eggs.publish('Hello again, spam subscribers!') if __name__ == '__main__': main()output Sending email to [email protected]: Hello, spam subscribers! Sending email to [email protected]: Hello, eggs subscribers! Sending email to [email protected]: Hello again, spam subscribers! |