本文来自网易云社区
作者:盛国存
前面的例子中我们等待任务结束是通过sleep来处理,因为打印的数据较少,1 毫秒足够;但是这种方式等待任务结束显然不是很优雅。 对于任务结束首先我们需要确定的通知外面我们打印结束了,那我们又如何通知呢?在Go语言中我们不要通过共享内存来通信,而是要通过通信来共享内存。直接用Channel就可以,下面我们来改造上面的例子
package main
import (
"fmt"
)
type worker struct {
in chan int
done chan bool
}
func work(in chan int, done chan bool, num int) {
for ch := range in {
fmt.Println("Work ID :", num)
fmt.Println(ch)
done<- true
}
}
func createWork(num int) worker {
ch := worker{
in: make(chan int),
done: make(chan bool),
}
go work(ch.in, ch.done, num)
return ch
}
func main() {
var workers [10]worker
for i := 0; i < 10; i ++ {
workers[i] = createWork(i)
}
for i := 0; i < 10; i ++ {
workers[i].in <- 'M' + i
<-workers[i].done
}
}
打印输出结果
Work ID : 0
77
Work ID : 1
78
Work ID : 2
79
Work ID : 3
80
Work ID : 4
81
Work ID : 5
82
Work ID : 6
83
Work ID : 7
84
Work ID : 8
85
Work ID : 9
86
虽然sleep部分的代码已经删除了,但是发现是顺序打印的,这显然不是我想要的结果。Go语言对等待多任务完成提供了一个库 WaitGroup,下面我们就用它继续重构上述的代码
package main
import (
"fmt"
"sync"
)
type worker struct {
in chan int
done func()
}
func work(worker worker, num int) {
for ch := range worker.in {
fmt.Println("Work ID :", num)
fmt.Println(ch)
worker.done()
}
}
func createWork(num int, wg *sync.WaitGroup) worker {
worker := worker{
in: make(chan int),
done: func() {
wg.Done() // 每个任务做完了就调用Done
},
}
go work(worker, num)
return worker
}
func main() {
var wg sync.WaitGroup
var workers [10]worker
for i := 0; i < 10; i ++ {
workers[i] = createWork(i, &wg)
}
wg.Add(10) // Add 总共有多少个任务
for i := 0; i < 10; i ++ {
workers[i].in <- 'M' + i
}
wg.Wait() // 等待所有的任务做完
}
结果输出
Work ID : 4
81
Work ID : 5
82
Work ID : 1
78
Work ID : 2
79
Work ID : 6
Work ID : 3
80
Work ID : 0
Work ID : 9
86
83
77
Work ID : 7
84
Work ID : 8
85
Process finished with exit code 0
这样相应的结果才是我们想要的。
协程交替执行,使其能顺序输出1-20的自然数
这个问题就不做演示了,留给读者自行发挥。
首先我们先来介绍一下select常规的应用场景,比如
var ch1, ch2 chan int
我们有两个channel,我们想从 ch1、ch2 里面收数据,
var ch1, ch2 chan int
data1 := <- ch1
data2 := <- ch2
谁快我就要谁,这就是我们的select
package main
import (
"fmt"
)
func main() {
var ch1, ch2 chan int
select {
case data := <- ch1:
fmt.Println("CH1 的数据:", data)
case data := <-ch2:
fmt.Println("CH2 的数据:", data)
default:
fmt.Println("没收到 CH1、CH2 的数据")
}
}
这就相当于做了一个非阻塞式的获取。下面我们就结合一个channel生成器来做一个例子演示
package main
import (
"fmt"
"time"
"math/rand"
)
func genChan() chan int {
out := make(chan int)
go func() {
i := 0
for {
time.Sleep(time.Millisecond * time.Duration(rand.Intn(1000)))
out <- i
i ++
}
}()
return out
}
func main() {
var ch1, ch2 = genChan(), genChan()
for {
select {
case data := <- ch1:
fmt.Println("CH1 的数据:", data)
case data := <-ch2:
fmt.Println("CH2 的数据:", data)
}
}
}
输出结果(部分)
CH1 的数据: 0
CH2 的数据: 0
CH1 的数据: 1
CH2 的数据: 1
CH1 的数据: 2
CH2 的数据: 2
CH1 的数据: 3
CH2 的数据: 3
CH1 的数据: 4
CH2 的数据: 4
CH1 的数据: 5
CH2 的数据: 5
CH2 的数据: 6
CH1 的数据: 6
CH1 的数据: 7
CH1 的数据: 8
CH2 的数据: 7
CH1 的数据: 9
CH2 的数据: 8
CH1 的数据: 10
CH2 的数据: 9
CH1 的数据: 11
CH1 的数据: 12
CH1 的数据: 13
CH2 的数据: 10
CH2 的数据: 11
CH1 的数据: 14
CH2 的数据: 12
CH2 的数据: 13
CH1 的数据: 15
Process finished with exit code 130 (interrupted by signal 2: SIGINT)
这就是select的一个应用场景,从输出结果可以看到,CH1、CH2的输出结果不一样,谁先出数据就先选择谁;两个同时出就随机的选择一个。
比如上面的这段代码我想要在10秒之后程序就终止,我该如何处理呢?我们这里需要介绍一下Go语言的 time.After
// After waits for the duration to elapse and then sends the current time
// on the returned channel.
// It is equivalent to NewTimer(d).C.
// The underlying Timer is not recovered by the garbage collector
// until the timer fires. If efficiency is a concern, use NewTimer
// instead and call Timer.Stop if the timer is no longer needed.
func After(d Duration) <-chan Time {
return NewTimer(d).C
}
从源码来看,他的返回值类型是一个 <-chan Time
,那就方便很多了
package main
import (
"fmt"
"time"
)
func genChan() chan int {
out := make(chan int)
go func() {
i := 0
for {
time.Sleep(time.Second)
out <- i
i ++
}
}()
return out
}
func main() {
var ch1, ch2 = genChan(), genChan()
tm := time.After(10 * time.Second) // 加上10秒的定时
for {
select {
case data := <- ch1:
fmt.Println("CH1 的数据:", data)
case data := <-ch2:
fmt.Println("CH2 的数据:", data)
case <-tm:
return // 收到指令程序直接return
}
}
}
运行到10秒,代码自动退出。
Go语言除了CSP模型外,还是有传统同步机制的,比如互斥量 Mutex
,现在我们就用它举个例子: 用互斥量实现 atomic
package main
import (
"sync"
"time"
"fmt"
)
type atomicInt struct {
value int
lock sync.Mutex
}
func increment(a *atomicInt) {
a.lock.Lock()
defer a.lock.Unlock()
a.value ++
}
func get(a *atomicInt) int {
a.lock.Lock()
defer a.lock.Unlock()
return a.value
}
func main() {
var a atomicInt
increment(&a)
go func() {
increment(&a)
}()
time.Sleep(time.Second)
fmt.Println(get(&a))
}
结果输出
2
Process finished with exit code 0
代码写完,可以用上面介绍的race来检查一下,是否有冲突,是否安全;当然这里还是不建议自己来造这些轮子的,直接使用系统的就可以了。系统提供了 atomic.AddInt32()
等等这些原子操作。
相关阅读:A Bite of GoLang (1)
网易云免费体验馆,0成本体验20+款云产品!
更多网易研发、产品、运营经验分享请访问网易云社区。