goroutinue和channel
- 需求
- 传统方式实现
- goroutinue
- 进程和线程说明
- 并发和并行
- go协程和go主线程
- MPG
- 设置Go运行的cpu数
- channel(管道)-看个需求
- 使用互斥锁、写锁
- channel 实现
- 使用select可以解决从管道取数据的阻塞问题(无需手动关闭channel了)
- goroutinue中使用了recover,解决协程中出现panic,导致程序崩溃问题
- 素数问题
需求
要求:统计 1-20000的数字中,哪些是素数?
分析思路:
(1)传统的方法中,就是使用一个循环,循环的判断各个数是不是素数
(2)使用并发或并行的方式,将统计素数的任务分配给多个goroutine去完成,这时就会使用到goroutinue
传统方式实现
func main() {
now := time.Now()
printPrime(1000000)
seconds := time.Since(now).Seconds()
fmt.Println("time : ", seconds)
}
// 打印素数
// 素数定义:仅能被1和它本身整除的大于1的自然数
func printPrime(n int) {
for i := 2; i <= n; i++ {
isPrime(i)
}
}
// 判断是否是素数
func isPrime(n int) bool {
if n <= 1 {
return false
}
sqrt := int(math.Sqrt(float64(n))) // 更高效
for i := 2; i <= sqrt; i++ {
if n%i == 0 {
return false
}
}
fmt.Println(n)
return true
}
单核 大约需要0.9s左右
goroutinue
进程和线程说明
- 进程就是程序在操作系统中的一次执行过程,是系统进行资源分配和调度的基本单元
- 线程是进城的一个执行实例,是程序执行的最小单元,他是比进程更小的能独立运行的基本单位
- 一个进程可以创建核销多个线程,同一个进程中的多个线程可以并发执行
- 一个程序至少有一个进程,一个进程至少有一个线程
并发和并行
- 多个线程在单核上执行,就是并发
- 多个线程在多核上运行,就是并行
go协程和go主线程
- Go主线程:一个狗线程上可以起多个协程,协程是轻量级的线程
- Go协程的特点
- 有独立的栈空间
- 共享程序堆空间
- 调度由用户控制
- 协程是轻量级的线程
MPG
- 主线程是一个物理线程,直接作用在cpu上。是重量级的,非常耗费cpu资源
- 协程是从主线程开启的,是轻量级别的线程,是逻辑太,对资源消耗相对小
- Golang 的协程机制是重要的特点,可以轻松的开启上万个协程。其他编程语言的并发机制一般基于线程的,开启过多的线程,资源耗费大,这里就凸显Golang在并发的优势了
MPG模式基本介绍
M: 操作系统主线程 是物理线程
P: 协程执行需要的上下文
G: 协程
- 当前程序由三个M,如果三个M都在一个cpu运行,就是并发,如果在不同的cpu运行,就是并行
- M1、M2、M3 正在执行一个G,M1协程队列有三个,M2协程队列有三个,M3线程队列有两个
- go协程是 轻量级的线程,是逻辑态的,Go可以容易的启动上万个协程
- 其他程序c/Java的多线程,往往是内核态的,比较重量级,几千个线程可能耗光cpu
1. 分成两个部分来看
2. 原来的情况是M0主线程正在执行Go协程,另外有三个协程在队列等待
3. 如果Go协程阻塞,比如读文件或数据库等
4. 这是就会创建M1主线程(也可能从已有线程中取出M1),并且将等待的3个协程挂到M1下开始执行,M0的主线程下的GO仍然执行文件IO的读写
5.这样MPG调度模拟,可以既让Go执行,同时也不会让队列的其他协程一直阻塞,仍然可以并发或并行执行
6. 等到Go不阻塞了,M0会被放到空闲的主线程继续执行(从已有线程中取),同时GO又会被唤醒
设置Go运行的cpu数
go1.8后,默认让程序运行在多个核上,可以不用设置了
fmt.Println("CPU", runtime.NumCPU())
runtime.GOMAXPROCS(19)
channel(管道)-看个需求
现在要计算1-200的各个数的阶乘,并且把各个数的阶乘写入到map中,最后显示出来。要求使用goroutinue实现
分析:
- map不是并发安全,应该会有安全问题
- 加锁或者使用channel通信实现
var wg sync.WaitGroup
func main() {
wg.Add(200)
calcJiCheng(200)
}
func calcJiCheng(n int) {
m := make(map[int]int64)
for i := 1; i <= n; i++ {
go jiCheng(i, m)
}
wg.Wait() // 使用同步等待组,阻塞主线程
fmt.Println(m)
}
func jiCheng(n int, m map[int]int64) {
var sum int64 = 1
for i := 1; i <= n; i++ {
sum *= int64(i)
}
m[n] = sum
wg.Done()
}
fatal error: concurrent map writes 对map存在并发写操作
java 解决方案,使用 ConcurrentHashMap 或者 手动加锁
使用互斥锁、写锁
var wg sync.WaitGroup
// var mutex sync.Mutex // 互斥锁 实现
var mutex sync.RWMutex // 写锁实现
func main() {
wg.Add(200)
calcJiCheng(200)
}
func calcJiCheng(n int) {
m := make(map[int]int64)
for i := 1; i <= n; i++ {
go jiCheng(i, m)
}
wg.Wait() // 使用同步等待组,阻塞主线程
fmt.Println(m)
}
func jiCheng(n int, m map[int]int64) {
defer mutex.Unlock()
mutex.Lock()
var sum int64 = 1
for i := 1; i <= n; i++ {
sum *= int64(i)
}
m[n] = sum
wg.Done()
}
这是低水平程序猿首选,哈哈,高级程序猿使用channel解决
channel 实现
- channel 本质上是一个数据结构 队列
- 数据是先进先出
- 线程安全,多goroutinue访问时,不需要加锁,就是说channel本身就是线程安全的
- channel时有类型的,一个string的channel只能存放string类型数据
无缓冲管道 var ch = make(chan [2]int64)
有写必有读,缺少一个必死锁,
使用range读必须在写入最后一个关闭通道
缓冲管道 var ch = make(chan [2]int64,3)
写入超过缓冲值,必死锁
没有值读取,必死锁
使用range读必须在写入最后一个关闭通道
管道的读写是一个阻塞操作,我更愿意把其当作消息队列来用
var wg sync.WaitGroup
var ch = make(chan [2]int64)
func main() {
wg.Add(200)
calcJiCheng(200)
}
func calcJiCheng(n int) {
m := make(map[int]int64)
for i := 1; i <= n; i++ {
go jiCheng(i, n)
}
for a := range ch {
var num int64 = a[0]
var value int64 = a[1]
m[int(num)] = value
}
fmt.Println(m)
}
func jiCheng(n int, end int) {
var sum int64 = 1
for i := 1; i <= n; i++ {
sum *= int64(i)
}
ch <- [2]int64{int64(n), sum}
wg.Done()
// 关闭channel
if n == end {
close(ch)
}
}
使用select可以解决从管道取数据的阻塞问题(无需手动关闭channel了)
func main() {
intChan := make(chan int, 10)
for i := 0; i < 10; i++ {
intChan <- i
}
strChan := make(chan string, 5)
for i := 0; i < 5; i++ {
strChan <- strconv.Itoa(i)
}
// 传统的方法在遍历管道的时候,如果不关闭会阻塞而导致 deadlock
// 问题 在实际开发中,可能我们不好确定什么时候关闭该管道
// 可以使用select 方式 解决
for {
select {
case v := <-intChan:
fmt.Printf("从intChan读取的数据 %d\n", v)
case v := <-strChan:
fmt.Printf("从strChan读取的数据 %s\n", v)
default:
fmt.Printf("取不到了,不玩了 ")
return
}
}
}
goroutinue中使用了recover,解决协程中出现panic,导致程序崩溃问题
如果我们起了一个协程,但是这个协程出现了panic,如果我们没有捕获这个panic,就会造成程序崩溃,这是我们在goroutinue中使用recover来捕获panic,进行处理,这样即时这个协程发生问题,但主线程不受影响,继续执行
func main() {
go hi()
go say()
time.Sleep(time.Second * 2)
}
func say() {
println("say 3333333")
}
func hi() {
defer func() {
if err := recover(); err != nil {
fmt.Println("程序出戳了", err)
}
}()
fmt.Println("hi")
var a map[string]int
a["2"] = 3
}
素数问题
var wg sync.WaitGroup
var ch = make(chan int)
var done = make(chan struct{})
func main() {
now := time.Now()
wg.Add(1000)
go printPrime(1000)
go func() {
for i := range ch {
fmt.Println("主线程", i)
}
close(done)
}()
wg.Wait()
close(ch)
<-done
seconds := time.Since(now).Seconds()
fmt.Println("time : ", seconds)
}
// 打印素数
// 素数定义:仅能被1和它本身整除的大于1的自然数
func printPrime(n int) {
for i := 1; i <= n; i++ {
go isPrime(i, n)
}
}
// 判断是否是素数
func isPrime(n int, end int) bool {
defer func() {
wg.Done()
}()
if n <= 1 {
return false
}
sqrt := int(math.Sqrt(float64(n))) // 更高效
for i := 2; i <= sqrt; i++ {
if n%i == 0 {
return false
}
}
ch <- n
fmt.Println(n)
return true
}