[toc]

文章介绍

本文将以实战的模式来介绍go核心channel,首先我们将介绍go的协程,如何开启go协程,然后通过实例来介绍go并发核心goroutine和channel。

Go的协程

协程(Coroutine)

**百科:当有多个线程在操作时,如果系统只有一个 CPU, 则它根本不可能真正同时进行一个以上的线程,它只能把 CPU 运行时间划分成若干个时间段,再将时间 段分配给各个线程执行,在一个时间段的线程代码运行时,其它线程处于挂起状。. 这种方式我们称之为并发 **。

首先我们先了解什么是协程(Coroutine),我们可以称为协作式线程,他的粒度比线程更轻,作用于用户态,操作系统内核态感受不到协程的存在的,协程可以帮助我们高并发的去完成一些指定任务,例如网络IO,文件IO等。

Go协程(goroutine)

其实协程的核心都一样,我们知道go原生的支持并发,而这一核心就是goroutine,在go中,main 和创建的 goroutine 是相互作用的,相互给予控制权,就像两个人,各做各的事,并且他们也相互通信,我们来看一下如何创建协程。

package main

import (
	"fmt"
	"time"
)

func main() {
  //使用go关键字创建协程
  //子协程1
	go func() {
		for {
			fmt.Println("我是goroutine1")
		}
	}()

  //子协程2
	go func() {
		for {
			fmt.Println("我是goroutine2")
		}
	}()

	fmt.Println("我是主goroutine")

	time.Sleep(4* time.Millisecond)  //go协程运行4毫秒
}

这样我们就启动两两个协程,他们不断的打印消息,直到4ms后主协程退出,将子协程kill掉

继续看实例:

package main

import (
	"fmt"
	"sync"
	"time"
)

var m sync.Map

func RwGlobalMap(){
	if value,exists := m.Load("name"); exists {
		fmt.Println("value:", value)
	}else{
		m.Store("name", "iceymoss")
	}
}
func main() {
	go RwGlobalMap()
	go RwGlobalMap()
	go RwGlobalMap()
	go RwGlobalMap()

	time.Sleep(time.Second)
}

我们可以想一想程序输出是什么?

其实输出不定:

➜  share_memonry go run main.go
value: iceymoss
➜  share_memonry go run main.go
➜  share_memonry go run main.go
value: iceymoss
value: iceymoss
➜  share_memonry go run main.go
➜  share_memonry go run main.go
value: iceymoss
value: iceymoss
value: iceymoss
value: iceymoss

CSP并发模型

CSP模型是上个世纪七十年代提出的,用于描述两个独立的并发实体通过共享的通讯 channel(管道)进行通信的并发模型。 CSP中channel是第一类对象,它不关注发送消息的实体,而关注与发送消息时使用的channel。

Golang 就是借用CSP模型的一些概念为之实现并发进行理论支持,其实从实际上出发,go语言并没有完全实现了CSP模型的所有理论,仅仅是借用了 process和channel这两个概念。process是在go语言上的表现就是 goroutine 是实际并发执行的实体,每个实体之间是通过channel通讯来实现数据共享。

Go并发核心

channel管道的意思,在go中指用来通讯的管道,用来传输数据,通过协程之间的通讯,下面来看看如何使用channel。

var ch chan int  //声明一个整型的管道

也可以使用make函数进行内存分配和初始化

ch := make(chan int)

下面使用channel来实现协程之间的通信,直接来看实例:

package main

import (
	"fmt"
	"time"
)

//生产数据
func send(ch chan int){
	i := 0
	for i < 100 {
		ch <- i
		i++
	}
}

//接收并处理数据
func receive(ch chan int, rch chan string){
	for {
		num := <-ch
		fmt.Println("接收到值:", num)
		rch <- fmt.Sprintf("处理后的编号:%d", num+1)
	}
}

func main() {
	ch := make(chan int)
	rch := make(chan string)
	go receive(ch, rch)  //此时ch没有数据,子协程rec阻塞,直到其他协程向管道放入数据,阻塞解除
	go send(ch)         
	go func(chan string) {
		for {
			fmt.Println("收到结果:", <-rch)
		}
	}(rch)
  
	time.Sleep(time.Second)  //主协程等待1s
}

可以想想输出是什么?

接收到值: 0
接收到值: 1
收到结果: 处理后的编号:1
收到结果: 处理后的编号:2
接收到值: 2
收到结果: 处理后的编号:3
接收到值: 3
接收到值: 4
收到结果: 处理后的编号:4
收到结果: 处理后的编号:5
接收到值: 5
收到结果: 处理后的编号:6
接收到值: 6
接收到值: 7
收到结果: 处理后的编号:7
收到结果: 处理后的编号:8
接收到值: 8
收到结果: 处理后的编号:9
接收到值: 9
收到结果: 处理后的编号:10

接着使用channel实现文件读写协程的交互,看实例:将三个文件中的内容并发的读取,并发写入一个文件中

age main

import (
	"bufio"
	"io"
	"log"
	"os"
)

var (
	content = make(chan string, 1000) //用于传输内容
	readOk = make(chan struct{}, 3)   //用于通知消费者
	writeOk = make(chan struct{})     //用户通知主协程
)

//ReadToSend 生产者
func ReadToSend(filename string){
	//打开文件
	file, err := os.Open(filename)
	if err != nil {
		log.Fatal("打开文件失败:", err)
	}
	defer file.Close()

	//读文件
	r := bufio.NewReader(file)
	for {
		line, err := r.ReadString('\n')
		if err == nil {
			content <-line
		}else {
			if err == io.EOF {
				if len(line) != 0 {
					content <- line +"\n"
				}
				break
			} else {
				log.Fatal("其他错误:", err)
			}
		}
	}
	//当生产者协程结束后需要通知消费者协程
	//当readTag管道为空时,则关闭content,从而通知消费者协程停止
	<-readOk
	if len(readOk) == 0 {
		close(content)
	}
}

func WriteFile(filename string){
	//打开文件
	fout, err := os.OpenFile(filename, os.O_CREATE|os.O_TRUNC|os.O_RDWR, 666)
	if err != nil {
		log.Fatal("打开文件失败:", err)
	}
	defer fout.Close()

	//写文件
	w := bufio.NewWriter(fout)

	for line := range content{
		w.WriteString(line)
	}
	w.Flush()

	//通知主协程write结束
	writeOk <-struct{}{}
}

func main() {
	for i := 0; i < 3; i++ {
		readOk <-struct{}{}
	}

	go ReadToSend("/data/data1.txt")
	go ReadToSend("/data/data2.txt")
	go ReadToSend("/data/data3.txt")

	go WriteFile("/data/data5.txt")

	//当执行writeOk没有数据时,会将主协程阻塞,直到writeOk有数据过来,然后程序运行结束
	<-writeOk
}

这个程序显示更加合理,我们没有像之前那样使用time.sleep方法来预估程序执行时间了(预估往往是不靠谱的),当读协程完成后,就通知写协程:文件读完了并关闭管道;当写协程被收到读协程的结束消息后,要退出循环,然后通知主协程已经写完了,当主协程收到读完消息后,就退出主协程,程序结束。

接着再看实例:

package main

import (
	"fmt"
	"time"
)

func main() {
	ch := make(chan struct{}, 1)  //初始化一个管道
	ch <-struct{}{}     //向管道添加数据

	go func(){
		//子协程1 5秒管道数据拿走,解除主协程的阻塞
		time.Sleep(5*time.Second)
		<- ch
		fmt.Println("goroutine1 over")
	}()

	ch <- struct{}{}  //管道容量满了,此时主协程将在此行阻塞,等待子协程1将数据拿走,然后放入数据
	fmt.Println("main goroutine run")

	go func() {
		ch <- struct{}{} //此时子协程2,将阻塞在此行
		fmt.Println("goroutine2 over")
	}()

	//3秒后主协程退出,子协程2一直阻塞,直到主协程退出
	time.Sleep(3*time.Second)
	fmt.Println("main exit")
}

思考一下程序输出什么:

goroutine1 over
main goroutine run
main exit

接着看实例:

package main

import "fmt"

func traveseChannal(){
	ch := make(chan int, 3)
	go func() {
		ch <- 1
		ch <- 2
		ch <- 3
		close(ch)   //如果不关闭ch,在主协程中ch数据将空读,造成主协程永远阻塞,即死锁
	}()
  
  //遍历ch并拿走ch中的数据
	for item := range ch {
		fmt.Println(item)
	}
	fmt.Println("bye")
}

func main() {
	traveseChannal()
}

再思考一下程序输出什么?

1
2
3
bye

当程序执行到for item := range ch 主协程会读阻塞,然后等待子协程向管道中放入数据

总结

  • channel满了,就会阻塞写,channel空了就会阻塞读。
  • 阻塞之后会交出CPU,去执行其他协程,希望其他协程能够帮助自己解除阻塞状态。
  • 如果阻塞发送在main协程里面,并且没有其他子协程可以执行,那就可以确定,“希望永远等不来”,然后main协程就会自己把自己杀掉,然后报:fatal error: deadlock, 即:死锁。
  • 如果阻塞发生在子协程里面,就不会发生死锁,因为至少main协程是一个值得等待的“希望”,会一直等(阻塞)下去。