Python實(shí)現(xiàn)異步IO的示例
前言
用阻塞 API 寫同步代碼最簡單,但一個(gè)線程同一時(shí)間只能處理一個(gè)請求,有限的線程數(shù)導(dǎo)致無法實(shí)現(xiàn)萬級別的并發(fā)連接,過多的線程切換也搶走了 CPU 的時(shí)間,從而降低了每秒能夠處理的請求數(shù)量。為了達(dá)到高并發(fā),你可能會選擇一個(gè)異步框架,用非阻塞 API 把業(yè)務(wù)邏輯打亂到多個(gè)回調(diào)函數(shù),通過多路復(fù)用與事件循環(huán)的方式實(shí)現(xiàn)高并發(fā)。
磁盤 IO 為例,描述了多線程中使用阻塞方法讀磁盤,2 個(gè)線程間的切換方式。那么,怎么才能實(shí)現(xiàn)高并發(fā)呢?
把上圖中本來由內(nèi)核實(shí)現(xiàn)的請求切換工作,交由用戶態(tài)的代碼來完成就可以了,異步化編程通過應(yīng)用層代碼實(shí)現(xiàn)了請求切換,降低了切換成本和內(nèi)存占用空間。異步化依賴于 IO 多路復(fù)用機(jī)制,比如 Linux 的 epoll 或者 Windows 上的 iocp,同時(shí),必須把阻塞方法更改為非阻塞方法,才能避免內(nèi)核切換帶來的巨大消耗。Nginx、Redis 等高性能服務(wù)都依賴異步化實(shí)現(xiàn)了百萬量級的并發(fā)。
下圖描述了異步 IO 的非阻塞讀和異步框架結(jié)合后,是如何切換請求的。
然而,寫異步化代碼很容易出錯(cuò)。因?yàn)樗凶枞瘮?shù),都需要通過非阻塞的系統(tǒng)調(diào)用拆分成兩個(gè)函數(shù)。雖然這兩個(gè)函數(shù)共同完成一個(gè)功能,但調(diào)用方式卻不同。第一個(gè)函數(shù)由你顯式調(diào)用,第二個(gè)函數(shù)則由多路復(fù)用機(jī)制調(diào)用。
這種方式違反了軟件工程的內(nèi)聚性原則,函數(shù)間同步數(shù)據(jù)也更復(fù)雜。特別是條件分支眾多、涉及大量系統(tǒng)調(diào)用時(shí),異步化的改造工作會非常困難。
Python如何實(shí)現(xiàn)異步調(diào)用
from flask import Flaskimport timeapp = Flask(__name__)@app.route(’/bar’)def bar(): time.sleep(1) return ’<h1>bar!</h1>’@app.route(’/foo’)def foo(): time.sleep(1) return ’<h1>foo!</h1>’if __name__ == ’__main__’: app.run(host=’127.0.0.1’,port=5555,debug=True)
采用同步的方式調(diào)用
import requestsimport timestarttime = time.time()print(requests.get(’http://127.0.0.1:5555/bar’).content)print(requests.get(’http://127.0.0.1:5555/foo’).content)print('消耗時(shí)間: ',time.time() -starttime)
b’<h1>bar!</h1>’b’<h1>foo!</h1>’消耗時(shí)間: 2.015509605407715
采樣異步的方式調(diào)用:
重點(diǎn):
1.將阻塞io改為非阻塞io;
2.多路復(fù)用io監(jiān)聽內(nèi)核事件,事件觸發(fā)通過回調(diào)函數(shù);
3.用戶態(tài)代碼采取事件循環(huán)的方式獲取事件,執(zhí)行事件的回調(diào)函數(shù);
import selectorsimport socketimport time# from asynrequest import ParserHttpclass asynhttp: def __init__(self): self.selecter = selectors.DefaultSelector() def get(self,url,optiondict = None): global reqcount reqcount += 1 s = socket.socket() s.setblocking(False) try: s.connect((’127.0.0.1’,5555)) except BlockingIOError: pass requset = ’GET %s HTTP/1.0rnrn’ % url callback = lambda : self.send(s,requset) self.selecter.register(s.fileno(),selectors.EVENT_WRITE,callback) def send(self,s,requset): self.selecter.unregister(s.fileno()) s.send(requset.encode()) chunks = [] callback = lambda: self.recv(s,chunks) self.selecter.register(s.fileno(),selectors.EVENT_READ,callback) def recv(self,s,chunks): self.selecter.unregister(s.fileno()) chunk = s.recv(1024) if chunk: chunks.append(chunk) callback = lambda: self.recv(s,chunks) self.selecter.register(s.fileno(), selectors.EVENT_READ, callback) else: global reqcount reqcount -= 1 request_first,request_headers,request_content,_ = ParserHttp.parser(b’’.join(chunks)) print('解析數(shù)據(jù):',request_first,request_headers,request_content) print((b’’.join(chunks)).decode()) return (b’’.join(chunks)).decode()starttime = time.time()reqcount = 0asynhttper = asynhttp()asynhttper.get(’/bar’)asynhttper.get(’/foo’)while reqcount: events = asynhttper.selecter.select() for event,mask in events: func = event.data func()print('消耗時(shí)間:' ,time.time() - starttime)
HTTP/1.0 200 OKContent-Type: text/html; charset=utf-8Content-Length: 13Server: Werkzeug/1.0.1 Python/3.7.7Date: Thu, 15 Oct 2020 03:28:16 GMT
<h1>bar!</h1>HTTP/1.0 200 OKContent-Type: text/html; charset=utf-8Content-Length: 13Server: Werkzeug/1.0.1 Python/3.7.7Date: Thu, 15 Oct 2020 03:28:16 GMT
<h1>foo!</h1>消耗時(shí)間: 1.0127637386322021
以上就是Python實(shí)現(xiàn)異步IO的示例的詳細(xì)內(nèi)容,更多關(guān)于python 異步IO的資料請關(guān)注好吧啦網(wǎng)其它相關(guān)文章!
相關(guān)文章:
1. SpringBoot使用Captcha生成驗(yàn)證碼2. android studio實(shí)現(xiàn)簡單的計(jì)算器(無bug)3. JavaEE SpringMyBatis是什么? 它和Hibernate的區(qū)別及如何配置MyBatis4. Python 忽略文件名編碼的方法5. android 控件同時(shí)監(jiān)聽單擊和雙擊實(shí)例6. Java Media Framework 基礎(chǔ)教程7. springboot項(xiàng)目整合druid數(shù)據(jù)庫連接池的實(shí)現(xiàn)8. 解決vue頁面刷新,數(shù)據(jù)丟失的問題9. python 讀txt文件,按‘,’分割每行數(shù)據(jù)操作10. python logging.info在終端沒輸出的解決
