作者:李小翠
前言
package main
import (
"errors"
"io"
"os"
"sync"
"sync/atomic"
)
type DataFile interface {
//读取一个数据块
Read() (rsn int64, d Data, err error)
//写入一个数据块
Write(d Data) (wsn int64, err error)
//获取最后读取的数据快的序列号
Rsn() int64
//获取最后写入的数据快的序列号
Wsn() int64
//获取数据块的长度
DataLen() uint32
}
type Data []byte
type myDataFile struct {
f *os.File //文件
fmutex sync.RWMutex //被用于文件的读写锁
woffset int64 //写操作需要用到的偏移量
roffset int64 //读操作需要用到的偏移量
//wmutex sync.Mutex //写操作用到的互斥锁
//rmutex sync.Mutex //读操作用到的互斥锁
rcond *sync.Cond //条件变量
datalen uint32 //数据快长度
}
func NewDataFile(path string, datalen uint32) (DataFile, error) {
f, err := os.Create(path)
if err != nil {
return nil, err
}
if datalen == 0 {
return nil, errors.New("Invalid data legth!")
}
df := &myDataFile{f: f, datalen: datalen}
df.rcond = sync.NewCond(df.fmutex.RLocker())
return df, nil
}
func (df *myDataFile) Read() (rsn int64, d Data, err error) {
//读取并更新读偏移量
var offset int64
//通过原子操作保证
atomic.AddInt64(&df.roffset, int64(df.datalen))
//读取一个数据块
rsn = offset / int64(df.datalen) //第几块数据块
df.fmutex.RLock()
defer df.fmutex.RUnlock()
bytes := make([]byte, df.datalen)
for {
_, err = df.f.ReadAt(bytes, offset)
if err != nil {
if err == io.EOF {
df.rcond.Wait()
continue
}
return
}
}
d = bytes
return
}
func (df *myDataFile) Write(d Data) (wsn int64, err error) {
//获取并更新写偏移量
var offset int64
atomic.AddInt64(&df.woffset, int64(df.datalen))
//写入一个数据块
wsn = offset / int64(df.datalen)
var bytes []byte
if len(d) > int(df.datalen) {
bytes = d[0:df.datalen]
} else {
bytes = d
}
df.fmutex.Lock()
df.fmutex.Unlock()
_, err = df.f.Write(bytes)
df.rcond.Signal()
return
}
func (df *myDataFile) Rsn() int64 {
//通过原子操作的方式实现
offset := atomic.LoadInt64(&df.roffset)
return offset / int64(df.datalen)
}
func (df *myDataFile) Wsn() int64 {
offset := atomic.LoadInt64(&df.woffset)
return offset / int64(df.datalen)
}
func (df *myDataFile) DataLen() uint32 {
return df.datalen
}
网易云大礼包:https://www.163yun.com/gift
本文来自网易实践者社区,经作者李小翠授权发布。