方法用錯了, 首先不能是DataFrame的applymap方法 這個方法是對每一個元素進(jìn)行處理的
創(chuàng)新互聯(lián)公司服務(wù)項目包括連平網(wǎng)站建設(shè)、連平網(wǎng)站制作、連平網(wǎng)頁制作以及連平網(wǎng)絡(luò)營銷策劃等。多年來,我們專注于互聯(lián)網(wǎng)行業(yè),利用自身積累的技術(shù)優(yōu)勢、行業(yè)經(jīng)驗、深度合作伙伴關(guān)系等,向廣大中小型企業(yè)、政府機構(gòu)等提供互聯(lián)網(wǎng)行業(yè)的解決方案,連平網(wǎng)站推廣取得了明顯的社會效益與經(jīng)濟效益。目前,我們服務(wù)的客戶以成都為中心已經(jīng)輻射到連平省份的部分城市,未來相信會繼續(xù)擴大服務(wù)區(qū)域并繼續(xù)獲得客戶的支持與信任!
其次, 處理函數(shù)(你的func_wd)是接受一個值, 返回一個值, 不是接受一堆值然后循環(huán)
def func_wd(x) 這個x就是wd那一列中的某一個值, 里面直接分支返回就行了:
在map這個方法的時候, 用Series的apply:
data['wd'] = data['wd'].apply(fuc_wd)
這樣就行了
Queue 叫隊列,是數(shù)據(jù)結(jié)構(gòu)中的一種,基本上所有成熟的編程語言都內(nèi)置了對 Queue 的支持。
Python 中的 Queue 模塊實現(xiàn)了多生產(chǎn)者和多消費者模型,當(dāng)需要在多線程編程中非常實用。而且該模塊中的 Queue 類實現(xiàn)了鎖原語,不需要再考慮多線程安全問題。
該模塊內(nèi)置了三種類型的 Queue,分別是 class queue.Queue(maxsize=0) , class queue.LifoQueue(maxsize=0) 和 class queue.PriorityQueue(maxsize=0) 。它們?nèi)齻€的區(qū)別僅僅是取出時的順序不一致而已。
Queue 是一個 FIFO 隊列,任務(wù)按照添加的順序被取出。
LifoQueue 是一個 LIFO 隊列,類似堆棧,后添加的任務(wù)先被取出。
PriorityQueue 是一個優(yōu)先級隊列,隊列里面的任務(wù)按照優(yōu)先級排序,優(yōu)先級高的先被取出。
如你所見,就是上面所說的三種不同類型的內(nèi)置隊列,其中 maxsize 是個整數(shù),用于設(shè)置可以放入隊列中的任務(wù)數(shù)的上限。當(dāng)達(dá)到這個大小的時候,插入操作將阻塞至隊列中的任務(wù)被消費掉。如果 maxsize 小于等于零,則隊列尺寸為無限大。
向隊列中添加任務(wù),直接調(diào)用 put() 函數(shù)即可
put() 函數(shù)完整的函數(shù)簽名如下 Queue.put(item, block=True, timeout=None) ,如你所見,該函數(shù)有兩個可選參數(shù)。
默認(rèn)情況下,在隊列滿時,該函數(shù)會一直阻塞,直到隊列中有空余的位置可以添加任務(wù)為止。如果 timeout 是正數(shù),則最多阻塞 timeout 秒,如果這段時間內(nèi)還沒有空余的位置出來,則會引發(fā) Full 異常。
當(dāng) block 為 false 時,timeout 參數(shù)將失效。同時如果隊列中沒有空余的位置可添加任務(wù)則會引發(fā) Full 異常,否則會直接把任務(wù)放入隊列并返回,不會阻塞。
另外,還可以通過 Queue.put_nowait(item) 來添加任務(wù),相當(dāng)于 Queue.put(item, False) ,不再贅述。同樣,在隊列滿時,該操作會引發(fā) Full 異常。
從隊列中獲取任務(wù),直接調(diào)用 get() 函數(shù)即可。
與 put() 函數(shù)一樣, get() 函數(shù)也有兩個可選參數(shù),完整簽名如下 Queue.get(block=True, timeout=None) 。
默認(rèn)情況下,當(dāng)隊列空時調(diào)用該函數(shù)會一直阻塞,直到隊列中有任務(wù)可獲取為止。如果 timeout 是正數(shù),則最多阻塞 timeout 秒,如果這段時間內(nèi)還沒有任務(wù)可獲取,則會引發(fā) Empty 異常。
當(dāng) block 為 false 時,timeout 參數(shù)將失效。同時如果隊列中沒有任務(wù)可獲取則會立刻引發(fā) Empty 異常,否則會直接獲取一個任務(wù)并返回,不會阻塞。
另外,還可以通過 Queue.get_nowait() 來獲取任務(wù),相當(dāng)于 Queue.get(False) ,不再贅述。同樣,在隊列為空時,該操作會引發(fā) Empty 異常。
Queue.qsize() 函數(shù)返回隊列的大小。注意這個大小不是精確的,qsize() 0 不保證后續(xù)的 get() 不被阻塞,同樣 qsize() maxsize 也不保證 put() 不被阻塞。
如果隊列為空,返回 True ,否則返回 False 。如果 empty() 返回 True ,不保證后續(xù)調(diào)用的 put() 不被阻塞。類似的,如果 empty() 返回 False ,也不保證后續(xù)調(diào)用的 get() 不被阻塞。
如果隊列是滿的返回 True ,否則返回 False 。如果 full() 返回 True 不保證后續(xù)調(diào)用的 get() 不被阻塞。類似的,如果 full() 返回 False 也不保證后續(xù)調(diào)用的 put() 不被阻塞。
queue.Queue() 是 FIFO 隊列,出隊順序跟入隊順序是一致的。
queue.LifoQueue() 是 LIFO 隊列,出隊順序跟入隊順序是完全相反的,類似于棧。
優(yōu)先級隊列中的任務(wù)順序跟放入時的順序是無關(guān)的,而是按照任務(wù)的大小來排序,最小值先被取出。那任務(wù)比較大小的規(guī)則是怎么樣的呢。
注意,因為列表的比較對規(guī)則是按照下標(biāo)順序來比較的,所以在沒有比較出大小之前 ,隊列中所有列表對應(yīng)下標(biāo)位置的元素類型要一致。
好比 [2,1] 和 ["1","b"] 因為第一個位置的元素類型不一樣,所以是沒有辦法比較大小的,所以也就放入不了優(yōu)先級隊列。
然而對于 [2,1] 和 [1,"b"] 來說即使第二個元素的類型不一致也是可以放入優(yōu)先級隊列的,因為只需要比較第一個位置元素的大小就可以比較出結(jié)果了,就不需要比較第二個位置元素的大小了。
但是對于 [2,1] 和 1 [2,"b"] 來說,則同樣不可以放入優(yōu)先級隊列,因為需要比較第二個位置的元素才可以比較出結(jié)果,然而第二個位置的元素類型是不一致的,無法比較大小。
綜上,也就是說, 直到在比較出結(jié)果之前,對應(yīng)下標(biāo)位置的元素類型都是需要一致的 。
下面我們自定義一個動物類型,希望按照年齡大小來做優(yōu)先級排序。年齡越小優(yōu)先級越高。
本章節(jié)介紹了隊列以及其常用操作。因為隊列默認(rèn)實現(xiàn)了鎖原語,因此在多線程編程中就不需要再考慮多線程安全問題了,對于程序員來說相當(dāng)友好了。
Python實現(xiàn)簡單多線程任務(wù)隊列
最近我在用梯度下降算法繪制神經(jīng)網(wǎng)絡(luò)的數(shù)據(jù)時,遇到了一些算法性能的問題。梯度下降算法的代碼如下(偽代碼):
defgradient_descent(): # the gradient descent code plotly.write(X, Y)
一般來說,當(dāng)網(wǎng)絡(luò)請求 plot.ly 繪圖時會阻塞等待返回,于是也會影響到其他的梯度下降函數(shù)的執(zhí)行速度。
一種解決辦法是每調(diào)用一次 plotly.write 函數(shù)就開啟一個新的線程,但是這種方法感覺不是很好。 我不想用一個像 cerely(一種分布式任務(wù)隊列)一樣大而全的任務(wù)隊列框架,因為框架對于我的這點需求來說太重了,并且我的繪圖也并不需要 redis 來持久化數(shù)據(jù)。
那用什么辦法解決呢?我在 python 中寫了一個很小的任務(wù)隊列,它可以在一個單獨的線程中調(diào)用 plotly.write函數(shù)。下面是程序代碼。
fromthreadingimportThreadimportQueueimporttime classTaskQueue(Queue.Queue):
首先我們繼承 Queue.Queue 類。從 Queue.Queue 類可以繼承 get 和 put 方法,以及隊列的行為。
def__init__(self, num_workers=1): Queue.Queue.__init__(self) self.num_workers=num_workers self.start_workers()
初始化的時候,我們可以不用考慮工作線程的數(shù)量。
defadd_task(self, task,*args,**kwargs): args=argsor() kwargs=kwargsor{} self.put((task, args, kwargs))
我們把 task, args, kwargs 以元組的形式存儲在隊列中。*args 可以傳遞數(shù)量不等的參數(shù),**kwargs 可以傳遞命名參數(shù)。
defstart_workers(self): foriinrange(self.num_workers): t=Thread(target=self.worker) t.daemon=True t.start()
我們?yōu)槊總€ worker 創(chuàng)建一個線程,然后在后臺刪除。
下面是 worker 函數(shù)的代碼:
defworker(self): whileTrue: tupl=self.get() item, args, kwargs=self.get() item(*args,**kwargs) self.task_done()
worker 函數(shù)獲取隊列頂端的任務(wù),并根據(jù)輸入?yún)?shù)運行,除此之外,沒有其他的功能。下面是隊列的代碼:
我們可以通過下面的代碼測試:
defblokkah(*args,**kwargs): time.sleep(5) print“Blokkah mofo!” q=TaskQueue(num_workers=5) foriteminrange(1): q.add_task(blokkah) q.join()# wait for all the tasks to finish. print“Alldone!”
Blokkah 是我們要做的任務(wù)名稱。隊列已經(jīng)緩存在內(nèi)存中,并且沒有執(zhí)行很多任務(wù)。下面的步驟是把主隊列當(dāng)做單獨的進(jìn)程來運行,這樣主程序退出以及執(zhí)行數(shù)據(jù)庫持久化時,隊列任務(wù)不會停止運行。但是這個例子很好地展示了如何從一個很簡單的小任務(wù)寫成像工作隊列這樣復(fù)雜的程序。
defgradient_descent(): # the gradient descent code queue.add_task(plotly.write, x=X, y=Y)
修改之后,我的梯度下降算法工作效率似乎更高了。如果你很感興趣的話,可以參考下面的代碼。fromthreadingimportThreadimportQueueimporttime classTaskQueue(Queue.Queue): def__init__(self, num_workers=1):Queue.Queue.__init__(self)self.num_workers=num_workersself.start_workers() defadd_task(self, task,*args,**kwargs):args=argsor()kwargs=kwargsor{}self.put((task, args, kwargs)) defstart_workers(self):foriinrange(self.num_workers):t=Thread(target=self.worker)t.daemon=Truet.start() defworker(self):whileTrue:tupl=self.get()item, args, kwargs=self.get()item(*args,**kwargs)self.task_done() deftests():defblokkah(*args,**kwargs):time.sleep(5)print"Blokkah mofo!" q=TaskQueue(num_workers=5) foriteminrange(10):q.add_task(blokkah) q.join()# block until all tasks are doneprint"All done!" if__name__=="__main__":tests()