在開(kāi)發(fā)高并發(fā)系統(tǒng)時(shí),我們可能會(huì)遇到接口訪(fǎng)問(wèn)頻次過(guò)高,為了保證系統(tǒng)的高可用和穩(wěn)定性,這時(shí)候就需要做流量限制,你可能是用的 Nginx
這種來(lái)控制請(qǐng)求,也可能是用了一些流行的類(lèi)庫(kù)實(shí)現(xiàn)。限流是高并發(fā)系統(tǒng)的一大殺器,在設(shè)計(jì)限流算法之前我們先來(lái)了解一下它們是什么。
限流
的目的是通過(guò)對(duì)并發(fā)訪(fǎng)問(wèn)請(qǐng)求進(jìn)行限速,或者對(duì)一個(gè)時(shí)間窗口內(nèi)的請(qǐng)求進(jìn)行限速來(lái)保護(hù)系統(tǒng),一旦達(dá)到限制速率則可以拒絕服務(wù)、排隊(duì)或等待、降級(jí)等處理。通過(guò)對(duì)并發(fā)(或者一定時(shí)間窗口內(nèi))請(qǐng)求進(jìn)行限速來(lái)保護(hù)系統(tǒng),一旦達(dá)到限制速率則拒絕服務(wù)(定向到錯(cuò)誤頁(yè)或告知資源沒(méi)有了)、排隊(duì)等待(比如秒殺、評(píng)論、下單)、降級(jí)(返回兜底數(shù)據(jù)或默認(rèn)數(shù)據(jù))。
如 圖:
自己魔改出來(lái)的漫畫(huà)
如圖上的漫畫(huà),在某個(gè)時(shí)間段流量上來(lái)了,服務(wù)的接口訪(fǎng)問(wèn)頻率可能會(huì)非???,如果我們沒(méi)有對(duì)接口訪(fǎng)問(wèn)頻次做限制可能會(huì)導(dǎo)致服務(wù)器無(wú)法承受過(guò)高的壓力掛掉,這時(shí)候也可能會(huì)產(chǎn)生數(shù)據(jù)丟失,所以就要對(duì)其進(jìn)行限流處理。
限流算法就可以幫助我們?nèi)タ刂泼總€(gè)接口或程序的函數(shù)被調(diào)用頻率,它有點(diǎn)兒像保險(xiǎn)絲,防止系統(tǒng)因?yàn)槌^(guò)訪(fǎng)問(wèn)頻率或并發(fā)量而引起癱瘓。我們可能在調(diào)用某些第三方的接口的時(shí)候會(huì)看到類(lèi)似這樣的響應(yīng)頭:
X-RateLimit-Limit: 60 //每秒60次請(qǐng)求 X-RateLimit-Remaining: 22 //當(dāng)前還剩下多少次 X-RateLimit-Reset: 1612184024 //限制重置時(shí)間
上面的 HTTP Response
是通過(guò)響應(yīng)頭告訴調(diào)用方服務(wù)端的限流頻次是怎樣的,保證后端的接口訪(fǎng)問(wèn)上限。為了解決限流問(wèn)題出現(xiàn)了很多的算法,它們都有不同的用途,通常的策略就是拒絕超出的請(qǐng)求,或者讓超出的請(qǐng)求排隊(duì)等待。
一般來(lái)說(shuō),限流的常用處理手段有:
計(jì)數(shù)器是一種最簡(jiǎn)單限流算法,其原理就是:
在一段時(shí)間間隔內(nèi),對(duì)請(qǐng)求進(jìn)行計(jì)數(shù),與閥值進(jìn)行比較判斷是否需要限流,一旦到了時(shí)間臨界點(diǎn),將計(jì)數(shù)器清零。
這個(gè)就像你去坐車(chē)一樣,車(chē)廂規(guī)定了多少個(gè)位置,滿(mǎn)了就不讓上車(chē)了,不然就是超載了,被交警叔叔抓到了就要罰款的,如果我們的系統(tǒng)那就不是罰款的事情了,可能直接崩掉了。
count
,當(dāng)過(guò)來(lái)一個(gè)請(qǐng)求我就將這個(gè)數(shù)+1
,同時(shí)記錄請(qǐng)求時(shí)間。count
的計(jì)數(shù)值是否超過(guò)設(shè)定的頻次,以及當(dāng)前請(qǐng)求的時(shí)間和第一次請(qǐng)求時(shí)間是否在 1
分鐘內(nèi)。1
分鐘內(nèi)并且超過(guò)設(shè)定的頻次則證明請(qǐng)求過(guò)多,后面的請(qǐng)求就拒絕掉。count
值還在限流范圍內(nèi),就重置 count
。代碼實(shí)現(xiàn):
package main import ( "log" "sync" "time" ) type Counter struct { rate int //計(jì)數(shù)周期內(nèi)最多允許的請(qǐng)求數(shù) begin time.Time //計(jì)數(shù)開(kāi)始時(shí)間 cycle time.Duration //計(jì)數(shù)周期 count int //計(jì)數(shù)周期內(nèi)累計(jì)收到的請(qǐng)求數(shù) lock sync.Mutex } func (l *Counter) Allow() bool { l.lock.Lock() defer l.lock.Unlock() if l.count == l.rate-1 { now := time.Now() if now.Sub(l.begin) >= l.cycle { //速度允許范圍內(nèi), 重置計(jì)數(shù)器 l.Reset(now) return true } else { return false } } else { //沒(méi)有達(dá)到速率限制,計(jì)數(shù)加1 l.count++ return true } } func (l *Counter) Set(r int, cycle time.Duration) { l.rate = r l.begin = time.Now() l.cycle = cycle l.count = 0 } func (l *Counter) Reset(t time.Time) { l.begin = t l.count = 0 } func main() { var wg sync.WaitGroup var lr Counter lr.Set(3, time.Second) // 1s內(nèi)最多請(qǐng)求3次 for i := 0; i 10; i++ { wg.Add(1) log.Println("創(chuàng)建請(qǐng)求:", i) go func(i int) { if lr.Allow() { log.Println("響應(yīng)請(qǐng)求:", i) } wg.Done() }(i) time.Sleep(200 * time.Millisecond) } wg.Wait() }
OutPut:
2021/02/01 21:16:12 創(chuàng)建請(qǐng)求: 0
2021/02/01 21:16:12 響應(yīng)請(qǐng)求: 0
2021/02/01 21:16:12 創(chuàng)建請(qǐng)求: 1
2021/02/01 21:16:12 響應(yīng)請(qǐng)求: 1
2021/02/01 21:16:12 創(chuàng)建請(qǐng)求: 2
2021/02/01 21:16:13 創(chuàng)建請(qǐng)求: 3
2021/02/01 21:16:13 創(chuàng)建請(qǐng)求: 4
2021/02/01 21:16:13 創(chuàng)建請(qǐng)求: 5
2021/02/01 21:16:13 響應(yīng)請(qǐng)求: 5
2021/02/01 21:16:13 創(chuàng)建請(qǐng)求: 6
2021/02/01 21:16:13 響應(yīng)請(qǐng)求: 6
2021/02/01 21:16:13 創(chuàng)建請(qǐng)求: 7
2021/02/01 21:16:13 響應(yīng)請(qǐng)求: 7
2021/02/01 21:16:14 創(chuàng)建請(qǐng)求: 8
2021/02/01 21:16:14 創(chuàng)建請(qǐng)求: 9
可以看到我們?cè)O(shè)置的是每200ms
創(chuàng)建一個(gè)請(qǐng)求,明顯高于1
秒最多3
個(gè)請(qǐng)求的限制,運(yùn)行起來(lái)之后發(fā)現(xiàn)編號(hào)為 2、3、4、8、9
的請(qǐng)求被丟棄,說(shuō)明限流成功。
那么問(wèn)題來(lái)了,如果有個(gè)需求對(duì)于某個(gè)接口 /query
每分鐘最多允許訪(fǎng)問(wèn) 200 次,假設(shè)有個(gè)用戶(hù)在第 59 秒的最后幾毫秒瞬間發(fā)送 200 個(gè)請(qǐng)求,當(dāng) 59 秒結(jié)束后 Counter
清零了,他在下一秒的時(shí)候又發(fā)送 200 個(gè)請(qǐng)求。那么在 1 秒鐘內(nèi)這個(gè)用戶(hù)發(fā)送了 2 倍的請(qǐng)求,這個(gè)是符合我們的設(shè)計(jì)邏輯的,這也是計(jì)數(shù)器方法的設(shè)計(jì)缺陷,系統(tǒng)可能會(huì)承受惡意用戶(hù)的大量請(qǐng)求,甚至擊穿系統(tǒng)。
如下圖:
這種方法雖然簡(jiǎn)單,但也有個(gè)大問(wèn)題就是沒(méi)有很好的處理單位時(shí)間的邊界。
滑動(dòng)窗口
是針對(duì)計(jì)數(shù)器存在的臨界點(diǎn)缺陷,所謂 滑動(dòng)窗口(Sliding window)
是一種流量控制技術(shù),這個(gè)詞出現(xiàn)在 TCP
協(xié)議中。滑動(dòng)窗口
把固定時(shí)間片進(jìn)行劃分,并且隨著時(shí)間的流逝,進(jìn)行移動(dòng),固定數(shù)量的可以移動(dòng)的格子,進(jìn)行計(jì)數(shù)并判斷閥值。
如 圖:
上圖中我們用紅色的虛線(xiàn)代表一個(gè)時(shí)間窗口(一分鐘
),每個(gè)時(shí)間窗口有 6
個(gè)格子,每個(gè)格子是 10
秒鐘。每過(guò) 10
秒鐘時(shí)間窗口向右移動(dòng)一格,可以看紅色箭頭的方向。我們?yōu)槊總€(gè)格子都設(shè)置一個(gè)獨(dú)立的計(jì)數(shù)器 Counter
,假如一個(gè)請(qǐng)求在 0:45
訪(fǎng)問(wèn)了那么我們將第五個(gè)格子的計(jì)數(shù)器 +1
(也是就是 0:40~0:50
),在判斷限流的時(shí)候需要把所有格子的計(jì)數(shù)加起來(lái)和設(shè)定的頻次進(jìn)行比較即可。
那么滑動(dòng)窗口如何解決我們上面遇到的問(wèn)題呢?來(lái)看下面的圖:
當(dāng)用戶(hù)在0:59
秒鐘發(fā)送了 200
個(gè)請(qǐng)求就會(huì)被第六個(gè)格子的計(jì)數(shù)器記錄 +200
,當(dāng)下一秒的時(shí)候時(shí)間窗口向右移動(dòng)了一個(gè),此時(shí)計(jì)數(shù)器已經(jīng)記錄了該用戶(hù)發(fā)送的 200
個(gè)請(qǐng)求,所以再發(fā)送的話(huà)就會(huì)觸發(fā)限流,則拒絕新的請(qǐng)求。
其實(shí)計(jì)數(shù)器就是滑動(dòng)窗口啊,只不過(guò)只有一個(gè)格子而已,所以想讓限流做的更精確只需要?jiǎng)澐指嗟母褡泳涂梢粤?,為了更精確我們也不知道到底該設(shè)置多少個(gè)格子,格子的數(shù)量影響著滑動(dòng)窗口算法的精度,依然有時(shí)間片的概念,無(wú)法根本解決臨界點(diǎn)問(wèn)題
。
相關(guān)算法實(shí)現(xiàn) github.com/RussellLuo/slidingwindow
漏桶算法(Leaky Bucket)
,原理就是一個(gè)固定容量的漏桶,按照固定速率流出水滴。用過(guò)水龍頭都知道,打開(kāi)龍頭開(kāi)關(guān)水就會(huì)流下滴到水桶里,而漏桶指的是水桶下面有個(gè)漏洞可以出水。如果水龍頭開(kāi)的特別大那么水流速就會(huì)過(guò)大,這樣就可能導(dǎo)致水桶的水滿(mǎn)了然后溢出。
如 圖:
一個(gè)固定容量的桶,有水流進(jìn)來(lái),也有水流出去。對(duì)于流進(jìn)來(lái)的水來(lái)說(shuō),我們無(wú)法預(yù)計(jì)一共有多少水會(huì)流進(jìn)來(lái),也無(wú)法預(yù)計(jì)水流的速度。但是對(duì)于流出去的水來(lái)說(shuō),這個(gè)桶可以固定水流出的速率(處理速度
),從而達(dá)到 流量整形
和 流量控制
的效果。
代碼實(shí)現(xiàn):
type LeakyBucket struct { rate float64 //固定每秒出水速率 capacity float64 //桶的容量 water float64 //桶中當(dāng)前水量 lastLeakMs int64 //桶上次漏水時(shí)間戳 ms lock sync.Mutex } func (l *LeakyBucket) Allow() bool { l.lock.Lock() defer l.lock.Unlock() now := time.Now().UnixNano() / 1e6 eclipse := float64((now - l.lastLeakMs)) * l.rate / 1000 //先執(zhí)行漏水 l.water = l.water - eclipse //計(jì)算剩余水量 l.water = math.Max(0, l.water) //桶干了 l.lastLeakMs = now if (l.water + 1) l.capacity { // 嘗試加水,并且水還未滿(mǎn) l.water++ return true } else { // 水滿(mǎn),拒絕加水 return false } } func (l *LeakyBucket) Set(r, c float64) { l.rate = r l.capacity = c l.water = 0 l.lastLeakMs = time.Now().UnixNano() / 1e6 }
漏桶算法有以下特點(diǎn):
漏桶限制的是常量流出速率(即流出速率是一個(gè)固定常量值),所以最大的速率就是出水的速率,不能出現(xiàn)突發(fā)流量。
令牌桶算法(Token Bucket)
是網(wǎng)絡(luò)流量整形(Traffic Shaping)
和速率限制(Rate Limiting)
中最常使用的一種算法。典型情況下,令牌桶算法用來(lái)控制發(fā)送到網(wǎng)絡(luò)上的數(shù)據(jù)的數(shù)目,并允許突發(fā)數(shù)據(jù)的發(fā)送。
我們有一個(gè)固定的桶,桶里存放著令牌(token)
。一開(kāi)始桶是空的,系統(tǒng)按固定的時(shí)間(rate)
往桶里添加令牌,直到桶里的令牌數(shù)滿(mǎn),多余的請(qǐng)求會(huì)被丟棄。當(dāng)請(qǐng)求來(lái)的時(shí)候,從桶里移除一個(gè)令牌,如果桶是空的則拒絕請(qǐng)求或者阻塞。
實(shí)現(xiàn)代碼:
type TokenBucket struct { rate int64 //固定的token放入速率, r/s capacity int64 //桶的容量 tokens int64 //桶中當(dāng)前token數(shù)量 lastTokenSec int64 //桶上次放token的時(shí)間戳 s lock sync.Mutex } func (l *TokenBucket) Allow() bool { l.lock.Lock() defer l.lock.Unlock() now := time.Now().Unix() l.tokens = l.tokens + (now-l.lastTokenSec)*l.rate // 先添加令牌 if l.tokens > l.capacity { l.tokens = l.capacity } l.lastTokenSec = now if l.tokens > 0 { // 還有令牌,領(lǐng)取令牌 l.tokens-- return true } else { // 沒(méi)有令牌,則拒絕 return false } } func (l *TokenBucket) Set(r, c int64) { l.rate = r l.capacity = c l.tokens = 0 l.lastTokenSec = time.Now().Unix() }
令牌桶有以下特點(diǎn):
B
個(gè)令牌,當(dāng)桶滿(mǎn)時(shí),新添加的令牌被丟棄或拒絕N
個(gè),則不會(huì)刪除令牌,且請(qǐng)求將被限流(丟棄或阻塞等待)令牌桶限制的是平均流入速率(允許突發(fā)請(qǐng)求,只要有令牌就可以處理,支持一次拿3個(gè)令牌,4個(gè)令牌...),并允許一定程度突發(fā)流量。
目前常用的是令牌桶
這種,本文介紹了幾種常見(jiàn)的限流算法實(shí)現(xiàn)
到此這篇關(guān)于Go實(shí)現(xiàn)各類(lèi)限流的文章就介紹到這了,更多相關(guān)Go實(shí)現(xiàn)各類(lèi)限流內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
標(biāo)簽:雞西 吐魯番 銅川 梅河口 蘭州 欽州 汕頭 重慶
巨人網(wǎng)絡(luò)通訊聲明:本文標(biāo)題《Go實(shí)現(xiàn)各類(lèi)限流的方法》,本文關(guān)鍵詞 實(shí)現(xiàn),各類(lèi),限,流的,方法,;如發(fā)現(xiàn)本文內(nèi)容存在版權(quán)問(wèn)題,煩請(qǐng)?zhí)峁┫嚓P(guān)信息告之我們,我們將及時(shí)溝通與處理。本站內(nèi)容系統(tǒng)采集于網(wǎng)絡(luò),涉及言論、版權(quán)與本站無(wú)關(guān)。