通过一个例子体会go中互斥锁、读写锁、条件变量、原子操作的使用

猪小花1号2018-09-03 09:33

作者:李小翠


前言

go除了提供自己特有的并发编程模型和工具(channel)之外,还提供了传统的同步工具:互斥锁、读写锁、条件变量、原子操作等
  • 互斥锁:传统并发程序对共享资源进行访问控制的主要手段
    (1)go中,由标准库代码包sync中的Mutex结构体代表,该结构题有Lock和UnLock两种方法
    (2)一般在锁定互斥锁以后紧接着使用defer语句保证互斥锁及时解锁
  • 读写锁:针对读写的互斥锁,有特有的访问控制规则
    (1)go中,标准库代码包sync中的RWMutex结构体代表
    (2)允许任意读操作同时进行;只允许一个写操作进行;写操作的过程中,读操作不被允许
  • 条件变量:与互斥锁配合使用
    (1)go中,标准库代码包sync中Cond结构体代表,有wait、Signal、Brodcast三种方法
    (2)调用wait:自动与该条件标量关联的锁进行解锁,并使调用方所在的Goroutine被阻塞
    (3)Signal、Brodcast唤醒因等待特定条件而被阻塞的Goroutine
  • 原子操作:go语言提供的是非入侵式的
    (1)go中,标准库代码包sync/atomic
    (2)支持的类型值:
    int32\int64\uint32\uint64\uintptr\unsafe.Pointer
    (3)支持的操作:增或减、比较并交换、载入、存储、交换=
Example
程序功能:读写文件(定长写)、获取数据块长度、获取最后写入的数据块号
  • 写数据时,需要读取当前文件中写数据的偏移量,可以通过加写锁的方式保护这个变量,如下:

     这个操作涉及一个数据的读取和加法,因此还有一种方案是对其进行原子操作,如下:
  • 读数据时,对文件中读数据的偏移量的操作和上述类似。需要注意的是,如果数据的实际写入比写数据偏移量更新慢,那么到了文件结尾还读不到实际数据,此时需要等待写结束,这里就有两种处理方式:
    一是不断的UnLock()和Lock()读锁,直到可以读到数据:

    二是通过锁与条件变量配合使用:
    在读的部分:

    写的部分:需要在写完之后进行通知

完整的代码如下:
package main
import (
"errors"
"io"
"os"
"sync"
"sync/atomic"
)
type DataFile interface {
//读取一个数据块
Read() (rsn int64, d Data, err error)
//写入一个数据块
Write(d Data) (wsn int64, err error)
//获取最后读取的数据快的序列号
Rsn() int64
//获取最后写入的数据快的序列号
Wsn() int64
//获取数据块的长度
DataLen() uint32
}
type Data []byte
type myDataFile struct {
f *os.File //文件
fmutex sync.RWMutex //被用于文件的读写锁
woffset int64 //写操作需要用到的偏移量
roffset int64 //读操作需要用到的偏移量
//wmutex sync.Mutex //写操作用到的互斥锁
//rmutex sync.Mutex //读操作用到的互斥锁
rcond *sync.Cond //条件变量
datalen uint32 //数据快长度
}
func NewDataFile(path string, datalen uint32) (DataFile, error) {
f, err := os.Create(path)
if err != nil {
return nil, err
}
if datalen == 0 {
return nil, errors.New("Invalid data legth!")
}
df := &myDataFile{f: f, datalen: datalen}
df.rcond = sync.NewCond(df.fmutex.RLocker())
return df, nil
}
func (df *myDataFile) Read() (rsn int64, d Data, err error) {
//读取并更新读偏移量
var offset int64
//通过原子操作保证
atomic.AddInt64(&df.roffset, int64(df.datalen))
//读取一个数据块
rsn = offset / int64(df.datalen) //第几块数据块
df.fmutex.RLock()
defer df.fmutex.RUnlock()
bytes := make([]byte, df.datalen)
for {
_, err = df.f.ReadAt(bytes, offset)
if err != nil {
if err == io.EOF {
df.rcond.Wait()
continue
}
return
}
}
d = bytes
return
}
func (df *myDataFile) Write(d Data) (wsn int64, err error) {
//获取并更新写偏移量
var offset int64
atomic.AddInt64(&df.woffset, int64(df.datalen))

//写入一个数据块
wsn = offset / int64(df.datalen)
var bytes []byte
if len(d) > int(df.datalen) {
bytes = d[0:df.datalen]
} else {
bytes = d
}
df.fmutex.Lock()
df.fmutex.Unlock()
_, err = df.f.Write(bytes)
df.rcond.Signal()
return
}
func (df *myDataFile) Rsn() int64 {
//通过原子操作的方式实现
offset := atomic.LoadInt64(&df.roffset)
return offset / int64(df.datalen)
}
func (df *myDataFile) Wsn() int64 {
offset := atomic.LoadInt64(&df.woffset)
return offset / int64(df.datalen)
}
func (df *myDataFile) DataLen() uint32 {
return df.datalen
}


网易云大礼包:https://www.163yun.com/gift

本文来自网易实践者社区,经作者李小翠授权发布。