創(chuàng)新互聯(lián)公司是一家專注于網(wǎng)站設(shè)計(jì)制作、成都網(wǎng)站設(shè)計(jì)與策劃設(shè)計(jì),大悟網(wǎng)站建設(shè)哪家好?創(chuàng)新互聯(lián)公司做網(wǎng)站,專注于網(wǎng)站建設(shè)十余年,網(wǎng)設(shè)計(jì)領(lǐng)域的專業(yè)建站公司;建站業(yè)務(wù)涵蓋:大悟等地區(qū)。大悟做網(wǎng)站價(jià)格咨詢:18982081108
版本一:
datafile1.go:
package v1
import (
"errors"
"io"
"os"
"sync"
)
// Data 代表數(shù)據(jù)的類型。
type Data []byte
// DataFile 代表數(shù)據(jù)文件的接口類型。
type DataFile interface {
// Read 會(huì)讀取一個(gè)數(shù)據(jù)塊。
Read() (rsn int64, d Data, err error)
// Write 會(huì)寫入一個(gè)數(shù)據(jù)塊。
Write(d Data) (wsn int64, err error)
// RSN 會(huì)獲取最后讀取的數(shù)據(jù)塊的序列號(hào)。
RSN() int64
// WSN 會(huì)獲取最后寫入的數(shù)據(jù)塊的序列號(hào)。
WSN() int64
// DataLen 會(huì)獲取數(shù)據(jù)塊的長(zhǎng)度。
DataLen() uint32
// Close 會(huì)關(guān)閉數(shù)據(jù)文件。
Close() error
}
// myDataFile 代表數(shù)據(jù)文件的實(shí)現(xiàn)類型。
type myDataFile struct {
f *os.File // 文件。
fmutex sync.RWMutex // 被用于文件的讀寫鎖。
woffset int64 // 寫操作需要用到的偏移量。
roffset int64 // 讀操作需要用到的偏移量。
wmutex sync.Mutex // 寫操作需要用到的互斥鎖。
rmutex sync.Mutex // 讀操作需要用到的互斥鎖。
dataLen uint32 // 數(shù)據(jù)塊長(zhǎng)度。
}
// NewDataFile 會(huì)新建一個(gè)數(shù)據(jù)文件的實(shí)例。
func NewDataFile(path string, dataLen uint32) (DataFile, error) {
f, err := os.Create(path)
if err != nil {
return nil, err
}
if dataLen == 0 {
return nil, errors.New("Invalid data length!")
}
df := &myDataFile{f: f, dataLen: dataLen}
return df, nil
}
func (df *myDataFile) Read() (rsn int64, d Data, err error) {
// 讀取并更新讀偏移量。
var offset int64
df.rmutex.Lock()
offset = df.roffset
df.roffset += int64(df.dataLen)
df.rmutex.Unlock()
//讀取一個(gè)數(shù)據(jù)塊。
rsn = offset / int64(df.dataLen)
bytes := make([]byte, df.dataLen)
for {
df.fmutex.RLock()
_, err = df.f.ReadAt(bytes, offset)
if err != nil {
if err == io.EOF {
df.fmutex.RUnlock()
continue
}
df.fmutex.RUnlock()
return
}
d = bytes
df.fmutex.RUnlock()
return
}
}
func (df *myDataFile) Write(d Data) (wsn int64, err error) {
// 讀取并更新寫偏移量。
var offset int64
df.wmutex.Lock()
offset = df.woffset
df.woffset += int64(df.dataLen)
df.wmutex.Unlock()
//寫入一個(gè)數(shù)據(jù)塊。
wsn = offset / int64(df.dataLen)
var bytes []byte
if len(d) > int(df.dataLen) {
bytes = d[0:df.dataLen]
} else {
bytes = d
}
df.fmutex.Lock()
defer df.fmutex.Unlock()
_, err = df.f.Write(bytes)
return
}
func (df *myDataFile) RSN() int64 {
df.rmutex.Lock()
defer df.rmutex.Unlock()
return df.roffset / int64(df.dataLen)
}
func (df *myDataFile) WSN() int64 {
df.wmutex.Lock()
defer df.wmutex.Unlock()
return df.woffset / int64(df.dataLen)
}
func (df *myDataFile) DataLen() uint32 {
return df.dataLen
}
func (df *myDataFile) Close() error {
if df.f == nil {
return nil
}
return df.f.Close()
}
datafile1_test.go:
package v1
import (
"math/rand"
"os"
"path/filepath"
"sync"
"testing"
)
func removeFile(path string) error {
file, err := os.Open(path)
if err != nil {
if !os.IsNotExist(err) {
return err
}
return nil
}
file.Close()
return os.Remove(path)
}
func TestIDataFile(t *testing.T) {
t.Run("v1/all", func(t *testing.T) {
dataLen := uint32(3)
path2 := filepath.Join(os.TempDir(), "data_file_test_new.txt")
defer func() {
if err := removeFile(path2); err != nil {
t.Errorf("Open file error: %s\n", err)
}
}()
t.Run("New", func(t *testing.T) {
testNew(path2, dataLen, t)
})
path3 := filepath.Join(os.TempDir(), "data_file_test.txt")
defer func() {
if err := removeFile(path3); err != nil {
t.Fatalf("Open file error: %s\n", err)
}
}()
max := 100000
t.Run("WriteAndRead", func(t *testing.T) {
testRW(path3, dataLen, max, t)
})
})
}
func testNew(path string, dataLen uint32, t *testing.T) {
t.Logf("New a data file (path: %s, dataLen: %d)...\n",
path, dataLen)
dataFile, err := NewDataFile(path, dataLen)
if err != nil {
t.Logf("Couldn't new a data file: %s", err)
t.FailNow()
}
if dataFile == nil {
t.Log("Unnormal data file!")
t.FailNow()
}
defer dataFile.Close()
if dataFile.DataLen() != dataLen {
t.Fatalf("Incorrect data length!")
}
}
func testRW(path string, dataLen uint32, max int, t *testing.T) {
t.Logf("New a data file (path: %s, dataLen: %d)...\n",
path, dataLen)
dataFile, err := NewDataFile(path, dataLen)
if err != nil {
t.Logf("Couldn't new a data file: %s", err)
t.FailNow()
}
defer dataFile.Close()
var wg sync.WaitGroup
wg.Add(5)
// 寫入。
for i := 0; i < 3; i++ {
go func() {
defer wg.Done()
var prevWSN int64 = -1
for j := 0; j < max; j++ {
data := Data{
byte(rand.Int31n(256)),
byte(rand.Int31n(256)),
byte(rand.Int31n(256)),
}
wsn, err := dataFile.Write(data)
if err != nil {
t.Fatalf("Unexpected writing error: %s\n", err)
}
if prevWSN >= 0 && wsn <= prevWSN {
t.Fatalf("Incorect WSN %d! (lt %d)\n", wsn, prevWSN)
}
prevWSN = wsn
}
}()
}
// 讀取。
for i := 0; i < 2; i++ {
go func() {
defer wg.Done()
var prevRSN int64 = -1
for i := 0; i < max; i++ {
rsn, date, err := dataFile.Read()
if err != nil {
t.Fatalf("Unexpected writing error: %s\n", err)
}
if date == nil {
t.Fatalf("Unnormal data!")
}
if prevRSN >= 0 && rsn <= prevRSN {
t.Fatalf("Incorect RSN %d! (lt %d)\n", rsn, prevRSN)
}
prevRSN = rsn
}
}()
}
wg.Wait()
}
版本二:
datafile2.go:
package v2
import (
"errors"
"io"
"os"
"sync"
)
// Data 代表數(shù)據(jù)的類型。
type Data []byte
// DataFile 代表數(shù)據(jù)文件的接口類型。
type DataFile interface {
// Read 會(huì)讀取一個(gè)數(shù)據(jù)塊。
Read() (rsn int64, d Data, err error)
// Write 會(huì)寫入一個(gè)數(shù)據(jù)塊。
Write(d Data) (wsn int64, err error)
// RSN 會(huì)獲取最后讀取的數(shù)據(jù)塊的序列號(hào)。
RSN() int64
// WSN 會(huì)獲取最后寫入的數(shù)據(jù)塊的序列號(hào)。
WSN() int64
// DataLen 會(huì)獲取數(shù)據(jù)塊的長(zhǎng)度。
DataLen() uint32
// Close 會(huì)關(guān)閉數(shù)據(jù)文件。
Close() error
}
// myDataFile 代表數(shù)據(jù)文件的實(shí)現(xiàn)類型。
type myDataFile struct {
f *os.File // 文件。
fmutex sync.RWMutex // 被用于文件的讀寫鎖。
rcond *sync.Cond //讀操作需要用到的條件變量
woffset int64 // 寫操作需要用到的偏移量。
roffset int64 // 讀操作需要用到的偏移量。
wmutex sync.Mutex // 寫操作需要用到的互斥鎖。
rmutex sync.Mutex // 讀操作需要用到的互斥鎖。
dataLen uint32 // 數(shù)據(jù)塊長(zhǎng)度。
}
// NewDataFile 會(huì)新建一個(gè)數(shù)據(jù)文件的實(shí)例。
func NewDataFile(path string, dataLen uint32) (DataFile, error) {
f, err := os.Create(path)
if err != nil {
return nil, err
}
if dataLen == 0 {
return nil, errors.New("Invalid data length!")
}
df := &myDataFile{f: f, dataLen: dataLen}
df.rcond = sync.NewCond(df.fmutex.RLocker())
return df, nil
}
func (df *myDataFile) Read() (rsn int64, d Data, err error) {
// 讀取并更新讀偏移量。
var offset int64
df.rmutex.Lock()
offset = df.roffset
df.roffset += int64(df.dataLen)
df.rmutex.Unlock()
//讀取一個(gè)數(shù)據(jù)塊。
rsn = offset / int64(df.dataLen)
bytes := make([]byte, df.dataLen)
df.fmutex.RLock()
defer df.fmutex.RUnlock()
for {
_, err = df.f.ReadAt(bytes, offset)
if err != nil {
if err == io.EOF {
df.rcond.Wait()
continue
}
return
}
d = bytes
return
}
}
func (df *myDataFile) Write(d Data) (wsn int64, err error) {
// 讀取并更新寫偏移量。
var offset int64
df.wmutex.Lock()
offset = df.woffset
df.woffset += int64(df.dataLen)
df.wmutex.Unlock()
//寫入一個(gè)數(shù)據(jù)塊。
wsn = offset / int64(df.dataLen)
var bytes []byte
if len(d) > int(df.dataLen) {
bytes = d[0:df.dataLen]
} else {
bytes = d
}
df.fmutex.Lock()
defer df.fmutex.Unlock()
_, err = df.f.Write(bytes)
df.rcond.Signal()
return
}
func (df *myDataFile) RSN() int64 {
df.rmutex.Lock()
defer df.rmutex.Unlock()
return df.roffset / int64(df.dataLen)
}
func (df *myDataFile) WSN() int64 {
df.wmutex.Lock()
defer df.wmutex.Unlock()
return df.woffset / int64(df.dataLen)
}
func (df *myDataFile) DataLen() uint32 {
return df.dataLen
}
func (df *myDataFile) Close() error {
if df.f == nil {
return nil
}
return df.f.Close()
}
datafile2_test.go:
package v2
import (
"math/rand"
"os"
"path/filepath"
"sync"
"testing"
)
func removeFile(path string) error {
file, err := os.Open(path)
if err != nil {
if !os.IsNotExist(err) {
return err
}
return nil
}
file.Close()
return os.Remove(path)
}
func TestIDataFile(t *testing.T) {
t.Run("v2/all", func(t *testing.T) {
dataLen := uint32(3)
path2 := filepath.Join(os.TempDir(), "data_file_test_new.txt")
defer func() {
if err := removeFile(path2); err != nil {
t.Errorf("Open file error: %s\n", err)
}
}()
t.Run("New", func(t *testing.T) {
testNew(path2, dataLen, t)
})
path3 := filepath.Join(os.TempDir(), "data_file_test.txt")
defer func() {
if err := removeFile(path3); err != nil {
t.Fatalf("Open file error: %s\n", err)
}
}()
max := 100000
t.Run("WriteAndRead", func(t *testing.T) {
testRW(path3, dataLen, max, t)
})
})
}
func testNew(path string, dataLen uint32, t *testing.T) {
t.Logf("New a data file (path: %s, dataLen: %d)...\n",
path, dataLen)
dataFile, err := NewDataFile(path, dataLen)
if err != nil {
t.Logf("Couldn't new a data file: %s", err)
t.FailNow()
}
if dataFile == nil {
t.Log("Unnormal data file!")
t.FailNow()
}
defer dataFile.Close()
if dataFile.DataLen() != dataLen {
t.Fatalf("Incorrect data length!")
}
}
func testRW(path string, dataLen uint32, max int, t *testing.T) {
t.Logf("New a data file (path: %s, dataLen: %d)...\n",
path, dataLen)
dataFile, err := NewDataFile(path, dataLen)
if err != nil {
t.Logf("Couldn't new a data file: %s", err)
t.FailNow()
}
defer dataFile.Close()
var wg sync.WaitGroup
wg.Add(5)
// 寫入。
for i := 0; i < 3; i++ {
go func() {
defer wg.Done()
var prevWSN int64 = -1
for j := 0; j < max; j++ {
data := Data{
byte(rand.Int31n(256)),
byte(rand.Int31n(256)),
byte(rand.Int31n(256)),
}
wsn, err := dataFile.Write(data)
if err != nil {
t.Fatalf("Unexpected writing error: %s\n", err)
}
if prevWSN >= 0 && wsn <= prevWSN {
t.Fatalf("Incorect WSN %d! (lt %d)\n", wsn, prevWSN)
}
prevWSN = wsn
}
}()
}
// 讀取。
for i := 0; i < 2; i++ {
go func() {
defer wg.Done()
var prevRSN int64 = -1
for i := 0; i < max; i++ {
rsn, date, err := dataFile.Read()
if err != nil {
t.Fatalf("Unexpected writing error: %s\n", err)
}
if date == nil {
t.Fatalf("Unnormal data!")
}
if prevRSN >= 0 && rsn <= prevRSN {
t.Fatalf("Incorect RSN %d! (lt %d)\n", rsn, prevRSN)
}
prevRSN = rsn
}
}()
}
wg.Wait()
}
版本三
datafile3.go:
package v3
import (
"errors"
"io"
"os"
"sync"
"sync/atomic"
)
// Data 代表數(shù)據(jù)的類型。
type Data []byte
// DataFile 代表數(shù)據(jù)文件的接口類型。
type DataFile interface {
// Read 會(huì)讀取一個(gè)數(shù)據(jù)塊。
Read() (rsn int64, d Data, err error)
// Write 會(huì)寫入一個(gè)數(shù)據(jù)塊。
Write(d Data) (wsn int64, err error)
// RSN 會(huì)獲取最后讀取的數(shù)據(jù)塊的序列號(hào)。
RSN() int64
// WSN 會(huì)獲取最后寫入的數(shù)據(jù)塊的序列號(hào)。
WSN() int64
// DataLen 會(huì)獲取數(shù)據(jù)塊的長(zhǎng)度。
DataLen() uint32
// Close 會(huì)關(guān)閉數(shù)據(jù)文件。
Close() error
}
// myDataFile 代表數(shù)據(jù)文件的實(shí)現(xiàn)類型。
type myDataFile struct {
f *os.File // 文件。
fmutex sync.RWMutex // 被用于文件的讀寫鎖。
rcond *sync.Cond // 讀操作需要用到的條件變量
woffset int64 // 寫操作需要用到的偏移量。
roffset int64 // 讀操作需要用到的偏移量。
dataLen uint32 // 數(shù)據(jù)塊長(zhǎng)度。
}
// NewDataFile 會(huì)新建一個(gè)數(shù)據(jù)文件的實(shí)例。
func NewDataFile(path string, dataLen uint32) (DataFile, error) {
f, err := os.Create(path)
if err != nil {
return nil, err
}
if dataLen == 0 {
return nil, errors.New("Invalid data length!")
}
df := &myDataFile{f: f, dataLen: dataLen}
df.rcond = sync.NewCond(df.fmutex.RLocker())
return df, nil
}
func (df *myDataFile) Read() (rsn int64, d Data, err error) {
// 讀取并更新讀偏移量
var offset int64
for {
offset = atomic.LoadInt64(&df.roffset)
if atomic.CompareAndSwapInt64(&df.roffset, offset, (offset + int64(df.dataLen))) {
break
}
}
//讀取一個(gè)數(shù)據(jù)塊
rsn = offset / int64(df.dataLen)
bytes := make([]byte, df.dataLen)
df.fmutex.RLock()
defer df.fmutex.RUnlock()
for {
_, err = df.f.ReadAt(bytes, offset)
if err != nil {
if err == io.EOF {
df.rcond.Wait()
continue
}
return
}
d = bytes
return
}
}
func (df *myDataFile) Write(d Data) (wsn int64, err error) {
// 讀取并更新寫偏移量
var offset int64
for {
offset = atomic.LoadInt64(&df.woffset)
if atomic.CompareAndSwapInt64(&df.woffset, offset, (offset + int64(df.dataLen))) {
break
}
}
//寫入一個(gè)數(shù)據(jù)塊
wsn = offset / int64(df.dataLen)
var bytes []byte
if len(d) > int(df.dataLen) {
bytes = d[0:df.dataLen]
} else {
bytes = d
}
df.fmutex.Lock()
defer df.fmutex.Unlock()
_, err = df.f.Write(bytes)
df.rcond.Signal()
return
}
func (df *myDataFile) RSN() int64 {
offset := atomic.LoadInt64(&df.roffset)
return offset / int64(df.dataLen)
}
func (df *myDataFile) WSN() int64 {
offset := atomic.LoadInt64(&df.woffset)
return offset / int64(df.dataLen)
}
func (df *myDataFile) DataLen() uint32 {
return df.dataLen
}
func (df *myDataFile) Close() error {
if df.f == nil {
return nil
}
return df.f.Close()
}
datafile3_test.go:
package v3
import (
"math/rand"
"os"
"path/filepath"
"sync"
"testing"
)
func removeFile(path string) error {
file, err := os.Open(path)
if err != nil {
if !os.IsNotExist(err) {
return err
}
return nil
}
file.Close()
return os.Remove(path)
}
func TestIDataFile(t *testing.T) {
t.Run("v3/all", func(t *testing.T) {
dataLen := uint32(3)
path2 := filepath.Join(os.TempDir(), "data_file_test_new.txt")
defer func() {
if err := removeFile(path2); err != nil {
t.Errorf("Open file error: %s\n", err)
}
}()
t.Run("New", func(t *testing.T) {
testNew(path2, dataLen, t)
})
path3 := filepath.Join(os.TempDir(), "data_file_test.txt")
defer func() {
if err := removeFile(path3); err != nil {
t.Fatalf("Open file error: %s\n", err)
}
}()
max := 100000
t.Run("WriteAndRead", func(t *testing.T) {
testRW(path3, dataLen, max, t)
})
})
}
func testNew(path string, dataLen uint32, t *testing.T) {
t.Logf("New a data file (path: %s, dataLen: %d)...\n",
path, dataLen)
dataFile, err := NewDataFile(path, dataLen)
if err != nil {
t.Logf("Couldn't new a data file: %s", err)
t.FailNow()
}
if dataFile == nil {
t.Log("Unnormal data file!")
t.FailNow()
}
defer dataFile.Close()
if dataFile.DataLen() != dataLen {
t.Fatalf("Incorrect data length!")
}
}
func testRW(path string, dataLen uint32, max int, t *testing.T) {
t.Logf("New a data file (path: %s, dataLen: %d)...\n",
path, dataLen)
dataFile, err := NewDataFile(path, dataLen)
if err != nil {
t.Logf("Couldn't new a data file: %s", err)
t.FailNow()
}
defer dataFile.Close()
var wg sync.WaitGroup
wg.Add(5)
// 寫入。
for i := 0; i < 3; i++ {
go func() {
defer wg.Done()
var prevWSN int64 = -1
for j := 0; j < max; j++ {
data := Data{
byte(rand.Int31n(256)),
byte(rand.Int31n(256)),
byte(rand.Int31n(256)),
}
wsn, err := dataFile.Write(data)
if err != nil {
t.Fatalf("Unexpected writing error: %s\n", err)
}
if prevWSN >= 0 && wsn <= prevWSN {
t.Fatalf("Incorect WSN %d! (lt %d)\n", wsn, prevWSN)
}
prevWSN = wsn
}
}()
}
// 讀取。
for i := 0; i < 2; i++ {
go func() {
defer wg.Done()
var prevRSN int64 = -1
for i := 0; i < max; i++ {
rsn, date, err := dataFile.Read()
if err != nil {
t.Fatalf("Unexpected writing error: %s\n", err)
}
if date == nil {
t.Fatalf("Unnormal data!")
}
if prevRSN >= 0 && rsn <= prevRSN {
t.Fatalf("Incorect RSN %d! (lt %d)\n", rsn, prevRSN)
}
prevRSN = rsn
}
}()
}
wg.Wait()
}