前言
村里的老人常说:真男人就该懂得遵守“三不原则”——不主动、不拒绝、不负责。
一个复杂的软件系统,其中必然会存在各种各样的“对象”,如果在设计之初没有注意控制好耦合度,导致各个对象甚至是函数之间高度耦合,那对于后期开发和维护将是一个灾难!
在日常开发中,大家不难发现,“面向事件编程”是解耦合的利器,其对应的设计模式便是大家常常会听到的“观察者模式”,而核心思想,就是尽可能令大部分对象都遵守“三不原则”:
- 合理设计事件处理器,等待事件的发生,而不要主动轮询某个临界资源;
- 设置一个安全高效的事件分发中心,无论多大并发都能保证不拒绝服务;
- 事件生产者不必关心事件将由谁来处理、如何处理,亦无需对结果负责。
接下来我将为大家展示如何设计一个优雅的本地消息分发处理系统。
接口设计
首先我们定义一个通用的 ```Notification``` (也可以叫“事件”,或者“消息”),它包含3个基本信息:
- 事件名;
- 发送者;
- 当前信息(上下文)
/// Notification object with name, sender and extra info
class Notification {
Notification(this.name, this.sender, this.userInfo);
final String name;
final dynamic sender;
final Map? userInfo;
}
然后我们定义“观察者”接口,以便让任务中心正确分发消息:
/// Notification observer
abstract class Observer {
Future<void> onReceiveNotification(Notification notification);
}
由于消费者处理该事件时有可能会需要花费较长时间,所以这里我们设计为异步接口。
最后我们再实现一个消息分发中心:
/// Singleton
class NotificationCenter {
factory NotificationCenter() => _instance;
static final NotificationCenter _instance = NotificationCenter._internal();
NotificationCenter._internal();
BaseCenter center = BaseCenter();
/// Add observer with notification name
///
/// @param observer - who will receive notification
/// @param name - notification name
void addObserver(Observer observer, String name) {
center.addObserver(observer, name);
}
/// Remove observer for notification name
///
/// @param observer - who will receive notification
/// @param name - notification name
void removeObserver(Observer observer, [String? name]) {
center.removeObserver(observer, name);
}
/// Post a notification with extra info
///
/// @param name - notification name
/// @param sender - who post this notification
/// @param info - extra info
Future<void> postNotification(String name, dynamic sender, [Map? info]) async {
await center.postNotification(name, sender, info);
}
/// Post a notification
///
/// @param notification - notification object
Future<void> post(Notification notification) async {
await center.post(notification);
}
}
这个事件分发中心主要实现3个功能:
- 将一个观察者以及其关心的事件名称添加到内部等候队列;
- 将一个观察者移出等候队列;
- 提交一个事件(中心内部异步执行分发)。
并且,因为一个应用系统中通常应该只有一个事件分发中心,所以这里的 NotificationCenter 被设计成为单例模式。
这样一个通用的本地消息分发系统就设计完成了。
应用示例
接下来将为你展示这个系统如何使用。
首先我们先定义一个观察者,并且将其添加到事件分发中心:
第一步,实现 Observer 接口:
import 'package:lnc/notification.dart' as lnc;
class _ContactListState extends State<ContactListPage> implements lnc.Observer {
// ...
@override
Future<void> onReceiveNotification(lnc.Notification notification) async {
// 获取事件名称与相关信息
String name = notification.name;
Map? userInfo = notification.userInfo;
// 根据事件名称处理信息
if (name == 'ContactsUpdated') {
ID? contact = userInfo?['contact'];
Log.info('contact updated: $contact');
// ...
} else if (name == 'DocumentUpdated') {
ID? did = userInfo?['ID'];
Log.info('document updated: $did');
// ...
}
}
}
第二步,在适当的时机将该观察者添加进事件分发中心 or 从中心删除:
class _ContactListState extends State<ContactListPage> implements lnc.Observer {
_ContactListState() {
// ...
var nc = lnc.NotificationCenter();
nc.addObserver(this, 'ContactsUpdated');
nc.addObserver(this, 'DocumentUpdated');
}
@override
void dispose() {
var nc = lnc.NotificationCenter();
nc.removeObserver(this, 'DocumentUpdated');
nc.removeObserver(this, 'ContactsUpdated');
super.dispose();
}
// ...
}
第三步,在事件发生点提交事件给分发中心:
// post notification
var nc = NotificationCenter();
nc.postNotification('DocumentUpdated', this, {
'ID': identifier,
'document': doc,
});
至此,一个观察者模式的本地事件系统的应用就介绍完了。
下面我们再来深入一下内部,看看这个事件分发中心是如何实现的?
进阶
注意前面提到的事件分发中心(单例) NotificationCenter,里面有一个代理对象 center:
/// Singleton
class NotificationCenter {
factory NotificationCenter() => _instance;
static final NotificationCenter _instance = NotificationCenter._internal();
NotificationCenter._internal();
BaseCenter center = BaseCenter(); // 代理对象,内部实现(可替换)
// ...
}
这里采用代理模式,是为了方便用户根据项目的特殊需要,定义具体的分发逻辑实现以替换之。
下面介绍一下这个默认的分发中心 BaseCenter:
class BaseCenter {
// name => WeakSet<Observer>
final Map<String, Set<Observer>> _observers = {};
/// Add observer with notification name
///
/// @param observer - listener
/// @param name - notification name
void addObserver(Observer observer, String name) {
Set<Observer>? listeners = _observers[name];
if (listeners == null) {
listeners = WeakSet(); // 弱引用集合
listeners.add(observer);
_observers[name] = listeners;
} else {
listeners.add(observer);
}
}
/// Remove observer from notification center
///
/// @param observer - listener
/// @param name - notification name
void removeObserver(Observer observer, [String? name]) {
if (name == null) {
// 1. remove from all observer set
_observers.forEach((key, listeners) {
listeners.remove(observer);
});
// 2. remove empty set
_observers.removeWhere((key, listeners) => listeners.isEmpty);
} else {
// 3. get listeners by name
Set<Observer>? listeners = _observers[name];
if (listeners != null && listeners.remove(observer)) {
// observer removed
if (listeners.isEmpty) {
_observers.remove(name);
}
}
}
}
/// Post notification with name
///
/// @param name - notification name
/// @param sender - notification sender
/// @param info - extra info
Future<void> postNotification(String name, dynamic sender, [Map? info]) async {
return await post(Notification(name, sender, info));
}
Future<void> post(Notification notification) async {
Set<Observer>? listeners = _observers[notification.name]?.toSet();
if (listeners == null) {
return;
}
List<Future> tasks = [];
for (Observer item in listeners) {
tasks.add(item.onReceiveNotification(notification));
}
// wait all tasks finished
await Future.wait(tasks);
}
}
- 首先,它有3个接口函数和 NotificationCenter 一一对应:
- 其次,它的内部有一个 key 为字符串的映射对象 _observers,其中每一个事件名称(字符串)映射向一个弱引用的集合 WeakSet,集合中的元素则是关注该事件名称的所有观察者;
- 当生产者提交事件时,该中心会根据该事件名称从 _observers 中获取对应的观察者集合,并调用其事件接口函数。
这里有两点值得注意:
- 由于低耦合的设计,各个观察者(事件消费者)分别独立处理事件结果,相互之间并无关联,并且也没有前后时序关系的要求,所以这里的 post 函数会采用异步并发的方式来同时调用这些观察者接口;
- 一般而言,观察者的添加与移除是一一对应的,但为了防止异常情况发生,这里的观察者集合仍然采用弱引用的集合,以便某些观察者非正常退出时,即使没有显式调用 removeObserver() 函数,也不会造成泄漏。
(关于弱引用的实现我们留到以后再来讲解)
代码引用
由于我已提交了一个完整的模块代码到 pub.dev,所以在实际应用中,你只需要在项目工程文件 ```pubspec.yaml``` 中添加
dependencies: lnc: ^0.1.2
然后在需要使用的 dart 文件头引入即可:
import 'package:lnc/notification.dart' as lnc;
全部源码
import 'package:object_key/object_key.dart';
import 'package:lnc/log.dart';
/// Notification observer
abstract class Observer {
Future<void> onReceiveNotification(Notification notification);
}
/// Notification object with name, sender and extra info
class Notification {
Notification(this.name, this.sender, this.userInfo);
final String name;
final dynamic sender;
final Map? userInfo;
@override
String toString() {
Type clazz = runtimeType;
return '<$clazz name="$name">\n\t<sender>$sender</sender>\n'
'\t<info>$userInfo</info>\n</$clazz>';
}
}
/// Notification center
class NotificationCenter {
factory NotificationCenter() => _instance;
static final NotificationCenter _instance = NotificationCenter._internal();
NotificationCenter._internal();
BaseCenter center = BaseCenter();
/// Add observer with notification name
///
/// @param observer - who will receive notification
/// @param name - notification name
void addObserver(Observer observer, String name) {
center.addObserver(observer, name);
}
/// Remove observer for notification name
///
/// @param observer - who will receive notification
/// @param name - notification name
void removeObserver(Observer observer, [String? name]) {
center.removeObserver(observer, name);
}
/// Post a notification with extra info
///
/// @param name - notification name
/// @param sender - who post this notification
/// @param info - extra info
Future<void> postNotification(String name, dynamic sender, [Map? info]) async {
await center.postNotification(name, sender, info);
}
/// Post a notification
///
/// @param notification - notification object
Future<void> post(Notification notification) async {
await center.post(notification);
}
}
class BaseCenter with Logging {
// name => WeakSet<Observer>
final Map<String, Set<Observer>> _observers = {};
/// Add observer with notification name
///
/// @param observer - listener
/// @param name - notification name
void addObserver(Observer observer, String name) {
Set<Observer>? listeners = _observers[name];
if (listeners == null) {
listeners = WeakSet();
listeners.add(observer);
_observers[name] = listeners;
} else {
listeners.add(observer);
}
}
/// Remove observer from notification center
///
/// @param observer - listener
/// @param name - notification name
void removeObserver(Observer observer, [String? name]) {
if (name == null) {
// 1. remove from all observer set
_observers.forEach((key, listeners) {
listeners.remove(observer);
});
// 2. remove empty set
_observers.removeWhere((key, listeners) => listeners.isEmpty);
} else {
// 3. get listeners by name
Set<Observer>? listeners = _observers[name];
if (listeners != null && listeners.remove(observer)) {
// observer removed
if (listeners.isEmpty) {
_observers.remove(name);
}
}
}
}
/// Post notification with name
///
/// @param name - notification name
/// @param sender - notification sender
/// @param info - extra info
Future<void> postNotification(String name, dynamic sender, [Map? info]) async {
return await post(Notification(name, sender, info));
}
Future<void> post(Notification notification) async {
Set<Observer>? listeners = _observers[notification.name]?.toSet();
if (listeners == null) {
logDebug('no listeners for notification: ${notification.name}');
return;
}
List<Future> tasks = [];
for (Observer item in listeners) {
try {
tasks.add(item.onReceiveNotification(notification).onError((error, st) =>
Log.error('observer error: $error, $st, $notification')
));
} catch (ex, stackTrace) {
logError('observer error: $ex, $stackTrace, $notification');
}
}
// wait all tasks finished
await Future.wait(tasks);
}
}
GitHub 地址:
https://github.com/dimchat/sdk-dart/blob/main/lnc/lib/src/notification.dart
结语
这里展示了一个基由观察者模式设计的本地事件通知分发系统,其中包含了“观察者模式”、“单例模式”、“代理模式”等设计思想,希望对你有帮助。
如有其他问题,可以下载登录 Tarsier 与我交流(默认通讯录里找 Albert Moky)