先看代码:
# 导入未来模块以支持类型注解
from __future__ import annotations
# 导入抽象基类模块和随机数生成器
from abc import ABC, abstractmethod
from random import randrange
# 导入列表类型注解
from typing import List
# 定义观察者模式中的主体接口(Subject)
class Subject(ABC):
"""
主体接口声明一组用于管理订阅者的方法。
"""
@abstractmethod
def attach(self, observer: Observer) -> None:
"""
将观察者附加到主体。
"""
pass
@abstractmethod
def detach(self, observer: Observer) -> None:
"""
从主体中移除观察者。
"""
pass
@abstractmethod
def notify(self) -> None:
"""
通知所有观察者有关事件的信息。
"""
pass
# 定义具体主体类(ConcreteSubject)
class ConcreteSubject(Subject):
"""
具体主体拥有对所有订阅者至关重要的状态,并在状态变化时通知观察者。
"""
_state: int = None
"""
为了简化起见,主体的状态(对所有订阅者至关重要)存储在此变量中。
"""
_observers: List[Observer] = []
"""
订阅者列表。在实际应用中,订阅者列表可以更全面地存储(按事件类型分类等)。
"""
def attach(self, observer: Observer) -> None:
print("主体:已附加一个观察者。")
self._observers.append(observer)
def detach(self, observer: Observer) -> None:
self._observers.remove(observer)
"""
订阅管理方法。
"""
def notify(self) -> None:
"""
触发每个订阅者的更新。
"""
print("主体:正在通知观察者...")
for observer in self._observers:
observer.update(self)
def some_business_logic(self) -> None:
"""
通常,订阅逻辑只是主体所能做的工作的一部分。主体通常包含一些重要的业务逻辑,
当即将发生(或已经发生)重要事情时触发通知方法。
"""
print("\n主体:我正在做一些重要的事情。")
self._state = randrange(0, 10)
print(f"主体:我的状态刚刚变更为:{self._state}")
self.notify()
# 定义观察者接口(Observer)
class Observer(ABC):
"""
观察者接口声明由主体使用的update方法。
"""
@abstractmethod
def update(self, subject: Subject) -> None:
"""
接收主体的更新。
"""
pass
# 定义具体观察者类(ConcreteObserverA 和 ConcreteObserverB)
"""
具体观察者对它们所附属的主体发出的更新做出反应。
"""
class ConcreteObserverA(Observer):
def update(self, subject: Subject) -> None:
if subject._state < 3:
print("具体观察者A:对事件作出反应")
class ConcreteObserverB(Observer):
def update(self, subject: Subject) -> None:
if subject._state == 0 or subject._state >= 2:
print("具体观察者B:对事件作出反应")
# 客户端代码
if __name__ == "__main__":
subject = ConcreteSubject()
observer_a = ConcreteObserverA()
subject.attach(observer_a)
observer_b = ConcreteObserverB()
subject.attach(observer_b)
subject.some_business_logic()
subject.some_business_logic()
subject.detach(observer_a)
subject.some_business_logic()
输出
Subject: Attached an observer.
Subject: Attached an observer.
Subject: I'm doing something important.
Subject: My state has just changed to: 0
Subject: Notifying observers...
ConcreteObserverA: Reacted to the event
ConcreteObserverB: Reacted to the event
Subject: I'm doing something important.
Subject: My state has just changed to: 5
Subject: Notifying observers...
ConcreteObserverB: Reacted to the event
Subject: I'm doing something important.
Subject: My state has just changed to: 0
Subject: Notifying observers...
ConcreteObserverB: Reacted to the event
参考:
参考