[云原生之旅] K8s-Portforward的另类用法, 立省两个端口

前言

此方法适用于Pod不需要大量连接的情况:

  • 有多个pod在执行任务, 偶尔需要连接其中一个pod查看进度/日志;
  • 对pod执行一个脚本/命令;

不适用于大量连接建立的情况:

  • pod启的数据库服务;
  • pod启的Api服务;
  • pod启的前端服务;
  • pod启的Oss服务;

Portforward简介

Portforward就是端口转发, 可以将本地机器的端口转发到 Kubernetes 集群中的Pod中, 主要是调试和临时访问场景,尤其是当你想要在不暴露服务的情况下访问 Pod 中的应用时; 比如:

  • 数据库服务本地连接
  • Api服务请求调试

主要命令格式:

kubectl port-forward <resource>/<pod-name> <local-port>:<remote-port>

支持PodService多端口转发, 比如:

kubectl port-forward pod/my-pod 9090:8080
kubectl port-forward pod/my-pod 9090:8080 7070:7777
kubectl port-forward svc/my-svc 9090:8080
kubectl port-forward svc/my-svc 9090:8080 7070:7777

需求背景

我们后台管理了多个集群, 每个集群都有海量的Pod任务, 需要提供SSH服务供用户连接到Pod;

有两种实现方式:

  • 使用Exec(不支持虚拟机)
  • Podforward

本篇主要讲Podforward;

源码解析

Podforward的实现方式主要是通过对HTTP请求进行连接升级, 支持多路流; 然后在本地打开监听端口, 接收TCP请求并创建新的流进行交互; 下面贴一下主要的流程代码:

ForwardPorts

Podforward的入口函数, 打开对Pod的流式连接, 准备进行端口转发;

func (pf *PortForwarder) ForwardPorts() error {
    defer pf.Close()

    var err error
    var protocol string
    pf.streamConn, protocol, err = pf.dialer.Dial(PortForwardProtocolV1Name)
    if err != nil {
       return fmt.Errorf("error upgrading connection: %s", err)
    }
    defer pf.streamConn.Close()
    if protocol != PortForwardProtocolV1Name {
       return fmt.Errorf("unable to negotiate protocol: client supports %q, server returned %q", PortForwardProtocolV1Name, protocol)
    }

    return pf.forward()
}

forward

forward获取端口映射参数, 开始监听指定的本地端口;

func (pf *PortForwarder) forward() error {
    var err error

    listenSuccess := false
    for i := range pf.ports {
       port := &pf.ports[i]
       err = pf.listenOnPort(port)
       switch {
       case err == nil:
          listenSuccess = true
       default:
          if pf.errOut != nil {
             fmt.Fprintf(pf.errOut, "Unable to listen on port %d: %v\n", port.Local, err)
          }
       }
    }
    
    ...

    return nil
}


func (pf *PortForwarder) getListener(protocol string, hostname string, port *ForwardedPort) (net.Listener, error) {
	listener, err := net.Listen(protocol, net.JoinHostPort(hostname, strconv.Itoa(int(port.Local))))
	if err != nil {
		return nil, fmt.Errorf("unable to create listener: Error %s", err)
	}
  
  ...
  
	return listener, nil
}


handleConnection

waitForConnection通过监听端口获取Tcp连接, 对每个连接开个go程进行处理;

handleConnection对每个Tcp连接创建新的Stream流, 进行Tcp连接和Stream流的交互;

func (pf *PortForwarder) waitForConnection(listener net.Listener, port ForwardedPort) {
	for {
		select {
		case <-pf.streamConn.CloseChan():
			return
		default:
			conn, err := listener.Accept()
			if err != nil {
				// TODO consider using something like https://github.com/hydrogen18/stoppableListener?
				if !strings.Contains(strings.ToLower(err.Error()), "use of closed network connection") {
					runtime.HandleError(fmt.Errorf("error accepting connection on port %d: %v", port.Local, err))
				}
				return
			}
			go pf.handleConnection(conn, port)
		}
	}
}

func (pf *PortForwarder) handleConnection(conn net.Conn, port ForwardedPort) {
	...

	// create data stream
	headers.Set(v1.StreamType, v1.StreamTypeData)
	dataStream, err := pf.streamConn.CreateStream(headers)
	if err != nil {
		runtime.HandleError(fmt.Errorf("error creating forwarding stream for port %d -> %d: %v", port.Local, port.Remote, err))
		return
	}
	defer pf.streamConn.RemoveStreams(dataStream)

	localError := make(chan struct{})
	remoteDone := make(chan struct{})

	go func() {
		// Copy from the remote side to the local port.
		if _, err := io.Copy(conn, dataStream); err != nil && !strings.Contains(err.Error(), "use of closed network connection") {
			runtime.HandleError(fmt.Errorf("error copying from remote stream to local connection: %v", err))
		}

		// inform the select below that the remote copy is done
		close(remoteDone)
	}()

	go func() {
		// inform server we're not sending any more data after copy unblocks
		defer dataStream.Close()

		// Copy from the local port to the remote side.
		if _, err := io.Copy(dataStream, conn); err != nil && !strings.Contains(err.Error(), "use of closed network connection") {
			runtime.HandleError(fmt.Errorf("error copying from local connection to remote stream: %v", err))
			// break out of the select below without waiting for the other copy to finish
			close(localError)
		}
	}()

	...
}

总结

看代码得知原理, 数据链路为 userClient -> serverListen -> pod;

知道链路了, 就自然能得知它最适合的场景, 就是大量的持续的新建Tcp请求, 比如Api/Oss等服务, 但是对于我的需求场景: 偶尔一次的连接就不太合适了;

所以我们能不能跳过ServerListen这层中转, 直接让userClientPod进行交互呢? 答案是可以的;

解决方案

回归我们的需求本身: 我们有大量用户和大量的pod, 每个pod也只会有少量用户会访问, 所以没必要用serverListen中转, 直接用户连pod就可以了, 这样就省了ServerListen的两个端口!

代码也很简单, 只需要把 handleConnection的代码沾出来, 将用户的连接跟pod 的连接做交互就好了;

实现代码

简单贴一下实现代码, 自己在handle func(dataStream httpstream.Stream)中与net.conn做交互就可以了;


func createSPDYConnection(namespace, podName string, podPort int, handle func(dataStream httpstream.Stream)) error {
	req := clientset.CoreV1().RESTClient().
		Post().
		Resource("pods").
		Namespace(namespace).
		Name(podName).
		SubResource("portforward").
		Param("ports", fmt.Sprintf("%d", podPort))

	// 创建 SPDY Transport 和 Dialer
	transport, upgrader, err := spdy.RoundTripperFor(config)
	if err != nil {
		return fmt.Errorf("failed to create round tripper: %v", err)
	}
	dialer := spdy.NewDialer(upgrader, &http.Client{Transport: transport}, "POST", req.URL())

	// 建立连接到 Pod 的端口
	streamConn, _, err := dialer.Dial(portforward.PortForwardProtocolV1Name)
	if err != nil {
		return fmt.Errorf("failed to dial port forward: %v", err)
	}
	defer streamConn.Close()

	handleStreamConnection(streamConn, portforward.ForwardedPort{
		Local:  0,
		Remote: uint16(podPort),
	}, handle)

	return nil
}

// handleStreamConnection copies data between the local connection and the stream to
// the remote server.
func handleStreamConnection(streamConn httpstream.Connection, port portforward.ForwardedPort, handle func(dataStream httpstream.Stream)) {
	requestID := time.Now().UnixNano()

	// create error stream
	headers := http.Header{}
	headers.Set(v1.StreamType, v1.StreamTypeError)
	headers.Set(v1.PortHeader, fmt.Sprintf("%d", port.Remote))
	headers.Set(v1.PortForwardRequestIDHeader, strconv.FormatInt(requestID, 10))
	errorStream, err := streamConn.CreateStream(headers)
	if err != nil {
		runtime.HandleError(fmt.Errorf("error creating error stream for port %d -> %d: %v", port.Local, port.Remote, err))
		return
	}
	// we're not writing to this stream
	errorStream.Close()

	go func() {
		message, err := io.ReadAll(errorStream)
		switch {
		case err != nil:
			log.Printf("error reading error stream: %v\n", err)
		case len(message) > 0:
			log.Printf("error reading error stream: %v\n", string(message))
		}
	}()

	// create data stream
	headers.Set(v1.StreamType, v1.StreamTypeData)
	dataStream, err := streamConn.CreateStream(headers)
	if err != nil {
		runtime.HandleError(fmt.Errorf("error creating forwarding stream for port %d -> %d: %v", port.Local, port.Remote, err))
		return
	}

	handle(dataStream)
	_ = dataStream.Close()
	_ = streamConn.Close()
}

Kubelet

并且在k8s源码中也有相同的使用, 虽然是个test;

kubernetes/pkg/kubelet/server/server_test.go at master · kubernetes/kubernetes


func TestServePortForward(t *testing.T) {
	tests := map[string]struct {
		port          string
		uid           bool
		clientData    string
		containerData string
		shouldError   bool
	}{
		"no port":                       {port: "", shouldError: true},
		"none number port":              {port: "abc", shouldError: true},
		"negative port":                 {port: "-1", shouldError: true},
		"too large port":                {port: "65536", shouldError: true},
		"0 port":                        {port: "0", shouldError: true},
		"min port":                      {port: "1", shouldError: false},
		"normal port":                   {port: "8000", shouldError: false},
		"normal port with data forward": {port: "8000", clientData: "client data", containerData: "container data", shouldError: false},
		"max port":                      {port: "65535", shouldError: false},
		"normal port with uid":          {port: "8000", uid: true, shouldError: false},
	}

	podNamespace := "other"
	podName := "foo"

	for desc := range tests {
		test := tests[desc]
		t.Run(desc, func(t *testing.T) {
			ss, err := newTestStreamingServer(0)
			require.NoError(t, err)
			defer ss.testHTTPServer.Close()
			fw := newServerTestWithDebug(true, ss)
			defer fw.testHTTPServer.Close()

			portForwardFuncDone := make(chan struct{})

			fw.fakeKubelet.getPortForwardCheck = func(name, namespace string, uid types.UID, opts portforward.V4Options) {
				assert.Equal(t, podName, name, "pod name")
				assert.Equal(t, podNamespace, namespace, "pod namespace")
				if test.uid {
					assert.Equal(t, testUID, string(uid), "uid")
				}
			}

			ss.fakeRuntime.portForwardFunc = func(podSandboxID string, port int32, stream io.ReadWriteCloser) error {
				defer close(portForwardFuncDone)
				assert.Equal(t, testPodSandboxID, podSandboxID, "pod sandbox id")
				// The port should be valid if it reaches here.
				testPort, err := strconv.ParseInt(test.port, 10, 32)
				require.NoError(t, err, "parse port")
				assert.Equal(t, int32(testPort), port, "port")

				if test.clientData != "" {
					fromClient := make([]byte, 32)
					n, err := stream.Read(fromClient)
					assert.NoError(t, err, "reading client data")
					assert.Equal(t, test.clientData, string(fromClient[0:n]), "client data")
				}

				if test.containerData != "" {
					_, err := stream.Write([]byte(test.containerData))
					assert.NoError(t, err, "writing container data")
				}

				return nil
			}

			var url string
			if test.uid {
				url = fmt.Sprintf("%s/portForward/%s/%s/%s", fw.testHTTPServer.URL, podNamespace, podName, testUID)
			} else {
				url = fmt.Sprintf("%s/portForward/%s/%s", fw.testHTTPServer.URL, podNamespace, podName)
			}

			var (
				upgradeRoundTripper httpstream.UpgradeRoundTripper
				c                   *http.Client
			)

			upgradeRoundTripper, err = spdy.NewRoundTripper(&tls.Config{})
			if err != nil {
				t.Fatalf("Error creating SpdyRoundTripper: %v", err)
			}
			c = &http.Client{Transport: upgradeRoundTripper}

			req := makeReq(t, "POST", url, "portforward.k8s.io")
			resp, err := c.Do(req)
			require.NoError(t, err, "POSTing")
			defer resp.Body.Close()

			assert.Equal(t, http.StatusSwitchingProtocols, resp.StatusCode, "status code")

			conn, err := upgradeRoundTripper.NewConnection(resp)
			require.NoError(t, err, "creating streaming connection")
			defer conn.Close()

			headers := http.Header{}
			headers.Set("streamType", "error")
			headers.Set("port", test.port)
			_, err = conn.CreateStream(headers)
			assert.Equal(t, test.shouldError, err != nil, "expect error")

			if test.shouldError {
				return
			}

			headers.Set("streamType", "data")
			headers.Set("port", test.port)
			dataStream, err := conn.CreateStream(headers)
			require.NoError(t, err, "create stream")

			if test.clientData != "" {
				_, err := dataStream.Write([]byte(test.clientData))
				assert.NoError(t, err, "writing client data")
			}

			if test.containerData != "" {
				fromContainer := make([]byte, 32)
				n, err := dataStream.Read(fromContainer)
				assert.NoError(t, err, "reading container data")
				assert.Equal(t, test.containerData, string(fromContainer[0:n]), "container data")
			}

			<-portForwardFuncDone
		})
	}
}

搞个demo

最后再放一个最近做的东西, 是一个连接k8s``pod的SSH服务, 用户通过连接SSH服务, 转而连接到pod, 中间可以在SSH握手后进行一些特殊处理, 比如身份校验, 日志记录等;

package main

import (
	"context"
	"fmt"
	"golang.org/x/crypto/ssh"
	gossh "golang.org/x/crypto/ssh"
	"golang.org/x/sync/errgroup"
	"io"
	v1 "k8s.io/api/core/v1"
	"k8s.io/apimachinery/pkg/util/httpstream"
	"k8s.io/apimachinery/pkg/util/runtime"
	"k8s.io/client-go/kubernetes"
	"k8s.io/client-go/rest"
	"k8s.io/client-go/tools/clientcmd"
	"k8s.io/client-go/tools/portforward"
	"k8s.io/client-go/transport/spdy"
	"log"
	"net"
	"net/http"
	"os"
	"strconv"
	"strings"
	"time"
)

var (
	podName          = ""
	podNamespace     = ""
	localSSHPort     = ":2225"
	kubeConfigPath   = "/home/fly/.kube/config"
	config           *rest.Config
	clientset        *kubernetes.Clientset
	authorizedKey, _ = os.ReadFile("/home/fly/.ssh/id_rsa")
	privateKey, _    = gossh.ParsePrivateKey(authorizedKey)
	err              error
)

func init() {
	config, err = clientcmd.BuildConfigFromFlags("", kubeConfigPath)
	if err != nil {
		log.Fatalf("k8s config err: %v \n", err)
	}

	clientset, err = kubernetes.NewForConfig(config)
	if err != nil {
		log.Fatalf("k8s client err: %v \n", err)
	}
}

func main() {
	listener, err := net.Listen("tcp", localSSHPort)
	if err != nil {
		log.Fatalf("unable to listen on port %s: %v", localSSHPort, err)
	}
	defer listener.Close()
	log.Printf("the proxy service is listening on the port %s", localSSHPort)

	for {
		clientConn, err := listener.Accept()
		if err != nil {
			log.Printf("failed to accept connection: %v", err)
			continue
		}
		go handleConnection(clientConn)
	}
}

type NetHandle struct {
	ctx        context.Context
	sshConn    *ssh.ServerConn
	chans      <-chan ssh.NewChannel
	reqs       <-chan *ssh.Request
	dataStream httpstream.Stream
}

func handleConnection(conn net.Conn) {
	ctx, cancel := context.WithTimeout(context.Background(), 7*time.Hour)
	defer cancel()

	// 创建一个新的 SSH 服务
	serverConfig := &ssh.ServerConfig{
		NoClientAuth: true,
	}
	serverConfig.AddHostKey(privateKey)

	// 接收客户端连接的 SSH 握手
	sshConn, chans, reqs, err := ssh.NewServerConn(conn, serverConfig)
	if err != nil {
		log.Printf("failed to receive ssh connection: %v", err)
		conn.Close()
		return
	}
	defer sshConn.Close()

	username := sshConn.User()
	log.Printf("ssh connection to users: %s", username)

	h := &NetHandle{
		ctx:        ctx,
		sshConn:    sshConn,
		chans:      chans,
		reqs:       reqs,
		dataStream: nil,
	}

	handle := func(dataStream httpstream.Stream) {
		clientConf := &ssh.ClientConfig{
			User:            "ubuntu",
			Auth:            []ssh.AuthMethod{ssh.PublicKeys(privateKey)},
			Timeout:         5 * time.Second,
			HostKeyCallback: ssh.InsecureIgnoreHostKey(),
		}
		streamConn := NewStreamConn(dataStream)
		log.Println("Encapsulate stream as net.conn, start forwarding")
		clientConn, clientChans, clientReqs, err := ssh.NewClientConn(streamConn, "vm:22", clientConf)
		if err != nil {
			log.Printf("new ssh client err: %v\n", err)
			return
		}
		defer clientConn.Close()
		go forwardConnReqs(h.sshConn, clientReqs)
		go forwardConnReqs(clientConn, h.reqs)
		go forwardChans(h.ctx, h.sshConn, clientChans)
		go forwardChans(h.ctx, clientConn, h.chans)

		waitCtx, waitCancel := context.WithCancel(h.ctx)
		go func() {
			_ = h.sshConn.Wait()
			waitCancel()
		}()
		go func() {
			_ = clientConn.Wait()
			waitCancel()
		}()
		<-waitCtx.Done()
	}

	createSPDYConnection(podNamespace, podName, 22, handle)

}

type ChannelOpener interface {
	OpenChannel(name string, data []byte) (ssh.Channel, <-chan *ssh.Request, error)
}

func forwardChans(ctx context.Context, dst ChannelOpener, chans <-chan ssh.NewChannel) {
	for newChan := range chans {
		go forwardChan(ctx, dst, newChan)
	}
}

func forwardChan(ctx context.Context, dst ChannelOpener, newChan ssh.NewChannel) {
	dstChan, dstReqs, err := dst.OpenChannel(newChan.ChannelType(), newChan.ExtraData())
	if err != nil {
		_ = newChan.Reject(ssh.Prohibited, err.Error())
		return
	}
	defer dstChan.Close()

	srcChan, srcReqs, err := newChan.Accept()
	if err != nil {
		return
	}
	defer srcChan.Close()

	g, ctx := errgroup.WithContext(ctx)
	g.Go(func() error {
		return copyWithReqs(ctx, srcChan, dstChan, dstReqs, "out")
	})
	g.Go(func() error {
		return copyWithReqs(ctx, dstChan, srcChan, srcReqs, "in")
	})
	g.Wait()
}

func copyWithReqs(ctx context.Context, dst, src ssh.Channel, srcReqs <-chan *ssh.Request, _ string) error {
	// According to https://github.com/golang/go/issues/29733
	// Before we close the channel, We have to wait until exit- prefixed request forwarded.
	// forwardChannelReqs should notify when it after forward exit- prefixed request.
	// io.Copy may encounter error and exit early (do not consume the channel), so we have to leave a slot in it.
	exitRequestForwarded := make(chan struct{}, 1)
	g, ctx := errgroup.WithContext(ctx)
	go func() { <-ctx.Done(); dst.Close() }()
	g.Go(func() error { return forwardChannelReqs(ctx, dst, srcReqs, exitRequestForwarded) })
	g.Go(func() error {
		_, err := io.Copy(dst.Stderr(), src.Stderr())
		return err
	})
	g.Go(func() error {
		// TODO if need audit. we need copy bytes to audit writer
		_, err := io.Copy(dst, src)
		switch err {
		case nil:
			// When receiving EOF (which means io.Copy returns nil), wait exit- prefixed request forwarded before we close channel.
			// For more detail, see https://github.com/golang/go/issues/29733
			t := time.NewTimer(time.Second)
			defer t.Stop()
			select {
			case <-t.C:
				// We can't wait forever, exit anyway.
			case <-exitRequestForwarded:
				// Already forwarded
			}
		default:
			// Encounter error, Don't need to wait anything, Close immediately.
		}

		dst.CloseWrite()
		return err
	})
	return g.Wait()
}

func forwardConnReqs(dst ssh.Conn, src <-chan *ssh.Request) {
	for r := range src {
		ok, data, err := dst.SendRequest(r.Type, r.WantReply, r.Payload)
		if err != nil {
			return
		}
		if r.WantReply {
			if err := r.Reply(ok, data); err != nil {
				return
			}
		}
	}
	return
}

func forwardChannelReqs(_ context.Context, dst ssh.Channel, src <-chan *ssh.Request, exitRequestForwarded chan<- struct{}) error {
	var isExitReq bool
	defer func() {
		if isExitReq {
			// According to https://github.com/golang/go/issues/29733
			// Send a signal when exit- prefix request already forwarded.
			// Send signal in non-blocking manner to prevent unexpected blocking.
			select {
			case exitRequestForwarded <- struct{}{}:
			default:
			}
		}
	}()

	for r := range src {
		if strings.HasPrefix(r.Type, "exit-") {
			isExitReq = true
		}
		ok, err := dst.SendRequest(r.Type, r.WantReply, r.Payload)
		if err != nil {
			return err
		}
		if r.WantReply {
			err := r.Reply(ok, nil)
			if err != nil {
				return err
			}
		}
	}

	return nil
}

func createSPDYConnection(namespace, podName string, podPort int, handle func(dataStream httpstream.Stream)) error {
	req := clientset.CoreV1().RESTClient().
		Post().
		Resource("pods").
		Namespace(namespace).
		Name(podName).
		SubResource("portforward").
		Param("ports", fmt.Sprintf("%d", podPort))

	// 创建 SPDY Transport 和 Dialer
	transport, upgrader, err := spdy.RoundTripperFor(config)
	if err != nil {
		return fmt.Errorf("failed to create round tripper: %v", err)
	}
	dialer := spdy.NewDialer(upgrader, &http.Client{Transport: transport}, "POST", req.URL())

	// 建立连接到 Pod 的端口
	streamConn, _, err := dialer.Dial(portforward.PortForwardProtocolV1Name)
	if err != nil {
		return fmt.Errorf("failed to dial port forward: %v", err)
	}
	defer streamConn.Close()

	handleStreamConnection(streamConn, portforward.ForwardedPort{
		Local:  0,
		Remote: uint16(podPort),
	}, handle)

	return nil
}

// handleStreamConnection copies data between the local connection and the stream to
// the remote server.
func handleStreamConnection(streamConn httpstream.Connection, port portforward.ForwardedPort, handle func(dataStream httpstream.Stream)) {
	requestID := time.Now().UnixNano()

	// create error stream
	headers := http.Header{}
	headers.Set(v1.StreamType, v1.StreamTypeError)
	headers.Set(v1.PortHeader, fmt.Sprintf("%d", port.Remote))
	headers.Set(v1.PortForwardRequestIDHeader, strconv.FormatInt(requestID, 10))
	errorStream, err := streamConn.CreateStream(headers)
	if err != nil {
		runtime.HandleError(fmt.Errorf("error creating error stream for port %d -> %d: %v", port.Local, port.Remote, err))
		return
	}
	// we're not writing to this stream
	errorStream.Close()

	go func() {
		message, err := io.ReadAll(errorStream)
		switch {
		case err != nil:
			log.Printf("error reading error stream: %v\n", err)
		case len(message) > 0:
			log.Printf("error reading error stream: %v\n", string(message))
		}
	}()

	// create data stream
	headers.Set(v1.StreamType, v1.StreamTypeData)
	dataStream, err := streamConn.CreateStream(headers)
	if err != nil {
		runtime.HandleError(fmt.Errorf("error creating forwarding stream for port %d -> %d: %v", port.Local, port.Remote, err))
		return
	}

	handle(dataStream)
	_ = dataStream.Close()
	_ = streamConn.Close()
}

// streamNetConn 是封装 httpstream.Stream 实现 net.Conn 接口
type streamNetConn struct {
	stream httpstream.Stream
}

// Read 实现 net.Conn 接口的 Read 方法
func (c *streamNetConn) Read(b []byte) (n int, err error) {
	// 从 httpstream.Stream 中读取数据
	return c.stream.Read(b)
}

// Write 实现 net.Conn 接口的 Write 方法
func (c *streamNetConn) Write(b []byte) (n int, err error) {
	// 将数据写入 httpstream.Stream
	return c.stream.Write(b)
}

// Close 实现 net.Conn 接口的 Close 方法
func (c *streamNetConn) Close() error {
	// 关闭 httpstream.Stream
	return c.stream.Close()
}

// LocalAddr 实现 net.Conn 接口的 LocalAddr 方法
func (c *streamNetConn) LocalAddr() net.Addr {
	// 可以返回一个 nil 或者实现一个自定义的 LocalAddr
	return nil
}

// RemoteAddr 实现 net.Conn 接口的 RemoteAddr 方法
func (c *streamNetConn) RemoteAddr() net.Addr {
	// 可以返回一个 nil 或者实现一个自定义的 RemoteAddr
	return nil
}

// SetDeadline 实现 net.Conn 接口的 SetDeadline 方法
func (c *streamNetConn) SetDeadline(t time.Time) error {
	// 如果需要设置超时,可以在这里实现
	return nil
}

// SetReadDeadline 实现 net.Conn 接口的 SetReadDeadline 方法
func (c *streamNetConn) SetReadDeadline(t time.Time) error {
	// 如果需要设置读取超时,可以在这里实现
	return nil
}

// SetWriteDeadline 实现 net.Conn 接口的 SetWriteDeadline 方法
func (c *streamNetConn) SetWriteDeadline(t time.Time) error {
	// 如果需要设置写入超时,可以在这里实现
	return nil
}

func NewStreamConn(stream httpstream.Stream) *streamNetConn {
	return &streamNetConn{
		stream: stream,
	}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/951056.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

Transformer 中缩放点积注意力机制探讨:除以根号 dk 理由及其影响

Transformer 中缩放点积注意力机制的探讨 1. 引言 自2017年Transformer模型被提出以来&#xff0c;它迅速成为自然语言处理&#xff08;NLP&#xff09;领域的主流架构&#xff0c;并在各种任务中取得了卓越的表现。其核心组件之一是注意力机制&#xff0c;尤其是缩放点积注意…

Qt监控系统远程网络登录/请求设备列表/服务器查看实时流/回放视频/验证码请求

一、前言说明 这几个功能是近期定制的功能&#xff0c;也非常具有代表性&#xff0c;核心就是之前登录和设备信息都是在本地&#xff0c;存放在数据库中&#xff0c;数据库可以是本地或者远程的&#xff0c;现在需要改成通过网络API请求的方式&#xff0c;现在很多的服务器很强…

IDEA配置maven和git并如何使用maven打包和git推送到gitlab

首先找到设置 在里面输入maven然后找到点击 然后点击右边两个选项 路径选择下载的maven目录下的settings文件和新建的repository文件夹 点击apply应用 然后在搜索框里搜git点击进去 此路径为git的exe执行文件所在目录&#xff0c;选好之后点击test测试下方出现git版本号表…

迎接2025Power BI日期表创建指南:模板与最佳实践

故事背景 最近&#xff0c;我们收到了一些关于时间表更新的询问。询问的朋友发现&#xff0c;随着2025年的到来&#xff0c;2024年的日期表已不再适用。这是一个在数据分析领域常见的问题&#xff0c;每年都需要对日期表进行更新。 解决方案 鉴于创建和更新日期表是一项年度…

案例研究:UML用例图中的结账系统

在软件工程和系统分析中&#xff0c;统一建模语言&#xff08;UML&#xff09;用例图是一种强有力的工具&#xff0c;用于描述系统与其用户之间的交互。本文将通过一个具体的案例研究&#xff0c;详细解释UML用例图的关键概念&#xff0c;并说明其在设计结账系统中的应用。 用…

国产3D CAD将逐步取代国外软件

在工业软件的关键领域&#xff0c;计算机辅助设计&#xff08;CAD&#xff09;软件对于制造业的重要性不言而喻。近年来&#xff0c;国产 CAD 的发展态势迅猛&#xff0c;展现出巨大的潜力与机遇&#xff0c;正逐步改变着 CAD 市场长期由国外软件主导的格局。 国产CAD发展现状 …

Oopsie【hack the box】

Oopsie 解题流程 文件上传 首先开启机器后&#xff0c;我们先使用 nmap -sC -SV来扫描一下IP地址&#xff1a; -sC&#xff1a;使用 Nmap 的默认脚本扫描&#xff08;通常是 NSE 脚本&#xff0c;Nmap Scripting Engine&#xff09;。这个选项会自动执行一系列常见的脚本&am…

金山WPS Android面试题及参考答案

说说你所知道的所有集合&#xff1f;并阐述其内部实现。 在 Android 开发&#xff08;Java 语言基础上&#xff09;中有多种集合。 首先是 List 集合&#xff0c;主要包括 ArrayList 和 LinkedList。 ArrayList 是基于数组实现的动态数组。它的内部有一个数组来存储元素&#x…

快速导入请求到postman

1.确定请求&#xff0c;右键复制为cURL(bash) 2.postman菜单栏Import-Raw text&#xff0c;粘贴复制的内容保存&#xff0c;请求添加成功

Mybatis原理简介

看到Mybatis的框架图&#xff0c;可以清晰的看到Mybatis的整体核心对象&#xff0c;我更喜欢用自己的图来表达Mybatis的整个的执行流程。如下图所示&#xff1a; 原理详解&#xff1a; MyBatis应用程序根据XML配置文件创建SqlSessionFactory&#xff0c;SqlSessionFactory在根…

【redis初阶】初识Redis

目录 一、初识Redis 二、盛赞 Redis 三、Redis 特性 3.1 速度快 ​编辑3.2 基于键值对的数据结构服务器 3.3 丰富的功能 3.4 简单稳定 &#x1f436; 3.6 持久化&#xff08;Persistence&#xff09; 3.7 主从复制&#xff08;Replication&#xff09; 3.8 高可用&#xff08;H…

【云商城】高性能门户网构建

第3章 高性能门户网构建 网站门户就是首页 1.OpenResty 百万并发站点架构 ​ 1).OpenResty 特性介绍 ​ 2).搭建OpenResty ​ 3).Web站点动静分离方案剖析 2.Lua语法学习 ​ 1).Lua基本语法 3.多级缓存架构实战 ​ 1).多级缓存架构分析 用户请求网站&#xff0c;最开始…

【GESP】C++三级考试大纲知识点梳理, (1)二进制数据编码

GESP C三级官方考试大纲中&#xff0c;共有8条考点&#xff0c;本文针对C&#xff08;1&#xff09;号知识点进行总结梳理。 &#xff08;1&#xff09;了解二进制数据编码:原码、反码、补码。 全文详见&#xff1a;https://www.coderli.com/gesp-3-exam-syllabus-data-encodin…

B+树的原理及实现

文章目录 B树的原理及实现一、引言二、B树的特性1、结构特点2、节点类型3、阶数 三、B树的Java实现1、节点实现2、B树操作2.1、搜索2.2、插入2.3、删除2.4、遍历 3、B树的Java实现示例 四、总结 B树的原理及实现 一、引言 B树是一种基于B树的树形数据结构&#xff0c;它在数据…

毕业项目推荐:基于yolov8/yolov5/yolo11的动物检测识别系统(python+卷积神经网络)

文章目录 概要一、整体资源介绍技术要点功能展示&#xff1a;功能1 支持单张图片识别功能2 支持遍历文件夹识别功能3 支持识别视频文件功能4 支持摄像头识别功能5 支持结果文件导出&#xff08;xls格式&#xff09;功能6 支持切换检测到的目标查看 二、数据集三、算法介绍1. YO…

用Kimi做研究:准实验设计的智能解决方案

目录 1.研究策略设计 2.过程框架设计 3.背景变量 4.细节设计 准实验设计是一种介于实验与观察研究之间的研究方法&#xff0c;准实验设计是在无法完全控制实验条件的情况下进行因果关系的探索。与传统实验设计相比&#xff0c;准实验设计不具备随机分配实验对象到各处理组的…

【前端】【HTML】入门基础知识

参考视频&#xff1a;【狂神说Java】HTML5完整教学通俗易懂_哔哩哔哩_bilibili 一、基本结构 二、基本标签 <h1>&#xff1a;一级标题&#xff0c;通常用于页面的主标题&#xff0c;字体较大且醒目。 <h2>&#xff1a;二级标题&#xff0c;用于副标题或主要章节标…

DVT:消除视觉变换器中的噪声伪影

人工智能咨询培训老师叶梓 转载标明出处 近年来&#xff0c;视觉变换器&#xff08;Vision Transformers&#xff0c;简称ViTs&#xff09;在多种视觉任务中取得了卓越的性能&#xff0c;成为现代视觉基础模型的主流架构之一。然而&#xff0c;这些模型在特征图中存在一种网格…

OpenCV的双边滤波函数

OpenCV的双边滤波函数cv2.bilateralFilter是一种用于图像处理的强大工具&#xff0c;它能够在去除噪声的同时保持边缘的清晰度。以下是对该函数的详细说明&#xff1a; 一、函数原型 python cv2.bilateralFilter(src, d, sigmaColor, sigmaSpace[, dst[, borderType]])二、参…

项目实战——使用python脚本完成指定OTA或者其他功能的自动化断电上电测试

前言 在嵌入式设备的OTA场景测试和其他断电上电测试过程中&#xff0c;有的场景发生在夜晚或者随时可能发生&#xff0c;这个时候不可能24h人工盯着&#xff0c;需要自动化抓取串口日志处罚断电上电操作。 下面的python脚本可以实现自动抓取串口指定关键词&#xff0c;然后触发…