event 事件记录初始化
- 一般在控制器都会有如下的初始化函数,初始化 event 记录器等参数
1. 创建 EventBroadcaster
record.NewBroadcaster()
: 创建事件广播器,用于记录和分发事件。StartLogging(klog.Infof)
: 将事件以日志的形式输出。StartRecordingToSink
: 将事件发送到 Kubernetes API Server,存储为Event
资源。
2. 创建 EventRecorder
NewRecorder(scheme, source)
从广播器中创建事件记录器。scheme
: 用于验证和序列化资源。source
: 指定事件的来源(如example-controller
)。
import "k8s.io/client-go/tools/record"
func (c *controller) Initialize(opt *framework.ControllerOption) error {
// ...
// 1. 创建事件广播器 eventBroadcaster
eventBroadcaster := record.NewBroadcaster()
// 将 event 记录到 log
eventBroadcaster.StartLogging(klog.Infof)
// 将 event 记录到 apiserver
// c.kubeClient.CoreV1().Events("") 这个是创建一个可以操作任意 ns 下 event 的 client
eventBroadcaster.StartRecordingToSink(&corev1.EventSinkImpl{Interface: c.kubeClient.CoreV1().Events("")})
// 2. 基于事件广播器 创建事件记录器 Recorder
c.recorder = eventBroadcaster.NewRecorder(versionedscheme.Scheme, v1.EventSource{Component: "example-controller"})
}
// 事件的记录
const Create-Reason = "PodCreate"
func (c *controller)Controller_Do_Something(pod *corev1.Pod){
newPod:= pod.DeepCopy()
// 生成个 event,并记录
// 内容为 newPod 创建成功,event等级为 Normal,Reason 是 PodCreate,Message 是 Create Pod succeed
// 之后 Recorder 内的 eventBroadcaster 会将此 event 广播出去,然后 eventBroadcaster 之前注册的日志记录和event存储逻辑会执行
// 日志记录逻辑,会通过 klog.Infof 将此 event 打印出来
// event存储逻辑,会将此 event 存储到 apiserver
c.recorder.Event(newPod, v1.EventTypeNormal, Create-Reason, "Create Pod succeed")
}
源码解析
0- 总体逻辑设计
-
控制中心 Broadcaster
-
eventBroadcaster := record.NewBroadcaster()
创建一个公共数据源(或理解为总控中心,也可以称之为控制器,但不是k8s 控制器)- 返回的是
eventBroadcasterImpl
结构体,其封装了Broadcaster
结构体,因此eventBroadcasterImpl
结构体的字段很丰富
- 返回的是
-
Broadcaster
中的字段主要记录处理 event 的监听者watchers
,以及分发的控制等eventBroadcaster.StartLogging(klog.Infof)
就是一个watcher
eventBroadcaster.StartRecordingToSink(&corev1.EventSinkImpl{Interface: c.kubeClient.CoreV1().Events("")})
也是个watcher
- 这些
watcher
都会被记录到Broadcaster
结构体的watchers map[int64]*broadcasterWatcher
的map 中
-
eventBroadcasterImpl
在Broadcaster
基础上增加少量配置参数和控制函数
-
-
Event 分发和 watcher 处理逻辑
eventBroadcaster := record.NewBroadcaster()
执行过程中会调用watch.NewLongQueueBroadcaster(maxQueuedEvents, watch.DropIfChannelFull)
函数,其会开启 event 分发逻辑go m.loop()
go m.loop()
用于处理 event 的分发,读取eventBroadcasterImpl
中incoming chan Event
通道传来的 event,分发给各个 watcher 的result
channelincoming
中的 event 是由Recorder
写入的,Recorder.Event
会生成个 event ,并发送到incomimg
channel 中go m.loop()
函数会读取incoming
通道中的 Event,发送个各个watcher
, 然后各个watcher
执行自己的逻辑(如记录为 info级别日志、或写入apiserver等)
- 同时为了避免主进程的结束导致
go m.loop()
进程结束,NewLongQueueBroadcaster
还利用distributing sync.WaitGroup
变量,进行m.distributing.Add(1)
,让主进程等待(避免主进程快速结束,导致 loop 进程结束)
StartLogging
或StartRecordingToSink
函数会调用StartEventWatcher
函数,StartEventWatcher
函数将传入的参数变为一个 event处理函数eventHandler
,StartEventWatcher
函数同时会开启一个 go 协程,读取各自 watcherresult
channel 中的 event,之后用eventHandler
进行处理(如记录为 info级别日志、或写入apiserver等)
-
Event 产生逻辑
-
Recorder
是由eventBroadcaster.NewRecorder
创建出来的,相当于对eventBroadcasterImpl
中Broadcaster
的封装 -
Recorder.Event
会生成个 event ,并发送到incomimg
channel 中-
Recorder
会利用Broadcaster
的incoming
channel 写入 event -
Recorder
会利用Broadcaster
的incomingBlock
,控制写入时的并发,避免同一时间写入 event 过多导致错乱(这部分逻辑在blockQueue
函数中)
-
-
1- 控制中心的创建 —— NewBroadcaster 函数
- 创建的
eventBroadcaster
,实际上就是创建一个eventBroadcasterImpl
结构体,并传入一些配置参数进行初始化 - 注意
eventBroadcasterImpl
封装了Broadcaster
结构体- 注意
Broadcaster
中有很多channel
、watchers
和分发相关控制、并发控制字段等eventBroadcaster.StartLogging(klog.Infof)
就是一个watcher
eventBroadcaster.StartRecordingToSink(&corev1.EventSinkImpl{Interface: c.kubeClient.CoreV1().Events("")})
也是个watcher
- 这些
watcher
都会被记录到watchers map[int64]*broadcasterWatcher
的map 中
- 基于
eventBroadcaster
创建的Recorder
,实际上级就是对eventBroadcasterImpl
结构体的封装 - 之后
Recorder
创建 event 时,会传入到eventBroadcasterImpl
内Broadcaster
- 注意
// 路径 mod/k8s.io/client-go@v0.29.0/tools/record/event.go
const maxQueuedEvents = 1000
type FullChannelBehavior int
const (
WaitIfChannelFull FullChannelBehavior = iota
DropIfChannelFull
)
// Creates a new event broadcaster.
func NewBroadcaster(opts ...BroadcasterOption) EventBroadcaster {
c := config{
sleepDuration: defaultSleepDuration,
}
for _, opt := range opts {
opt(&c)
}
// 重点关注
eventBroadcaster := &eventBroadcasterImpl{
Broadcaster: watch.NewLongQueueBroadcaster(maxQueuedEvents, watch.DropIfChannelFull),
sleepDuration: c.sleepDuration,
options: c.CorrelatorOptions,
}
ctx := c.Context
if ctx == nil {
ctx = context.Background()
} else {
// Calling Shutdown is not required when a context was provided:
// when the context is canceled, this goroutine will shut down
// the broadcaster.
go func() {
<-ctx.Done()
eventBroadcaster.Broadcaster.Shutdown()
}()
}
eventBroadcaster.cancelationCtx, eventBroadcaster.cancel = context.WithCancel(ctx)
// 重点关注
return eventBroadcaster
}
// 路径 mod/k8s.io/apimachinery@v0.29.0/pkg/watch/mux.go
// NewLongQueueBroadcaster functions nearly identically to NewBroadcaster,
// except that the incoming queue is the same size as the outgoing queues
// (specified by queueLength).
func NewLongQueueBroadcaster(queueLength int, fullChannelBehavior FullChannelBehavior) *Broadcaster {
m := &Broadcaster{
watchers: map[int64]*broadcasterWatcher{},
incoming: make(chan Event, queueLength),
stopped: make(chan struct{}),
watchQueueLength: queueLength,
fullChannelBehavior: fullChannelBehavior,
}
m.distributing.Add(1) // distributing sync.WaitGroup, 1 个进程
go m.loop() // loop 进程,很关键! 处理 event 的分发,分发给 watcher 处理
return m
}
1.1- eventBroadcasterImpl 结构体
// 路径 mod/k8s.io/client-go@v0.29.0/tools/record/event.go
type eventBroadcasterImpl struct {
*watch.Broadcaster // 此处引用下面的结构体
sleepDuration time.Duration
options CorrelatorOptions
cancelationCtx context.Context
cancel func()
}
// 路径 /mod/k8s.io/apimachinery@v0.29.0/pkg/watch/mux.go
// Broadcaster distributes event notifications among any number of watchers. Every event
// is delivered to every watcher.
type Broadcaster struct {
watchers map[int64]*broadcasterWatcher // map 结构 id 和 watcher 的映射
nextWatcher int64 // 下一个 watcher 该分配的 id
distributing sync.WaitGroup // 用于保证分发函数 loop 正常运行,避免主函数停止,导致 loop 函数停止
// incomingBlock allows us to ensure we don't race and end up sending events
// to a closed channel following a broadcaster shutdown.
incomingBlock sync.Mutex // 避免接收 event 时,event 过多导致的并发,因此需要锁进行控制
incoming chan Event // 承接生成的 event,其他 watcher 会从此 channel 中读取 event 进行记录到 apiserver 或日志打印等
stopped chan struct{} // 承接关闭广播器 Broadcaster 的停止信号
// How large to make watcher's channel.
watchQueueLength int
// If one of the watch channels is full, don't wait for it to become empty.
// Instead just deliver it to the watchers that do have space in their
// channels and move on to the next event.
// It's more fair to do this on a per-watcher basis than to do it on the
// "incoming" channel, which would allow one slow watcher to prevent all
// other watchers from getting new events.
fullChannelBehavior FullChannelBehavior
}
1.2- EventBroadcaster 接口
// 路径 mod/k8s.io/client-go@v0.29.0/tools/record/event.go
// EventBroadcaster knows how to receive events and send them to any EventSink, watcher, or log.
type EventBroadcaster interface {
// StartEventWatcher starts sending events received from this EventBroadcaster to the given
// event handler function. The return value can be ignored or used to stop recording, if
// desired.
StartEventWatcher(eventHandler func(*v1.Event)) watch.Interface
// StartRecordingToSink starts sending events received from this EventBroadcaster to the given
// sink. The return value can be ignored or used to stop recording, if desired.
StartRecordingToSink(sink EventSink) watch.Interface
// StartLogging starts sending events received from this EventBroadcaster to the given logging
// function. The return value can be ignored or used to stop recording, if desired.
StartLogging(logf func(format string, args ...interface{})) watch.Interface
// StartStructuredLogging starts sending events received from this EventBroadcaster to the structured
// logging function. The return value can be ignored or used to stop recording, if desired.
StartStructuredLogging(verbosity klog.Level) watch.Interface
// NewRecorder returns an EventRecorder that can be used to send events to this EventBroadcaster
// with the event source set to the given event source.
NewRecorder(scheme *runtime.Scheme, source v1.EventSource) EventRecorderLogger
// Shutdown shuts down the broadcaster. Once the broadcaster is shut
// down, it will only try to record an event in a sink once before
// giving up on it with an error message.
Shutdown()
}
1.3- NewRecorder 接口的实现
- Recorder 封装了 Broadcaster
// 路径 mod/k8s.io/client-go@v0.29.0/tools/record/event.go
// NewRecorder returns an EventRecorder that records events with the given event source.
func (e *eventBroadcasterImpl) NewRecorder(scheme *runtime.Scheme, source v1.EventSource) EventRecorderLogger {
return &recorderImplLogger{recorderImpl: &recorderImpl{scheme, source, e.Broadcaster, clock.RealClock{}}, logger: klog.Background()}
}
type recorderImplLogger struct {
*recorderImpl
logger klog.Logger
}
type recorderImpl struct {
scheme *runtime.Scheme
source v1.EventSource
*watch.Broadcaster
clock clock.PassiveClock
}
1.3- loop(event的分发)
// // 路径 /mod/k8s.io/apimachinery@v0.29.0/pkg/watch/mux.go
// loop receives from m.incoming and distributes to all watchers.
func (m *Broadcaster) loop() {
// Deliberately not catching crashes here. Yes, bring down the process if there's a
// bug in watch.Broadcaster.
for event := range m.incoming {
if event.Type == internalRunFunctionMarker {
event.Object.(functionFakeRuntimeObject)()
continue
}
m.distribute(event) // 将 event 分发给 watcher
}
m.closeAll()
m.distributing.Done()
}
// distribute sends event to all watchers. Blocking.
func (m *Broadcaster) distribute(event Event) {
if m.fullChannelBehavior == DropIfChannelFull {
for _, w := range m.watchers {
select {
case w.result <- event: // 将 event 发送到 watcher 的 result channel,等待 watcher 进行处理
case <-w.stopped:
default: // Don't block if the event can't be queued.
}
}
} else {
for _, w := range m.watchers {
select {
case w.result <- event:
case <-w.stopped:
}
}
}
}
1.4 event 的产生
// 路径 mod/k8s.io/client-go@v0.29.0/tools/record/event.go
// EventRecorder knows how to record events on behalf of an EventSource.
type EventRecorder interface {
// Event constructs an event from the given information and puts it in the queue for sending.
// 'object' is the object this event is about. Event will make a reference-- or you may also
// pass a reference to the object directly.
// 'eventtype' of this event, and can be one of Normal, Warning. New types could be added in future
// 'reason' is the reason this event is generated. 'reason' should be short and unique; it
// should be in UpperCamelCase format (starting with a capital letter). "reason" will be used
// to automate handling of events, so imagine people writing switch statements to handle them.
// You want to make that easy.
// 'message' is intended to be human readable.
//
// The resulting event will be created in the same namespace as the reference object.
Event(object runtime.Object, eventtype, reason, message string)
// Eventf is just like Event, but with Sprintf for the message field.
Eventf(object runtime.Object, eventtype, reason, messageFmt string, args ...interface{})
// AnnotatedEventf is just like eventf, but with annotations attached
AnnotatedEventf(object runtime.Object, annotations map[string]string, eventtype, reason, messageFmt string, args ...interface{})
}
func (recorder *recorderImpl) Event(object runtime.Object, eventtype, reason, message string) {
recorder.generateEvent(klog.Background(), object, nil, eventtype, reason, message)
}
func (recorder *recorderImpl) generateEvent(logger klog.Logger, object runtime.Object, annotations map[string]string, eventtype, reason, message string) {
ref, err := ref.GetReference(recorder.scheme, object)
if err != nil {
logger.Error(err, "Could not construct reference, will not report event", "object", object, "eventType", eventtype, "reason", reason, "message", message)
return
}
if !util.ValidateEventType(eventtype) {
logger.Error(nil, "Unsupported event type", "eventType", eventtype)
return
}
event := recorder.makeEvent(ref, annotations, eventtype, reason, message)
event.Source = recorder.source
event.ReportingInstance = recorder.source.Host
event.ReportingController = recorder.source.Component
// NOTE: events should be a non-blocking operation, but we also need to not
// put this in a goroutine, otherwise we'll race to write to a closed channel
// when we go to shut down this broadcaster. Just drop events if we get overloaded,
// and log an error if that happens (we've configured the broadcaster to drop
// outgoing events anyway).
sent, err := recorder.ActionOrDrop(watch.Added, event)
if err != nil {
logger.Error(err, "Unable to record event (will not retry!)")
return
}
if !sent {
logger.Error(nil, "Unable to record event: too many queued events, dropped event", "event", event)
}
}
// Action distributes the given event among all watchers, or drops it on the floor
// if too many incoming actions are queued up. Returns true if the action was sent,
// false if dropped.
func (m *Broadcaster) ActionOrDrop(action EventType, obj runtime.Object) (bool, error) {
m.incomingBlock.Lock()
defer m.incomingBlock.Unlock()
// Ensure that if the broadcaster is stopped we do not send events to it.
select {
case <-m.stopped:
return false, fmt.Errorf("broadcaster already stopped")
default:
}
select {
case m.incoming <- Event{action, obj}:
return true, nil
default:
return false, nil
}
}
1.5 watcher 的处理
eventBroadcaster.StartLogging(klog.Infof)
// StartLogging starts sending events received from this EventBroadcaster to the given logging function.
// The return value can be ignored or used to stop recording, if desired.
func (e *eventBroadcasterImpl) StartLogging(logf func(format string, args ...interface{})) watch.Interface {
return e.StartEventWatcher(
func(e *v1.Event) { // 对应 下面 eventHandler
logf("Event(%#v): type: '%v' reason: '%v' %v", e.InvolvedObject, e.Type, e.Reason, e.Message)
})
}
// StartEventWatcher starts sending events received from this EventBroadcaster to the given event handler function.
// The return value can be ignored or used to stop recording, if desired.
func (e *eventBroadcasterImpl) StartEventWatcher(eventHandler func(*v1.Event)) watch.Interface {
watcher, err := e.Watch()
if err != nil {
klog.FromContext(e.cancelationCtx).Error(err, "Unable start event watcher (will not retry!)")
}
go func() { // 直接运行了
defer utilruntime.HandleCrash()
for {
select {
case <-e.cancelationCtx.Done():
watcher.Stop()
return
case watchEvent := <-watcher.ResultChan(): // 从 watcher result channel 中取出 event
event, ok := watchEvent.Object.(*v1.Event)
if !ok {
// This is all local, so there's no reason this should
// ever happen.
continue
}
eventHandler(event) // 对 event 进行处理
}
}
}()
return watcher
}
附录1 | 示例详解
以下是一个完整的 EventRecorder
和 EventBroadcaster
实例化的代码示例,展示如何在 Kubernetes 控制器中记录事件。代码包含详细注释,适合用于实际开发或学习:
代码示例
package main
import (
"context"
"fmt"
"time"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime"
metav1 "k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/fake"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog/v2"
)
// 主要逻辑的入口
func main() {
// 1. 创建 Kubernetes 客户端
// 这里我们使用 fake 客户端进行演示,生产环境应替换为真实的 `kubernetes.Clientset`
clientset := fake.NewSimpleClientset()
// 2. 创建事件广播器(EventBroadcaster)
// 事件广播器是事件处理的核心,负责分发事件到日志和 API Server
eventBroadcaster := record.NewBroadcaster()
// 3. 启动日志记录功能
// 通过 `klog.Infof` 输出事件到控制台或日志文件
eventBroadcaster.StartLogging(klog.Infof)
// 4. 启动事件的 API Server 记录功能
// 配置事件接收器,将事件发送到 API Server,通过 `kubeClient.CoreV1().Events` 接口记录事件
eventBroadcaster.StartRecordingToSink(&corev1.EventSinkImpl{
Interface: clientset.CoreV1().Events(""),
})
// 5. 创建事件记录器(EventRecorder)
// EventRecorder 用于开发者实际记录事件
recorder := eventBroadcaster.NewRecorder(
scheme(), // 提供资源的模式信息,常用的是 `runtime.NewScheme()` 或自定义的 scheme
corev1.EventSource{Component: "example-controller"},
)
// 6. 模拟一个 Kubernetes 对象(如 Pod)的引用
// 事件通常需要与具体的 Kubernetes 资源关联
objRef := &corev1.ObjectReference{
Kind: "Pod", // 资源类型
Namespace: "default", // 命名空间
Name: "example-pod", // 资源名称
UID: "12345-abcde-67890", // 唯一标识符
APIVersion: "v1", // API 版本
}
// 7. 使用 EventRecorder 记录事件
// 记录一个正常类型的事件(EventTypeNormal)
recorder.Eventf(objRef, corev1.EventTypeNormal, "PodCreated", "Successfully created Pod %s", objRef.Name)
// 模拟一个警告事件(EventTypeWarning)
recorder.Eventf(objRef, corev1.EventTypeWarning, "PodFailed", "Failed to create Pod %s due to insufficient resources", objRef.Name)
// 模拟一个控制器逻辑的操作
processQueue(recorder, objRef)
// 等待事件记录完成
time.Sleep(2 * time.Second)
}
// 模拟处理队列的函数
func processQueue(recorder record.EventRecorder, objRef *corev1.ObjectReference) {
// 创建一个工作队列
queue := workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter())
// 将任务添加到队列
queue.Add("task1")
// 模拟处理队列中的任务
for queue.Len() > 0 {
item, _ := queue.Get()
defer queue.Done(item)
// 记录一个事件,表明任务已处理
recorder.Eventf(objRef, corev1.EventTypeNormal, "TaskProcessed", "Successfully processed task: %v", item)
fmt.Printf("Processed task: %v\n", item)
}
}
// 创建 Scheme 用于事件记录器
// Scheme 是 Kubernetes 中资源的模式定义,用于确定资源类型和序列化方式
func scheme() *runtime.Scheme {
s := runtime.NewScheme()
// 添加 CoreV1 资源到 Scheme 中
corev1.AddToScheme(s)
metav1.AddToGroupVersion(s, schema.GroupVersion{Version: "v1"})
return s
}
代码详解
1. 创建 EventBroadcaster
record.NewBroadcaster()
: 创建事件广播器,用于记录和分发事件。StartLogging(klog.Infof)
: 将事件以日志的形式输出。StartRecordingToSink
: 将事件发送到 Kubernetes API Server,存储为Event
资源。
2. 创建 EventRecorder
NewRecorder(scheme, source)
从广播器中创建事件记录器。scheme
: 用于验证和序列化资源。source
: 指定事件的来源(如example-controller
)。
3. 创建对象引用
ObjectReference
: 用于标识事件关联的 Kubernetes 资源。包括类型、名称、命名空间、UID 等信息。
4. 记录事件
-
- 用于记录事件,包括:
Eventf
- 类型:
corev1.EventTypeNormal
(正常)或corev1.EventTypeWarning
(警告)。 - 原因:事件发生的原因(如
PodCreated
)。 - 消息:事件的详细描述。
5. 模拟队列任务
- 使用
workqueue
模拟处理任务,记录任务完成时的事件。
运行结果
日志输出
事件将输出到日志(通过 klog
):
I1119 12:34:56.123456 12345 example.go:52] Event(v1.ObjectReference{...}): type: 'Normal' reason: 'PodCreated' message: 'Successfully created Pod example-pod'
I1119 12:34:56.123457 12345 example.go:53] Event(v1.ObjectReference{...}): type: 'Warning' reason: 'PodFailed' message: 'Failed to create Pod example-pod due to insufficient resources'
Processed task: task1
事件存储
如果使用真实客户端,事件会存储在集群中,可通过 kubectl
查看:
kubectl get events
事件输出
LAST SEEN TYPE REASON OBJECT MESSAGE
5s Normal PodCreated Pod/example-pod Successfully created Pod example-pod
5s Warning PodFailed Pod/example-pod Failed to create Pod example-pod due to insufficient resources
5s Normal TaskProcessed Pod/example-pod Successfully processed task: task1
代码用途
- 日志记录和事件管理: 帮助开发者跟踪控制器的运行状态和资源变更。
- 任务队列处理: 将业务逻辑与事件机制结合,记录每个关键操作的状态。
以上代码展示了如何使用 EventRecorder
和 EventBroadcaster
实现 Kubernetes 控制器中的事件管理,适合用于开发自定义控制器或调试集群事件处理逻辑。
附录2 | stoped 变量的作用
在 NewBroadcaster
函数中的 stopped
通道用于实现对 Broadcaster
对象的停止和关闭控制。具体来说,它的作用是在广播器的生命周期中进行信号传递,用于通知 Broadcaster
是否已经停止运行。
详细分析:
1. 通道的定义:
stopped: make(chan struct{}),
stopped
是一个无缓冲的通道,类型为 struct{}
。无缓冲的通道用于信号传递,表示某个事件的发生,而不需要传递具体数据。这里的 struct{}
是一个空结构体,占用零内存,因此不会传递任何实际数据。
2. 停止广播器的作用:
stopped
通道用于在广播器停止时传递一个信号。通常这种信号用于通知相关的 goroutine 或者处理流程,广播器已经停止工作,可以做一些清理操作或者退出。
3. 与 go m.loop()
的配合:
go m.loop()
在 NewBroadcaster
中,启动了一个新的 goroutine 来执行 m.loop()
。这个 loop
方法通常是处理传入事件并进行分发的核心逻辑。loop
方法可能会定期检查 stopped
通道,判断是否已经收到停止信号。
4. 典型的停止逻辑(假设)
func (m *Broadcaster) loop() {
for {
select {
case event := <-m.incoming:
// 处理事件逻辑
case <-m.stopped:
// 收到停止信号后,退出 goroutine
return
}
}
}
在这个假设的 loop
实现中,select
语句等待从 m.incoming
通道接收事件,或者等待 stopped
通道的信号。当收到 stopped
通道的信号时,loop
方法退出,从而停止事件的分发。
5. 停止广播器的触发:
在实际代码中,某个外部操作可能会通过 close(m.stopped)
或者发送一个信号到 m.stopped
通道,来通知 Broadcaster
停止工作。比如在处理完所有事件或者发生错误时,可能会调用停止操作:
close(m.stopped)
或者
m.stopped <- struct{}{}
这样 loop
就会检测到停止信号并退出。
总结
stopped
通道在 Broadcaster
中的作用是提供一个停止信号,通知正在运行的 goroutine(如 loop
方法)停止执行。这种设计使得 Broadcaster
能够优雅地处理停止操作,确保所有 goroutine 都能够适时退出并清理资源。