隨著互聯(lián)網(wǎng)的發(fā)展,在處理流量的方法也不僅僅為 first-come,first-served,而在共享網(wǎng)絡中實現(xiàn)流量管理的基本機制就是排隊。而公平算法則是實現(xiàn)在優(yōu)先級隊列中基于哪些策略來排隊的”公平隊列“。Token Bucket
則是為公平排隊提供了替代方案。Fair Queue 與 Token Bucket的區(qū)別主要在,對于Fair Queue來講,如果請求者目前空閑,Queue會將該請求者的帶寬分配給其他請求者;而 Token Bucket 則是分配給請求者的帶寬是帶寬的上限。
創(chuàng)新互聯(lián)專注于瑤海企業(yè)網(wǎng)站建設,響應式網(wǎng)站開發(fā),商城網(wǎng)站開發(fā)?,幒>W(wǎng)站建設公司,為瑤海等地區(qū)提供建站服務。全流程定制網(wǎng)站開發(fā),專業(yè)設計,全程項目跟蹤,創(chuàng)新互聯(lián)專業(yè)和態(tài)度為您提供的服務
通過例子了解算法原理
假設出站帶寬是 4個數(shù)據(jù)包/ms,此時有一個需求為,為一個特定的發(fā)送端 A 來分配 1個數(shù)據(jù)包/ms的帶寬。此時可以使用公平排隊的方法分給發(fā)送 A 25%的帶寬。
此時存在的問題是我們希望可以靈活地允許 A 的數(shù)據(jù)包以無規(guī)則的時間間隔發(fā)送。例如假設 A 在每個數(shù)據(jù)包發(fā)送后等待1毫秒后再開始下一個數(shù)據(jù)包的發(fā)送。
顯然sence2是合理的,這個場景的解決方法就是令牌桶算法,規(guī)定 A 的配額,允許指定平均速率和突發(fā)容量。當數(shù)據(jù)包不符合令牌桶規(guī)范,那么就認為其不合理,此時會做出一下相應:
delay 被稱為 整形 shaping
, shaping 是指在某個時間間隔內(nèi)發(fā)送超過 Bc(Committed Burst)的大小,Bc 在這里指桶的尺寸。由于數(shù)據(jù)流量是突發(fā)性的,當在一段時間內(nèi)不活動后,再次激活后的在一個間隔內(nèi)發(fā)送的數(shù)量大于 Bc ,那么額外的流量被稱為Be (burst excess)。
將流量丟棄或標記超額流量,保持在一個流量速率限制稱為 管制 policing
。
令牌桶的定義是指,有一個桶,以穩(wěn)定的速度填充令牌;桶中的任何一個溢出都會被丟棄。當要發(fā)送一個數(shù)據(jù)包,需要能夠從桶中取出一個令牌;如果桶是空的那么此時數(shù)據(jù)包是不合規(guī)的數(shù)據(jù)包,必須進行 delay
, drop
, mark
操作。如果桶是滿的,則會發(fā)送與桶容量相對應的突發(fā)(短時間內(nèi)的高帶寬傳輸),這是桶是空的。
令牌桶的規(guī)范:\(TB(r,B_{max})\)
那么公式則表示,桶以指定的速率填充令牌,最大為 \(B_{max}\) 。這就說明了為了使大小為 S 的數(shù)據(jù)包合規(guī),桶內(nèi)必須至少有 S 個令牌,即 \(B \ge S\),否則數(shù)據(jù)包不合規(guī),在發(fā)送時,桶為 \(B=B-S\)
場景1:假設令牌桶規(guī)范為 \(TB(\frac{1}{3}\ packet/ms, 4\ packet)\),桶最初是滿的,數(shù)據(jù)包在以下時間到達 [0, 0, 0, 2, 3, 6, 9, 12]
在處理完所有 T=0
的數(shù)據(jù)包后,桶中還剩 1 個令牌。到第四個數(shù)據(jù)包 T=2
到達時,桶內(nèi)已經(jīng)有1個令牌 + \(\frac{2}{3}\) 個令牌;當發(fā)送完第四個數(shù)據(jù)包時,桶內(nèi)令牌數(shù)為 \(\frac{2}{3}\) 。到 T=3
數(shù)據(jù)包時,桶內(nèi)令牌為1,滿足發(fā)送第 5 個數(shù)據(jù)包。萬松完成后桶是空的,在后面 6 9 12時,都滿足3/ms 一個數(shù)據(jù)包,都可以發(fā)送成功
場景2:另外一個實例,在同樣的令牌桶規(guī)范下 \(TB(\frac{1}{3}, 4)\),數(shù)據(jù)包到達時間為 [0, 0, 0, 0, 12, 12, 12, 12, 24, 24, 24, 24]
,可以看到在這個場景下,數(shù)據(jù)到達為3個突發(fā),每個突發(fā)4個數(shù)據(jù)包,此時每次發(fā)送完成后桶被清空,當再次填滿時需要12ms,此時另外一組突發(fā)達。故這組數(shù)據(jù)是合規(guī)的。、
場景3:在同樣的令牌桶規(guī)范下 \(TB(\frac{1}{3}, 4)\),數(shù)據(jù)包到達時間為 [0, 1, 2, 3, 4, 5]
, 這組數(shù)據(jù)是不合規(guī)的
用表格形式表示如下:
數(shù)據(jù)包到達時間 | 0 | 1 | 2 | 3 | 4 | 5 |
---|---|---|---|---|---|---|
發(fā)送前桶內(nèi)令牌 | 4 | 3 \(\frac{1}{3}\) | 2 \(\frac{2}{3}\) | 2 | 1 \(\frac{1}{3}\) | \(\frac{2}{3}\) |
發(fā)送后桶內(nèi)令牌 | 3 | 2 \(\frac{1}{3}\) | 1 \(\frac{2}{3}\) | 1 | \(\frac{1}{3}\) | \(\frac{2}{3}\) |
如果一個數(shù)據(jù)包在桶中沒有足夠的令牌來發(fā)送它時到達,可以進行整形或管制,整形使數(shù)據(jù)包等到足夠的令牌積累。管制會丟棄數(shù)據(jù)包?;蛘甙l(fā)送方可以立即發(fā)送數(shù)據(jù)包,但將其標記為不合規(guī)。
漏桶 (leaky bucket)是一種臨時存儲可變數(shù)量的請求并將它們組織成設定速率輸出的數(shù)據(jù)包的方法。漏桶的概念與令牌桶比起是相反的,漏桶可以理解為是一個具有恒定服務時間的隊列。
由下圖可以看出,漏桶的概念是一個底部有孔的桶。無論水進入桶的速度是多少,它都會以恒定的速度通過孔從桶中泄漏出來。如果桶中沒有水,則流速為零,如果桶已滿,則多余的水溢出并丟失。
和令牌桶一樣,漏桶用于流量整形和流量管制
Leaky | Token |
---|---|
桶中存放的是所有到達的數(shù)據(jù)包,必須入桶 | 桶中存放的是定期生成的令牌 |
桶以恒定速率泄漏 | 桶有最大容量 \(B_{max}\) |
突發(fā)流量入桶轉(zhuǎn)換為恒定流量發(fā)送 | 發(fā)送數(shù)據(jù)包需要小號對應的token |
token較leaky的優(yōu)勢:
在golang中,內(nèi)置的 rate
包實現(xiàn)了一個令牌桶算法,通過 rate.NewLimiter(r,B)
進行構造。與公式\(TB(r,B_{max})\) 意思相同。
type Limiter struct {
limit Limit // 向桶中放置令牌的速率
burst int // 桶的容量
mu sync.Mutex
tokens float64 // 可用令牌容量
last time.Time // 上次放入token的時間
lastEvent time.Time
}
Limiter中帶有三種方法, Allow
、Reserve
、Wait
分別表示Token Bucket中的 shaping
和 policing
:
drop
delay
Reserve()
返回了 ReserveN(time.Now(), 1)
ReserveN()
無論如何都會返回一個 Reservation,指定了調(diào)用者在 n 個事件發(fā)生之前必須等待多長時間。Delay()
方法返回了需要等待的時間,如果時間為0則不需要等待Cancel()
將取消等待wait/waitN
reserveN()
拿到token是合規(guī)的,并消耗掉tokenAllowN 為截止到某一時刻,當前桶內(nèi)桶中數(shù)目是否至少為 n 個,滿足則返回 true,同時從桶中消費 n 個 token。反之不消費 Token,false。
func (lim *Limiter) AllowN(now time.Time, n int) bool {
return lim.reserveN(now, n, 0).ok // 由于僅需要一個合規(guī)否,顧合規(guī)的通過,不合規(guī)的丟棄
}
reserveN()
是三個行為的核心,AllowN中指定的為 0 ,因為 maxFutureReserve
是最大的等待時間,AllowN給定的是0,即如果突發(fā)大的情況下丟棄額外的 Bc。
func (lim *Limiter) reserveN(now time.Time, n int, maxFutureReserve time.Duration) Reservation {
lim.mu.Lock()
if lim.limit == Inf {
lim.mu.Unlock()
return Reservation{
ok: true,
lim: lim,
tokens: n,
timeToAct: now,
}
}
// 這里拿到的是now,上次更新token時間和桶內(nèi)token數(shù)量
now, last, tokens := lim.advance(now)
// 計算剩余的token
tokens -= float64(n)
// Calculate the wait duration
var waitDuration time.Duration
if tokens < 0 {
waitDuration = lim.limit.durationFromTokens(-tokens)
}
// 確定是否合規(guī),n是token
// token 的數(shù)量要小于桶的容量,并且 等待時間小于最大等待時間
ok := n <= lim.burst && waitDuration <= maxFutureReserve
// Prepare reservation
r := Reservation{
ok: ok,
lim: lim,
limit: lim.limit,
}
if ok {
r.tokens = n
r.timeToAct = now.Add(waitDuration)
}
// Update state
if ok {
lim.last = now
lim.tokens = tokens
lim.lastEvent = r.timeToAct
} else {
lim.last = last
}
lim.mu.Unlock()
return r
}
在reserveN中調(diào)用了一個 advance()
函數(shù),
func (lim *Limiter) advance(now time.Time) (newNow time.Time, newLast time.Time, newTokens float64) {
last := lim.last
if now.Before(last) { // 計算上次放入token是否在傳入now之前
last = now
}
// 當 last 很舊時,避免在下面進行 delta 溢出。
// maxElapsed 計算裝滿需要多少時間
maxElapsed := lim.limit.durationFromTokens(float64(lim.burst) - lim.tokens)
elapsed := now.Sub(last) // 上次裝入到現(xiàn)在的時差
if elapsed > maxElapsed { // 上次如果放入token時間超長,就讓他與裝滿時間相等
elapsed = maxElapsed // 即,讓桶為滿的
}
// 裝桶的動作,下面函數(shù)表示,elapsed時間內(nèi)可以生成多少個token
delta := lim.limit.tokensFromDuration(elapsed)
tokens := lim.tokens + delta // 當前的token
if burst := float64(lim.burst); tokens > burst {
tokens = burst // 這里表示token溢出,讓他裝滿就好
}
return now, last, tokens
}
reserveN()
拿到token是合規(guī)的,并消耗掉tokenfunc (lim *Limiter) WaitN(ctx context.Context, n int) (err error) {
if n > lim.burst && lim.limit != Inf {
return fmt.Errorf("rate: Wait(n=%d) exceeds limiter's burst %d", n, lim.burst)
}
// 外部已取消
select {
case <-ctx.Done():
return ctx.Err()
default:
}
// Determine wait limit
now := time.Now()
waitLimit := InfDuration
if deadline, ok := ctx.Deadline(); ok {
waitLimit = deadline.Sub(now)
}
// 三個方法的核心,這里給定了deatline
r := lim.reserveN(now, n, waitLimit)
if !r.ok {
return fmt.Errorf("rate: Wait(n=%d) would exceed context deadline", n)
}
// Wait if necessary
delay := r.DelayFrom(now)
if delay == 0 {
return nil
}
t := time.NewTimer(delay)
defer t.Stop()
select {
case <-t.C:
// We can proceed.
return nil
case <-ctx.Done():
// Context was canceled before we could proceed. Cancel the
// reservation, which may permit other events to proceed sooner.
r.Cancel()
return ctx.Err()
}
}
在 rate.limiter
中,支持調(diào)整速率和桶大小,這樣就可以根據(jù)現(xiàn)有環(huán)境和條件,來動態(tài)的改變 Token生成速率和桶容量
SetLimit(Limit)
更改生成 Token 的速率SetBurst(int)
改變桶容量package main
import (
"log"
"strconv"
"time"
"golang.org/x/time/rate"
)
func main() {
timeLayout := "2006-01-02:15:04:05.0000"
limiter := rate.NewLimiter(1, 5) // BT(1,5)
log.Println("bucket current capacity: " + strconv.Itoa(limiter.Burst()))
length := 20 // 一共請求20次
chs := make([]chan string, length)
for i := 0; i < length; i++ {
chs[i] = make(chan string, 1)
go func(taskId string, ch chan string, r *rate.Limiter) {
err := limiter.Allow()
if !err {
ch <- "Task-" + taskId + " unallow " + time.Now().Format(timeLayout)
}
time.Sleep(time.Duration(5) * time.Millisecond)
ch <- "Task-" + taskId + " run success " + time.Now().Format(timeLayout)
return
}(strconv.FormatInt(int64(i), 10), chs[i], limiter)
}
for _, ch := range chs {
log.Println("task start at " + <-ch)
}
}
通過執(zhí)行結果可以看出,在突發(fā)為20的情況下,allow僅允許了獲得token的事件執(zhí)行,,這種場景下實現(xiàn)了流量整形的特性。
package main
import (
"context"
"log"
"strconv"
"time"
"golang.org/x/time/rate"
)
func main() {
timeLayout := "2006-01-02:15:04:05.0000"
limiter := rate.NewLimiter(1, 5) // BT(1,5)
log.Println("bucket current capacity: " + strconv.Itoa(limiter.Burst()))
length := 20 // 一共請求20次
chs := make([]chan string, length)
for i := 0; i < length; i++ {
chs[i] = make(chan string, 1)
go func(taskId string, ch chan string, r *rate.Limiter) {
err := limiter.Wait(context.TODO())
if err != nil {
ch <- "Task-" + taskId + " unallow " + time.Now().Format(timeLayout)
}
ch <- "Task-" + taskId + " run success " + time.Now().Format(timeLayout)
return
}(strconv.FormatInt(int64(i), 10), chs[i], limiter)
}
for _, ch := range chs {
log.Println("task start at " + <-ch)
}
}
結果可以看出,在大突發(fā)的情況下,在拿到token的任務會立即執(zhí)行,沒有拿到token的會等待拿到token后繼續(xù)執(zhí)行,這種場景下實現(xiàn)了流量管制的特性
Reference
tokenbucket
QoS Policing