作者:李小翠
前言
package main
import "fmt"
/*
批量更改人员信息,过程如下:
1、一个goroutine读取人员信息
2、一个goroutine从1中读取的信息进行更改
3、一个goroutine保存更改过后的信息
*/
func main(){
handler := getPersonHandler()
origs := make(chan Person, 100)
fetchPerson2(origs)
dests := handler.Batch(origs)
sign := SavePerson(dests)
<-sign
}
//Person:结构体
type Person struct {
name string
age int
addr string
}
var persons []Person = make([]Person, 200)
//用于初始化persons
func init() {
for i := 0; i < 200; i++ {
name := fmt.Sprintf("%s%d", "P", i)
p := Person{name, 32, "杭州滨江"}
persons[i] = p
//fmt.Printf("[%s %d %s]", p.name, p.age, p.addr)
}
}
//PersonHandler 接口
type PersonHandler interface {
Batch(origs <-chan Person) <-chan Person
Handle(orig *Person)
}
//PersonHandlerImpl 对接口的实现
type PersonHandlerImpl struct{}
//Batch 实现人员的批量处理
func (handler PersonHandlerImpl) Batch(origs <-chan Person) <-chan Person {
dests := make(chan Person, 100)
go func() { //以异步的方式实现对若干人员的批量处理
for p := range origs {
handler.Handle(&p)
dests <- p
}
fmt.Printf("All the information has handled")
close(dests)
}()
return dests
}
//Handle add "_modify" to adrr
func (handler PersonHandlerImpl) Handle(origs *Person) {
origs.addr += "_modify"
}
func getPersonHandler() PersonHandler {
return PersonHandlerImpl{}
}
func fetchPerson(origs chan<- Person) {
go func() {
for _, p := range persons {
origs <- p
}
close(origs) //如果这里不关闭会造成永久阻塞
}()
}
/*
改进
原因:从函数的参数看来,并不知道它是缓冲通道还是非缓冲的,因此最开始需要进行类型检查
如果为缓冲:那么可以并发的往里面写数据
如果为非缓冲:只要一条一条的写就可以了
*/
func fetchPerson2(origs chan<- Person) {
bufcap := cap(origs) //查看通道容量
bufferd := bufcap > 0 //标志位
gopoolnum := bufcap / 2 //goroutine的数量是bufcap的一半
goticketpool := getGoroutineTicket(gopoolnum)
go func() {
for {
p, ok := fetchPerson0()
if !ok { //如果已经读取完毕
for { //这个是要保证所有的给缓冲通道塞值的goroutine都结束
if !bufferd || len(goticketpool) == gopoolnum {
break
}
}
fmt.Println("all information has been fetched.")
close(origs)
break
}
if bufferd {
<-goticketpool //goroutine票池的处理
go func() {
origs <- p
goticketpool <- 1
}()
} else {
origs <- p
}
}
}()
}
var pcount int = 0
func fetchPerson0() (Person, bool) {
if pcount >= 200 {
return Person{}, false
}
p := persons[pcount]
pcount++
return p, true
}
func getGoroutineTicket(gonum int) chan byte {
var goticket chan byte
if gonum == 0 { //如果是非缓冲类型的,返回一个nil
return goticket
}
goticket = make(chan byte, gonum)
for i := 0; i < gonum; i++ {
goticket <- 1
}
return goticket //如果是缓冲类型的,返回一个Goroutine票池
}
//SavePerson 执行一个单独的线程来保存修改的结果
func SavePerson(dest <-chan Person) <-chan byte {
sign := make(chan byte, 1)
go func() {
for {
v, ok := <-dest
if !ok {
fmt.Println("All Person are saved")
sign <- 0
break
}
savePerson(v)
}
}()
return sign
}
func savePerson(p Person) bool {
return true
}
网易云大礼包:https://www.163yun.com/gift
本文来自网易实践者社区,经作者李小翠授权发布。