Python Topics : How to Use Publish-Subscribe Pattern
The Publish-Subscribe (Pub-Sub) Pattern
publish-subscribe pattern is a messaging architecture
publishers do not directly send messages to subscribers
messages are published to channels
no need for the publisher to know which subscribers (if any) will receive them
subscribers indicate interest in specific channels
subscribers receive only the messages that are relevant to them
promotes a clear separation of concerns. pattern allows for a decoupling of message producers (publishers) and message consumers (subscribers)
decoupling enhances flexibility and scalability
easy to add new subscribers or change the way messages are handled without affecting the publishers
Components of the PubSub Pattern
componentdescription
subscriber protocol protocol defines the interface for a subscriber
any subscriber must implement the __call__ method which takes a message as a parameter
by defining the protocol this way supports the use of both class-based and function-based subscribers
subscriber protocol ensures that any object conforming to it can be used interchangeably
class Subscriber[Message](Protocol):
    def __call__(self, message: Message) -> None:
        ...
channel class channel class represents a communication channel
maintains a set of subscribers
provides methods to subscribe, unsubscribe, and publish messages
using a set ensures that each subscriber is unique and avoids duplicates
@dataclass(slots=True, repr=False, kw_only=True)
class Channel[Message]:
    subscribers: set[Subscriber[Message]] = field(default_factory=set)

    def subscribe(self, subscriber: Subscriber[Message]) -> None:
        self.subscribers.add(subscriber)

    def unsubscribe(self, subscriber: Subscriber[Message]) -> None:
        self.subscribers.remove(subscriber)

    def publish(self, message: str) -> None:
        for subscriber in self.subscribers:
            subscriber(message)
publisher class the Publisher class manages multiple channels
provides methods to
  • get or create channels
  • subscribe/unsubscribe subscribers to/from channels
  • publish messages to a specific channel
  • broadcast messages to all channels
@dataclass(slots=True)
class Publisher[Message]:
    channels: dict[str, Channel[Message]] = field(default_factory=lambda: defaultdict(Channel))

    def publish(self, channel_name: str, message: Message) -> None:
        self.channels[channel_name].publish(message)

    def publish_all(self, message: Message) -> None:
        for channel in self.channels.values():
            channel.publish(message)

    def subscribe(self, channel_name: str, subscriber: Subscriber) -> None:
        self.channels[channel_name].subscribe(subscriber)

    def subscribe_all(self, subscriber: Subscriber) -> None:
        for channel in self.channels.values():
            channel.subscribe(subscriber)

    def unsubscribe(self, channel_name: str, subscriber: Subscriber) -> None:
        self.channels[channel_name].unsubscribe(subscriber)

    def unsubscribe_all(self, subscriber: Subscriber) -> None:
        for channel in self.channels.values():
            channel.unsubscribe(subscriber)

    def __repr__(self) -> str:
        return f"{self.__class__.__name__}({self.channels})"
Putting It Together
from collections import defaultdict
from typing import Protocol
from dataclasses import field, dataclass

class Subscriber[Message](Protocol):
    def __call__(self, message: Message) -> None:
        ...

@dataclass(slots=True, repr=False, kw_only=True)
class Channel[Message]:
    subscribers: set[Subscriber[Message]] = field(default_factory=set)

    def subscribe(self, subscriber: Subscriber[Message]) -> None:
        self.subscribers.add(subscriber)

    def unsubscribe(self, subscriber: Subscriber[Message]) -> None:
        self.subscribers.remove(subscriber)

    def publish(self, message: str) -> None:
        for subscriber in self.subscribers:
            subscriber(message)


@dataclass(slots=True)
class Publisher[Message]:
    channels: dict[str, Channel[Message]] = field(default_factory=lambda: defaultdict(Channel))

    def publish(self, channel_name: str, message: Message) -> None:
        self.channels[channel_name].publish(message)

    def publish_all(self, message: Message) -> None:
        for channel in self.channels.values():
            channel.publish(message)

    def subscribe(self, channel_name: str, subscriber: Subscriber) -> None:
        self.channels[channel_name].subscribe(subscriber)

    def subscribe_all(self, subscriber: Subscriber) -> None:
        for channel in self.channels.values():
            channel.subscribe(subscriber)

    def unsubscribe(self, channel_name: str, subscriber: Subscriber) -> None:
        self.channels[channel_name].unsubscribe(subscriber)

    def unsubscribe_all(self, subscriber: Subscriber) -> None:
        for channel in self.channels.values():
            channel.unsubscribe(subscriber)

    def __repr__(self) -> str:
        return f"{self.__class__.__name__}({self.channels})"
Usage Example
below
  • the email_subscriber is subscribed to both channels and receives messages published to the.
  • after unsubscribing from the spam channel, messages published to spam are no longer received by email_subscriber
class EmailSubscriber:
    def __init__(self, email: str):
        self.email = email

    def __call__(self, message: str):
        print(f"Sending email to {self.email}: {message}")

def main() -> None:
    publisher = Publisher()
    email_subscriber = EmailSubscriber('[email protected]')

    spam = publisher.channels["spam"]
    eggs = publisher.channels["eggs"]

    # Subscribing to channels
    spam.subscribe(email_subscriber)
    eggs.subscribe(email_subscriber)

    # Publishing messages
    spam.publish('Hello, spam subscribers!')
    eggs.publish('Hello, eggs subscribers!')

    # Unsubscribe
    spam.unsubscribe(email_subscriber)

    # Publishing after unsubscription
    spam.publish('Hello again, spam subscribers!')
    eggs.publish('Hello again, spam subscribers!')

if __name__ == '__main__':
    main()
output
Sending email to [email protected]: Hello, spam subscribers!
Sending email to [email protected]: Hello, eggs subscribers!
Sending email to [email protected]: Hello again, spam subscribers!
index