總結(jié)以下:ØMQ (ZeroMQ) 是一個(gè)基于消息隊(duì)列的多線(xiàn)程網(wǎng)絡(luò)庫(kù),它封裝了網(wǎng)絡(luò)通信、消息隊(duì)列、線(xiàn)程調(diào)度等功能,向上層提供簡(jiǎn)潔的API,應(yīng)用程序通過(guò)加載庫(kù)文件,調(diào)用API函數(shù)來(lái)實(shí)現(xiàn)高性能網(wǎng)絡(luò)通信。
看起來(lái)有些抽象,下面我們結(jié)合ZeroMQ 的 Python 封裝———— pyzmp,用實(shí)例看一下ZeroMQ的三種最基本的工作模式。
安裝方法
pip install pyzmq
查看是否安裝成功
>>> import zmq >>> print(zmq.__version__) 22.0.3
#client.py import zmq context = zmq.Context() # Socket to talk to server print("Connecting to hello world server…") socket = context.socket(zmq.REQ) socket.connect("tcp://localhost:5555") socket.send(b"Hello") # Get the reply. message = socket.recv() print(f"Received reply [ {message} ]")
#server.py import time import zmq context = zmq.Context() socket = context.socket(zmq.REP) socket.bind("tcp://*:5555") while True: # Wait for next request from client message = socket.recv() print("Received request: %s" % message) # Do some 'work' time.sleep(1) # Send reply back to client socket.send(b"World")
python client.py Connecting to hello world server… Received reply [ b'World' ]
python server.py Received request: b'Hello'
可以試一下,多運(yùn)行幾個(gè)client.py,看看情況是什么樣的。
這里直接引用官方文檔的例子:
發(fā)布者:類(lèi)似于一個(gè)天氣更新服務(wù)器,向訂閱者發(fā)送天氣更新,內(nèi)容包括郵政編碼、溫度、濕度等信息
#Publisher.py import zmq from random import randrange context = zmq.Context() socket = context.socket(zmq.PUB) socket.bind("tcp://*:5556") while True: zipcode = randrange(1, 100000) temperature = randrange(-80, 135) relhumidity = randrange(10, 60) socket.send_string("%i %i %i" % (zipcode, temperature, relhumidity))
訂閱者:它監(jiān)聽(tīng)發(fā)布者更新的數(shù)據(jù)流,過(guò)濾只接收與特定郵政編碼相關(guān)的天氣信息,默認(rèn)接收接收10條數(shù)據(jù)
#Subscribe.py import sys import zmq # Socket to talk to server context = zmq.Context() socket = context.socket(zmq.SUB) print("Collecting updates from weather server...") socket.connect("tcp://localhost:5556") # Subscribe to zipcode, default is NYC, 10001 zip_filter = sys.argv[1] if len(sys.argv) > 1 else "10001" # Python 2 - ascii bytes to unicode str if isinstance(zip_filter, bytes): zip_filter = zip_filter.decode('ascii') socket.setsockopt_string(zmq.SUBSCRIBE, zip_filter) # Process 5 updates total_temp = 0 for update_nbr in range(5): string = socket.recv_string() zipcode, temperature, relhumidity = string.split() total_temp += int(temperature) print( "Average temperature for zipcode '%s' was %dF" % (zip_filter, total_temp / (update_nbr + 1)) )
ventilator 使用的是 SOCKET_PUSH,將任務(wù)分發(fā)到 Worker 節(jié)點(diǎn)上。Worker 節(jié)點(diǎn)上,使用 SOCKET_PULL 從上游接受任務(wù),并使用 SOCKET_PUSH 將結(jié)果匯集到 Sink。值得注意的是,任務(wù)的分發(fā)的時(shí)候也同樣有一個(gè)負(fù)載均衡的路由功能,worker 可以隨時(shí)自由加入,ventilator 可以均衡將任務(wù)分發(fā)出去。
Push/Pull模式還是蠻常用的,這里我們主要測(cè)試一下它的負(fù)載均衡。
# ventilator.py import zmq import time context = zmq.Context() socket = context.socket(zmq.PUSH) socket.bind("tcp://*:5557") while True: socket.send(b"test") print("已發(fā)送") time.sleep(1)
# worker.py import zmq context = zmq.Context() recive = context.socket(zmq.PULL) recive.connect('tcp://127.0.0.1:5557') sender = context.socket(zmq.PUSH) sender.connect('tcp://127.0.0.1:5558') while True: data = recive.recv() print("work1 正在轉(zhuǎn)發(fā)...") sender.send(data)
# sink.py import zmq import sys context = zmq.Context() socket = context.socket(zmq.PULL) socket.bind("tcp://*:5558") while True: response = socket.recv() print("response: %s" % response)
打開(kāi)4個(gè)Terminal,分別運(yùn)行
python sink.py python worker.py python worker.py python ventilator.py
消息模型可以根據(jù)需要組合使用,后續(xù)的代理模式和路由模式等都是在三種基本模式上面的擴(kuò)展或變異。
到此這篇關(guān)于Python網(wǎng)絡(luò)編程之ZeroMQ知識(shí)總結(jié)的文章就介紹到這了,更多相關(guān)Python ZeroMQ知識(shí)總結(jié)內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
標(biāo)簽:克拉瑪依 金華 赤峰 臨汾 貴州 陽(yáng)泉 日照 雙鴨山
巨人網(wǎng)絡(luò)通訊聲明:本文標(biāo)題《Python網(wǎng)絡(luò)編程之ZeroMQ知識(shí)總結(jié)》,本文關(guān)鍵詞 Python,網(wǎng)絡(luò)編程,之,ZeroMQ,;如發(fā)現(xiàn)本文內(nèi)容存在版權(quán)問(wèn)題,煩請(qǐng)?zhí)峁┫嚓P(guān)信息告之我們,我們將及時(shí)溝通與處理。本站內(nèi)容系統(tǒng)采集于網(wǎng)絡(luò),涉及言論、版權(quán)與本站無(wú)關(guān)。