真实的国产乱ⅩXXX66竹夫人,五月香六月婷婷激情综合,亚洲日本VA一区二区三区,亚洲精品一区二区三区麻豆

成都創(chuàng)新互聯(lián)網(wǎng)站制作重慶分公司

etcd實現(xiàn)分布式鎖

轉(zhuǎn)載自:etcd實現(xiàn)分布式鎖

三臺網(wǎng)站制作公司哪家好,找創(chuàng)新互聯(lián)公司!從網(wǎng)頁設(shè)計、網(wǎng)站建設(shè)、微信開發(fā)、APP開發(fā)、響應(yīng)式網(wǎng)站設(shè)計等網(wǎng)站項目制作,到程序開發(fā),運營維護。創(chuàng)新互聯(lián)公司于2013年成立到現(xiàn)在10年的時間,我們擁有了豐富的建站經(jīng)驗和運維經(jīng)驗,來保證我們的工作的順利進行。專注于網(wǎng)站建設(shè)就選創(chuàng)新互聯(lián)公司。

當(dāng)并發(fā)的訪問共享資源的時候,如果沒有加鎖的話,無法保證共享資源安全性和正確性。這個時候就需要用到鎖

1、需要具備的特性

  1. 需要保證互斥訪問(分布式環(huán)境需要保證不同節(jié)點、不同線程的互斥訪問)
  2. 需要有超時機制,防止鎖意外未釋放,其他節(jié)點無法獲取到鎖;也要保證任務(wù)能夠正常執(zhí)行完成,不能超時了任務(wù)還沒結(jié)束,導(dǎo)致任務(wù)執(zhí)行一般被釋放鎖
  3. 需要有阻塞和非阻塞兩種請求鎖的接口

2、本地鎖

當(dāng)業(yè)務(wù)執(zhí)行在同一個線程內(nèi),也就是我初始化一個本地鎖,其他請求也認這把鎖。一般是服務(wù)部署在單機環(huán)境下。

我們可以看下下面的例子,開1000個goroutine并發(fā)的給Counter做自增操作,結(jié)果會是什么樣的呢?

package main

import (
	"fmt"
	"sync"
)

var sg sync.WaitGroup

type Counter struct {
	count int
}

// 自增操作
func (m *Counter) Incr() {
	m.count++
}

// 獲取總數(shù)
func (m *Counter) Count() int {
	return m.count
}

func main() {
	c := &Counter{}
	for i := 0; i < 1000; i++ {
		sg.Add(1)
    // 模擬并發(fā)請求
		go func() {
			c.Incr()
			sg.Done()
		}()
	}
	sg.Wait()

	fmt.Println(c.Count())
}

結(jié)果是count的數(shù)量并不是預(yù)想中的1000,而是下面這樣,每次打印出的結(jié)果都不一樣,但是接近1000

user@userdeMacBook-Pro  ~/go/src/go-demo/mutex  go run main.go
953
 user@userdeMacBook-Pro  ~/go/src/go-demo/mutex  go run main.go
982
 user@userdeMacBook-Pro  ~/go/src/go-demo/mutex  go run main.go
984

出現(xiàn)這個問題的原因就是沒有給自增操作加鎖

下面我們修改代碼如下,在Incr中加上go的mutex互斥鎖

package main

import (
	"fmt"
	"sync"
)

var sg sync.WaitGroup

type Counter struct {
	count int
	mu    sync.Mutex
}

func (m *Counter) Incr() {
  // 每次寫之前先加鎖,寫完之后釋放鎖
	m.mu.Lock()
	defer m.mu.Unlock()
	m.count++
}

func (m *Counter) Count() int {
	return m.count
}

func main() {
	c := &Counter{}
	for i := 0; i < 1000; i++ {
		sg.Add(1)
		go func() {
			c.Incr()
			sg.Done()
		}()
	}
	sg.Wait()

	fmt.Println(c.Count())
}

可以看到現(xiàn)在count正常輸出1000了

user@userdeMacBook-Pro  ~/go/src/go-demo/mutex  go run main.go
1000
 user@userdeMacBook-Pro  ~/go/src/go-demo/mutex  go run main.go
1000
 user@userdeMacBook-Pro  ~/go/src/go-demo/mutex  go run main.go
1000

3、etcd分布式鎖

簡單部署一個etcd集群

├── docker-compose.yml
├── etcd
│   └── Dockerfile

Dockerfile文件內(nèi)容

FROM bitnami/etcd:latest

LABEL maintainer="liuyuede123 "

Docker-compose.yml內(nèi)容

version: '3.5'
# 網(wǎng)絡(luò)配置
networks:
  backend:
    driver: bridge

# 服務(wù)容器配置
services:
  etcd1:                                  # 自定義容器名稱
    build:
      context: etcd                    # 指定構(gòu)建使用的 Dockerfile 文件
    environment:
      - TZ=Asia/Shanghai
      - ALLOW_NONE_AUTHENTICATION=yes
      - ETCD_NAME=etcd1
      - ETCD_INITIAL_ADVERTISE_PEER_URLS=http://etcd1:2380
      - ETCD_LISTEN_PEER_URLS=http://0.0.0.0:2380
      - ETCD_LISTEN_CLIENT_URLS=http://0.0.0.0:2379
      - ETCD_ADVERTISE_CLIENT_URLS=http://etcd1:2379
      - ETCD_INITIAL_CLUSTER_TOKEN=etcd-cluster
      - ETCD_INITIAL_CLUSTER=etcd1=http://etcd1:2380,etcd2=http://etcd2:2380,etcd3=http://etcd3:2380
      - ETCD_INITIAL_CLUSTER_STATE=new
    ports:                               # 設(shè)置端口映射
      - ":2379"
      - ":2380"
    networks:
      - backend
    restart: always

  etcd2: # 自定義容器名稱
    build:
      context: etcd                    # 指定構(gòu)建使用的 Dockerfile 文件
    environment:
      - TZ=Asia/Shanghai
      - ALLOW_NONE_AUTHENTICATION=yes
      - ETCD_NAME=etcd2
      - ETCD_INITIAL_ADVERTISE_PEER_URLS=http://etcd2:2380
      - ETCD_LISTEN_PEER_URLS=http://0.0.0.0:2380
      - ETCD_LISTEN_CLIENT_URLS=http://0.0.0.0:2379
      - ETCD_ADVERTISE_CLIENT_URLS=http://etcd2:2379
      - ETCD_INITIAL_CLUSTER_TOKEN=etcd-cluster
      - ETCD_INITIAL_CLUSTER=etcd1=http://etcd1:2380,etcd2=http://etcd2:2380,etcd3=http://etcd3:2380
      - ETCD_INITIAL_CLUSTER_STATE=new
    ports: # 設(shè)置端口映射
      - ":2379"
      - ":2380"
    networks:
      - backend
    restart: always

  etcd3: # 自定義容器名稱
    build:
      context: etcd                    # 指定構(gòu)建使用的 Dockerfile 文件
    environment:
      - TZ=Asia/Shanghai
      - ALLOW_NONE_AUTHENTICATION=yes
      - ETCD_NAME=etcd3
      - ETCD_INITIAL_ADVERTISE_PEER_URLS=http://etcd3:2380
      - ETCD_LISTEN_PEER_URLS=http://0.0.0.0:2380
      - ETCD_LISTEN_CLIENT_URLS=http://0.0.0.0:2379
      - ETCD_ADVERTISE_CLIENT_URLS=http://etcd3:2379
      - ETCD_INITIAL_CLUSTER_TOKEN=etcd-cluster
      - ETCD_INITIAL_CLUSTER=etcd1=http://etcd1:2380,etcd2=http://etcd2:2380,etcd3=http://etcd3:2380
      - ETCD_INITIAL_CLUSTER_STATE=new
    ports: # 設(shè)置端口映射
      - ":2379"
      - ":2380"
    networks:
      - backend
    restart: always

執(zhí)行docker-compose up -d啟動etcd服務(wù),可以看到docker中已經(jīng)啟動了3個服務(wù)

實現(xiàn)互斥訪問

package main

import (
	"fmt"
	clientv3 "go.etcd.io/etcd/client/v3"
	"go.etcd.io/etcd/client/v3/concurrency"
	"sync"
)

var sg sync.WaitGroup

type Counter struct {
	count int
}

func (m *Counter) Incr() {
	m.count++
}

func (m *Counter) Count() int {
	return m.count
}

func main() {
	endpoints := []string{"http://127.0.0.1:", "http://127.0.0.1:", "http://127.0.0.1:"}
  // 初始化etcd客戶端
	client, err := clientv3.New(clientv3.Config{Endpoints: endpoints})
	if err != nil {
		fmt.Println(err)
		return
	}
	defer client.Close()

	counter := &Counter{}

	sg.Add(100)
	for i := 0; i < 100; i++ {
		go func() {
      // 這里會生成租約,默認是60秒
			session, err := concurrency.NewSession(client)
			if err != nil {
				panic(err)
			}
			defer session.Close()

			locker := concurrency.NewLocker(session, "/my-test-lock")
			locker.Lock()
			counter.Incr()
			locker.Unlock()
			sg.Done()
		}()
	}
	sg.Wait()

	fmt.Println("count:", counter.Count())
}

執(zhí)行結(jié)果:

user@userdeMacBook-Pro  ~/go/src/go-demo/mutex  go run main.go
count: 100
 user@userdeMacBook-Pro  ~/go/src/go-demo/mutex  go run main.go
count: 100
 user@userdeMacBook-Pro  ~/go/src/go-demo/mutex  go run main.go
count: 100

實現(xiàn)超時機制

當(dāng)某個客戶端持有鎖時,由于某些原因?qū)е骆i未釋放,就會導(dǎo)致這個客戶端一直持有這把鎖,其他客戶端一直獲取不到鎖。所以需要分布式鎖實現(xiàn)超時機制,當(dāng)鎖未釋放時,會因為etcd的租約會到期而釋放鎖。當(dāng)業(yè)務(wù)正常處理時,租約到期之前會繼續(xù)續(xù)約,知道業(yè)務(wù)處理完畢釋放鎖。

package main

import (
	"fmt"
	clientv3 "go.etcd.io/etcd/client/v3"
	"go.etcd.io/etcd/client/v3/concurrency"
	"sync"
	"time"
)

var sg sync.WaitGroup

type Counter struct {
	count int
}

func (m *Counter) Incr() {
	m.count++
}

func (m *Counter) Count() int {
	return m.count
}

func main() {
	endpoints := []string{"http://127.0.0.1:", "http://127.0.0.1:", "http://127.0.0.1:"}
	client, err := clientv3.New(clientv3.Config{Endpoints: endpoints})
	if err != nil {
		fmt.Println(err)
		return
	}
	defer client.Close()

	counter := &Counter{}

	session, err := concurrency.NewSession(client)
	if err != nil {
		panic(err)
	}
	defer session.Close()

	locker := concurrency.NewLocker(session, "/my-test-lock")
	fmt.Println("locking...", time.Now().Format("2006-01-02 15:04:05"))
	locker.Lock()
	fmt.Println("locked...", time.Now().Format("2006-01-02 15:04:05"))
  // 模擬業(yè)務(wù)
	time.Sleep(100 * time.Second)
	counter.Incr()
	locker.Unlock()
	fmt.Println("released...", time.Now().Format("2006-01-02 15:04:05"))

	fmt.Println("count:", counter.Count())
}

命令行開2個窗口,第一個窗口執(zhí)行程序并獲取鎖,之后模擬意外退出并沒有調(diào)用unlock方法

go run main.go
locking... 2022-09-03 23:41:48 # 租約生成時間
locked... 2022-09-03 23:41:48
^Csignal: interrupt

第二個窗口,在第一個窗口退出之前嘗試獲取鎖,此時是阻塞狀態(tài)。第一個窗口退出之后由于租約還沒到期,第二個窗口還是獲取鎖的狀態(tài)。等到第一個窗口租約到期(默認60秒),第二個獲取鎖成功

locking... 2022-09-03 23:41:52
locked... 2022-09-03 23:42:48 # 第一個租約60秒到期,獲取鎖成功
released... 2022-09-03 23:44:28
count: 1

實現(xiàn)阻塞和非阻塞接口

上面的例子中已經(jīng)實現(xiàn)了阻塞接口,即當(dāng)前有獲取到鎖的請求,則其他請求阻塞等待鎖釋放

非阻塞的方式就是嘗試獲取鎖,如果失敗立即返回。etcd中是實現(xiàn)了tryLock方法

// TryLock locks the mutex if not already locked by another session.
// If lock is held by another session, return immediately after attempting necessary cleanup
// The ctx argument is used for the sending/receiving Txn RPC.
func (m *Mutex) TryLock(ctx context.Context) error {

具體看下面的例子

package main

import (
	"context"
	"fmt"
	clientv3 "go.etcd.io/etcd/client/v3"
	"go.etcd.io/etcd/client/v3/concurrency"
	"sync"
	"time"
)

var sg sync.WaitGroup

type Counter struct {
	count int
}

func (m *Counter) Incr() {
	m.count++
}

func (m *Counter) Count() int {
	return m.count
}

func main() {
	endpoints := []string{"http://127.0.0.1:", "http://127.0.0.1:", "http://127.0.0.1:"}
	client, err := clientv3.New(clientv3.Config{Endpoints: endpoints})
	if err != nil {
		fmt.Println(err)
		return
	}
	defer client.Close()

	counter := &Counter{}

	session, err := concurrency.NewSession(client)
	if err != nil {
		panic(err)
	}
	defer session.Close()

  // 此處使用newMutex初始化
	locker := concurrency.NewMutex(session, "/my-test-lock")
	fmt.Println("locking...", time.Now().Format("2006-01-02 15:04:05"))
	err = locker.TryLock(context.Background())
  // 獲取鎖失敗就拋錯
	if err != nil {
		fmt.Println("lock failed", err)
		return
	}
	fmt.Println("locked...", time.Now().Format("2006-01-02 15:04:05"))
	time.Sleep(100 * time.Second)
	counter.Incr()
	err = locker.Unlock(context.Background())
	if err != nil {
		fmt.Println("unlock failed", err)
		return
	}
	fmt.Println("released...", time.Now().Format("2006-01-02 15:04:05"))

	fmt.Println("count:", counter.Count())
}

窗口1、窗口2執(zhí)行結(jié)果

go run main.go
locking... 2022-09-04 00:00:21
locked... 2022-09-04 00:00:21
released... 2022-09-04 00:02:01
count: 1
go run main.go
locking... 2022-09-04 00:00:27
lock failed mutex: Locked by another session

當(dāng)前名稱:etcd實現(xiàn)分布式鎖
當(dāng)前網(wǎng)址:http://weahome.cn/article/dsoicdo.html

其他資訊

在線咨詢

微信咨詢

電話咨詢

028-86922220(工作日)

18980820575(7×24)

提交需求

返回頂部