猪小花1号

个人签名

282篇博客

通过一个例子体会Go中的channel

猪小花1号2018-09-03 09:31

作者:李小翠


前言

  • channel用于Goroutine间的值传递
  • channel典型体现了GO提倡的“尽量不用共享内存的方式来通信,而以通信作为手段来共享内存”
    如何理解这句话?
  1. 把数据放在共享内存区,供多个线程的程序访问,这种思想是很简单的,但是,需要各种控制和约束机制来控制并发。
    比如:对该内存区域加锁等
  2. go中的channel是一个线程安全的通道,它本身的机制可以同步两个被并发的函数(非缓冲channel),又可以让这两个函数实现异步通信(缓冲channel)
  3. 不过go还是提供了一些传统的方法:互斥锁、读写锁、条件变量等来实现goroutine间的通信

Channel
双向通道                                                          
  • 缓冲channel的初始化:make(chan int,10)
    最最有吸引力的特点是:如果在发送端close通道,不会对接收端接受已有的元素值产生任何影响
  • 非缓冲channel的初始化:make(chan int)
    最最大的特点是:必需收发双方同时做好准备后才开始通信
单向通道
  • chan<- 只接受数据的通道
  • <-chan 只发送数据的通道
说明:这两个通道通常被用作函数的参数。
原因:仅仅定义单向通道是毫无意义的,它无法用于goroutine间的通信!但是呢,我们可以用这个类型作为函数的参数, 约束该函数使用这个通道的方式
Example
该程序用来批量处理人员信息
流程:
  • func fetchPerson2(origs chan<- Person)    //启用多个goroutine并发读取人员信息 
    tips:
    (1)函数的参数为只接受数据的通道,实际调用时传入的参数是chan Person类型的
    (2)注意接收完数据之后,必须要关闭通道,否则会引起deadlock
    (3)程序中运用了goroutine票池的方式,管理读取人员信息的goroutine的数量
    (4)channel里面存放的是原始数据的副本,因此当存放大数据时候,可以使用指针。
              如果channel里面传递的是数据的指针,需要遵循这样的约定:各goroutine从channel中收到指针可以任意使用,但把它送到别的channel之后就不能再改变它了。
              可以参考:
    如何理解 Golang 中“不要通过共享内存来通信,而应该通过通信来共享内存”?参考作者:布丁@kyhpudding的回答
  • Batch(origs <-chan Person) <-chan Person   //启用一个goroutine读取通道里的数据,处理完之后输出到一个新的通道
    tips:
    (1)函数参数为只读数据同通道,实际调用时传入的参数是chan Person类型的
    (2)函数的返回值同样是一个只读数据通道,但在函数实际内创建的是chan Person类型
    (3)这个函数内生产数据,因此,结束之前需要关闭通道
  • func SavePerson(dest <-chan Person) <-chan byte  //启用一个 goroutine保存处理后的数据
    tips:
    这里的传入参数类似前两个。函数的返回值类型chan byte 1,在调用函数中接收数据,这使得调用者被阻塞到数据处理结束,否则goroutine还没有跑完,主线程就结束了
package main
import "fmt"

/*
批量更改人员信息,过程如下:
1、一个goroutine读取人员信息
2、一个goroutine从1中读取的信息进行更改
3、一个goroutine保存更改过后的信息
*/
func main(){
handler := getPersonHandler()
origs := make(chan Person, 100)
fetchPerson2(origs)
dests := handler.Batch(origs)
sign := SavePerson(dests)
<-sign
}
//Person:结构体
type Person struct {
name string
age int
addr string
}
var persons []Person = make([]Person, 200)
//用于初始化persons
func init() {
for i := 0; i < 200; i++ {
name := fmt.Sprintf("%s%d", "P", i)
p := Person{name, 32, "杭州滨江"}
persons[i] = p
//fmt.Printf("[%s %d %s]", p.name, p.age, p.addr)
}
}

//PersonHandler 接口
type PersonHandler interface {
Batch(origs <-chan Person) <-chan Person
Handle(orig *Person)
}

//PersonHandlerImpl 对接口的实现
type PersonHandlerImpl struct{}

//Batch 实现人员的批量处理
func (handler PersonHandlerImpl) Batch(origs <-chan Person) <-chan Person {
dests := make(chan Person, 100)
go func() { //以异步的方式实现对若干人员的批量处理
for p := range origs {
handler.Handle(&p)
dests <- p
}
fmt.Printf("All the information has handled")
close(dests)
}()
return dests

}
//Handle add "_modify" to adrr
func (handler PersonHandlerImpl) Handle(origs *Person) {
origs.addr += "_modify"
}

func getPersonHandler() PersonHandler {
return PersonHandlerImpl{}
}

func fetchPerson(origs chan<- Person) {
go func() {
for _, p := range persons {
origs <- p
}
close(origs) //如果这里不关闭会造成永久阻塞
}()
}

/*
改进
原因:从函数的参数看来,并不知道它是缓冲通道还是非缓冲的,因此最开始需要进行类型检查
如果为缓冲:那么可以并发的往里面写数据
如果为非缓冲:只要一条一条的写就可以了
*/
func fetchPerson2(origs chan<- Person) {
bufcap := cap(origs) //查看通道容量
bufferd := bufcap > 0 //标志位
gopoolnum := bufcap / 2 //goroutine的数量是bufcap的一半
goticketpool := getGoroutineTicket(gopoolnum)

go func() {
for {
p, ok := fetchPerson0()
if !ok { //如果已经读取完毕
for { //这个是要保证所有的给缓冲通道塞值的goroutine都结束
if !bufferd || len(goticketpool) == gopoolnum {
break
}
}
fmt.Println("all information has been fetched.")
close(origs)
break
}
if bufferd {
<-goticketpool //goroutine票池的处理
go func() {
origs <- p
goticketpool <- 1
}()

} else {
origs <- p
}
}
}()
}

var pcount int = 0
func fetchPerson0() (Person, bool) {
if pcount >= 200 {
return Person{}, false
}
p := persons[pcount]
pcount++
return p, true
}

func getGoroutineTicket(gonum int) chan byte {
var goticket chan byte
if gonum == 0 { //如果是非缓冲类型的,返回一个nil
return goticket
}

goticket = make(chan byte, gonum)
for i := 0; i < gonum; i++ {
goticket <- 1
}
return goticket //如果是缓冲类型的,返回一个Goroutine票池
}

//SavePerson 执行一个单独的线程来保存修改的结果
func SavePerson(dest <-chan Person) <-chan byte {
sign := make(chan byte, 1)
go func() {
for {
v, ok := <-dest
if !ok {
fmt.Println("All Person are saved")
sign <- 0
break
}
savePerson(v)
}
}()
return sign
}

func savePerson(p Person) bool {
return true
}




网易云大礼包:https://www.163yun.com/gift

本文来自网易实践者社区,经作者李小翠授权发布。