本文的完整代碼在github.com/hdt3213/godis/redis/client
通常 TCP 客戶端的通信模式都是阻塞式的: 客戶端發(fā)送請求 -> 等待服務端響應 -> 發(fā)送下一個請求。因為需要等待網絡傳輸數據,完成一次請求循環(huán)需要等待較多時間。
我們能否不等待服務端響應直接發(fā)送下一條請求呢?答案是肯定的。
TCP 作為全雙工協議可以同時進行上行和下行通信,不必擔心客戶端和服務端同時發(fā)包會導致沖突。
p.s. 打電話的時候兩個人同時講話就會沖突聽不清,只能輪流講。這種通信方式稱為半雙工。廣播只能由電臺發(fā)送到收音機不能反向傳輸,這種方式稱為單工。
我們?yōu)槊恳粋€ tcp 連接分配了一個 goroutine 可以保證先收到的請求先先回復。另一個方面,tcp 協議會保證數據流的有序性,同一個 tcp 連接上先發(fā)送的請求服務端先接收,先回復的響應客戶端先收到。因此我們不必擔心混淆響應所對應的請求。
這種在服務端未響應時客戶端繼續(xù)向服務端發(fā)送請求的模式稱為 Pipeline 模式。因為減少等待網絡傳輸的時間,Pipeline 模式可以極大的提高吞吐量,減少所需使用的 tcp 鏈接數。
pipeline 模式的 redis 客戶端需要有兩個后臺協程程負責 tcp 通信,調用方通過 channel 向后臺協程發(fā)送指令,并阻塞等待直到收到響應,這是一個典型的異步編程模式。
我們先來定義 client 的結構:
type Client struct {
conn net.Conn // 與服務端的 tcp 連接
pendingReqs chan *Request // 等待發(fā)送的請求
waitingReqs chan *Request // 等待服務器響應的請求
ticker *time.Ticker // 用于觸發(fā)心跳包的計時器
addr string
ctx context.Context
cancelFunc context.CancelFunc
writing *sync.WaitGroup // 有請求正在處理不能立即停止,用于實現 graceful shutdown
}
type Request struct {
id uint64 // 請求id
args [][]byte // 上行參數
reply redis.Reply // 收到的返回值
heartbeat bool // 標記是否是心跳請求
waiting *wait.Wait // 調用協程發(fā)送請求后通過 waitgroup 等待請求異步處理完成
err error
}
調用者將請求發(fā)送給后臺協程,并通過 wait group 等待異步處理完成:
func (client *Client) Send(args [][]byte) redis.Reply {
request := request{
args: args,
heartbeat: false,
waiting: wait.Wait{},
}
request.waiting.Add(1)
client.working.Add(1)
defer client.working.Done()
client.pendingReqs - request // 請求入隊
timeout := request.waiting.WaitWithTimeout(maxWait) // 等待響應或者超時
if timeout {
return reply.MakeErrReply("server time out")
}
if request.err != nil {
return reply.MakeErrReply("request failed")
}
return request.reply
}
client 的核心部分是后臺的讀寫協程。先從寫協程開始:
// 寫協程入口
func (client *Client) handleWrite() {
for req := range client.pendingReqs {
client.doRequest(req)
}
}
// 發(fā)送請求
func (client *Client) doRequest(req *request) {
if req == nil || len(req.args) == 0 {
return
}
// 序列化請求
re := reply.MakeMultiBulkReply(req.args)
bytes := re.ToBytes()
_, err := client.conn.Write(bytes)
i := 0
// 失敗重試
for err != nil i 3 {
err = client.handleConnectionError(err)
if err == nil {
_, err = client.conn.Write(bytes)
}
i++
}
if err == nil {
// 發(fā)送成功等待服務器響應
client.waitingReqs - req
} else {
req.err = err
req.waiting.Done()
}
}
讀協程是我們熟悉的協議解析器模板, 不熟悉的朋友可以到解析Redis Cluster原理了解更多。
// 收到服務端的響應
func (client *Client) finishRequest(reply redis.Reply) {
defer func() {
if err := recover(); err != nil {
debug.PrintStack()
logger.Error(err)
}
}()
request := -client.waitingReqs
if request == nil {
return
}
request.reply = reply
if request.waiting != nil {
request.waiting.Done()
}
}
// 讀協程是個 RESP 協議解析器
func (client *Client) handleRead() error {
ch := parser.ParseStream(client.conn)
for payload := range ch {
if payload.Err != nil {
client.finishRequest(reply.MakeErrReply(payload.Err.Error()))
continue
}
client.finishRequest(payload.Data)
}
return nil
}
最后編寫 client 的構造器和啟動異步協程的代碼:
func MakeClient(addr string) (*Client, error) {
conn, err := net.Dial("tcp", addr)
if err != nil {
return nil, err
}
ctx, cancel := context.WithCancel(context.Background())
return Client{
addr: addr,
conn: conn,
sendingReqs: make(chan *Request, chanSize),
waitingReqs: make(chan *Request, chanSize),
ctx: ctx,
cancelFunc: cancel,
writing: sync.WaitGroup{},
}, nil
}
func (client *Client) Start() {
client.ticker = time.NewTicker(10 * time.Second)
go client.handleWrite()
go func() {
err := client.handleRead()
logger.Warn(err)
}()
go client.heartbeat()
}
關閉 client 的時候記得等待請求完成:
func (client *Client) Close() {
// 先阻止新請求進入隊列
close(client.sendingReqs)
// 等待處理中的請求完成
client.writing.Wait()
// 釋放資源
_ = client.conn.Close() // 關閉與服務端的連接,連接關閉后讀協程會退出
client.cancelFunc() // 使用 context 關閉讀協程
close(client.waitingReqs) // 關閉隊列
}
測試一下:
func TestClient(t *testing.T) {
client, err := MakeClient("localhost:6379")
if err != nil {
t.Error(err)
}
client.Start()
result = client.Send([][]byte{
[]byte("SET"),
[]byte("a"),
[]byte("a"),
})
if statusRet, ok := result.(*reply.StatusReply); ok {
if statusRet.Status != "OK" {
t.Error("`set` failed, result: " + statusRet.Status)
}
}
result = client.Send([][]byte{
[]byte("GET"),
[]byte("a"),
})
if bulkRet, ok := result.(*reply.BulkReply); ok {
if string(bulkRet.Arg) != "a" {
t.Error("`get` failed, result: " + string(bulkRet.Arg))
}
}
}
Keep working, we will find a way out.This is Finley, welcome to join us.
到此這篇關于Golang 實現 Redis系列(六)如何實現 pipeline 模式的 redis 客戶端的文章就介紹到這了,更多相關Golang實現pipeline模式的redis客戶端內容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持腳本之家!
您可能感興趣的文章:- Golang連接Redis數據庫的方法
- Golang使用lua腳本實現redis原子操作
- golang實現redis的延時消息隊列功能示例
- 在Golang中使用Redis的方法示例