client-go中watch机制的一些陷阱

Reference

  • https://stackoverflow.com/questions/51399407/watch-in-k8s-golang-api-watches-and-get-events-but-after-sometime-doesnt-get-an

问题描述

最近在使用 client-go 的 watch 机制监听 k8s 中的 deployment 资源时,发现一个奇怪的现象

先看下代码:

  • 服务启动时调用 watchDeployment 新建一个 watcher 监听对应的资源
  • for 循环,select 处理 watcher.ResultChan 返回的事件
func WatchDeployment(ctx context.Context, namespace string, options metav1.ListOptions, handler EventHandler) error {
	watcher, err := KubeCli.AppsV1().Deployments(namespace).Watch(ctx, options)
	if err != nil {
		log.Errorf("watching deployments err: %+v", err)
		return err
	}

	defer watcher.Stop()

	// 处理事件
	for {
		select {
		case event, ok := <-watcher.ResultChan():
			if !ok {
				log.Errorf("Watcher channel closed")
				return nil
			}

			deployment, ok := event.Object.(*appsv1.Deployment)
			if !ok {
				log.Errorf("Error casting to Deployment")
				continue
			}

			switch event.Type {
			case watch.Added:
				if handler.OnAdd != nil {
					handler.OnAdd(ctx, deployment)
				}
			case watch.Modified:
				if handler.OnModify != nil {
					handler.OnModify(ctx, deployment)
				}
			case watch.Deleted:
				if handler.OnDelete != nil {
					handler.OnDelete(ctx, deployment)
				}
			}
		}
	}
}

在运行了一段时间后,watch 监听的通道会自动关闭,日志:“ERROR [trace-] Watcher channel closed”

检视完代码,唯一存在问题的就是 watcher.ResultCha( ) 如果出问题,则会直接 return 导致 for 循环退出了,所以我改了第二版的代码,将 return 替换为了 continue

if !ok {
    log.Errorf("Watcher channel closed")
    continue
}

再运行一段时间,日志疯狂报错 “ERROR [trace-] Watcher channel closed” ;

疑问:为什么错误已经continue了,为什么无法再继续监听了?

排查过程

具体debug看下 watch 机制的源码,只展示重要流程代码,细节忽略:

// watch 其实是可以设置 timeout 时间,具体用在哪,继续往下看看
func (c *deployments) Watch(ctx context.Context, opts metav1.ListOptions) (watch.Interface, error) {
	var timeout time.Duration
	if opts.TimeoutSeconds != nil {
		timeout = time.Duration(*opts.TimeoutSeconds) * time.Second
	}
	opts.Watch = true
	return c.client.Get().
		Namespace(c.ns).
		Resource("deployments").
		VersionedParams(&opts, scheme.ParameterCodec).
		Timeout(timeout).
		Watch(ctx)
}

// Timeout makes the request use the given duration as an overall timeout for the
// request. Additionally, if set passes the value as "timeout" parameter in URL.
// 这里就将timeout 设置为了 rquest 请求的超时时间
func (r *Request) Timeout(d time.Duration) *Request {
	if r.err != nil {
		return r
	}
	r.timeout = d
	return r
}

从 watch 可以看到,client-go 提供的 watch 方法,就是使用 net/http 发起一个 http 请求 https://10.96.0.1:443/apis/apps/v1/namespaces/xxx/deployments?fieldSelector=metadata.name%3Dabc&watch=true,并启用 watch 机制,成功后则返回一个 实现了 watch.Interface 这个接口的 StreamWatcher 的结构体

// Watch attempts to begin watching the requested location.
// Returns a watch.Interface, or an error.
func (r *Request) Watch(ctx context.Context) (watch.Interface, error) {
	// We specifically don't want to rate limit watches, so we
	// don't use r.rateLimiter here.
	if r.err != nil {
		return nil, r.err
	}

	client := r.c.Client
	if client == nil {
		client = http.DefaultClient
	}

	isErrRetryableFunc := func(request *http.Request, err error) bool {
		// The watch stream mechanism handles many common partial data errors, so closed
		// connections can be retried in many cases.
		if net.IsProbableEOF(err) || net.IsTimeout(err) {
			return true
		}
		return false
	}
	retry := r.retryFn(r.maxRetries)
	url := r.URL().String()
	for {
		if err := retry.Before(ctx, r); err != nil {
			return nil, retry.WrapPreviousError(err)
		}

		req, err := r.newHTTPRequest(ctx)
		if err != nil {
			return nil, err
		}

		resp, err := client.Do(req)
		updateURLMetrics(ctx, r, resp, err)
		retry.After(ctx, r, resp, err)
		if err == nil && resp.StatusCode == http.StatusOK {
			return r.newStreamWatcher(resp)
		}

		// 重试机制...
	}
}

StreamWatcher 就是启了一个协程接受 wacth 中的事件变化,进行处理

func (r *Request) newStreamWatcher(resp *http.Response) (watch.Interface, error) {
	contentType := resp.Header.Get("Content-Type")
	mediaType, params, err := mime.ParseMediaType(contentType)
	if err != nil {
		klog.V(4).Infof("Unexpected content type from the server: %q: %v", contentType, err)
	}
	objectDecoder, streamingSerializer, framer, err := r.c.content.Negotiator.StreamDecoder(mediaType, params)
	if err != nil {
		return nil, err
	}

	handleWarnings(resp.Header, r.warningHandler)

	frameReader := framer.NewFrameReader(resp.Body)
	watchEventDecoder := streaming.NewDecoder(frameReader, streamingSerializer)

	return watch.NewStreamWatcher(
		restclientwatch.NewDecoder(watchEventDecoder, objectDecoder),
		// use 500 to indicate that the cause of the error is unknown - other error codes
		// are more specific to HTTP interactions, and set a reason
		errors.NewClientErrorReporter(http.StatusInternalServerError, r.verb, "ClientWatchDecoding"),
	), nil
}
// NewStreamWatcher creates a StreamWatcher from the given decoder.
func NewStreamWatcher(d Decoder, r Reporter) *StreamWatcher {
	sw := &StreamWatcher{
		source:   d,
		reporter: r,
		// It's easy for a consumer to add buffering via an extra
		// goroutine/channel, but impossible for them to remove it,
		// so nonbuffered is better.
		result: make(chan Event),
		// If the watcher is externally stopped there is no receiver anymore
		// and the send operations on the result channel, especially the
		// error reporting might block forever.
		// Therefore a dedicated stop channel is used to resolve this blocking.
		done: make(chan struct{}),
	}
	go sw.receive()
	return sw
}

// StreamWatcher turns any stream for which you can write a Decoder interface
// into a watch.Interface.
type StreamWatcher struct {
	sync.Mutex
	source   Decoder
	reporter Reporter
	result   chan Event
	done     chan struct{}
}

// Interface can be implemented by anything that knows how to watch and report changes.
type Interface interface {
	// Stop stops watching. Will close the channel returned by ResultChan(). Releases
	// any resources used by the watch.
	Stop()

	// ResultChan returns a chan which will receive all the events. If an error occurs
	// or Stop() is called, the implementation will close this channel and
	// release any resources used by the watch.
	ResultChan() <-chan Event
}

看下具体是怎么进行处理的

  1. 从 source 中解码得到k8s中监听到的事件变化的action(动作)
  2. 将结果写入 result 这个 channel 中
  3. result 这个channel 就是我们最开始 watch.ResultChan 函数的返回结果
// receive reads result from the decoder in a loop and sends down the result channel.
func (sw *StreamWatcher) receive() {
	defer utilruntime.HandleCrash()
	defer close(sw.result)
	defer sw.Stop()
	for {
		action, obj, err := sw.source.Decode()
		if err != nil {
			switch err {
			case io.EOF:
				// watch closed normally
			case io.ErrUnexpectedEOF:
				klog.V(1).Infof("Unexpected EOF during watch stream event decoding: %v", err)
			default:
				if net.IsProbableEOF(err) || net.IsTimeout(err) {
					klog.V(5).Infof("Unable to decode an event from the watch stream: %v", err)
				} else {
					select {
					case <-sw.done:
					case sw.result <- Event{
						Type:   Error,
						Object: sw.reporter.AsObject(fmt.Errorf("unable to decode an event from the watch stream: %v", err)),
					}:
					}
				}
			}
			return
		}
		select {
		case <-sw.done:
			return
		case sw.result <- Event{
			Type:   action,
			Object: obj,
		}:
		}
	}
}

// ResultChan implements Interface.
func (sw *StreamWatcher) ResultChan() <-chan Event {
	return sw.result
}

回顾一下我们接受channel的写法,我们拿到channel后,从里面读取数据,会根据bool值来判断channel是否已经关闭,关闭则不处理;

ch := watcher.ResultChan()
event, ok := <-ch:

那为什么channel会关闭呢,猜测一下?

  • 客户端超时断开了?但是我们没设置timeout,则默认为0,就是无限制时间,不会主动断开
  • 服务端主动断开了?有可能

在watcher建立后,我们通过 lsof -p 查看对应进程打开的连接,可以看到与 k8s 建立的 https 的连接,就是对应的 watcher 发起的http请求建立的 tcp 长链接;net/http 发起的 http 请求,是使用了 transport 连接池进行管理的,所以会默认维持长链接,感兴趣可以看下这篇文章 [[net-http-transport]]

xxx-75df5b458c-hj6qr:45006->kubernetes.default.svc.cluster.local:https (ESTABLISHED)

知道了对端域名和端口后,就能通过 tcpkill -i eth0 host kubernetes.default.svc.cluster.local and port 443 命令,来手动中断这个连接,看下 streamWatch 是怎么处理的;

在 streamWatcher 的 receive 函数 select 中打断点调试,最后发现是在 net.IsProbableEOF 函数中命中了 “connection reset by peer”

// IsProbableEOF returns true if the given error resembles a connection termination
// scenario that would justify assuming that the watch is empty.
// These errors are what the Go http stack returns back to us which are general
// connection closure errors (strongly correlated) and callers that need to
// differentiate probable errors in connection behavior between normal "this is
// disconnected" should use the method.
func IsProbableEOF(err error) bool {
	if err == nil {
		return false
	}
	var uerr *url.Error
	if errors.As(err, &uerr) {
		err = uerr.Err
	}
	msg := err.Error()
	switch {
	case err == io.EOF:
		return true
	case err == io.ErrUnexpectedEOF:
		return true
	case msg == "http: can't write HTTP request on broken connection":
		return true
	case strings.Contains(msg, "http2: server sent GOAWAY and closed the connection"):
		return true
	case strings.Contains(msg, "connection reset by peer"):
		return true
	case strings.Contains(strings.ToLower(msg), "use of closed network connection"):
		return true
	}
	return false
}

并且我们将日志级别设置为0,就能直接打印对应的infof日志:

I1226 09:38:19.316831 16623 streamwatcher.go:114] Unable to decode an event from the watch stream: read tcp 10.244.1.96:33006->10.96.0.1:443: read: connection reset by peer

ok,连接被对端关闭了,然后按照代码逻,就会直接return,在返回之前,会执行 defer 进行一些操作,receive 在方法开始就定义了 defer 资源回收

  • 明确声明了会关闭 sw.result 这个channel
  • stop 中则是将 source 这个 streamDecoder 关闭,最后调用到 http.transportResponseBody 进行关闭,这也是 net-http 源码 transport 的设计,不过k8s-apiserver 貌似用的http2的协议;
defer close(sw.result)
defer sw.Stop()
// Stop implements Interface.
func (sw *StreamWatcher) Stop() {
	// Call Close() exactly once by locking and setting a flag.
	sw.Lock()
	defer sw.Unlock()
	// closing a closed channel always panics, therefore check before closing
	select {
	case <-sw.done:
	default:
		close(sw.done)
		sw.source.Close()
	}
}

// 最终的close函数,会把未读的数据都flush出来再关闭
func (b transportResponseBody) Close() error {
	cs := b.cs
	cc := cs.cc

	cs.bufPipe.BreakWithError(errClosedResponseBody)
	cs.abortStream(errClosedResponseBody)

	unread := cs.bufPipe.Len()
	if unread > 0 {
		cc.mu.Lock()
		// Return connection-level flow control.
		connAdd := cc.inflow.add(unread)
		cc.mu.Unlock()

		// TODO(dneil): Acquiring this mutex can block indefinitely.
		// Move flow control return to a goroutine?
		cc.wmu.Lock()
		// Return connection-level flow control.
		if connAdd > 0 {
			cc.fr.WriteWindowUpdate(0, uint32(connAdd))
		}
		cc.bw.Flush()
		cc.wmu.Unlock()
	}

	select {
	case <-cs.donec:
	case <-cs.ctx.Done():
		// See golang/go#49366: The net/http package can cancel the
		// request context after the response body is fully read.
		// Don't treat this as an error.
		return nil
	case <-cs.reqCancel:
		return errRequestCanceled
	}
	return nil
}

再梳理一下整个流程:

  • 我们通过client-go提供的方法创建一个watcher,监听对应的资源
  • watcher 会先向 kube-apiserver 发起一个 http 请求,告知 apiserver 启用 watch 机制监听某类型的资源
  • 服务与apiserver建立了连接后,就通过FD进行读写传输
  • 最终变更的事件,是通过 channel 与我们的服务进行通信
  • 当apiserver关闭了连接,streamwatcher就会return并进行资源回收,从而关闭 channel

问题原因

  • apiserver 主动关闭了 TCP 连接,客户端 streamWatcher 将channel回收关闭了,所以,我们通过 watcher.ResultChan 获取到的 channel 永远都是关闭的
  • apiserver 主动关闭连接有几个可能原因
    • 监听的资源被删除了,尝试了手动删除,发现watcher还是存在不会关闭
    • 长时间没有事件变更,TCP连接会自动断开(大概在30min左右)(事实证明就是这个原因)
    • 其他xxx

解决办法

  • 如果发现 channel 被关闭了,则重新建立一个 watcher 进行监听即可

改进后的代码:

func WatchDeployment(ctx context.Context, namespace string, options metav1.ListOptions, handler EventHandler) {
	log.Infof("start watch deployment: %+v", options)
	for {
		func() {
			defer func() {
				if r := recover(); r != nil {
					log.Warnf("The Kubernetes deployment watcher is attempting to restart for recovery. err: %v", r)
				}
			}()
			if err := runLoop(ctx, namespace, options, handler); err != nil {
				log.Errorf("Kubernetes deployment watcher has exited in runLoop: %v", err)
			}
		}()
		time.Sleep(5 * time.Second) // 等待一段时间后重试
	}
}

func runLoop(ctx context.Context, namespace string, options metav1.ListOptions, handler EventHandler) error {
	watcher, err := KubeCli.AppsV1().Deployments(namespace).Watch(ctx, options)
	if err != nil {
		return err
	}
	ch := watcher.ResultChan()

	for {
		select {
		case event, ok := <-ch:
			if !ok {
				// channel 关闭,重启 watcher
				log.Infof("Kubernetes hung up on us, restarting deployment watcher")
				return nil
			}

			deployment, ok := event.Object.(*appsv1.Deployment)
			if !ok {
				log.Errorf("Error casting to Deployment")
				continue
			}

			// 处理事件
			switch event.Type {
			case watch.Added:
				if handler.OnAdd != nil {
					handler.OnAdd(ctx, deployment)
				}
			case watch.Modified:
				if handler.OnModify != nil {
					handler.OnModify(ctx, deployment)
				}
			case watch.Deleted:
				if handler.OnDelete != nil {
					handler.OnDelete(ctx, deployment)
				}
			}
		case <-time.After(30 * time.Minute):
			// 超时,重启 watcher
			log.Infof("Timeout, restarting deployment watcher")
			return nil
		case <-ctx.Done():
			log.Info("Context done, stopping watch")
			return nil
		}
	}
}

其他疑问

1、为什么发起一个 http 请求,apiserver 就能与这个请求建立连接,进行 watch 并增量通知,apiserver 是怎么实现的?

  • 推荐阅读:
    • https://cloud.tencent.com/developer/article/1991054
    • etcd教程(五)—watch机制原理分析

2、为什么 list-watch 机制不会每隔一段时间就关闭连接?(貌似有探活?)

3、StreamWatcher 中包装的 Decoder 是怎么与TCP连接的描述符关联上的,读写是怎么传输的?

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/950596.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

基于 GEE 提取白莲种植范围

目录 1 方法原理 1.1 步骤一 1.2 步骤二 1.3 步骤三 1.4 步骤四 2 完整代码 3 运行结果 近年来&#xff0c;随着乡村振兴战略的提出&#xff0c;我国的农业种植模式呈现出多元化的趋势。白莲具有易种植、经济效益高的特点&#xff0c;由此被广泛种植&#xff0c;本文介绍…

el-table 自定义表头颜色

第一种方法&#xff1a;计算属性 <template><div><el-table:data"formData.detail"border stripehighlight-current-row:cell-style"{ text-align: center }":header-cell-style"headerCellStyle"><el-table-column fixed…

c++类和对象---上

文章目录 类的介绍 类的声明 1.1 类名 1.2 成员变量 1.3 成员函数 1.4 访问权限 类的定义 2.1 成员变量的初始化 2.2 成员函数的实现 对象的创建和销毁 3.1 默认构造函数 3.2 析构函数 3.3 拷贝构造函数 3.4 对象的实例化 3.5 对象的销毁 成员访问控制 4.1 公有成员 4.2 私有…

上汽乘用车研发流程

目的 最近刚入职主机厂&#xff0c;工作中所提到各个阶段名称与之前在供应商那边不一致&#xff0c;概念有点模糊&#xff0c;所以打算学习了解一番 概念 术语 EP: enginerring prototype car 工程样车 Mule Car: 骡子车 Simulator Car&#xff1a;模拟样车 PPV&#xff1a;…

阿里云发现后门webshell,怎么处理,怎么解决?

当收到如下阿里云通知邮件时&#xff0c;大部分管理员都会心里一惊吧&#xff01;出现Webshell&#xff0c;大概是网站被入侵了。 尊敬的 xxxaliyun.com&#xff1a; 云盾云安全中心检测到您的服务器&#xff1a;47.108.x.xx&#xff08;xx机&#xff09;出现了紧急安全事件…

vite5.x配置https

旧版的vite直接在config里面配置https&#xff1a;true即可&#xff0c;新版的麻烦一些。 1.准备工作 需要安装openssl 下载地址&#xff1a;Win32/Win64 OpenSSL Installer for Windows - Shining Light Productions 找到合适的版本安装&#xff0c;配置好环境变量&#x…

深度学习与计算机视觉 (博士)

文章目录 零、计算机视觉概述一、深度学习相关概念1.学习率η2.batchsize和epoch3.端到端(End-to-End)、序列到序列(Seq-to-Seq)4.消融实验5.学习方式6.监督学习的方式(1)有监督学习(2)强监督学习(3)弱监督学习(4)半监督学习(5)自监督学习(6)无监督学习(7)总结&#xff1a;不同…

在AI浪潮中,RSS3为何会被低估其价值?有何潜力

​​RSS3 简介&#xff1a; RSS3 是一个去中心化网络索引和结构化开放信息&#xff0c;使其对于下一个 Twitter、Google 和 OpenAI 来说易于访问且有价值。凭借独特的数据子层价值子层设计&#xff0c; RSS3 网络推动了开放信息从索引到消费等的全生命周期&#xff0c;并建立了…

BMS应用软件开发 — 3 电池系统的组成

目录 1 电池的基本拓扑 2 已经被淘汰的CTM 3 早已经普及的CTP 4 集成度更高的CTC 5 刚性更好的CTB 1 电池的基本拓扑 相比于燃油车&#xff0c;虽然电动车在结构空间上灵活度更高&#xff0c;空间利用率也更好&#xff0c;但现有条件下无法像燃油车一样快速补能&#xff…

UE5 打包要点

------------------------- 1、需要环境 win sdk &#xff0c;大约3G VS&#xff0c;大约10G 不安装就无法打包&#xff0c;就是这么简单。 ----------------------- 2、打包设置 编译类型&#xff0c;开发、调试、发行 项目设置-地图和模式&#xff0c;默认地图 项目…

高等数学学习笔记 ☞ 一元函数微分的基础知识

1. 微分的定义 &#xff08;1&#xff09;定义&#xff1a;设函数在点的某领域内有定义&#xff0c;取附近的点&#xff0c;对应的函数值分别为和&#xff0c; 令&#xff0c;若可以表示成&#xff0c;则称函数在点是可微的。 【 若函数在点是可微的&#xff0c;则可以表达为】…

Redis查询缓存

什么是缓存&#xff1f; 缓存是一种提高数据访问效率的技术&#xff0c;通过在内存中存储数据的副本来减少对数据库或其他慢速存储设备的频繁访问。缓存通常用于存储热点数据或计算代价高的结果&#xff0c;以加快响应速度。 添加Redis缓存有什么好处&#xff1f; Redis 基…

3D立体无人机夜间表演技术详解

3D立体无人机夜间表演技术是一种结合了无人机技术、灯光艺术和计算机编程的创新表演形式。以下是该技术的详细解析&#xff1a; 一、技术基础 1. 无人机技术&#xff1a; 无人机通常采用四旋翼设计&#xff0c;具有强大的飞行控制能力&#xff0c;可以实现前飞、后飞、悬停、…

MATLAB深度学习实战文字识别

文章目录 前言视频演示效果1.DB文字定位环境配置安装教程与资源说明1.1 DB概述1.2 DB算法原理1.2.1 整体框架1.2.2 特征提取网络Resnet1.2.3 自适应阈值1.2.4 文字区域标注生成1.2.5 DB文字定位模型训练 2.CRNN文字识别2.1 CRNN概述2.2 CRNN原理2.2.1 CRNN网络架构实现2.2.2 CN…

H2数据库在单元测试中的应用

H2数据库特征 用比较简洁的话来介绍h2数据库&#xff0c;就是一款轻量级的内存数据库&#xff0c;支持标准的SQL语法和JDBC API&#xff0c;工业领域中&#xff0c;一般会使用h2来进行单元测试。 这里贴一下h2数据库的主要特征 Very fast database engineOpen sourceWritten…

Android 10.0 授权app获取cpu温度和电池温度功能实现

1.前言 在10.0的系统定制化开发中&#xff0c;在开发某些产品的老化应用的时候&#xff0c;需要app获取cpu温度和电池 温度等功能&#xff0c;有些产品带温度传感器&#xff0c;大部分的产品都不包含温度传感器&#xff0c;所以就需要读取 sys下的相关节点来获取相关温度值 2.…

IDEA 撤销 merge 操作(详解)

作为一个开发者&#xff0c;我们都知道Git是一个非常重要的版本控制工具&#xff0c;尤其是在协作开发的过程中。然而&#xff0c;在使用Git的过程中难免会踩一些坑&#xff0c;今天我来给大家分享一个我曾经遇到的问题&#xff1a;在使用IDEA中进行merge操作后如何撤销错误的合…

WD5105同步降压转换器:9.2V-95V宽电压输入,4.5A大电流输出,95%高效率,多重保护功能

概述 • WD5105同步降压转换器 • 封装形式&#xff1a;QFN-20封装 • 应用场景&#xff1a;适用于车载充电器、电动车仪表、电信基站电源、电源适配器等 性能特点 • 输入电压范围&#xff1a;9.2V至95V • 输出电流&#xff1a;可提供4.5A连续负载电流 • 效率&#xff1a;高…

【C++】B2108 图像模糊处理

博客主页&#xff1a; [小ᶻ☡꙳ᵃⁱᵍᶜ꙳] 本文专栏: C 文章目录 &#x1f4af;前言&#x1f4af;题目描述题目内容输入格式输出格式示例输入&#xff1a;输出&#xff1a; &#x1f4af;题目分析问题拆解 &#x1f4af;我的做法代码实现代码分析 &#x1f4af;老师的做法…

怎么把word试题转成excel?

在教育行业、学校管理以及在线学习平台中&#xff0c;试题库的高效管理是一项核心任务。许多教育工作者和系统开发人员常常面临将 Word 中的试题批量导入 Excel 的需求。本文将详细介绍如何快速将试题从 Word 转换为 Excel&#xff0c;帮助您轻松解决繁琐的数据整理问题&#x…