轉(zhuǎn)載自:etcd實現(xiàn)分布式鎖
三臺網(wǎng)站制作公司哪家好,找創(chuàng)新互聯(lián)公司!從網(wǎng)頁設(shè)計、網(wǎng)站建設(shè)、微信開發(fā)、APP開發(fā)、響應(yīng)式網(wǎng)站設(shè)計等網(wǎng)站項目制作,到程序開發(fā),運營維護。創(chuàng)新互聯(lián)公司于2013年成立到現(xiàn)在10年的時間,我們擁有了豐富的建站經(jīng)驗和運維經(jīng)驗,來保證我們的工作的順利進行。專注于網(wǎng)站建設(shè)就選創(chuàng)新互聯(lián)公司。
當(dāng)并發(fā)的訪問共享資源的時候,如果沒有加鎖的話,無法保證共享資源安全性和正確性。這個時候就需要用到鎖
當(dāng)業(yè)務(wù)執(zhí)行在同一個線程內(nèi),也就是我初始化一個本地鎖,其他請求也認這把鎖。一般是服務(wù)部署在單機環(huán)境下。
我們可以看下下面的例子,開1000個goroutine并發(fā)的給Counter做自增操作,結(jié)果會是什么樣的呢?
package main
import (
"fmt"
"sync"
)
var sg sync.WaitGroup
type Counter struct {
count int
}
// 自增操作
func (m *Counter) Incr() {
m.count++
}
// 獲取總數(shù)
func (m *Counter) Count() int {
return m.count
}
func main() {
c := &Counter{}
for i := 0; i < 1000; i++ {
sg.Add(1)
// 模擬并發(fā)請求
go func() {
c.Incr()
sg.Done()
}()
}
sg.Wait()
fmt.Println(c.Count())
}
結(jié)果是count的數(shù)量并不是預(yù)想中的1000,而是下面這樣,每次打印出的結(jié)果都不一樣,但是接近1000
user@userdeMacBook-Pro ~/go/src/go-demo/mutex go run main.go
953
user@userdeMacBook-Pro ~/go/src/go-demo/mutex go run main.go
982
user@userdeMacBook-Pro ~/go/src/go-demo/mutex go run main.go
984
出現(xiàn)這個問題的原因就是沒有給自增操作加鎖
下面我們修改代碼如下,在Incr中加上go的mutex互斥鎖
package main
import (
"fmt"
"sync"
)
var sg sync.WaitGroup
type Counter struct {
count int
mu sync.Mutex
}
func (m *Counter) Incr() {
// 每次寫之前先加鎖,寫完之后釋放鎖
m.mu.Lock()
defer m.mu.Unlock()
m.count++
}
func (m *Counter) Count() int {
return m.count
}
func main() {
c := &Counter{}
for i := 0; i < 1000; i++ {
sg.Add(1)
go func() {
c.Incr()
sg.Done()
}()
}
sg.Wait()
fmt.Println(c.Count())
}
可以看到現(xiàn)在count正常輸出1000了
user@userdeMacBook-Pro ~/go/src/go-demo/mutex go run main.go
1000
user@userdeMacBook-Pro ~/go/src/go-demo/mutex go run main.go
1000
user@userdeMacBook-Pro ~/go/src/go-demo/mutex go run main.go
1000
├── docker-compose.yml
├── etcd
│ └── Dockerfile
Dockerfile文件內(nèi)容
FROM bitnami/etcd:latest
LABEL maintainer="liuyuede123 "
Docker-compose.yml內(nèi)容
version: '3.5'
# 網(wǎng)絡(luò)配置
networks:
backend:
driver: bridge
# 服務(wù)容器配置
services:
etcd1: # 自定義容器名稱
build:
context: etcd # 指定構(gòu)建使用的 Dockerfile 文件
environment:
- TZ=Asia/Shanghai
- ALLOW_NONE_AUTHENTICATION=yes
- ETCD_NAME=etcd1
- ETCD_INITIAL_ADVERTISE_PEER_URLS=http://etcd1:2380
- ETCD_LISTEN_PEER_URLS=http://0.0.0.0:2380
- ETCD_LISTEN_CLIENT_URLS=http://0.0.0.0:2379
- ETCD_ADVERTISE_CLIENT_URLS=http://etcd1:2379
- ETCD_INITIAL_CLUSTER_TOKEN=etcd-cluster
- ETCD_INITIAL_CLUSTER=etcd1=http://etcd1:2380,etcd2=http://etcd2:2380,etcd3=http://etcd3:2380
- ETCD_INITIAL_CLUSTER_STATE=new
ports: # 設(shè)置端口映射
- ":2379"
- ":2380"
networks:
- backend
restart: always
etcd2: # 自定義容器名稱
build:
context: etcd # 指定構(gòu)建使用的 Dockerfile 文件
environment:
- TZ=Asia/Shanghai
- ALLOW_NONE_AUTHENTICATION=yes
- ETCD_NAME=etcd2
- ETCD_INITIAL_ADVERTISE_PEER_URLS=http://etcd2:2380
- ETCD_LISTEN_PEER_URLS=http://0.0.0.0:2380
- ETCD_LISTEN_CLIENT_URLS=http://0.0.0.0:2379
- ETCD_ADVERTISE_CLIENT_URLS=http://etcd2:2379
- ETCD_INITIAL_CLUSTER_TOKEN=etcd-cluster
- ETCD_INITIAL_CLUSTER=etcd1=http://etcd1:2380,etcd2=http://etcd2:2380,etcd3=http://etcd3:2380
- ETCD_INITIAL_CLUSTER_STATE=new
ports: # 設(shè)置端口映射
- ":2379"
- ":2380"
networks:
- backend
restart: always
etcd3: # 自定義容器名稱
build:
context: etcd # 指定構(gòu)建使用的 Dockerfile 文件
environment:
- TZ=Asia/Shanghai
- ALLOW_NONE_AUTHENTICATION=yes
- ETCD_NAME=etcd3
- ETCD_INITIAL_ADVERTISE_PEER_URLS=http://etcd3:2380
- ETCD_LISTEN_PEER_URLS=http://0.0.0.0:2380
- ETCD_LISTEN_CLIENT_URLS=http://0.0.0.0:2379
- ETCD_ADVERTISE_CLIENT_URLS=http://etcd3:2379
- ETCD_INITIAL_CLUSTER_TOKEN=etcd-cluster
- ETCD_INITIAL_CLUSTER=etcd1=http://etcd1:2380,etcd2=http://etcd2:2380,etcd3=http://etcd3:2380
- ETCD_INITIAL_CLUSTER_STATE=new
ports: # 設(shè)置端口映射
- ":2379"
- ":2380"
networks:
- backend
restart: always
執(zhí)行docker-compose up -d
啟動etcd服務(wù),可以看到docker中已經(jīng)啟動了3個服務(wù)
package main
import (
"fmt"
clientv3 "go.etcd.io/etcd/client/v3"
"go.etcd.io/etcd/client/v3/concurrency"
"sync"
)
var sg sync.WaitGroup
type Counter struct {
count int
}
func (m *Counter) Incr() {
m.count++
}
func (m *Counter) Count() int {
return m.count
}
func main() {
endpoints := []string{"http://127.0.0.1:", "http://127.0.0.1:", "http://127.0.0.1:"}
// 初始化etcd客戶端
client, err := clientv3.New(clientv3.Config{Endpoints: endpoints})
if err != nil {
fmt.Println(err)
return
}
defer client.Close()
counter := &Counter{}
sg.Add(100)
for i := 0; i < 100; i++ {
go func() {
// 這里會生成租約,默認是60秒
session, err := concurrency.NewSession(client)
if err != nil {
panic(err)
}
defer session.Close()
locker := concurrency.NewLocker(session, "/my-test-lock")
locker.Lock()
counter.Incr()
locker.Unlock()
sg.Done()
}()
}
sg.Wait()
fmt.Println("count:", counter.Count())
}
執(zhí)行結(jié)果:
user@userdeMacBook-Pro ~/go/src/go-demo/mutex go run main.go
count: 100
user@userdeMacBook-Pro ~/go/src/go-demo/mutex go run main.go
count: 100
user@userdeMacBook-Pro ~/go/src/go-demo/mutex go run main.go
count: 100
當(dāng)某個客戶端持有鎖時,由于某些原因?qū)е骆i未釋放,就會導(dǎo)致這個客戶端一直持有這把鎖,其他客戶端一直獲取不到鎖。所以需要分布式鎖實現(xiàn)超時機制,當(dāng)鎖未釋放時,會因為etcd的租約會到期而釋放鎖。當(dāng)業(yè)務(wù)正常處理時,租約到期之前會繼續(xù)續(xù)約,知道業(yè)務(wù)處理完畢釋放鎖。
package main
import (
"fmt"
clientv3 "go.etcd.io/etcd/client/v3"
"go.etcd.io/etcd/client/v3/concurrency"
"sync"
"time"
)
var sg sync.WaitGroup
type Counter struct {
count int
}
func (m *Counter) Incr() {
m.count++
}
func (m *Counter) Count() int {
return m.count
}
func main() {
endpoints := []string{"http://127.0.0.1:", "http://127.0.0.1:", "http://127.0.0.1:"}
client, err := clientv3.New(clientv3.Config{Endpoints: endpoints})
if err != nil {
fmt.Println(err)
return
}
defer client.Close()
counter := &Counter{}
session, err := concurrency.NewSession(client)
if err != nil {
panic(err)
}
defer session.Close()
locker := concurrency.NewLocker(session, "/my-test-lock")
fmt.Println("locking...", time.Now().Format("2006-01-02 15:04:05"))
locker.Lock()
fmt.Println("locked...", time.Now().Format("2006-01-02 15:04:05"))
// 模擬業(yè)務(wù)
time.Sleep(100 * time.Second)
counter.Incr()
locker.Unlock()
fmt.Println("released...", time.Now().Format("2006-01-02 15:04:05"))
fmt.Println("count:", counter.Count())
}
命令行開2個窗口,第一個窗口執(zhí)行程序并獲取鎖,之后模擬意外退出并沒有調(diào)用unlock方法
go run main.go
locking... 2022-09-03 23:41:48 # 租約生成時間
locked... 2022-09-03 23:41:48
^Csignal: interrupt
第二個窗口,在第一個窗口退出之前嘗試獲取鎖,此時是阻塞狀態(tài)。第一個窗口退出之后由于租約還沒到期,第二個窗口還是獲取鎖的狀態(tài)。等到第一個窗口租約到期(默認60秒),第二個獲取鎖成功
locking... 2022-09-03 23:41:52
locked... 2022-09-03 23:42:48 # 第一個租約60秒到期,獲取鎖成功
released... 2022-09-03 23:44:28
count: 1
上面的例子中已經(jīng)實現(xiàn)了阻塞接口,即當(dāng)前有獲取到鎖的請求,則其他請求阻塞等待鎖釋放
非阻塞的方式就是嘗試獲取鎖,如果失敗立即返回。etcd中是實現(xiàn)了tryLock方法
// TryLock locks the mutex if not already locked by another session.
// If lock is held by another session, return immediately after attempting necessary cleanup
// The ctx argument is used for the sending/receiving Txn RPC.
func (m *Mutex) TryLock(ctx context.Context) error {
具體看下面的例子
package main
import (
"context"
"fmt"
clientv3 "go.etcd.io/etcd/client/v3"
"go.etcd.io/etcd/client/v3/concurrency"
"sync"
"time"
)
var sg sync.WaitGroup
type Counter struct {
count int
}
func (m *Counter) Incr() {
m.count++
}
func (m *Counter) Count() int {
return m.count
}
func main() {
endpoints := []string{"http://127.0.0.1:", "http://127.0.0.1:", "http://127.0.0.1:"}
client, err := clientv3.New(clientv3.Config{Endpoints: endpoints})
if err != nil {
fmt.Println(err)
return
}
defer client.Close()
counter := &Counter{}
session, err := concurrency.NewSession(client)
if err != nil {
panic(err)
}
defer session.Close()
// 此處使用newMutex初始化
locker := concurrency.NewMutex(session, "/my-test-lock")
fmt.Println("locking...", time.Now().Format("2006-01-02 15:04:05"))
err = locker.TryLock(context.Background())
// 獲取鎖失敗就拋錯
if err != nil {
fmt.Println("lock failed", err)
return
}
fmt.Println("locked...", time.Now().Format("2006-01-02 15:04:05"))
time.Sleep(100 * time.Second)
counter.Incr()
err = locker.Unlock(context.Background())
if err != nil {
fmt.Println("unlock failed", err)
return
}
fmt.Println("released...", time.Now().Format("2006-01-02 15:04:05"))
fmt.Println("count:", counter.Count())
}
窗口1、窗口2執(zhí)行結(jié)果
go run main.go
locking... 2022-09-04 00:00:21
locked... 2022-09-04 00:00:21
released... 2022-09-04 00:02:01
count: 1
go run main.go
locking... 2022-09-04 00:00:27
lock failed mutex: Locked by another session