【十三】Golang 通道

💢欢迎来到张胤尘的开源技术站
💥开源如江河,汇聚众志成。代码似星辰,照亮行征程。开源精神长,传承永不忘。携手共前行,未来更辉煌💥

文章目录

  • 通道
    • 通道声明
    • 初始化
    • 缓冲机制
      • 无缓冲通道
        • 代码示例
      • 带缓冲通道
        • 代码示例
    • 通道操作
      • 发送数据
      • 接收数据
      • 单向通道
        • 单向通道的用途
        • 代码示例
      • 多路复用
        • 代码示例
        • 通道复用器的实现
      • 超时机制
        • `time.After`
        • `context`
      • 关闭通道
      • 检查通道是否关闭
      • 获取通道长度
      • 获取通道容量
    • 源码解析
      • 通道结构体
      • 创建通道
        • 函数原型
        • 函数内容
      • 发送数据
        • 函数原型
        • 函数内容
      • 接收数据
        • 函数原型
        • 函数内容
      • 关闭通道
        • 函数原型
        • 函数内容

通道

在传统的并发编程中,多个线程或进程之间通常通过共享内存来通信。这种模型虽然高效,但容易引发 竞争条件死锁 等问题。为了避免这些问题,程序员在开发时需要使用复杂的同步机制(例如:锁、信号量等)来保护共享数据。但是 golang 采用了不同的思路:避免共享内存,通过通信来实现并发

示意图所下所示:

在这里插入图片描述

通道就是 golang 中实现 Goroutine 之间通信的机制。它是一种 类型化的通道,允许多个 Goroutine 之间安全地传递数据。通道是 Golang 并发模型的核心,它解决了传统并发编程中共享内存带来的复杂性和风险。

本篇文章主要介绍的是通道,有关于协程、并发编程等相关知识点请关注后续文章《Golang 协程》、《Golang 并发编程》。

通道声明

使用 var 关键字声明通道变量。如下所示:

var ch chan int

声明后,ch 是一个未初始化的通道,其默认值为 nil

初始化

使用 make 函数初始化通道,如下所示:

ch = make(chan int)

也可以使用 make 函数创建一个带缓冲区的通道,如下所示:

ch = make(chan int, 10)

缓冲机制

通道分为 无缓冲通道带缓冲通道,它们在行为和使用场景上有一些关键区别。

无缓冲通道

在初始化的小结中,使用 make 函数进行初始化通道,当没有指定通道的容量,或者说通道的容量大小为 0 时,称为无缓冲通道。

无缓冲通道在发送数据和接收数据时存在如下的特点:

  • 发送操作:发送数据时,发送方会阻塞,直到有接收方准备接收数据。
  • 接收操作:接收方会阻塞,直到有发送方发送数据。
  • 无缓冲通道的发送和接收操作是同步的,必须有发送方和接收方同时准备好,才能完成通信。
代码示例
package main

import (
	"fmt"
	"sync"
	"time"
)

func main() {
	unbufferedChan := make(chan int) // 创建无缓冲通道
	var wg sync.WaitGroup

	wg.Add(1)
	go func() {
		defer wg.Done()
		fmt.Println("receiver is waiting...") // receiver is waiting...
		time.Sleep(time.Second * 3)	// 模拟3秒的准备时间
		data := <-unbufferedChan // 在这行函数执行之前,发送方处于阻塞状态
		fmt.Println("received:", data)
	}()

	unbufferedChan <- 42 	// 发送方阻塞,直到接收方准备好
    
    close(unbufferedChan)	// 关闭通道
	
    wg.Wait()
}

带缓冲通道

当指定了通道的容量,例如 make(chan int, 10),则称为带缓冲通道。

带缓冲通道在发送数据和接收数据时存在如下的特点:

  • 发送操作:发送数据时,如果缓冲区未满,数据会被放入缓冲区,发送方不会阻塞;如果缓冲区已满,发送方会阻塞,直到缓冲区有空间。
  • 接收操作:接收方从缓冲区中取出数据,如果缓冲区为空,接收方会阻塞,直到有数据可用。
  • 带缓冲通道的发送和接收操作是异步的,发送方和接收方不需要同时准备好。缓冲区的存在允许数据在发送方和接收方之间暂时存储。
代码示例
package main

import (
	"fmt"
	"sync"
	"time"
)

func main() {
	bufferedChan := make(chan int, 2) // 创建带缓冲通道,容量大小为2
	var wg sync.WaitGroup

	wg.Add(1)
	go func() {
		defer wg.Done()
		fmt.Println("receiver is waiting...")
		// for {
		// 	select {
		// 	case data, ok := <-bufferedChan:
		// 		if ok {
		// 			fmt.Println("received:", data)
		// 		} else {
		// 			fmt.Println("bufferedChan closed")
		// 			return
		// 		}
		// 	}
		// }
		for {
			time.Sleep(time.Second * 5) // 模拟接收方不从缓冲通道中接收数据
		}
	}()

	bufferedChan <- 42
	bufferedChan <- 43
	bufferedChan <- 44 // 如果接收方一直未接收数据,则在此处会发送阻塞
	fmt.Println("send data over ...")

	close(bufferedChan) // 关闭通道

	wg.Wait()
}

通道操作

发送数据

使用 <- 操作符将数据发送到通道,如下所示:

package main

func main() {
	ch := make(chan int)
	ch <- 10 // 向通道中发送一个 10
    
    close(ch) // 关闭通道
}

接收数据

使用 <- 操作符从通道中读取数据,如下所示:

package main

import "fmt"

func main() {
	ch := make(chan int, 1)
	ch <- 10 // 向通道中发送一个 10

	data := <-ch      // 通道中读取一个数据
	fmt.Println(data) // 10

	close(ch) // 关闭通道
}

单向通道

golang 中提供了单向通道这么一种特殊的通道类型,它只能用于发送或接收数据,而不能同时进行发送和接收操作。单向通道在类型声明上与普通通道(双向通道)有所不同,主要用于限制通道的使用方式,从而提高代码的可读性和安全性。

单向通道有如下两种类型:

  • 只发送通道:用于发送数据,但不能接收数据。
chan<- Type
  • 只接收通道:用于接收数据,但不能发送数据。
<-chan Type
单向通道的用途

单向通道的主要用途是限制通道的使用范围,避免在函数或方法中滥用通道的发送和接收功能。例如:

  • 当一个函数只需要从通道中读取数据时,使用只接收通道可以明确表示该函数的意图。
  • 当一个函数只需要向通道中写入数据时,使用只发送通道可以避免意外读取通道中的数据。
代码示例
package main

import (
	"fmt"
	"sync"
)

func producer(ch chan<- int, wg *sync.WaitGroup) { // ch 参数:一个只发送通道
	defer wg.Done()
	for i := 0; i < 5; i++ {
		ch <- i // 向通道发送数据
		fmt.Printf("sent: %d\n", i)
	}

	close(ch) // 关闭通道
}

func consumer(ch <-chan int, wg *sync.WaitGroup) { // ch 参数:一个只接收通道
	defer wg.Done()
	for {
		select {
		case data, ok := <-ch:	// 读取通道数据
			if ok {
				fmt.Printf("received: %d\n", data)
			} else {
				fmt.Println("chan closed ...")
				return
			}
		}
	}
}

func main() {
	var wg sync.WaitGroup
	ch := make(chan int) // 创建一个双向通道
	wg.Add(2)

	// received: 0
	// sent: 0
	// sent: 1
	// received: 1
	// received: 2
	// sent: 2
	// sent: 3
	// received: 3
	// received: 4
	// sent: 4
	// chan closed ...

	go consumer(ch, &wg) // 接收者
	go producer(ch, &wg) // 发送者

	wg.Wait()
}

多路复用

通道的多路复用机制是一种用于处理多个通道操作的技术,它允许程序同时等待多个通道的读写操作。这种机制的核心是 select...case 语句,它提供了对多个通道操作的并发处理能力。

select {
case <-ch1:
    // 处理 ch1 的读操作
case data := <-ch2:
    // 处理 ch2 的读操作,并将读取到的数据赋值给 data
case ch3 <- value:
    // 向 ch3 中发送值
default:
    // 如果没有通道准备好,则执行默认逻辑
}

另外需要强调的是,如果多个通道同时准备好,select...case 会随机选择一个通道执行操作;如果没有任何一个通道准备好,则 select 会陷入阻塞,除非有 default 分支执行。

需要注意的是,上面提到的 通道准备好 是个口语,对于接收操作来说需要满足以下两个条件中的任意一个:

  • 通道中有数据可读:如果通道的缓冲区中有数据,或者有发送操作正在等待发送数据到通道中,那么接收操作就准备好。
  • 通道已关闭:即使通道中没有数据,如果通道已经被关闭,接收操作也会立即返回零值,并且 okfalse(如果使用了 data, ok := <-ch 的形式)。

对发送操作来说也需要满足以下两个条件中的任意一个:

  • 通道中有空间可写:如果通道是无缓冲的,并且有等待接收的协程,或者通道是缓冲的且缓冲区中有空闲位置,那么发送操作就准备好。
  • 通道已关闭:如果通道已经被关闭,发送操作会触发 panic("send on closed channel")
代码示例

通过 select 同时处理多个通道,如下所示:

package main

import (
	"fmt"
	"time"
)

func main() {
	ch1 := make(chan string) // 创建通道 ch1
	ch2 := make(chan string) // 创建通道 ch2

	go func() { // 每间隔 1 秒,向通道 ch1 中发送数据,一共发送 10 条数据
		defer close(ch1)
		for i := 1; i <= 10; i++ {
			ch1 <- "message from ch1"
			time.Sleep(1 * time.Second)
		}
	}()

	go func() { // 每间隔 2 秒,向通道 ch2 中发送数据,一共发送 10 条数据
		defer close(ch2)
		for i := 1; i <= 10; i++ {
			ch2 <- "message from ch2"
			time.Sleep(2 * time.Second)
		}
	}()

	// no data reception ...
	// recived:  message from ch2
	// recived:  message from ch1
	// no data reception ...
	// no data reception ...
	// recived:  message from ch1
	// no data reception ...
	// recived:  message from ch2
	// recived:  message from ch1
	// no data reception ...
	// recived:  message from ch1
	// no data reception ...
	// no data reception ...
	// recived:  message from ch1
	// recived:  message from ch2
	// no data reception ...
	// recived:  message from ch1
	// no data reception ...
	// recived:  message from ch1
	// recived:  message from ch2
	// no data reception ...
	// recived:  message from ch1
	// no data reception ...
	// no data reception ...
	// recived:  message from ch1
	// recived:  message from ch2
	// no data reception ...
	// recived:  message from ch1
	// no data reception ...
	// recived:  message from ch2
	// ch1 chan closed ...
	// recived:  message from ch2
	// recived:  message from ch2
	// recived:  message from ch2
	// recived:  message from ch2
	// ch2 chan closed ...
	// both channels closed. exiting...

	ch1_s, ch2_s := true, true

	for ch1_s || ch2_s {
		select {
		case data, ok := <-ch1:
			if ok {
				fmt.Println("recived: ", data)
			} else if ch1_s {
				fmt.Println("ch1 chan closed ...")
				ch1_s = false
			}
		case data, ok := <-ch2:
			if ok {
				fmt.Println("recived: ", data)
			} else if ch2_s {
				fmt.Println("ch2 chan closed ...")
				ch2_s = false
			}
		default:
			fmt.Println("no data reception ...")
			time.Sleep(time.Second)
		}
	}

	fmt.Println("both channels closed. exiting...")
}

通道复用器的实现

在某些复杂场景中,可能需要手动实现通道复用器。例如,可以将多个通道的输出合并到一个通道中,从而简化后续的处理逻辑。

下面是一个简单的通道复用器实现思路:

package main

import (
	"fmt"
	"sync"
	"time"
)

func send(ch chan<- int) { // 模拟数据发送,每秒发送一个数字
	defer close(ch)
	for i := 1; i <= 10; i++ {
		ch <- i
		time.Sleep(time.Second)
	}
}

func multiplexer(channels ...<-chan int) <-chan int { // 合并多个通道数据,函数返回最终的一个只读通道
	out := make(chan int)
	go func() {
		defer close(out)
		var wg sync.WaitGroup
		for _, ch := range channels {
			wg.Add(1)
			go func(ch <-chan int) {
				defer wg.Done()
				for {
					select {
					case data, ok := <-ch:
						if ok {
							out <- data // 数据发送到最终的只读通道
						} else {
							return
						}
					}
				}
			}(ch)
		}
		wg.Wait()
	}()

	return out
}

func main() {

	ch1 := make(chan int) // 创建通道 ch1
	ch2 := make(chan int) // 创建通道 ch2
	ch3 := make(chan int) // 创建通道 ch3

	go send(ch1)
	go send(ch2)
	go send(ch3)

	ret_ch := multiplexer(ch1, ch2, ch3) // 接收返回的通道

	for data := range ret_ch { // 从只读通道中获取合并后的数据
		fmt.Printf("ret received: %d\n", data)
	}

	fmt.Println("ret chan closed ...")
}

超时机制

golang 中,通道的超时机制可以通过 time.Aftercontext 两种方式来实现。

time.After

time.After 是一个返回通道的函数,会在指定的超时时间后向通道发送一个时间值。结合 select...case 语句,可以实现超时逻辑。如下所示:

package main

import (
	"fmt"
	"time"
)

func main() {
	ch := make(chan string) // 创建通道

	go func() {
		time.Sleep(3 * time.Second) // 模拟耗时操作
		ch <- "任务完成"
	}()

	select {
	case res := <-ch:
		fmt.Println(res)
	case <-time.After(2 * time.Second): // 设置超时为2秒
		fmt.Println("超时退出") // 最终打印 超时退出
	}
}

time.After 是一种非常简洁且易于理解的超时机制,特别适合简单的超时场景。但是需要注意的是,time.After 会在后台启动一个定时器,即使 select 提前退出,定时器也不会立刻回收,可能导致轻微的资源泄漏

context

contextgolang 中用于传递可取消信号、超时时间等的工具。通过 context.WithTimeout 创建一个带有超时功能的上下文,其 Done() 方法返回一个通道,用于超时控制,如下所示:

package main

import (
	"context"
	"fmt"
	"time"
)

func main() {
	ch := make(chan string) // 创建通道
	ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
	defer cancel() // cancel() 会释放与上下文相关的资源,避免内存泄漏

	go func() {
		time.Sleep(3 * time.Second) // 模拟耗时操作
		ch <- "任务完成"
	}()

	select {
	case res := <-ch:
		fmt.Println(res)
	case <-ctx.Done(): // 监听超时信号
		fmt.Println("超时退出", ctx.Err())
	}
}

在上面的这个代码中:

  • context.WithTimeout 创建了一个带有超时的上下文,并设置超时时间为 2 秒。
  • defer cancel() 确保在函数返回时调用 cancel(),释放资源。
  • 如果任务在超时前完成,cancel() 会被调用,终止所有监听 ctx.Done() 的协程。

总的来说,context 更适用于复杂的并发场景,例如多个任务的超时控制、任务取消等;time.After 更适用于简单的超时控制,例如单个任务的超时。

关闭通道

使用 close 函数关闭通道,如下所示:

package main

func main() {
	ch := make(chan int, 5)
	close(ch) // 关闭通道
}

需要注意的是,当关闭通道后,不能再向通道发送数据,但可以继续从通道中接收数据。

检查通道是否关闭

接收数据时会有两个返回值,一个是数据另一个是布尔值,用于判断通道是否关闭,如下所示:

package main

import (
	"fmt"
	"sync"
)

func main() {
	ch := make(chan int, 5)
	var wg sync.WaitGroup
	wg.Add(1)

	go func() {
		defer wg.Done()
		for {
			select {
			case data, ok := <-ch: // data 表示数据,ok 表示通道是否关闭
				if ok {
					// received: 11
					// received: 10
					// received: 15
					fmt.Println("received:", data)
				} else {
					fmt.Println("ch closed") // ch closed
					return
				}
			}
		}
	}()

	ch <- 11
	ch <- 10
	ch <- 15

	close(ch) // 关闭通道
	wg.Wait()
}

获取通道长度

使用 len 函数获取通道的当前长度,如下所示:

package main

import "fmt"

func main() {
	ch := make(chan int, 5)
	fmt.Println(len(ch)) // 0

	ch <- 10
	fmt.Println(len(ch)) // 1

	close(ch) // 关闭通道
}

获取通道容量

使用 cap 函数获取通道的当前容量,如下所示:

package main

import "fmt"

func main() {
	ch := make(chan int, 5)
	fmt.Println(cap(ch)) // 5

	ch1 := make(chan int)
	fmt.Println(cap(ch1)) // 0

	close(ch)  // 关闭通道 ch
	close(ch1) // 关闭通道 ch1
}

源码解析

针对通道的源代码进行解析,从以下几个方面:

  • 创建通道
  • 发送数据
  • 接收数据
  • 关闭通道

通道结构体

源码位置:src/runtime/chan.go

type hchan struct {
	qcount   uint           // total data in the queue
	dataqsiz uint           // size of the circular queue
	buf      unsafe.Pointer // points to an array of dataqsiz elements
	elemsize uint16
	closed   uint32
	timer    *timer // timer feeding this chan
	elemtype *_type // element type
	sendx    uint   // send index
	recvx    uint   // receive index
	recvq    waitq  // list of recv waiters
	sendq    waitq  // list of send waiters

	// lock protects all fields in hchan, as well as several
	// fields in sudogs blocked on this channel.
	//
	// Do not change another G's status while holding this lock
	// (in particular, do not ready a G), as this can deadlock
	// with stack shrinking.
	lock mutex
}
  • qcount:当前通道中存储的数据数量,对于无缓冲通道,qcount 的值通常为 0 或 1。
  • dataqsiz:对于缓冲通道,dataqsiz 表示缓冲区可以存储的最大元素数量。对于无缓冲通道,dataqsiz 为 0。
  • buf:指向缓冲区的指针。缓冲区是一个数组,用于存储通道中的数据。仅对缓冲通道有效。无缓冲通道的 bufnil
  • elemsize:通道中每个元素的大小(以字节为单位)。
  • closed:标记通道是否已关闭。关闭的通道不能再次发送数据,但可以继续接收数据直到缓冲区为空。
  • timer:指向一个定时器,该定时器与通道相关联(例如,用于超时操作)。
  • elemtype:指向通道中元素的类型信息,用于在运行时检查通道中存储的数据类型是否正确。
  • sendx:用于管理缓冲区的环形队列,记录下一次发送操作在缓冲区中的索引。
  • recvx:用于管理缓冲区的环形队列,记录下一次接收操作在缓冲区中的索引。
  • recvq:存储等待接收的协程队列,在发送操作中,如果缓冲区已满且没有等待接收的协程,则发送协程会被加入到 sendq
  • sendq:存储等待发送的协程队列,在接收操作中,如果缓冲区为空且没有等待发送的协程,则接收协程会被加入到 recvq
  • lock:保护 hchan 结构体中所有字段的互斥锁,确保通道操作的线程安全性。在发送、接收和关闭操作中,lock 用于防止多个协程同时修改通道的状态。

创建通道

golang 的运行时中,创建通道的代码会被编译为对 makechan 的调用。如下所示:

package main

func main() {
	ch := make(chan int, 1)
}

编译成汇编代码如下所示:

0x001a 00026        CALL    runtime.makechan(SB)

以上汇编代码只是部分截取,请注意甄别。

makechan 函数是运行时中用于创建通道的核心函数。它初始化了一个 hchan 结构体,并根据指定的类型和缓冲区大小分配内存。

源码位置:src/runtime/chan.go

函数原型
func makechan(t *chantype, size int) *hchan {
    // ...
}
  • t *chantype:通道的类型信息,包含通道中元素的类型。
  • size int:通道的缓冲区大小。如果为 0,则创建无缓冲通道;如果大于 0,则创建有缓冲通道。
  • 返回一个初始化后的 hchan 结构体指针。
函数内容
func makechan(t *chantype, size int) *hchan {
	elem := t.Elem	// 从通道类型 t 中获取通道中元素的类型信息

	// compiler checks this but be safe.
    // 检查通道中元素的大小是否超过 64KB,如果超过,抛出异常
	if elem.Size_ >= 1<<16 {
		throw("makechan: invalid channel element type")
	}
    
    // 确保 hchan 的大小是最大对齐单位的倍数,并且元素的对齐要求不超过最大对齐单位
    // maxAlign = 8
	if hchanSize%maxAlign != 0 || elem.Align_ > maxAlign {
		throw("makechan: bad alignment")
	}

    // 计算缓冲区的总大小:elem.Size_ * size
	mem, overflow := math.MulUintptr(elem.Size_, uintptr(size))
    // 检查是否发生溢出,或者缓冲区大小超过最大分配限制,或者缓冲区大小小于0,抛出异常
	if overflow || mem > maxAlloc-hchanSize || size < 0 {
		panic(plainError("makechan: size out of range"))
	}

	// Hchan does not contain pointers interesting for GC when elements stored in buf do not contain pointers.
	// buf points into the same allocation, elemtype is persistent.
	// SudoG's are referenced from their owning thread so they can't be collected.
	// TODO(dvyukov,rlh): Rethink when collector can move allocated objects.
    // 内存分配
	var c *hchan
	switch {
	case mem == 0:
        // 如果缓冲区大小为 0(无缓冲通道)或元素大小为 0,仅分配 hchan 的内存
		// Queue or element size is zero.
		c = (*hchan)(mallocgc(hchanSize, nil, true))
		// Race detector uses this location for synchronization.
        // 初始化通道的缓冲区的指针
        // buf 指向 hchan 结构体本身,表示没有独立的缓冲区
		c.buf = c.raceaddr()
	case !elem.Pointers():
        // 如果元素类型不包含指针(如基本类型或数组),将 hchan 和缓冲区分配在同一块内存中
		// Elements do not contain pointers.
		// Allocate hchan and buf in one call.
		c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
        // 初始化通道的缓冲区的指针
        // 将 c.buf 指向 hchan 结构体之后的内存区域,该区域用于存储缓冲区数据
		c.buf = add(unsafe.Pointer(c), hchanSize)
	default:
        // 如果元素类型包含指针(如结构体或切片),分别分配 hchan 和缓冲区的内存
        // 因为 gc 需要跟踪指针类型的内存分配
		// Elements contain pointers.
		c = new(hchan)
        // 初始化通道的缓冲区的指针
		c.buf = mallocgc(mem, elem, true)
	}
	
    // 设置通道的元素大小
	c.elemsize = uint16(elem.Size_)
    // 设置通道的元素类型
	c.elemtype = elem
    // 设置通道的缓冲区大小
	c.dataqsiz = uint(size)
    // 初始化通道的互斥锁
	lockInit(&c.lock, lockRankHchan)
	
    // 如果启用了通道调试模式,打印调试信息
	if debugChan {
		print("makechan: chan=", c, "; elemsize=", elem.Size_, "; dataqsiz=", size, "\n")
	}
    
    // 返回通道指针
	return c
}

具体分配内存 mallocgc 函数原型如下所示:

func mallocgc(size uintptr, typ *_type, needzero bool) unsafe.Pointer {
    // ...
}
  • size:分配的内存大小。
  • typ:分配内存的类型信息。
  • needzero:是否需要将分配的内存清零。

更多关于 mallocgc 函数的内容本文章不再赘述,感兴趣的同学请关注后续文章《Golang 内存模型》。

发送数据

golang 的运行时中,发送操作的代码会被编译为对 chansend1 的调用。如下所示:

package main

func main() {
	ch := make(chan int, 1)
	ch <- 1
}

编译成汇编代码如下所示:

0x001a 00026        CALL    runtime.makechan(SB)
# ...
0x0026 00038        CALL    runtime.chansend1(SB)

以上汇编代码只是部分截取,请注意甄别。

而在 chansend1 的函数内部,又是对 chansend 的调用,如下所示:

源码位置:src/runtime/chan.go

// entry point for c <- x from compiled code.
//
//go:nosplit
func chansend1(c *hchan, elem unsafe.Pointer) {
	chansend(c, elem, true, getcallerpc())
}
函数原型
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
    // ...
}
  • c *hchan:指向通道的指针。
  • ep unsafe.Pointer:指向数据的指针。
  • block bool:是否阻塞发送。
    • 如果 block == true,发送操作会在通道缓冲区满或没有接收方时阻塞,直到可以发送数据。
    • 如果 block == false,发送操作是非阻塞的。如果当前无法发送数据(缓冲区满或没有接收方),函数会立即返回 false
  • callerpc uintptr:程序计数器(PC)的值,主要用于调试和竞态检测。
  • 发送操作是否成功。
函数内容
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
    // 检查通道指针 c 是否为 nil
	if c == nil {
		if !block {
            // 如果通道为 nil 且是非阻塞发送,直接返回 false
			return false
		}
        // 如果是阻塞发送,则协程会阻塞并抛出异常
		gopark(nil, nil, waitReasonChanSendNilChan, traceBlockForever, 2)
		throw("unreachable")
	}

    // 如果启用了通道调试模式,打印调试信息
	if debugChan {
		print("chansend: chan=", c, "\n")
	}
	
    // 如果启用了竞态检测器,记录对通道的读操作
	if raceenabled {
		racereadpc(c.raceaddr(), callerpc, abi.FuncPCABIInternal(chansend))
	}

	// Fast path: check for failed non-blocking operation without acquiring the lock.
	//
	// After observing that the channel is not closed, we observe that the channel is
	// not ready for sending. Each of these observations is a single word-sized read
	// (first c.closed and second full()).
	// Because a closed channel cannot transition from 'ready for sending' to
	// 'not ready for sending', even if the channel is closed between the two observations,
	// they imply a moment between the two when the channel was both not yet closed
	// and not ready for sending. We behave as if we observed the channel at that moment,
	// and report that the send cannot proceed.
	//
	// It is okay if the reads are reordered here: if we observe that the channel is not
	// ready for sending and then observe that it is not closed, that implies that the
	// channel wasn't closed during the first observation. However, nothing here
	// guarantees forward progress. We rely on the side effects of lock release in
	// chanrecv() and closechan() to update this thread's view of c.closed and full().
    
    // 如果是非阻塞发送并且通道未关闭但缓冲区已满,直接返回 false
    // 前置判断,避免进入锁的开销
	if !block && c.closed == 0 && full(c) {
		return false
	}
	
    // 如果启用了阻塞剖析,记录当前时间戳
	var t0 int64
	if blockprofilerate > 0 {
		t0 = cputicks()
	}
	
    // 锁定通道的互斥锁,确保操作的原子性
	lock(&c.lock)
	
    // 如果通道已关闭,解锁并抛出异常
    // 阅读到此处时,考虑是否改为使用原子操作,代替 lock/unlock?
    // 可?不可?
    // 记录吧!!!
	if c.closed != 0 {
		unlock(&c.lock)
		panic(plainError("send on closed channel"))
	}
	
    // 如果有等待接收的协程,直接将数据发送给接收协程,跳过缓冲区
	if sg := c.recvq.dequeue(); sg != nil {
		// Found a waiting receiver. We pass the value we want to send
		// directly to the receiver, bypassing the channel buffer (if any).
        // 调用 send 函数完成数据传递,并解锁
		send(c, sg, ep, func() { unlock(&c.lock) }, 3)
		return true
	}

    // 如果没有等待接收的协程,则判断缓冲区是否有空间
	if c.qcount < c.dataqsiz {
		// Space is available in the channel buffer. Enqueue the element to send.
        // 计算缓冲区中下一个写入位置的指针
		qp := chanbuf(c, c.sendx)
		if raceenabled {
            // 通知竞态检测器当前操作的上下文信息,对缓冲区的写入操作被正确跟踪
			racenotify(c, c.sendx, nil)
		}
        
        // 将要发送的数据从 ep 复制到缓冲区的 qp 位置
		typedmemmove(c.elemtype, qp, ep)
        // 更新发送索引为下一个写入位置
		c.sendx++
        // 如果 c.sendx 达到缓冲区大小,则将其重置为 0,实现环形缓冲区的效果
		if c.sendx == c.dataqsiz {
			c.sendx = 0
		}
        // 每次成功写入数据后,c.qcount 增加 1
		c.qcount++
        // 解锁通道的互斥锁
		unlock(&c.lock)
        // 写入成功,返回 true
		return true
	}
	
    // 如果缓冲区已满且是非阻塞发送,解锁并返回 false
	if !block {
		unlock(&c.lock)
		return false
	}
    
    // 下面是阻塞发送的代码逻辑

	// Block on the channel. Some receiver will complete our operation for us.
    // 获取当前正在运行的协程的指针
	gp := getg()
    // 分配一个 sudog 对象,sudog 是 golang 运行时中用于表示协程在通道操作中等待的结构体
	mysg := acquireSudog()
	mysg.releasetime = 0
	if t0 != 0 {
		mysg.releasetime = -1
	}
	// No stack splits between assigning elem and enqueuing mysg
	// on gp.waiting where copystack can find it.
    // 下面这些是初始化 sudog 对象属性
	mysg.elem = ep	// 指向要发送的数据指针
	mysg.waitlink = nil
	mysg.g = gp	// 指向当前协程
	mysg.isSelect = false	// 标记是否是 select 操作
	mysg.c = c	// 指向当前通道
	gp.waiting = mysg	// 将当前协程的 sudog 对象设置为等待状态
	gp.param = nil
    
    // 将 sudog 对象加入通道的发送等待队列
	c.sendq.enqueue(mysg)
	// Signal to anyone trying to shrink our stack that we're about
	// to park on a channel. The window between when this G's status
	// changes and when we set gp.activeStackChans is not safe for
	// stack shrinking.
    // 标记当前协程即将阻塞在通道操作
	gp.parkingOnChan.Store(true)
    // 阻塞当前协程,直到被接收操作唤醒
	gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceBlockChanSend, 2)
	// Ensure the value being sent is kept alive until the
	// receiver copies it out. The sudog has a pointer to the
	// stack object, but sudogs aren't considered as roots of the
	// stack tracer.
    // 确保发送的数据在接收协程拷贝之前不会被回收,因为 ep 指向的数据有可能是栈上的数据
    // 而栈上的数据可能在协程阻塞后被回收
	KeepAlive(ep)
    
    // 下面是协程被唤醒后的处理代码

	// someone woke us up.
	if mysg != gp.waiting {
		throw("G waiting list is corrupted")
	}
    // 清空协程的等待状态
	gp.waiting = nil	
    // 标记协程不再阻塞在通道操作上
	gp.activeStackChans = false
    // 发送操作是否成功,mysg.success 表示 true 成功,false 失败
	closed := !mysg.success
	gp.param = nil
	if mysg.releasetime > 0 {
        // 记录协程阻塞的时间
		blockevent(mysg.releasetime-t0, 2)
	}
    // 清空 sudog 对象的通道指针
	mysg.c = nil
	releaseSudog(mysg)
	if closed {
        // 如果通道未关闭但协程被唤醒,抛出异常
		if c.closed == 0 {
			throw("chansend: spurious wakeup")
		}
        // 通道在发送操作完成前被关闭,报错 !!!
        // 这也就是为什么说向一个已经关闭通道写数据会报错,原因就在这里
		panic(plainError("send on closed channel"))
	}
    // 发送操作成功完成
	return true
}

send 函数的作用是将数据直接发送给等待接收的协程,并唤醒等待接收的协程。

func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
    // 竞态检测
	if raceenabled {
        // 无缓冲通道
		if c.dataqsiz == 0 {
            // 通知竞态检测器当前操作的上下文
			racesync(c, sg)
		} else {
            // 有缓冲通道
			// Pretend we go through the buffer, even though
			// we copy directly. Note that we need to increment
			// the head/tail locations only when raceenabled.
            // 即使数据是直接发送的,竞态检测器也会模拟数据通过缓冲区的流程
			// 调用 racenotify,记录接收索引(c.recvx)的变化。
			racenotify(c, c.recvx, nil)
			racenotify(c, c.recvx, sg)
            // 更新接收索引 c.recvx 和发送索引 c.sendx,模拟环形缓冲区的行为
			c.recvx++
			if c.recvx == c.dataqsiz {
				c.recvx = 0
			}
			c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz
		}
	}
    
    // 如果 sg.elem 不为 nil,调用 sendDirect 将数据从发送方 ep 直接发送到接收方 sg.elem
	if sg.elem != nil {
		sendDirect(c.elemtype, sg, ep)
        // 清空 sg.elem,表示数据已发送
		sg.elem = nil
	}
    // 获取等待接收的协程
	gp := sg.g
    // 调用解锁函数,释放通道的锁
	unlockf()
    // 将 sudog 对象传递给接收协程,用于后续处理
	gp.param = unsafe.Pointer(sg)
    // 标记发送操作成功
	sg.success = true
	if sg.releasetime != 0 {
		sg.releasetime = cputicks()
	}
    // 将 gp 协程标记为可运行状态,并将其加入调度队列中等待执行
	goready(gp, skip+1)
}

sendDirect 函数的作用是将数据直接从发送方的内存位置发送到等待接收的协程 sudog 对象 。

func sendDirect(t *_type, sg *sudog, src unsafe.Pointer) {
	// src is on our stack, dst is a slot on another stack.

	// Once we read sg.elem out of sg, it will no longer
	// be updated if the destination's stack gets copied (shrunk).
	// So make sure that no preemption points can happen between read & use.
    // 获取接收方的内存位置,以便将数据直接发送到该位置
	dst := sg.elem
    // 触发写屏障
    // 确保垃圾回收器能够正确追踪目标内存区域中的指针
	typeBitsBulkBarrier(t, uintptr(dst), uintptr(src), t.Size_)
	// No need for cgo write barrier checks because dst is always
	// Go memory.
    // 这是一个底层函数,用于在内存中安全地移动数据
    // dst 目标地址,src 源地址
	memmove(dst, src, t.Size_)
}

关于更多内存管理的知识点,请关注后续文章《Golang 内存模型》。

接收数据

golang 的运行时中,接收数据操作的代码会被编译为对 chanrecv1 的调用。如下所示:

package main

import "fmt"

func main() {
	ch := make(chan int, 1)
	ch <- 1

	data := <-ch
	fmt.Println(data)
}

编译成汇编代码如下所示:

0x001a 00026        CALL    runtime.makechan(SB)
# ...
0x0043 00067        CALL    runtime.chanrecv1(SB)

以上汇编代码只是部分截取,请注意甄别。

而在 chanrecv1 的函数内部,又是对 chanrecv 的调用,如下所示:

源码位置:src/runtime/chan.go

// entry points for <- c from compiled code.
//
//go:nosplit
func chanrecv1(c *hchan, elem unsafe.Pointer) {
	chanrecv(c, elem, true)
}
函数原型
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
    // ...
}
  • c *hchan:指向目标通道的指针。
  • ep unsafe.Pointer:指向接收数据的内存位置。如果为 nil,表示仅检查通道状态而不接收数据。
  • block bool:是否允许阻塞接收。如果为 false,则在无法立即接收数据时返回。
  • selected bool:表示是否成功接收数据。在 select 语句中,用于标记是否选择了当前通道。
  • received bool:表示是否实际接收到数据。如果通道关闭且缓冲区为空,receivedfalse
函数内容
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
	// raceenabled: don't need to check ep, as it is always on the stack
	// or is new memory allocated by reflect.
	// 如果启用了通道调试模式,打印调试信息
	if debugChan {
		print("chanrecv: chan=", c, "\n")
	}
	// 如果通道为 nil
	if c == nil {
		if !block {
            // 非阻塞接收,直接返回
			return
		}
        // 如果是阻塞接收,协程会阻塞并抛出异常
		gopark(nil, nil, waitReasonChanReceiveNilChan, traceBlockForever, 2)
		throw("unreachable")
	}
	
    // 如果通道关联了一个定时器,则调用 maybeRunChan 来处理定时器事件
	if c.timer != nil {
		c.timer.maybeRunChan()
	}
	
    // 如果是非阻塞接收且通道为空
	// Fast path: check for failed non-blocking operation without acquiring the lock.
	if !block && empty(c) {
		// After observing that the channel is not ready for receiving, we observe whether the
		// channel is closed.
		//
		// Reordering of these checks could lead to incorrect behavior when racing with a close.
		// For example, if the channel was open and not empty, was closed, and then drained,
		// reordered reads could incorrectly indicate "open and empty". To prevent reordering,
		// we use atomic loads for both checks, and rely on emptying and closing to happen in
		// separate critical sections under the same lock.  This assumption fails when closing
		// an unbuffered channel with a blocked send, but that is an error condition anyway.
        // 检查通道是否关闭
		if atomic.Load(&c.closed) == 0 {
			// Because a channel cannot be reopened, the later observation of the channel
			// being not closed implies that it was also not closed at the moment of the
			// first observation. We behave as if we observed the channel at that moment
			// and report that the receive cannot proceed.
            // 如果通道未关闭,直接返回
			return
		}
		// The channel is irreversibly closed. Re-check whether the channel has any pending data
		// to receive, which could have arrived between the empty and closed checks above.
		// Sequential consistency is also required here, when racing with such a send.
        // 如果通道已关闭并且为非阻塞并且缓冲区为空
		if empty(c) {
			// The channel is irreversibly closed and empty.
			if raceenabled {
				raceacquire(c.raceaddr())
			}
            // ep 不是空,则清空目标内存 ep
			if ep != nil {
				typedmemclr(c.elemtype, ep)
			}
            
            // 返回成功接收数据(true),未实际接收到数据(false)
			return true, false
		}
	}

	var t0 int64
	if blockprofilerate > 0 {
		t0 = cputicks()
	}
	
    // 锁定通道的互斥锁,确保操作的原子性
	lock(&c.lock)

    // 如果通道已关闭
	if c.closed != 0 {
        // 缓冲区为空
		if c.qcount == 0 {
			if raceenabled {
				raceacquire(c.raceaddr())
			}
            // 这里是通道已经关闭了,而且缓冲区已经为空
            // 考虑是否可以采用原子操作来代替这里的 lock/unlock 操作的操作?
            // ...
            // 和通道关闭时,发送者的检测同理
           	// 虽然理论上可以通过原子操作来避免加锁,但在实际实现中,锁的使用是为了确保线程安全和一致性,另外,虽然原子操作避免了锁的开销,但它们仍然有一定的性能开销
            // 即使使用原子操作,也需要确保在检查 c.closed 和 c.qcount 时不会出现竞态条件。例如,如果一个协程在检查 c.closed 后修改了 c.qcount,可能会导致不一致的行为
            // 记录吧!!!
         	// 解锁
			unlock(&c.lock)
            // 清空目标内存 ep
			if ep != nil {
				typedmemclr(c.elemtype, ep)
			}
            // 返回成功接收数据(true),未实际接收到数据(false)
			return true, false
		}
		// The channel has been closed, but the channel's buffer have data.
	} else {
        // 通道未关闭,并且有等待发送的协程
		// Just found waiting sender with not closed.
		if sg := c.sendq.dequeue(); sg != nil {
			// Found a waiting sender. If buffer is size 0, receive value
			// directly from sender. Otherwise, receive from head of queue
			// and add sender's value to the tail of the queue (both map to
			// the same buffer slot because the queue is full).
            // 调用 recv 函数直接从发送协程接收数据,跳过缓冲区
            // 解锁通道
			recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
            // 返回成功接收数据(true),实际接收到数据(true)
			return true, true
		}
	}
	
    // 如果缓冲区中有数据
	if c.qcount > 0 {
		// Receive directly from queue
        // 得到缓冲区地址
		qp := chanbuf(c, c.recvx)
		if raceenabled {
			racenotify(c, c.recvx, nil)
		}
        // 从缓冲区中读取数据到目标内存
		if ep != nil {
			typedmemmove(c.elemtype, ep, qp)
		}
        // 清空缓冲区中的数据
		typedmemclr(c.elemtype, qp)
        // 更新接收索引 c.recvx
		c.recvx++
        // 如果接收索引 == 缓冲区大小,则从 0 重新开始
		if c.recvx == c.dataqsiz {
			c.recvx = 0
		}
        // 更新缓冲区计数 c.qcount
		c.qcount--
        // 解锁
		unlock(&c.lock)
        // 返回成功接收数据(true),实际接收到数据(true)
		return true, true
	}

    // 如果是非阻塞接收且缓冲区为空,解锁通道并返回未成功接收数据(false),未实际接收到数据(false)
	if !block {
		unlock(&c.lock)
		return false, false
	}
    
    // 下面是进行阻塞接收数据代码逻辑

	// no sender available: block on this channel.
    // 获取当前协程
	gp := getg()
    // 分配一个 sudog 对象
	mysg := acquireSudog()
    // 下面是针对 mysg 对象属性的初始化
	mysg.releasetime = 0
	if t0 != 0 {
		mysg.releasetime = -1
	}
	// No stack splits between assigning elem and enqueuing mysg
	// on gp.waiting where copystack can find it.
	mysg.elem = ep
	mysg.waitlink = nil
	gp.waiting = mysg

	mysg.g = gp
	mysg.isSelect = false
	mysg.c = c
	gp.param = nil
    // 将协程加入接收等待队列
	c.recvq.enqueue(mysg)
    // 如果通道关联了定时器,调用 blockTimerChan
	if c.timer != nil {
		blockTimerChan(c)
	}

	// Signal to anyone trying to shrink our stack that we're about
	// to park on a channel. The window between when this G's status
	// changes and when we set gp.activeStackChans is not safe for
	// stack shrinking.
    // 标记协程即将阻塞在通道操作上
	gp.parkingOnChan.Store(true)
    // 调用 gopark 阻塞当前协程,直到被发送操作唤醒
	gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceBlockChanRecv, 2)

    // 下面是协程被唤醒之后的操作流程
    
	// someone woke us up
    // 检查协程是否被正确唤醒
	if mysg != gp.waiting {
		throw("G waiting list is corrupted")
	}
    // 如果通道关联了定时器,调用 unblockTimerChan
	if c.timer != nil {
		unblockTimerChan(c)
	}
    // 清理协程状态
	gp.waiting = nil
	gp.activeStackChans = false
	if mysg.releasetime > 0 {
		blockevent(mysg.releasetime-t0, 2)
	}
    // 获取接收操作的结果
	success := mysg.success
	gp.param = nil
    // 释放 sudog 对象
	mysg.c = nil
	releaseSudog(mysg)
    // 返回接收操作结果
	return true, success
}

recv 函数用于从通道中读取数据,并将其传递给等待接收的协程。

func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
    // 如果通道无缓冲区
	if c.dataqsiz == 0 {
		if raceenabled {
			racesync(c, sg)
		}
        // 并且目标内存位置不是 nil, 则直接拷贝到目标位置
		if ep != nil {
			// copy data from sender
			recvDirect(c.elemtype, sg, ep)
		}
	} else {
        // 通道有缓冲区
		// Queue is full. Take the item at the
		// head of the queue. Make the sender enqueue
		// its item at the tail of the queue. Since the
		// queue is full, those are both the same slot.
        // 计算缓冲区中接收位置的指针
		qp := chanbuf(c, c.recvx)
		if raceenabled {
			racenotify(c, c.recvx, nil)
			racenotify(c, c.recvx, sg)
		}
		// copy data from queue to receiver
		if ep != nil {
            // 如果目标内存位置不是 nil, 将数据从缓冲区复制到接收方的内存位置
			typedmemmove(c.elemtype, ep, qp)
		}
		// copy data from sender to queue
        // 将发送方的数据复制到缓冲区的尾部
		typedmemmove(c.elemtype, qp, sg.elem)
        // 增加接收索引并处理环形缓冲区的边界条件
		c.recvx++
		if c.recvx == c.dataqsiz {
			c.recvx = 0
		}
        // 更新发送索引,使其指向下一个可用位置
		c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz
	}
    
    // 清空 sudog 对象中的数据指针
	sg.elem = nil
    // 获取发送协程的指针
	gp := sg.g
    // 调用解锁函数,释放通道的锁
	unlockf()
	gp.param = unsafe.Pointer(sg)
    // 标记接收操作成功
	sg.success = true
	if sg.releasetime != 0 {
		sg.releasetime = cputicks()
	}
    // 将 gp 协程标记为可运行状态,并将其加入调度队列中等待执行
	goready(gp, skip+1)
}

recvDirect 函数的作用是从发送方直接接收数据,并将其复制到接收方的目标内存位置。

func recvDirect(t *_type, sg *sudog, dst unsafe.Pointer) {
	// dst is on our stack or the heap, src is on another stack.
	// The channel is locked, so src will not move during this
	// operation.
    // 指向发送方的内存位置
	src := sg.elem
    // 写屏障
    // 确保目标内存位置中的指针被垃圾回收器正确追踪
	typeBitsBulkBarrier(t, uintptr(dst), uintptr(src), t.Size_)
    // 将数据从发送方的内存位置复制到接收方的内存位置
	memmove(dst, src, t.Size_)
}

关于更多内存管理的知识点,请关注后续文章《Golang 内存模型》。

关闭通道

golang 的运行时中,关闭通道操作的代码会被编译为对 closechan 的调用。如下所示:

package main

import "fmt"

func main() {
	ch := make(chan int, 1)
	close(ch)
}

编译成汇编代码如下所示:

0x001a 00026        CALL    runtime.makechan(SB)
# ...
0x0020 00032        CALL    runtime.closechan(SB)

以上汇编代码只是部分截取,请注意甄别。

函数原型

源码位置:src/runtime/chan.go

func closechan(c *hchan) {
    // ...
}
  • c *hchan:指向要关闭的通道的指针。
函数内容
func closechan(c *hchan) {
    // 如果通道为 nil,直接抛出异常
	if c == nil {
		panic(plainError("close of nil channel"))
	}
	
    // 锁定通道的互斥锁,确保关闭操作的原子性
	lock(&c.lock)
	if c.closed != 0 {
        // 如果通道已经关闭,解锁,抛出异常
		unlock(&c.lock)
		panic(plainError("close of closed channel"))
	}

	if raceenabled {
		callerpc := getcallerpc()
		racewritepc(c.raceaddr(), callerpc, abi.FuncPCABIInternal(closechan))
		racerelease(c.raceaddr())
	}

    // 将通道的 closed 标志设置为 1,表示通道已关闭
	c.closed = 1
	
    // 需要被唤醒的协程集合
	var glist gList

	// release all readers
	for {
        // 遍历通道的接收等待队列,释放所有等待接收的协程
		sg := c.recvq.dequeue()
		if sg == nil {
			break
		}
        // 如果 sg.elem 不为 nil
		if sg.elem != nil {
            // 清空接收方的内存位置
			typedmemclr(c.elemtype, sg.elem)
			sg.elem = nil
		}
		if sg.releasetime != 0 {
			sg.releasetime = cputicks()
		}
        
        // 获取到接收者协程
		gp := sg.g
		gp.param = unsafe.Pointer(sg)
        // 标记获取操作失败
		sg.success = false
		if raceenabled {
			raceacquireg(gp, c.raceaddr())
		}
        // 将协程加入到 glist 中,稍后唤醒
		glist.push(gp)
	}

	// release all writers (they will panic)
	for {
        // 遍历通道的发送等待队列,释放所有等待发送的协程
		sg := c.sendq.dequeue()
		if sg == nil {
			break
		}
        // 清空 sg.elem,避免内存泄漏
		sg.elem = nil
		if sg.releasetime != 0 {
			sg.releasetime = cputicks()
		}
        
        // 获取到发送者协程
		gp := sg.g
		gp.param = unsafe.Pointer(sg)
        // 标记发送操作失败
		sg.success = false
		if raceenabled {
			raceacquireg(gp, c.raceaddr())
		}
        // 将协程加入到 glist 中,稍后唤醒
		glist.push(gp)
	}
    
    // 解锁通道的互斥锁
	unlock(&c.lock)

	// Ready all Gs now that we've dropped the channel lock.
    // 遍历 glist,唤醒所有等待的协程
	for !glist.empty() {
		gp := glist.pop()
		gp.schedlink = 0
        // 调用 goready 将协程标记为可运行状态
		goready(gp, 3)
	}
}

🌺🌺🌺撒花!

如果本文对你有帮助,就点关注或者留个👍
如果您有任何技术问题或者需要更多其他的内容,请随时向我提问。
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/983424.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

聊天服务器分布式改造

目前的聊天室是单节点的&#xff0c;无论是http接口还是socket接口都在同一个进程&#xff0c;无法承受太多人同时在线&#xff0c;容灾性也非常差。因此&#xff0c;一个成熟的IM产品一定是做成分布式的&#xff0c;根据功能分模块&#xff0c;每个模块也使用多个节点并行部署…

DeepSeek 医疗大模型微调实战讨论版(第一部分)

DeepSeek医疗大模型微调实战指南第一部分 DeepSeek 作为一款具有独特优势的大模型,在医疗领域展现出了巨大的应用潜力。它采用了先进的混合专家架构(MoE),能够根据输入数据的特性选择性激活部分专家,避免了不必要的计算,极大地提高了计算效率和模型精度 。这种架构使得 …

深入解析 BitBake 日志机制:任务调度、日志记录与调试方法

1. 引言&#xff1a;为什么 BitBake 的日志机制至关重要&#xff1f; BitBake 是 Yocto 项目的核心构建工具&#xff0c;用于解析配方、管理任务依赖&#xff0c;并执行编译和打包任务。在 BitBake 构建过程中&#xff0c;日志记录机制不仅用于跟踪任务执行情况&#xff0c;还…

OpenCV计算摄影学(16)调整图像光照效果函数illuminationChange()

操作系统&#xff1a;ubuntu22.04 OpenCV版本&#xff1a;OpenCV4.9 IDE:Visual Studio Code 编程语言&#xff1a;C11 算法描述 对选定区域内的梯度场应用适当的非线性变换&#xff0c;然后通过泊松求解器重新积分&#xff0c;可以局部修改图像的表观照明。 cv::illuminati…

【DuodooTEKr 】多度科技 以开源之力,驱动企业数字化转型

多度科技 背景 / Background 在全球产业链重构与国内经济双循环的浪潮下&#xff0c;中国制造业与贸易企业正面临数字化升级的迫切需求。开源技术作为数字化转型的基石&#xff0c;不仅能打破技术壁垒、降低企业成本&#xff0c;更能通过协作创新加速产业智能化进程。 多度科技…

VBA经典应用69例应用7:从字符串中删除数字

《VBA经典应用69例》&#xff08;版权10178981&#xff09;&#xff0c;是我推出的第九套教程&#xff0c;教程是专门针对初级、中级学员在学习VBA过程中可能遇到的案例展开&#xff0c;这套教程案例众多&#xff0c;紧贴“实战”&#xff0c;并做“战术总结”&#xff0c;以便…

Vue 系列之:插槽

前言 插槽是定义在子组件中的&#xff0c;相当于一个占位符&#xff0c;父组件可以在这个占位符中填充HTML代码、组件等内容。 插槽显不显示、怎样显示是由父组件来控制的&#xff0c;而插槽在哪里显示就由子组件来进行控制。 基本使用 子组件&#xff1a; <template&g…

一周热点-OpenAI 推出了 GPT-4.5,这可能是其最后一个非推理模型

在人工智能领域,大型语言模型一直是研究的热点。OpenAI 的 GPT 系列模型在自然语言处理方面取得了显著成就。GPT-4.5 是 OpenAI 在这一领域的又一力作,它在多个方面进行了升级和优化。 1 新模型的出现 GPT-4.5 目前作为研究预览版发布。与 OpenAI 最近的 o1 和 o3 模型不同,…

IP,MAC,ARP 笔记

1.什么是IP地址 IP 地址是一串由句点分隔的数字。IP 地址表示为一组四个数字&#xff0c;比如 192.158.1.38 就是一个例子。该组合中的每个数字都可以在 0 到 255 的范围内。因此&#xff0c;完整的 IP 寻址范围从 0.0.0.0 到 255.255.255.255。 IP 地址不是随机的。它们由互…

【A2DP】MPEG - 2/4 AAC 编解码器互操作性要求详解

目录 一、概述 二、编解码器特定信息元素(Codec Specific Information Elements ) 2.1 信息元素结构 2.2 对象类型(Object Type) 2.3 MPEG - D DRC 2.4 采样频率(Sampling Frequency) 2.5 通道(Channels) 2.6 比特率(Bit rate) 2.7 可变比特率(VBR) 三、…

网络安全规划重安全性需求

1.网络安全基本内容 安全包括哪些方面 操作系统内部的安全包括&#xff1a;数据存储安全、应用程序安全、操作系统安全。 此外还有网络安全、物理安全、用户安全教育。 网络安全&#xff1a; 网络安全是指网络系统的硬件、软件及其系统中的数 据受到保护&#xff0c;不因偶然…

发展史 | 深度学习 / 云计算

注&#xff1a;本文为来自 csdn 不错的“深度学习 / 云计算发展史 ” 相关文章合辑。 对原文&#xff0c;略作重排。 深度学习发展史&#xff08;1943-2024 编年体&#xff09;&#xff08;The History of Deep Learning&#xff09; Hefin_H 已于 2024-05-23 15:54:45 修改 …

Qt开发:nativeEvent事件的使用

文章目录 一、概述二、nativeEvent 的定义三、Windows 平台示例三、使用nativeEvent监测设备变化 一、概述 Qt 的 nativeEvent 是一个特殊的事件处理机制&#xff0c;允许开发者处理操作系统级别的原生事件。通常&#xff0c;Qt 通过 QEvent 机制来管理事件&#xff0c;但有时…

宠物医院台账怎么做,兽医电子处方单模板打印样式,佳易王兽医兽药开方宠物病历填写打印操作教程

一、概述 本实例以佳易王兽医宠物电子处方开单系统版本为例说明&#xff0c;其他版本可参考本实例。试用版软件资源可到文章最后了解&#xff0c;下载的文件为压缩包文件&#xff0c;请使用免费版的解压工具解压即可试用。 软件特点&#xff1a; 多场景处方兼容性针对宠物医…

RuleOS:区块链开发的“新引擎”,点燃Web3创新之火

RuleOS&#xff1a;区块链开发的“新引擎”&#xff0c;点燃Web3创新之火 在区块链技术的浪潮中&#xff0c;RuleOS宛如一台强劲的“新引擎”&#xff0c;为个人和企业开发去中心化应用&#xff08;DApp&#xff09;注入了前所未有的动力。它以独特的设计理念和强大的功能特性&…

Leetcode 刷题记录 04 —— 子串

本系列为笔者的 Leetcode 刷题记录&#xff0c;顺序为 Hot 100 题官方顺序&#xff0c;根据标签命名&#xff0c;记录笔者总结的做题思路&#xff0c;附部分代码解释和疑问解答。 目录 01 和为 K 的子数组 方法一&#xff1a;枚举 方法二&#xff1a;前缀和 哈希表优化 0…

3D数字化:家居行业转型升级的关键驱动力

在科技日新月异的今天&#xff0c;家居行业正经历着一场前所未有的变革。从传统的线下实体店铺到线上电商平台的兴起&#xff0c;再到如今3D数字化营销的广泛应用&#xff0c;消费者的购物体验正在发生翻天覆地的变化。3D数字化营销不仅让购物变得更加智能和便捷&#xff0c;还…

开启Ollama的GPU加速

Ollama 开启 GPU 加速可显著提升大语言模型运行效率与性能&#xff0c;通过利用 NVIDIA CUDA 等 GPU 并行计算能力优化矩阵运算&#xff0c;推理速度可实现数倍至数十倍的提升&#xff0c;有效降低用户交互延迟。 1.验证WSL内是否已正确启用CUDA支持&#xff0c;需在两个不同环…

Linux 指定命令行前后添加echo打印内容

目录 一. 前提条件二. 通过sh脚本进行批量修改三. 通过Excel和文本编辑器进行批量转换四. 实际执行效果 一. 前提条件 ⏹项目中有批量检索文件的需求&#xff0c;如下所示需要同时执行500多个find命令 find ./work -type f -name *.java find ./work -type f -name *.html fi…

力扣HOT100之哈希:128. 最长连续序列

这道题我想的比较简单&#xff0c;先遍历一遍输入的数组&#xff0c;然后将读取到的数字存入一个map容器中&#xff08;注意&#xff0c;不是unordered_map&#xff09;&#xff0c;数字作为键&#xff0c;布尔变量为值&#xff0c;然后再遍历一遍map&#xff0c;用一个变量tem…