【k8s深入学习之 event 记录】初步了解 k8s event 记录机制

event 事件记录初始化

  • 一般在控制器都会有如下的初始化函数,初始化 event 记录器等参数
1. 创建 EventBroadcaster
  • record.NewBroadcaster(): 创建事件广播器,用于记录和分发事件。
  • StartLogging(klog.Infof): 将事件以日志的形式输出。
  • StartRecordingToSink: 将事件发送到 Kubernetes API Server,存储为 Event 资源。
2. 创建 EventRecorder
  • NewRecorder(scheme, source)从广播器中创建事件记录器。
    • scheme: 用于验证和序列化资源。
    • source: 指定事件的来源(如 example-controller)。
import "k8s.io/client-go/tools/record"

func (c *controller) Initialize(opt *framework.ControllerOption) error {
	// ...
  // 1. 创建事件广播器 eventBroadcaster
  eventBroadcaster := record.NewBroadcaster()
  // 将 event 记录到 log
	eventBroadcaster.StartLogging(klog.Infof)
  // 将 event 记录到 apiserver
  // c.kubeClient.CoreV1().Events("") 这个是创建一个可以操作任意 ns 下 event 的 client
	eventBroadcaster.StartRecordingToSink(&corev1.EventSinkImpl{Interface: c.kubeClient.CoreV1().Events("")})
  // 2. 基于事件广播器 创建事件记录器 Recorder
  c.recorder = eventBroadcaster.NewRecorder(versionedscheme.Scheme, v1.EventSource{Component: "example-controller"})
}

// 事件的记录
const Create-Reason = "PodCreate"
func (c *controller)Controller_Do_Something(pod *corev1.Pod){
  newPod:= pod.DeepCopy()
  // 生成个 event,并记录
  // 内容为 newPod 创建成功,event等级为 Normal,Reason 是 PodCreate,Message 是 Create Pod succeed
  // 之后 Recorder 内的 eventBroadcaster 会将此 event 广播出去,然后 eventBroadcaster 之前注册的日志记录和event存储逻辑会执行
  // 日志记录逻辑,会通过 klog.Infof 将此 event 打印出来
  // event存储逻辑,会将此 event 存储到 apiserver
  c.recorder.Event(newPod, v1.EventTypeNormal, Create-Reason, "Create Pod succeed")
}

源码解析

在这里插入图片描述

0- 总体逻辑设计

  1. 控制中心 Broadcaster

    • eventBroadcaster := record.NewBroadcaster() 创建一个公共数据源(或理解为总控中心,也可以称之为控制器,但不是k8s 控制器)

      • 返回的是eventBroadcasterImpl 结构体,其封装了Broadcaster结构体,因此 eventBroadcasterImpl 结构体的字段很丰富
    • Broadcaster 中的字段主要记录处理 event 的监听者watchers,以及分发的控制等

      • eventBroadcaster.StartLogging(klog.Infof) 就是一个 watcher
      • eventBroadcaster.StartRecordingToSink(&corev1.EventSinkImpl{Interface: c.kubeClient.CoreV1().Events("")}) 也是个 watcher
      • 这些 watcher 都会被记录到Broadcaster结构体的watchers map[int64]*broadcasterWatcher 的map 中
    • eventBroadcasterImplBroadcaster 基础上增加少量配置参数和控制函数

  2. Event 分发和 watcher 处理逻辑

    • eventBroadcaster := record.NewBroadcaster() 执行过程中会调用 watch.NewLongQueueBroadcaster(maxQueuedEvents, watch.DropIfChannelFull) 函数,其会开启 event 分发逻辑go m.loop()
      • go m.loop() 用于处理 event 的分发,读取eventBroadcasterImplincoming chan Event 通道传来的 event,分发给各个 watcher 的 result channel
        • incoming 中的 event 是由 Recorder 写入的,Recorder.Event 会生成个 event ,并发送到incomimg channel 中
        • go m.loop()函数会读取incoming通道中的 Event,发送个各个watcher, 然后各个watcher执行自己的逻辑(如记录为 info级别日志、或写入apiserver等)
      • 同时为了避免主进程的结束导致go m.loop()进程结束,NewLongQueueBroadcaster 还利用distributing sync.WaitGroup变量,进行 m.distributing.Add(1),让主进程等待(避免主进程快速结束,导致 loop 进程结束)
    • StartLoggingStartRecordingToSink 函数会调用 StartEventWatcher 函数, StartEventWatcher 函数将传入的参数变为一个 event处理函数 eventHandler, StartEventWatcher 函数同时会开启一个 go 协程,读取各自 watcher result channel 中的 event,之后用eventHandler进行处理(如记录为 info级别日志、或写入apiserver等)
  3. Event 产生逻辑

    • Recorder 是由 eventBroadcaster.NewRecorder 创建出来的,相当于对eventBroadcasterImplBroadcaster 的封装

    • Recorder.Event 会生成个 event ,并发送到incomimg channel 中

      • Recorder 会利用Broadcasterincoming channel 写入 event

      • Recorder 会利用BroadcasterincomingBlock,控制写入时的并发,避免同一时间写入 event 过多导致错乱(这部分逻辑在blockQueue 函数中)

1- 控制中心的创建 —— NewBroadcaster 函数

  • 创建的eventBroadcaster ,实际上就是创建一个 eventBroadcasterImpl 结构体,并传入一些配置参数进行初始化
  • 注意 eventBroadcasterImpl封装了Broadcaster结构体
    • 注意Broadcaster中有很多channelwatchers和分发相关控制、并发控制字段等
      • eventBroadcaster.StartLogging(klog.Infof) 就是一个 watcher
      • eventBroadcaster.StartRecordingToSink(&corev1.EventSinkImpl{Interface: c.kubeClient.CoreV1().Events("")}) 也是个 watcher
      • 这些 watcher 都会被记录到watchers map[int64]*broadcasterWatcher 的map 中
    • 基于eventBroadcaster 创建的Recorder,实际上级就是对eventBroadcasterImpl结构体的封装
    • 之后Recorder创建 event 时,会传入到eventBroadcasterImplBroadcaster
// 路径 mod/k8s.io/client-go@v0.29.0/tools/record/event.go
const maxQueuedEvents = 1000
type FullChannelBehavior int
const (
	WaitIfChannelFull FullChannelBehavior = iota
	DropIfChannelFull
)

// Creates a new event broadcaster.
func NewBroadcaster(opts ...BroadcasterOption) EventBroadcaster {
	c := config{
		sleepDuration: defaultSleepDuration,
	}
	for _, opt := range opts {
		opt(&c)
	}
  // 重点关注
	eventBroadcaster := &eventBroadcasterImpl{
		Broadcaster:   watch.NewLongQueueBroadcaster(maxQueuedEvents, watch.DropIfChannelFull),
		sleepDuration: c.sleepDuration,
		options:       c.CorrelatorOptions,
	}
	ctx := c.Context
	if ctx == nil {
		ctx = context.Background()
	} else {
		// Calling Shutdown is not required when a context was provided:
		// when the context is canceled, this goroutine will shut down
		// the broadcaster.
		go func() {
			<-ctx.Done()
			eventBroadcaster.Broadcaster.Shutdown()
		}()
	}
	eventBroadcaster.cancelationCtx, eventBroadcaster.cancel = context.WithCancel(ctx)
  // 重点关注
	return eventBroadcaster
}

// 路径 mod/k8s.io/apimachinery@v0.29.0/pkg/watch/mux.go
// NewLongQueueBroadcaster functions nearly identically to NewBroadcaster,
// except that the incoming queue is the same size as the outgoing queues
// (specified by queueLength).
func NewLongQueueBroadcaster(queueLength int, fullChannelBehavior FullChannelBehavior) *Broadcaster {
	m := &Broadcaster{
		watchers:            map[int64]*broadcasterWatcher{},
		incoming:            make(chan Event, queueLength),
		stopped:             make(chan struct{}),
		watchQueueLength:    queueLength,
		fullChannelBehavior: fullChannelBehavior,
	}
	m.distributing.Add(1)	// distributing sync.WaitGroup, 1 个进程
	go m.loop()  					// loop 进程,很关键! 处理 event 的分发,分发给 watcher 处理		
	return m
}
1.1- eventBroadcasterImpl 结构体
// 路径 mod/k8s.io/client-go@v0.29.0/tools/record/event.go
type eventBroadcasterImpl struct {
	*watch.Broadcaster  // 此处引用下面的结构体
	sleepDuration  time.Duration
	options        CorrelatorOptions
	cancelationCtx context.Context
	cancel         func()
}

// 路径 /mod/k8s.io/apimachinery@v0.29.0/pkg/watch/mux.go
// Broadcaster distributes event notifications among any number of watchers. Every event
// is delivered to every watcher.
type Broadcaster struct {
	watchers     map[int64]*broadcasterWatcher  // map 结构  id 和 watcher 的映射
	nextWatcher  int64													// 下一个 watcher 该分配的 id
	distributing sync.WaitGroup									// 用于保证分发函数 loop 正常运行,避免主函数停止,导致 loop 函数停止

	// incomingBlock allows us to ensure we don't race and end up sending events
	// to a closed channel following a broadcaster shutdown.
	incomingBlock sync.Mutex										// 避免接收 event 时,event 过多导致的并发,因此需要锁进行控制
	incoming      chan Event										// 承接生成的 event,其他 watcher 会从此 channel 中读取 event 进行记录到 apiserver 或日志打印等
	stopped       chan struct{}									// 承接关闭广播器 Broadcaster 的停止信号

	// How large to make watcher's channel.
	watchQueueLength int
	// If one of the watch channels is full, don't wait for it to become empty.
	// Instead just deliver it to the watchers that do have space in their
	// channels and move on to the next event.
	// It's more fair to do this on a per-watcher basis than to do it on the
	// "incoming" channel, which would allow one slow watcher to prevent all
	// other watchers from getting new events.
	fullChannelBehavior FullChannelBehavior
}
1.2- EventBroadcaster 接口
// 路径 mod/k8s.io/client-go@v0.29.0/tools/record/event.go
// EventBroadcaster knows how to receive events and send them to any EventSink, watcher, or log.
type EventBroadcaster interface {
	// StartEventWatcher starts sending events received from this EventBroadcaster to the given
	// event handler function. The return value can be ignored or used to stop recording, if
	// desired.
	StartEventWatcher(eventHandler func(*v1.Event)) watch.Interface

	// StartRecordingToSink starts sending events received from this EventBroadcaster to the given
	// sink. The return value can be ignored or used to stop recording, if desired.
	StartRecordingToSink(sink EventSink) watch.Interface

	// StartLogging starts sending events received from this EventBroadcaster to the given logging
	// function. The return value can be ignored or used to stop recording, if desired.
	StartLogging(logf func(format string, args ...interface{})) watch.Interface

	// StartStructuredLogging starts sending events received from this EventBroadcaster to the structured
	// logging function. The return value can be ignored or used to stop recording, if desired.
	StartStructuredLogging(verbosity klog.Level) watch.Interface

	// NewRecorder returns an EventRecorder that can be used to send events to this EventBroadcaster
	// with the event source set to the given event source.
	NewRecorder(scheme *runtime.Scheme, source v1.EventSource) EventRecorderLogger

	// Shutdown shuts down the broadcaster. Once the broadcaster is shut
	// down, it will only try to record an event in a sink once before
	// giving up on it with an error message.
	Shutdown()
}
1.3- NewRecorder 接口的实现
  • Recorder 封装了 Broadcaster
// 路径 mod/k8s.io/client-go@v0.29.0/tools/record/event.go

// NewRecorder returns an EventRecorder that records events with the given event source.
func (e *eventBroadcasterImpl) NewRecorder(scheme *runtime.Scheme, source v1.EventSource) EventRecorderLogger {
	return &recorderImplLogger{recorderImpl: &recorderImpl{scheme, source, e.Broadcaster, clock.RealClock{}}, logger: klog.Background()}
}

type recorderImplLogger struct {
	*recorderImpl
	logger klog.Logger
}

type recorderImpl struct {
	scheme *runtime.Scheme
	source v1.EventSource
	*watch.Broadcaster
	clock clock.PassiveClock
}
1.3- loop(event的分发)
// // 路径 /mod/k8s.io/apimachinery@v0.29.0/pkg/watch/mux.go
// loop receives from m.incoming and distributes to all watchers.
func (m *Broadcaster) loop() {
	// Deliberately not catching crashes here. Yes, bring down the process if there's a
	// bug in watch.Broadcaster.
	for event := range m.incoming {
		if event.Type == internalRunFunctionMarker {
			event.Object.(functionFakeRuntimeObject)()
			continue
		}
		m.distribute(event)  // 将 event 分发给 watcher
	}
	m.closeAll()
	m.distributing.Done()
}

// distribute sends event to all watchers. Blocking.
func (m *Broadcaster) distribute(event Event) {
	if m.fullChannelBehavior == DropIfChannelFull {
		for _, w := range m.watchers {
			select {
			case w.result <- event: // 将 event 发送到 watcher 的 result channel,等待 watcher 进行处理
			case <-w.stopped:
			default: // Don't block if the event can't be queued.
			}
		}
	} else {
		for _, w := range m.watchers {
			select {
			case w.result <- event:
			case <-w.stopped:
			}
		}
	}
}
1.4 event 的产生
// 路径 mod/k8s.io/client-go@v0.29.0/tools/record/event.go

// EventRecorder knows how to record events on behalf of an EventSource.
type EventRecorder interface {
	// Event constructs an event from the given information and puts it in the queue for sending.
	// 'object' is the object this event is about. Event will make a reference-- or you may also
	// pass a reference to the object directly.
	// 'eventtype' of this event, and can be one of Normal, Warning. New types could be added in future
	// 'reason' is the reason this event is generated. 'reason' should be short and unique; it
	// should be in UpperCamelCase format (starting with a capital letter). "reason" will be used
	// to automate handling of events, so imagine people writing switch statements to handle them.
	// You want to make that easy.
	// 'message' is intended to be human readable.
	//
	// The resulting event will be created in the same namespace as the reference object.
	Event(object runtime.Object, eventtype, reason, message string)

	// Eventf is just like Event, but with Sprintf for the message field.
	Eventf(object runtime.Object, eventtype, reason, messageFmt string, args ...interface{})

	// AnnotatedEventf is just like eventf, but with annotations attached
	AnnotatedEventf(object runtime.Object, annotations map[string]string, eventtype, reason, messageFmt string, args ...interface{})
}

func (recorder *recorderImpl) Event(object runtime.Object, eventtype, reason, message string) {
	recorder.generateEvent(klog.Background(), object, nil, eventtype, reason, message)
}

func (recorder *recorderImpl) generateEvent(logger klog.Logger, object runtime.Object, annotations map[string]string, eventtype, reason, message string) {
	ref, err := ref.GetReference(recorder.scheme, object)
	if err != nil {
		logger.Error(err, "Could not construct reference, will not report event", "object", object, "eventType", eventtype, "reason", reason, "message", message)
		return
	}

	if !util.ValidateEventType(eventtype) {
		logger.Error(nil, "Unsupported event type", "eventType", eventtype)
		return
	}

	event := recorder.makeEvent(ref, annotations, eventtype, reason, message)
	event.Source = recorder.source

	event.ReportingInstance = recorder.source.Host
	event.ReportingController = recorder.source.Component

	// NOTE: events should be a non-blocking operation, but we also need to not
	// put this in a goroutine, otherwise we'll race to write to a closed channel
	// when we go to shut down this broadcaster.  Just drop events if we get overloaded,
	// and log an error if that happens (we've configured the broadcaster to drop
	// outgoing events anyway).
	sent, err := recorder.ActionOrDrop(watch.Added, event)
	if err != nil {
		logger.Error(err, "Unable to record event (will not retry!)")
		return
	}
	if !sent {
		logger.Error(nil, "Unable to record event: too many queued events, dropped event", "event", event)
	}
}

// Action distributes the given event among all watchers, or drops it on the floor
// if too many incoming actions are queued up.  Returns true if the action was sent,
// false if dropped.
func (m *Broadcaster) ActionOrDrop(action EventType, obj runtime.Object) (bool, error) {
	m.incomingBlock.Lock()
	defer m.incomingBlock.Unlock()

	// Ensure that if the broadcaster is stopped we do not send events to it.
	select {
	case <-m.stopped:
		return false, fmt.Errorf("broadcaster already stopped")
	default:
	}

	select {
	case m.incoming <- Event{action, obj}:
		return true, nil
	default:
		return false, nil
	}
}
1.5 watcher 的处理
eventBroadcaster.StartLogging(klog.Infof)


// StartLogging starts sending events received from this EventBroadcaster to the given logging function.
// The return value can be ignored or used to stop recording, if desired.
func (e *eventBroadcasterImpl) StartLogging(logf func(format string, args ...interface{})) watch.Interface {
	return e.StartEventWatcher(
		func(e *v1.Event) {  // 对应 下面 eventHandler
			logf("Event(%#v): type: '%v' reason: '%v' %v", e.InvolvedObject, e.Type, e.Reason, e.Message)
		})
}

// StartEventWatcher starts sending events received from this EventBroadcaster to the given event handler function.
// The return value can be ignored or used to stop recording, if desired.
func (e *eventBroadcasterImpl) StartEventWatcher(eventHandler func(*v1.Event)) watch.Interface {
	watcher, err := e.Watch()
	if err != nil {
		klog.FromContext(e.cancelationCtx).Error(err, "Unable start event watcher (will not retry!)")
	}
	go func() {  // 直接运行了
		defer utilruntime.HandleCrash()
		for {
			select {
			case <-e.cancelationCtx.Done():
				watcher.Stop()
				return
			case watchEvent := <-watcher.ResultChan():  // 从 watcher result channel 中取出 event 
				event, ok := watchEvent.Object.(*v1.Event)
				if !ok {
					// This is all local, so there's no reason this should
					// ever happen.
					continue
				}
				eventHandler(event) // 对 event 进行处理 
			}
		}
	}()
	return watcher
}



附录1 | 示例详解

以下是一个完整的 EventRecorderEventBroadcaster 实例化的代码示例,展示如何在 Kubernetes 控制器中记录事件。代码包含详细注释,适合用于实际开发或学习:


代码示例

package main

import (
	"context"
	"fmt"
	"time"

	corev1 "k8s.io/api/core/v1"
	"k8s.io/apimachinery/pkg/runtime"
	metav1 "k8s.io/apimachinery/pkg/runtime/schema"
	"k8s.io/client-go/kubernetes"
	"k8s.io/client-go/kubernetes/fake"
	"k8s.io/client-go/tools/record"
	"k8s.io/client-go/util/workqueue"
	"k8s.io/klog/v2"
)

// 主要逻辑的入口
func main() {
	// 1. 创建 Kubernetes 客户端
	// 这里我们使用 fake 客户端进行演示,生产环境应替换为真实的 `kubernetes.Clientset`
	clientset := fake.NewSimpleClientset()

	// 2. 创建事件广播器(EventBroadcaster)
	// 事件广播器是事件处理的核心,负责分发事件到日志和 API Server
	eventBroadcaster := record.NewBroadcaster()

	// 3. 启动日志记录功能
	// 通过 `klog.Infof` 输出事件到控制台或日志文件
	eventBroadcaster.StartLogging(klog.Infof)

	// 4. 启动事件的 API Server 记录功能
	// 配置事件接收器,将事件发送到 API Server,通过 `kubeClient.CoreV1().Events` 接口记录事件
	eventBroadcaster.StartRecordingToSink(&corev1.EventSinkImpl{
		Interface: clientset.CoreV1().Events(""),
	})

	// 5. 创建事件记录器(EventRecorder)
	// EventRecorder 用于开发者实际记录事件
	recorder := eventBroadcaster.NewRecorder(
		scheme(), // 提供资源的模式信息,常用的是 `runtime.NewScheme()` 或自定义的 scheme
		corev1.EventSource{Component: "example-controller"},
	)

	// 6. 模拟一个 Kubernetes 对象(如 Pod)的引用
	// 事件通常需要与具体的 Kubernetes 资源关联
	objRef := &corev1.ObjectReference{
		Kind:       "Pod",                  // 资源类型
		Namespace:  "default",              // 命名空间
		Name:       "example-pod",          // 资源名称
		UID:        "12345-abcde-67890",    // 唯一标识符
		APIVersion: "v1",                   // API 版本
	}

	// 7. 使用 EventRecorder 记录事件
	// 记录一个正常类型的事件(EventTypeNormal)
	recorder.Eventf(objRef, corev1.EventTypeNormal, "PodCreated", "Successfully created Pod %s", objRef.Name)

	// 模拟一个警告事件(EventTypeWarning)
	recorder.Eventf(objRef, corev1.EventTypeWarning, "PodFailed", "Failed to create Pod %s due to insufficient resources", objRef.Name)

	// 模拟一个控制器逻辑的操作
	processQueue(recorder, objRef)

	// 等待事件记录完成
	time.Sleep(2 * time.Second)
}

// 模拟处理队列的函数
func processQueue(recorder record.EventRecorder, objRef *corev1.ObjectReference) {
	// 创建一个工作队列
	queue := workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter())

	// 将任务添加到队列
	queue.Add("task1")

	// 模拟处理队列中的任务
	for queue.Len() > 0 {
		item, _ := queue.Get()
		defer queue.Done(item)

		// 记录一个事件,表明任务已处理
		recorder.Eventf(objRef, corev1.EventTypeNormal, "TaskProcessed", "Successfully processed task: %v", item)
		fmt.Printf("Processed task: %v\n", item)
	}
}

// 创建 Scheme 用于事件记录器
// Scheme 是 Kubernetes 中资源的模式定义,用于确定资源类型和序列化方式
func scheme() *runtime.Scheme {
	s := runtime.NewScheme()

	// 添加 CoreV1 资源到 Scheme 中
	corev1.AddToScheme(s)
	metav1.AddToGroupVersion(s, schema.GroupVersion{Version: "v1"})
	return s
}

代码详解

1. 创建 EventBroadcaster
  • record.NewBroadcaster(): 创建事件广播器,用于记录和分发事件。
  • StartLogging(klog.Infof): 将事件以日志的形式输出。
  • StartRecordingToSink: 将事件发送到 Kubernetes API Server,存储为 Event 资源。
2. 创建 EventRecorder
  • NewRecorder(scheme, source)从广播器中创建事件记录器。
    • scheme: 用于验证和序列化资源。
    • source: 指定事件的来源(如 example-controller)。
3. 创建对象引用
  • ObjectReference: 用于标识事件关联的 Kubernetes 资源。包括类型、名称、命名空间、UID 等信息。
4. 记录事件
  • Eventf
    用于记录事件,包括:
    • 类型corev1.EventTypeNormal(正常)或 corev1.EventTypeWarning(警告)。
    • 原因:事件发生的原因(如 PodCreated)。
    • 消息:事件的详细描述。
5. 模拟队列任务
  • 使用 workqueue 模拟处理任务,记录任务完成时的事件。

运行结果

日志输出

事件将输出到日志(通过 klog):

I1119 12:34:56.123456   12345 example.go:52] Event(v1.ObjectReference{...}): type: 'Normal' reason: 'PodCreated' message: 'Successfully created Pod example-pod'
I1119 12:34:56.123457   12345 example.go:53] Event(v1.ObjectReference{...}): type: 'Warning' reason: 'PodFailed' message: 'Failed to create Pod example-pod due to insufficient resources'
Processed task: task1
事件存储

如果使用真实客户端,事件会存储在集群中,可通过 kubectl 查看:

kubectl get events
事件输出
LAST SEEN   TYPE      REASON         OBJECT            MESSAGE
5s          Normal    PodCreated     Pod/example-pod   Successfully created Pod example-pod
5s          Warning   PodFailed      Pod/example-pod   Failed to create Pod example-pod due to insufficient resources
5s          Normal    TaskProcessed  Pod/example-pod   Successfully processed task: task1

代码用途

  • 日志记录和事件管理: 帮助开发者跟踪控制器的运行状态和资源变更。
  • 任务队列处理: 将业务逻辑与事件机制结合,记录每个关键操作的状态。

以上代码展示了如何使用 EventRecorderEventBroadcaster 实现 Kubernetes 控制器中的事件管理,适合用于开发自定义控制器或调试集群事件处理逻辑。

附录2 | stoped 变量的作用

NewBroadcaster 函数中的 stopped 通道用于实现对 Broadcaster 对象的停止和关闭控制。具体来说,它的作用是在广播器的生命周期中进行信号传递,用于通知 Broadcaster 是否已经停止运行。

详细分析:

1. 通道的定义:
stopped: make(chan struct{}),

stopped 是一个无缓冲的通道,类型为 struct{}。无缓冲的通道用于信号传递,表示某个事件的发生,而不需要传递具体数据。这里的 struct{} 是一个空结构体,占用零内存,因此不会传递任何实际数据。

2. 停止广播器的作用:

stopped 通道用于在广播器停止时传递一个信号。通常这种信号用于通知相关的 goroutine 或者处理流程,广播器已经停止工作,可以做一些清理操作或者退出。

3. 与 go m.loop() 的配合:
go m.loop()

NewBroadcaster 中,启动了一个新的 goroutine 来执行 m.loop()。这个 loop 方法通常是处理传入事件并进行分发的核心逻辑。loop 方法可能会定期检查 stopped 通道,判断是否已经收到停止信号。

4. 典型的停止逻辑(假设)
func (m *Broadcaster) loop() {
    for {
        select {
        case event := <-m.incoming:
            // 处理事件逻辑
        case <-m.stopped:
            // 收到停止信号后,退出 goroutine
            return
        }
    }
}

在这个假设的 loop 实现中,select 语句等待从 m.incoming 通道接收事件,或者等待 stopped 通道的信号。当收到 stopped 通道的信号时,loop 方法退出,从而停止事件的分发。

5. 停止广播器的触发:

在实际代码中,某个外部操作可能会通过 close(m.stopped) 或者发送一个信号到 m.stopped 通道,来通知 Broadcaster 停止工作。比如在处理完所有事件或者发生错误时,可能会调用停止操作:

close(m.stopped)

或者

m.stopped <- struct{}{}

这样 loop 就会检测到停止信号并退出。


总结

stopped 通道在 Broadcaster 中的作用是提供一个停止信号,通知正在运行的 goroutine(如 loop 方法)停止执行。这种设计使得 Broadcaster 能够优雅地处理停止操作,确保所有 goroutine 都能够适时退出并清理资源。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/927603.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

STM32 ADC --- 知识点总结

STM32 ADC — 知识点总结 文章目录 STM32 ADC --- 知识点总结cubeMX中配置注解单次转换模式、连续转换模式、扫描模式单通道采样的情况单次转换模式&#xff1a;连续转换模式&#xff1a; 多通道采样的情况禁止扫描模式&#xff08;单次转换模式或连续转换模式&#xff09;单次…

如何打开链接中的网址

文章目录 1 概念介绍2 使用方法3 示例代码我们在上一章回中介绍了包管理相关的内容,本章回中将介绍如何使用url_launcher包.闲话休提,让我们一起Talk Flutter吧。 1 概念介绍 我们在这里介绍url_launcher包主要用来打开Url中的内容,Url可以是电话号码,网址,邮箱等内容。如…

cpp的set

一、关联式容器和键值对 1.关联式容器 关联式容器也是用来存储数据的&#xff0c;与序列式容器不同的是&#xff0c;其里面存储的是<key, value>结构的键值对&#xff0c;在数据检索时比序列式容器效率更高 2.键值对 用来表示具有一一对应关系的一种结构&#xff0c;…

Vue3学习宝典

1.ref函数调用的方式生成响应式数据&#xff0c;可以传复杂和简单数据类型 <script setup> // reactive接收一个对象类型的数据 import { reactive } from vue;// ref用函数调用的方式生成响应式数据&#xff0c;可以传复杂和简单数据类型 import { ref } from vue // 简…

数据结构——排序第三幕(深究快排(非递归实现)、快排的优化、内省排序,排序总结)超详细!!!!

文章目录 前言一、非递归实现快排二、快排的优化版本三、内省排序四、排序算法复杂度以及稳定性的分析总结 前言 继上一篇博客基于递归的方式学习了快速排序和归并排序 今天我们来深究快速排序&#xff0c;使用栈的数据结构非递归实现快排&#xff0c;优化快排&#xff08;三路…

数字经济发展的新视角:数字产业化、数据资产化、产业数字化与数据产业

在当今数字化、网络化和智能化的时代&#xff0c;数字经济的快速发展催生了一系列新兴概念&#xff0c;其中“数字产业化、数据资产化、产业数字化与数据产业”尤为引人注目。这四个概念不仅代表了数字经济发展的不同阶段和方向&#xff0c;也深刻影响着传统产业的转型升级和经…

springboot370高校宣讲会管理系统(论文+源码)_kaic

毕 业 设 计&#xff08;论 文&#xff09; 高校宣讲会管理系统设计与实现 摘 要 传统办法管理信息首先需要花费的时间比较多&#xff0c;其次数据出错率比较高&#xff0c;而且对错误的数据进行更改也比较困难&#xff0c;最后&#xff0c;检索数据费事费力。因此&#xff0c…

ansible自动化运维(一)配置主机清单

目录 一、介绍 1.1了解自动化运维 1.2 ansible简介 1.3 ansible自动化运维的优势 1.4 ansible架构图 二、部署ansible 2.1 基本参数 2.2 Ansible帮助命令 2.3 配置主机清单 2.3.1 查看ansible的所有配置文件 2.3.2 /etc/ansible/ansible.cfg常用配置选项 2.3.3 ssh密…

计算机网络 —— HTTP 协议(详解)

前一篇文章&#xff1a;网页版五子棋—— WebSocket 协议_网页可以实现websocket吗-CSDN博客 目录 前言 一、HTTP 协议简介 二、HTTP 协议格式 1.抓包工具的使用 2.抓包工具的原理 3.抓包结果 4.HTTP协议格式总结 三、HTTP 请求 1. URL &#xff08;1&#xff09;UR…

GaussDB的BTree索引和UBTree索引

目录 一、简介 二、BTree索引和UBTree索引结构 三、BTree索引和UBTree索引优势 四、总结与展望 一、简介 数据库通常使用索引来提高业务查询的速度。本文将深入介绍GaussDB中最常用的两种索引&#xff1a;BTree索引和UBTree索引。我们将重点解读BTree索引和UBTree索引的存储…

通义灵码走进北京大学创新课堂丨阿里云云原生 10 月产品月报

云原生月度动态 云原生是企业数字创新的最短路径。 《阿里云云原生每月动态》&#xff0c;从趋势热点、产品新功能、服务客户、开源与开发者动态等方面&#xff0c;为企业提供数字化的路径与指南。 趋势热点 &#x1f947; 通义灵码走进北京大学创新课堂&#xff0c;与 400…

python 练习题

目录 1&#xff0c;输入三个整数&#xff0c;按升序输出 2&#xff0c;输入年份及1-12月份&#xff0c;判断月份属于大月&#xff0c;小月&#xff0c;闰月&#xff0c;平月&#xff0c;并输出本月天数 3&#xff0c;输入一个整数&#xff0c;显示其所有是素数因子 4&#…

IDEA 2024 配置Maven

Step 1:确定下载Apache Maven版本 在IDEA 2024中&#xff0c;随便新建一个Maven项目&#xff1b; 在File下拉菜单栏中&#xff0c;找到Setings&#xff1b; 在Build&#xff0c;Execution&#xff0c;Deployment中找到Maven 确定下载的Apache Maven版本应略低于或等于IDEA绑…

ubuntu20.04更换安装高版本CUDA以及多个CUDA版本管理

Ubuntu 20.04下多版本CUDA的安装与切换 CUDA安装配置环境变量软连接附上参考博客CUDA安装 cuda官方下载地址 因为我需要安装的是11.1版本的,所以这里按着11.1举例安装 安装命令如下: wget https://developer.download.nvidia.com/compute/cuda/11.1.0/local_installers/cu…

Web基础

实践目标 &#xff08;1&#xff09;Web前端HTML&#xff08;2&#xff09;Web前端javascipt&#xff08;3&#xff09;Web后端&#xff1a;MySQL基础&#xff1a;正常安装、启动MySQL&#xff0c;建库、创建用户、修改密码、建表&#xff08;4&#xff09;Web后端&#xff1a…

C++:unordered_map与unordered_set详解

文章目录 前言一、KeyOfT1. 为什么需要仿函数&#xff1f;2. MapKeyOfT与SetKeyOfT代码实现 二、迭代器1. 设计背景2. 为什么需要存储哈希表指针3. operator 的逻辑4. begin() 和 end() 的实现5. 友元和前置声明的作用6. 完整代码 三、迭代器map与set的复用1. map的复用&#x…

redis下载、基础数据类型、操作讲解说明,持久化、springboot整合等

1 Redis是什么 官网&#xff1a;https://redis.io 开发者&#xff1a;Antirez Redis诞生于2009年全称是Remote Dictionary Server 远程词典服务器&#xff0c;是一个基于内存的键值型NoSQL数据库。 Redis是一个开源的、高性能的键值对存储系统&#xff0c;它支持多种数据结构&…

《 C++ 修炼全景指南:二十五 》缓存系统的技术奥秘:LRU 原理、代码实现与未来趋势

摘要 本篇博客深入解析了 LRU&#xff08;Least Recently Used&#xff09;缓存机制&#xff0c;包括其核心原理、代码实现、优化策略和实际应用等方面。通过结合双向链表与哈希表&#xff0c;LRU 缓存实现了高效的数据插入、查找与删除操作。文章还对 LRU 的优化方案进行了详…

【k8s】给ServiceAccount 创建关联的 Secrets

说明 k8s v1.24.0 更新之后进行创建 ServiceAccount 不会自动生成 Secret 需要对其手动创建. 创建步骤 创建SA apiVersion: rbac.authorization.k8s.io/v1 kind: Role metadata:namespace: jtkjdevname: gitcicd-role rules: - apiGroups: ["apps"]resources: [&…

C++(4个类型转换)

1. C语言中的类型转换 1. 隐式 类型转换&#xff1a; 具有相近的类型才能进行互相转换&#xff0c;如&#xff1a;int,char,double都表示数值。 2. 强制类型转换&#xff1a;能隐式类型转换就能强制类型转换&#xff0c;隐式类型之间的转换类型强相关&#xff0c;强制类型转换…