真实的国产乱ⅩXXX66竹夫人,五月香六月婷婷激情综合,亚洲日本VA一区二区三区,亚洲精品一区二区三区麻豆

成都創(chuàng)新互聯(lián)網(wǎng)站制作重慶分公司

python多線程與隊(duì)列

各位好,之前寫了多線程,但是在實(shí)際的生產(chǎn)中,往往情況比較復(fù)雜,要處理一批任務(wù)(比如要處理列表中所有元素),這時(shí)候不可能創(chuàng)建很多的線程,線程過(guò)多反而不好,還會(huì)造成資源開銷太大,這時(shí)候想到了隊(duì)列。

目前創(chuàng)新互聯(lián)已為成百上千的企業(yè)提供了網(wǎng)站建設(shè)、域名、虛擬主機(jī)、網(wǎng)站托管、企業(yè)網(wǎng)站設(shè)計(jì)、名山網(wǎng)站維護(hù)等服務(wù),公司將堅(jiān)持客戶導(dǎo)向、應(yīng)用為本的策略,正道將秉承"和諧、參與、激情"的文化,與客戶和合作伙伴齊心協(xié)力一起成長(zhǎng),共同發(fā)展。


Queue隊(duì)列

Queue用于建立和操作隊(duì)列,常和threading類一起用來(lái)建立一個(gè)簡(jiǎn)單的線程隊(duì)列。

  • Queue.Queue(maxsize)  FIFO(先進(jìn)先出隊(duì)列)
  • Queue.LifoQueue(maxsize)  LIFO(先進(jìn)后出隊(duì)列)
  • Queue.PriorityQueue(maxsize)  為優(yōu)先級(jí)越高的越先出來(lái),對(duì)于一個(gè)隊(duì)列中的所有元素組成的entries,優(yōu)先隊(duì)列優(yōu)先返回的一個(gè)元素是sorted(list(entries))[0]。至于對(duì)于一般的數(shù)據(jù),優(yōu)先隊(duì)列取什么東西作為優(yōu)先度要素進(jìn)行判斷,官方文檔給出的建議是一個(gè)tuple如(priority, data),取priority作為優(yōu)先度。
    如果設(shè)置的maxsize小于1,則表示隊(duì)列的長(zhǎng)度無(wú)限長(zhǎng)

FIFO是常用的隊(duì)列,常用的方法有:

  • Queue.qsize()   返回隊(duì)列大小
  • Queue.empty()  判斷隊(duì)列是否為空
  • Queue.full()   判斷隊(duì)列是否滿了

  • Queue.get([block[,timeout]])  從隊(duì)列頭刪除并返回一個(gè)item,block默認(rèn)為True,表示當(dāng)隊(duì)列為空卻去get的時(shí)候會(huì)阻塞線程,等待直到有有item出現(xiàn)為止來(lái)get出這個(gè)item。如果是False的話表明當(dāng)隊(duì)列為空你卻去get的時(shí)候,會(huì)引發(fā)異常。
    在block為True的情況下可以再設(shè)置timeout參數(shù)。表示當(dāng)隊(duì)列為空,get阻塞timeout指定的秒數(shù)之后還沒(méi)有g(shù)et到的話就引發(fā)Full異常。

  • Queue.put(...[,block[,timeout]])  向隊(duì)尾插入一個(gè)item,同樣若block=True的話隊(duì)列滿時(shí)就阻塞等待有空位出來(lái)再put,block=False時(shí)引發(fā)異常。
    同get的timeout,put的timeout是在block為True的時(shí)候進(jìn)行超時(shí)設(shè)置的參數(shù)。
    Queue.task_done()  從場(chǎng)景上來(lái)說(shuō),處理完一個(gè)get出來(lái)的item之后,調(diào)用task_done將向隊(duì)列發(fā)出一個(gè)信號(hào),表示本任務(wù)已經(jīng)完成。

  • Queue.join()  監(jiān)視所有item并阻塞主線程,直到所有item都調(diào)用了task_done之后主線程才繼續(xù)向下執(zhí)行。這么做的好處在于,假如一個(gè)線程開始處理最后一個(gè)任務(wù),它從任務(wù)隊(duì)列中拿走最后一個(gè)任務(wù),此時(shí)任務(wù)隊(duì)列就空了但最后那個(gè)線程還沒(méi)處理完。當(dāng)調(diào)用了join之后,主線程就不會(huì)因?yàn)殛?duì)列空了而擅自結(jié)束,而是等待最后那個(gè)線程處理完成了。

隊(duì)列-單線程

import threading
import queue
import time

class worker(threading.Thread):
    def __init__(self, queue):
        threading.Thread.__init__(self)
        self.queue = queue
        self.thread_stop = False

    def run(self):
        while not self.thread_stop:
            print("thread%d %s: waiting for tast" % (self.ident, self.name))
            try:
                task = q.get(block=True, timeout=2)  # 接收消息
            except queue.Empty:
                print("Nothing to do! I will go home!")
                self.thread_stop = True
                break
            print("tasking: %s ,task No:%d" % (task[0], task[1]))
            print("I am working")
            time.sleep(3)
            print("work finished!")
            q.task_done()                           # 完成一個(gè)任務(wù)
            res = q.qsize()                         # 判斷消息隊(duì)列大小(隊(duì)列中還有幾個(gè)任務(wù))
            if res > 0:
                print("fuck! Still %d tasks to do" % (res))

    def stop(self):
        self.thread_stop = True

if __name__ == "__main__":
    q = queue.Queue(3)                                    # 創(chuàng)建隊(duì)列(大小為3)
    worker = worker(q)                                    # 將隊(duì)列加入類中
    worker.start()                                        # 啟動(dòng)類
    q.put(["produce cup!", 1], block=True, timeout=None)  # 向隊(duì)列中添加元素,產(chǎn)生任務(wù)消息
    q.put(["produce desk!", 2], block=True, timeout=None)
    q.put(["produce apple!", 3], block=True, timeout=None)
    q.put(["produce banana!", 4], block=True, timeout=None)
    q.put(["produce bag!", 5], block=True, timeout=None)
    print("***************leader:wait for finish!")
    q.join()                                             # 等待所有任務(wù)完成
    print("***************leader:all task finished!")

輸出:
thread9212 Thread-1: waiting for tast
tasking: produce cup! ,task No:1
I am working
work finished!
fuck! Still 3 tasks to do
thread9212 Thread-1: waiting for tast
tasking: produce desk! ,task No:2
I am working
***************leader:wait for finish!
work finished!
fuck! Still 3 tasks to do
thread9212 Thread-1: waiting for tast
tasking: produce apple! ,task No:3
I am working
work finished!
fuck! Still 2 tasks to do
thread9212 Thread-1: waiting for tast
tasking: produce banana! ,task No:4
I am working
work finished!
fuck! Still 1 tasks to do
thread9212 Thread-1: waiting for tast
tasking: produce bag! ,task No:5
I am working
work finished!
thread9212 Thread-1: waiting for tast
***************leader:all task finished!
Nothing to do!i will go home!

隊(duì)列-多線程

import threading
import time
from queue import Queue

img_lists = ['lipei_00006.mp3','lipei_00007.mp3','lipei_00012.mp3','lipei_00014.mp3',
             'lipei_00021.mp3','lipei_00027.mp3','lipei_00028.mp3','lipei_00035.mp3',
             'lipei_00039.mp3','lipei_00044.mp3','lipei_00047.mp3','lipei_00049.mp3',
             'lipei_00057.mp3','lipei_00058.mp3','lipei_00059.mp3','lipei_00061.mp3',
             'lipei_00066.mp3','lipei_00068.mp3','lipei_00070.mp3','lipei_00081.mp3',
             'lipei_00087.mp3','lipei_00104.mp3','lipei_00106.mp3','lipei_00117.mp3',
             'lipei_00123.mp3','lipei_00129.mp3',]

q = Queue(10)

class Music_Cols(threading.Thread):
    def __init__(self, name):
        super().__init__(name=name)

    def run(self):
        global img_lists
        global q
        while True:
            try:
                music = img_lists.pop(0)
                q.put(music)
            except IndexError:
                break

class Music_Play(threading.Thread):
    def __init__(self, name):
        super().__init__(name=name)

    def run(self):
        global q
        while True:
            if q.not_empty:
                music = q.get()
                print('{}正在播放{}'.format(threading.current_thread(), music))
                time.sleep(5)
                q.task_done()
                print('{}播放結(jié)束'.format(music))
            else:
                break

if __name__ == '__main__':
    mc_thread = Music_Cols('music_cols')
    mc_thread.setDaemon(True)       # 設(shè)置為守護(hù)進(jìn)程,主線程退出時(shí),子進(jìn)程也kill掉
    mc_thread.start()               # 啟動(dòng)進(jìn)程
    for i in range(5):              # 設(shè)置線程個(gè)數(shù)(批量任務(wù)時(shí),線程數(shù)不必太大,注意內(nèi)存及CPU負(fù)載)
        mp_thread = Music_Play('music_play')
        mp_thread.setDaemon(True)
        mp_thread.start()
    q.join()                        # 線程阻塞(等待所有子線程處理完成,再退出)

輸出:
正在播放lipei_00006.mp3
正在播放lipei_00007.mp3
正在播放lipei_00012.mp3
正在播放lipei_00014.mp3
正在播放lipei_00021.mp3
lipei_00014.mp3播放結(jié)束
... ...
正在播放lipei_00117.mp3
lipei_00066.mp3播放結(jié)束
正在播放lipei_00123.mp3
lipei_00104.mp3播放結(jié)束
正在播放lipei_00129.mp3
lipei_00123.mp3播放結(jié)束
lipei_00117.mp3播放結(jié)束
lipei_00087.mp3播放結(jié)束
lipei_00106.mp3播放結(jié)束
lipei_00129.mp3播放結(jié)束

或者(效果與上述一樣)

import threading
import time
from queue import Queue

img_lists = ['lipei_00006.mp3','lipei_00007.mp3','lipei_00012.mp3','lipei_00014.mp3',
             'lipei_00021.mp3','lipei_00027.mp3','lipei_00028.mp3','lipei_00035.mp3',
             'lipei_00039.mp3','lipei_00044.mp3','lipei_00047.mp3','lipei_00049.mp3',
             'lipei_00057.mp3','lipei_00058.mp3','lipei_00059.mp3','lipei_00061.mp3',
             'lipei_00066.mp3','lipei_00068.mp3','lipei_00070.mp3','lipei_00081.mp3',
             'lipei_00087.mp3','lipei_00104.mp3','lipei_00106.mp3','lipei_00117.mp3',
             'lipei_00123.mp3','lipei_00129.mp3',]

q = Queue(10)

class Music_Cols(threading.Thread):
    def __init__(self, name):
        super().__init__(name=name)

    def run(self):
        while True:
            try:
                music = img_lists.pop(0)
                q.put(music)
            except IndexError:
                break

class Music_Play(threading.Thread):
    def __init__(self, name):
        super().__init__(name=name)

    def run(self):
        while True:
            if q.not_empty:
                music = q.get()
                print('{}正在播放{}'.format(threading.current_thread(), music))
                time.sleep(5)
                q.task_done()
                print('{}播放結(jié)束'.format(music))
            else:
                break

if __name__ == '__main__':
    mc_thread = Music_Cols('music_cols')
    mc_thread.setDaemon(True)       # 設(shè)置為守護(hù)進(jìn)程,主線程退出時(shí),子進(jìn)程也kill掉
    mc_thread.start()               # 啟動(dòng)進(jìn)程
    for i in range(5):              # 設(shè)置線程個(gè)數(shù)(批量任務(wù)時(shí),線程數(shù)不必太大,注意內(nèi)存及CPU負(fù)載)
        mp_thread = Music_Play('music_play')
        mp_thread.setDaemon(True)
        mp_thread.start()
    q.join()                        # 線程阻塞(等待所有子線程處理完成,再退出)

隊(duì)列-多線程—圖像增強(qiáng)實(shí)例

"""
開啟多線程:圖像增強(qiáng)
"""
import os
import random
import queue
import numpy as np
import cv2
import time
import threading

def Affine_transformation(img_array):
    rows, cols = img_array.shape[:2]
    pointsA = np.float32([[30, 80], [180, 60], [80, 230]])  # 左偏
    pointsB = np.float32([[60, 50], [220, 70], [20, 180]])  # 右偏
    pointsC = np.float32([[70, 60], [180, 50], [50, 200]])  # 前偏
    pointsD = np.float32([[40, 50], [210, 60], [70, 180]])  # 后偏

    points1 = np.float32([[50, 50], [200, 50], [50, 200]])
    points2 = random.choice((pointsA, pointsB, pointsC, pointsD))

    matrix = cv2.getAffineTransform(points1, points2)
    Affine_transfor_img = cv2.warpAffine(img_array, matrix, (cols, rows))
    return Affine_transfor_img

def random_rotate_img(img):
    rows, cols= img.shape[:2]
    angle = random.choice([25, 90, -25, -90, 180])
    Matrix = cv2.getRotationMatrix2D((cols / 2, rows / 2), angle, 1)
    res = cv2.warpAffine(img, Matrix, (cols, rows), borderMode=cv2.BORDER_CONSTANT)
    return res

def random_hsv_transform(img, hue_vari, sat_vari, val_vari):
    """
    :param img:
    :param hue_vari: 色調(diào)變化比例范圍(0,360)
    :param sat_vari: 飽和度變化比例范圍(0,1)
    :param val_vari: 明度變化比例范圍(0,1)
    :return:
    """
    hue_delta = np.random.randint(-hue_vari, hue_vari)
    sat_mult = 1 + np.random.uniform(-sat_vari, sat_vari)
    val_mult = 1 + np.random.uniform(-val_vari, val_vari)

    img_hsv = cv2.cvtColor(img, cv2.COLOR_BGR2HSV).astype(np.float)
    img_hsv[:, :, 0] = (img_hsv[:, :, 0] + hue_delta) % 180
    img_hsv[:, :, 1] *= sat_mult
    img_hsv[:, :, 2] *= val_mult
    img_hsv[img_hsv > 255] = 255
    return cv2.cvtColor(np.round(img_hsv).astype(np.uint8), cv2.COLOR_HSV2BGR)

def random_gamma_transform(img, gamma_vari):
    """
    :param img:
    :param gamma_vari:
    :return:
    """
    log_gamma_vari = np.log(gamma_vari)
    alpha = np.random.uniform(-log_gamma_vari, log_gamma_vari)
    gamma = np.exp(alpha)
    gamma_table = [np.power(x / 255.0, gamma) * 255.0 for x in range(256)]
    gamma_table = np.round(np.array(gamma_table)).astype(np.uint8)
    return cv2.LUT(img, gamma_table)

def random_flip_img(img):
    """
    0 = X axis, 1 = Y axis,  -1 = both
    :param img:
    :return:
    """
    flip_val = [0,1,-1]
    random_flip_val = random.choice(flip_val)
    res = cv2.flip(img, random_flip_val)
    return res

def clamp(pv):     #防止像素溢出
    if pv > 255:
        return 255
    if pv < 0:
        return 0
    else:
        return pv

def gaussian_noise(image):   # 加高斯噪聲
    """
    :param image:
    :return:
    """
    h, w, c = image.shape
    for row in range(h):
        for col in range(w):
            s = np.random.normal(0, 20, 3)
            b = image[row, col, 0] # blue
            g = image[row, col, 1] # green
            r = image[row, col, 2] # red
            image[row, col, 0] = clamp(b + s[0])
            image[row, col, 1] = clamp(g + s[1])
            image[row, col, 2] = clamp(r + s[2])
    return image

def get_img(input_dir):
    img_path_list = []
    for (root_path,dirname,filenames) in os.walk(input_dir):
        for filename in filenames:
            Suffix_name = ['.png', '.jpg', '.tif', '.jpeg']
            if filename.endswith(tuple(Suffix_name)):
                img_path = root_path+"/"+filename
                img_path_list.append(img_path)
    return  img_path_list

class IMG_QUEUE(threading.Thread):
    def __init__(self, name):
        super().__init__(name=name)

    def run(self):
        while True:
            try:
                img_path = img_path_list.pop(0)
                q.put(img_path)
            except IndexError:
                break

class IMG_AUG(threading.Thread):
    def __init__(self, name):
        super().__init__(name=name)
        self.q = q

    def run(self):
        while True:
            if q.not_empty:
                img_path = q.get()
                try:
                    print("doing...")
                    img_array = cv2.imread(img_path)
                    Affine_transfor_img = Affine_transformation(img_array)
                    cv2.imwrite(output_dir + "/" + img_path[len(input_dir):-4] + '_Affine_transfor.png', Affine_transfor_img)

                    res_rotate = random_rotate_img(img_array)
                    cv2.imwrite(output_dir + "/" + img_path[len(input_dir):-4] + '_rotate_img.png',res_rotate)

                    GAMMA_IMG = random_gamma_transform(img_array, 0.3)
                    cv2.imwrite(output_dir + "/" + img_path[len(input_dir):-4] + '_GAMMA_IMG.png',GAMMA_IMG)

                    res_flip = random_flip_img(img_array)
                    cv2.imwrite(output_dir + "/" + img_path[len(input_dir):-4] + '_flip_img.png',res_flip)

                    G_Noiseimg = gaussian_noise(img_array)
                    cv2.imwrite(output_dir + "/" + img_path[len(input_dir):-4] + '_G_Noise_img.png',G_Noiseimg)

                    HSV_IMG = random_hsv_transform(img_array, 2, 0.3, 0.6)
                    cv2.imwrite(output_dir + "/" + img_path[len(input_dir):-4] + '_HSV_IMG.png',HSV_IMG)
                except:
                    print("圖像格式錯(cuò)誤!")
                    pass
                q.task_done()
            else:
                break

if __name__ == '__main__':
    input_dir = './cccc'
    output_dir = './eeee'
    start_time = time.time()            # 開始計(jì)時(shí)
    img_path_list = get_img(input_dir)  # 獲取圖像數(shù)據(jù)

    q = queue.Queue(10)                 # 設(shè)置隊(duì)列元素個(gè)數(shù)
    my_thread = IMG_QUEUE('IMG_QUEUE')  # 實(shí)例化
    my_thread.setDaemon(True)           # 設(shè)置為守護(hù)進(jìn)程,主線程退出時(shí),子進(jìn)程也kill掉
    my_thread.start()                   # 啟動(dòng)進(jìn)程

    for i in range(5):                  # 設(shè)置線程個(gè)數(shù)(批量任務(wù)時(shí),線程數(shù)不必太大,注意內(nèi)存及CPU負(fù)載)
        mp_thread = IMG_AUG('IMG_AUG')
        mp_thread.setDaemon(True)
        mp_thread.start()
    q.join()                            # 線程阻塞(等待所有子線程處理完成,再退出)
    end_time = time.time()
    print("Total Spend time:", str((end_time - start_time) / 60)[0:6] + "分鐘")

多線程-創(chuàng)建圖像縮略圖(等比縮放圖像)

import os
from PIL import Image
import threading
import time
import queue

def get_img(input_dir):
    img_path_list = []
    for (root_path,dirname,filenames) in os.walk(input_dir):
        for filename in filenames:
            Suffix_name = ['.png', '.jpg', '.tif', '.jpeg']
            if filename.endswith(tuple(Suffix_name)):
                img_path = root_path+"/"+filename
                img_path_list.append(img_path)
    return  img_path_list

class IMG_QUEUE(threading.Thread):
    def __init__(self, name):
        super().__init__(name=name)

    def run(self):
        while True:
            try:
                img_path = img_path_list.pop(0)
                q.put(img_path)
            except IndexError:
                break

class IMG_RESIZE(threading.Thread):
    def __init__(self, name):
        super().__init__(name=name)

    def run(self):
        while True:
            if q.not_empty:
                img_path = q.get()
                try:
                    im = Image.open(img_path)
                    im.thumbnail((size, size))
                    print(im.format, im.size, im.mode)
                    im.save(img_path, 'JPEG')
                except:
                    print("圖像格式錯(cuò)誤!")
                    pass
                q.task_done()
            else:
                break

if __name__=='__main__':
    input_dir = 'D:\\20190112_20190114_all' #需要?jiǎng)?chuàng)建縮略圖,圖片的目錄
    start_time = time.time()            # 開始計(jì)時(shí)
    img_path_list = get_img(input_dir)  # 獲取圖像數(shù)據(jù)

    size = 800
    q = queue.Queue(100)                # 設(shè)置隊(duì)列元素個(gè)數(shù)
    my_thread = IMG_QUEUE('IMG_QUEUE')  # 實(shí)例化
    my_thread.setDaemon(True)           # 設(shè)置為守護(hù)進(jìn)程,主線程退出時(shí),子進(jìn)程也kill掉
    my_thread.start()                   # 啟動(dòng)進(jìn)程

    for i in range(5):                  # 設(shè)置線程個(gè)數(shù)(批量任務(wù)時(shí),線程數(shù)不必太大,注意內(nèi)存及CPU負(fù)載)
        mp_thread = IMG_RESIZE(str(i))
        mp_thread.setDaemon(True)
        mp_thread.start()
    q.join()                            # 線程阻塞(等待所有子線程處理完成,再退出)
    end_time = time.time()              # 計(jì)時(shí)結(jié)束
    print("Total Spend time:", str((end_time - start_time) / 60)[0:6] + "分鐘")

網(wǎng)站欄目:python多線程與隊(duì)列
瀏覽路徑:http://weahome.cn/article/ipdpii.html

其他資訊

在線咨詢

微信咨詢

電話咨詢

028-86922220(工作日)

18980820575(7×24)

提交需求

返回頂部