改進(jìn)之前
目前創(chuàng)新互聯(lián)公司已為數(shù)千家的企業(yè)提供了網(wǎng)站建設(shè)、域名、網(wǎng)頁(yè)空間、網(wǎng)站托管、服務(wù)器托管、企業(yè)網(wǎng)站設(shè)計(jì)、肥鄉(xiāng)網(wǎng)站維護(hù)等服務(wù),公司將堅(jiān)持客戶導(dǎo)向、應(yīng)用為本的策略,正道將秉承"和諧、參與、激情"的文化,與客戶和合作伙伴齊心協(xié)力一起成長(zhǎng),共同發(fā)展。
之前,我的查詢步驟很簡(jiǎn)單,就是:
前端提交查詢請(qǐng)求 -- 建立數(shù)據(jù)庫(kù)連接 -- 新建游標(biāo) -- 執(zhí)行命令 -- 接受結(jié)果 -- 關(guān)閉游標(biāo)、連接
這幾大步驟的順序執(zhí)行。
這里面當(dāng)然問(wèn)題很大:
建立數(shù)據(jù)庫(kù)連接實(shí)際上就是新建一個(gè)套接字。這是進(jìn)程間通信的幾種方法里,開(kāi)銷最大的了。
在“執(zhí)行命令”和“接受結(jié)果”兩個(gè)步驟中,線程在阻塞在數(shù)據(jù)庫(kù)內(nèi)部的運(yùn)行過(guò)程中,數(shù)據(jù)庫(kù)連接和游標(biāo)都處于閑置狀態(tài)。
這樣一來(lái),每一次查詢都要順序的新建數(shù)據(jù)庫(kù)連接,都要阻塞在數(shù)據(jù)庫(kù)返回結(jié)果的過(guò)程中。當(dāng)前端提交大量查詢請(qǐng)求時(shí),查詢效率肯定是很低的。
第一次改進(jìn)
之前的模塊里,問(wèn)題最大的就是第一步——建立數(shù)據(jù)庫(kù)連接套接字了。如果能夠一次性建立連接,之后查詢能夠反復(fù)服用這個(gè)連接就好了。
所以,首先應(yīng)該把數(shù)據(jù)庫(kù)查詢模塊作為一個(gè)單獨(dú)的守護(hù)進(jìn)程去執(zhí)行,而前端app作為主進(jìn)程響應(yīng)用戶的點(diǎn)擊操作。那么兩條進(jìn)程怎么傳遞消息呢?翻了幾天Python文檔,終于構(gòu)思出來(lái):用隊(duì)列queue作為生產(chǎn)者(web前端)向消費(fèi)者(數(shù)據(jù)庫(kù)后端)傳遞任務(wù)的渠道。生產(chǎn)者,會(huì)與SQL命令一起,同時(shí)傳遞一個(gè)管道pipe的連接對(duì)象,作為任務(wù)完成后,回傳結(jié)果的渠道。確保,任務(wù)的接收方與發(fā)送方保持一致。
作為第二個(gè)問(wèn)題的解決方法,可以使用線程池來(lái)并發(fā)獲取任務(wù)隊(duì)列中的task,然后執(zhí)行命令并回傳結(jié)果。
第二次改進(jìn)
第一次改進(jìn)的效果還是很明顯的,不用任何測(cè)試手段。直接點(diǎn)擊頁(yè)面鏈接,可以很直觀地感覺(jué)到反應(yīng)速度有很明顯的加快。
但是對(duì)于第二個(gè)問(wèn)題,使用線程池還是有些欠妥當(dāng)。因?yàn)椋珻Python解釋器存在GIL問(wèn)題,所有線程實(shí)際上都在一個(gè)解釋器進(jìn)程里調(diào)度。線程稍微開(kāi)多一點(diǎn),解釋器進(jìn)程就會(huì)頻繁的切換線程,而線程切換的開(kāi)銷也不小。線程多一點(diǎn),甚至?xí)霈F(xiàn)“抖動(dòng)”問(wèn)題(也就是剛剛喚醒一個(gè)線程,就進(jìn)入掛起狀態(tài),剛剛換到棧幀或內(nèi)存的上下文,又被換回內(nèi)存或者磁盤),效率大大降低。也就是說(shuō),線程池的并發(fā)量很有限。
試過(guò)了多進(jìn)程、多線程,只能在單個(gè)線程里做文章了。
Python中的asyncio庫(kù)
Python里有大量的協(xié)程庫(kù)可以實(shí)現(xiàn)單線程內(nèi)的并發(fā)操作,比如Twisted、Gevent等等。Python官方在3.5版本里提供了asyncio庫(kù)同樣可以實(shí)現(xiàn)協(xié)程并發(fā)。asyncio庫(kù)大大降低了Python中協(xié)程的實(shí)現(xiàn)難度,就像定義普通函數(shù)那樣就可以了,只是要在def前面多加一個(gè)async關(guān)鍵詞。async def函數(shù)中,需要阻塞在其他async def函數(shù)的位置前面可以加上await關(guān)鍵詞。
import asyncio
async def wait():
await asyncio.sleep(2)
async def execute(task):
process_task(task)
await wait()
continue_job()
async def函數(shù)的執(zhí)行稍微麻煩點(diǎn)。需要首先獲取一個(gè)loop對(duì)象,然后由這個(gè)對(duì)象代為執(zhí)行async def函數(shù)。
loop = asyncio.get_event_loop()
loop.run_until_complete(execute(task))
loop.close()
loop在執(zhí)行execute(task)函數(shù)時(shí),如果遇到await關(guān)鍵字,就會(huì)暫時(shí)掛起當(dāng)前協(xié)程,轉(zhuǎn)而去執(zhí)行其他阻塞在await關(guān)鍵詞的協(xié)程,從而實(shí)現(xiàn)協(xié)程并發(fā)。
不過(guò)需要注意的是,run_until_complete()函數(shù)本身是一個(gè)阻塞函數(shù)。也就是說(shuō),當(dāng)前線程會(huì)等候一個(gè)run_until_complete()函數(shù)執(zhí)行完畢之后,才會(huì)繼續(xù)執(zhí)行下一部函數(shù)。所以下面這段代碼并不能并發(fā)執(zhí)行。
for task in task_list:
loop.run_until_complete(task)
對(duì)與這個(gè)問(wèn)題,asyncio庫(kù)也有相應(yīng)的解決方案:gather函數(shù)。
loop = asyncio.get_event_loop()
tasks = [asyncio.ensure_future(execute(task))
for task in task_list]
loop.run_until_complete(asyncio.gather(*tasks))
loop.close()
當(dāng)然了,async def函數(shù)的執(zhí)行并不只有這兩種解決方案,還有call_soon與run_forever的配合執(zhí)行等等,更多內(nèi)容還請(qǐng)參考官方文檔。
Python下的I/O多路復(fù)用
協(xié)程,實(shí)際上,也存在上下文切換,只不過(guò)開(kāi)銷很輕微。而I/O多路復(fù)用則完全不存在這個(gè)問(wèn)題。
目前,Linux上比較火的I/O多路復(fù)用API要算epoll了。Tornado,就是通過(guò)調(diào)用C語(yǔ)言封裝的epoll庫(kù),成功解決了C10K問(wèn)題(當(dāng)然還有Pypy的功勞)。
在Linux里查文檔,可以看到epoll只有三類函數(shù),調(diào)用起來(lái)比較方便易懂。
創(chuàng)建epoll對(duì)象,并返回其對(duì)應(yīng)的文件描述符(file descriptor)。
int epoll_create(int size);
int epoll_create1(int flags);
控制監(jiān)聽(tīng)事件。第一個(gè)參數(shù)epfd就對(duì)應(yīng)于前面命令創(chuàng)建的epoll對(duì)象的文件描述符;第二個(gè)參數(shù)表示該命令要執(zhí)行的動(dòng)作:監(jiān)聽(tīng)事件的新增、修改或者刪除;第三個(gè)參數(shù),是要監(jiān)聽(tīng)的文件對(duì)應(yīng)的描述符;第四個(gè),代表要監(jiān)聽(tīng)的事件。
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
等候。這是一個(gè)阻塞函數(shù),調(diào)用者會(huì)等候內(nèi)核通知所注冊(cè)的事件被觸發(fā)。
int epoll_wait(int epfd, struct epoll_event *events,
int maxevents, int timeout);
int epoll_pwait(int epfd, struct epoll_event *events,
int maxevents, int timeout,
const sigset_t *sigmask);
在Python的select庫(kù)里:
select.epoll()對(duì)應(yīng)于第一類創(chuàng)建函數(shù);
epoll.register(),epoll.unregister(),epoll.modify()均是對(duì)控制函數(shù)epoll_ctl的封裝;
epoll.poll()則是對(duì)等候函數(shù)epoll_wait的封裝。
Python里epoll相關(guān)API的最大問(wèn)題應(yīng)該是在epoll.poll()。相比于其所封裝的epoll_wait,用戶無(wú)法手動(dòng)指定要等候的事件,也就是后者的第二個(gè)參數(shù)struct epoll_event *events。沒(méi)法實(shí)現(xiàn)精確控制。因此只能使用替代方案:select.select()函數(shù)。
根據(jù)Python官方文檔,select.select(rlist, wlist, xlist[, timeout])是對(duì)Unix系統(tǒng)中select函數(shù)的直接調(diào)用,與C語(yǔ)言API的傳參很接近。前三個(gè)參數(shù)都是列表,其中的元素都是要注冊(cè)到內(nèi)核的文件描述符。如果想用自定義類,就要確保實(shí)現(xiàn)了fileno()方法。
其分別對(duì)應(yīng)于:
rlist: 等候直到可讀
wlist: 等候直到可寫(xiě)
xlist: 等候直到異常。這個(gè)異常的定義,要查看系統(tǒng)文檔。
select.select(),類似于epoll.poll(),先注冊(cè)文件和事件,然后保持等候內(nèi)核通知,是阻塞函數(shù)。
實(shí)際應(yīng)用
Psycopg2庫(kù)支持對(duì)異步和協(xié)程,但和一般情況下的用法略有區(qū)別。普通數(shù)據(jù)庫(kù)連接支持不同線程中的不同游標(biāo)并發(fā)查詢;而異步連接則不支持不同游標(biāo)的同時(shí)查詢。所以異步連接的不同游標(biāo)之間必須使用I/O復(fù)用方法來(lái)協(xié)調(diào)調(diào)度。
所以,我的大致實(shí)現(xiàn)思路是這樣的:首先并發(fā)執(zhí)行大量協(xié)程,從任務(wù)隊(duì)列中提取任務(wù),再向連接池請(qǐng)求連接,創(chuàng)建游標(biāo),然后執(zhí)行命令,并返回結(jié)果。在獲取游標(biāo)和接受查詢結(jié)果之前,均要阻塞等候內(nèi)核通知連接可用。
其中,連接池返回連接時(shí),會(huì)根據(jù)引用連接的協(xié)程數(shù)量,返回負(fù)載最輕的連接。這也是自己定義AsyncConnectionPool類的目的。
我的代碼位于:bottle-blog/dbservice.py
存在問(wèn)題
當(dāng)然了,這個(gè)流程目前還一些問(wèn)題。
首先就是每次輪詢拿到任務(wù)之后,都會(huì)走這么一個(gè)流程。
獲取連接 -- 新建游標(biāo) -- 執(zhí)行任務(wù) -- 關(guān)閉游標(biāo) -- 取消連接引用
本來(lái),最好的情況應(yīng)該是:在輪詢之前,就建好游標(biāo);在輪詢時(shí),直接等候內(nèi)核通知,執(zhí)行相應(yīng)任務(wù)。這樣可以減少輪詢時(shí)的任務(wù)量。但是如果協(xié)程提前對(duì)應(yīng)好連接,那就不能保證在獲取任務(wù)時(shí),保持各連接負(fù)載均衡了。
所以這一塊,還有工作要做。
還有就是epoll沒(méi)能用上,有些遺憾。
以后打算寫(xiě)點(diǎn)C語(yǔ)言的內(nèi)容,或者用Python/C API,或者用Ctypes包裝共享庫(kù),來(lái)實(shí)現(xiàn)epoll的調(diào)用。
最后,請(qǐng)?jiān)试S我吐槽一下Python的epoll相關(guān)文檔:簡(jiǎn)直太弱了!??!必須看源碼才能弄清楚功能。
1、使用os.system("cmd")
這是最簡(jiǎn)單的一種方法,其執(zhí)行過(guò)程中會(huì)輸出顯示cmd命令執(zhí)行的信息。
例如:print os.system("mkdir test") 輸出:0
可以看到結(jié)果打印出0,表示命令執(zhí)行成功;否則表示失?。ㄔ俅螆?zhí)行該命令,輸出:子目錄或文件 test 已經(jīng)存在。1)。
2、使用os.popen("cmd")
通過(guò)os.popen()返回的是 file read 的對(duì)象,對(duì)其進(jìn)行讀取read()操作可以看到執(zhí)行的輸出
例如:print os.popen("adb shell ls /sdcard/ | findstr aa.png").read() 輸出:aa.png(若aa.png存在,否則輸出為空)
3、subprocess.Popen("cmd")
subprocess模塊被推薦用來(lái)替換一些老的模塊和函數(shù),如:os.system、os.spawn*、os.popen*等
subprocess模塊目的是 啟動(dòng)一個(gè)新的進(jìn)程并與之通信 ,最常用是定義類Popen,使用Popen可以創(chuàng)建進(jìn)程,并與進(jìn)程進(jìn)行復(fù)雜的交互。其函數(shù)原型為:
classsubprocess.Popen(args, bufsize=0, executable=None, stdin=None, stdout=None, stderr=None, preexec_fn=None, close_fds=False, shell=False, cwd=None, env=None, universal_newlines=False, startupinfo=None, creationflags=0)
Popen非常強(qiáng)大,支持多種參數(shù)和模式,通過(guò)其構(gòu)造函數(shù)可以看到支持很多參數(shù)。但Popen函數(shù)存在缺陷在于, 它是一個(gè)阻塞的方法 ,如果運(yùn)行cmd命令時(shí)產(chǎn)生內(nèi)容非常多,函數(shù)就容易阻塞。另一點(diǎn), Popen方法也不會(huì)打印出cmd的執(zhí)行信息 。
以下羅列常用到的參數(shù):
args :這個(gè)參數(shù)必須是 字符串 或者是一個(gè)由 字符串成員的列表 。其中如果是一個(gè)字符串列表的話,那第一個(gè)成員為要運(yùn)行的程序的路徑以及程序名稱;從第二個(gè)成員開(kāi)始到最后一個(gè)成員為運(yùn)行這個(gè)程序需要輸入的參數(shù)。這與popen中是一樣的。
bufsize: 一般使用比較少,略過(guò)。
executable: 指定要運(yùn)行的程序,這個(gè)一般很少用到,因?yàn)橐付ㄟ\(yùn)行的程序在args中已經(jīng)指定了。 stdin,stdout?,stderr: 分別代表程序的標(biāo)準(zhǔn)輸入、標(biāo)準(zhǔn)輸出、標(biāo)準(zhǔn)錯(cuò)誤處理??梢赃x擇的值有 PIPE , 已經(jīng)存在的打開(kāi)的文件對(duì)象 和 NONE 。若stdout是文件對(duì)象的話,要確保文件對(duì)象是處于打開(kāi)狀態(tài)。
shell:shell參數(shù)根據(jù)要執(zhí)行的命令情況來(lái)定,如果將參數(shù)shell設(shè)為True,executable將指定程序使用的shell。在windows平臺(tái)下,默認(rèn)的shell由COMSPEC環(huán)境變量來(lái)指定。
用gevent啊,協(xié)程方案,
通過(guò)語(yǔ)句from gevent import monkey; monkey.patch_socket()對(duì)IO函數(shù)打補(bǔ)丁,就可以設(shè)置為阻塞
如果是python3的話,還可以用asyncio,一個(gè)已經(jīng)加入標(biāo)準(zhǔn)庫(kù)的協(xié)程方案
協(xié)程就是異步回調(diào)的語(yǔ)法糖,用同步的寫(xiě)法實(shí)現(xiàn)異步的效果,你值得擁有
上篇文章 對(duì)logging做了基本介紹,我們可以使用logging來(lái)做日志的簡(jiǎn)單記錄。但實(shí)際項(xiàng)目應(yīng)用時(shí),我們一般會(huì)根據(jù)自身需要對(duì)其做二次封裝(loggingV2),然后在其他python文件中, 先import申明后直接調(diào)用。
廢話不多說(shuō),下面給幾個(gè)二次封裝的簡(jiǎn)單示例:
示例一:
loggingV2.py - 封裝
logMain.py - 應(yīng)用
示例二:
對(duì)上述示例進(jìn)行 模塊化封裝 ,如下log.py
則任何聲明了log模塊的python文件都可以調(diào)用logging日志系統(tǒng),如下logMain.py
示例三:
對(duì)上述示例進(jìn)行 定制化封裝 ,如下myLog.py
需求:
1)同時(shí)實(shí)現(xiàn)終端顯示與日志文件保存
2)日志文件名除日期外,增加顯示時(shí)間,精確到秒
3)日志輸出級(jí)別可配置
4)日志保存路徑與文件名可配置
5)日志跨天(或者小時(shí)/分鐘),另生成新文件保存
改寫(xiě)logMain.py,如下:
示例四:
對(duì)上述示例進(jìn)行 異步線程封裝 ,如下myThreadLog.py
需求:
1)獨(dú)立線程處理日志,不影響主程序性能
2)使用隊(duì)列異步處理日志記錄
繼續(xù)改寫(xiě)logMain.py,如下:
注意 - 線程相關(guān)操作函數(shù)(如下):
1.threading.Thread() — 創(chuàng)建線程并初始化線程,可以為線程傳遞參數(shù)
2.threading.enumerate() — 返回一個(gè)包含正在運(yùn)行的線程的list
3.threading.activeCount(): 返回正在運(yùn)行的線程數(shù)量,與len(threading.enumerate())有相同的結(jié)果
4.Thread.start() — 啟動(dòng)線程
5.Thread.join() — 阻塞函數(shù),一直等到線程結(jié)束
6.Thread.isAlive() — 返回線程活動(dòng)狀態(tài)
7.Thread.setName() — 設(shè)置線程名
8.Thread.getName() — 獲取線程名
9.Thread.setDaemon() —?設(shè)置為后臺(tái)線程,這里默認(rèn)是False,設(shè)置為True之后則主線程不會(huì)再等待子線程結(jié)束才結(jié)束,而是主線程結(jié)束意味程序退出,子線程也立即結(jié)束,注意調(diào)用時(shí)必須設(shè)置在start()之前;
10.除了以上常用函數(shù),線程還經(jīng)常與互斥鎖Lock/事件Event/信號(hào)量Condition/隊(duì)列Queue等函數(shù)配合使用