真实的国产乱ⅩXXX66竹夫人,五月香六月婷婷激情综合,亚洲日本VA一区二区三区,亚洲精品一区二区三区麻豆

成都創(chuàng)新互聯(lián)網(wǎng)站制作重慶分公司

think-queue的示例分析

這篇文章主要介紹了think-queue的示例分析,具有一定借鑒價值,感興趣的朋友可以參考下,希望大家閱讀完這篇文章之后大有收獲,下面讓小編帶著大家一起了解一下。

成都創(chuàng)新互聯(lián)是創(chuàng)新、創(chuàng)意、研發(fā)型一體的綜合型網(wǎng)站建設(shè)公司,自成立以來公司不斷探索創(chuàng)新,始終堅持為客戶提供滿意周到的服務,在本地打下了良好的口碑,在過去的十余年時間我們累計服務了上千家以及全國政企客戶,如玻璃鋼雕塑等企業(yè)單位,完善的項目管理流程,嚴格把控項目進度與質(zhì)量監(jiān)控加上過硬的技術(shù)實力獲得客戶的一致表揚。

前言

分析之前請大家務必了解消息隊列的實現(xiàn)

tp5的消息隊列是基于database redis 和tp官方自己實現(xiàn)的 Topthink
本章是圍繞redis來做分析

存儲key:

key類型描述
queues:queueNamelist要執(zhí)行的任務
think:queue:restartstring重啟隊列時間戳
queues:queueName:delayedzSet延遲任務
queues:queueName:reservedzSet執(zhí)行失敗,等待重新執(zhí)行

執(zhí)行命令

work和listen的區(qū)別在下面會解釋
命令描述
php think queue:work監(jiān)聽隊列
php think queue:listen監(jiān)聽隊列
php think queue:restart重啟隊列
php think queue:subscribe暫無,可能是保留的 官方有什么其他想法但是還沒實現(xiàn)

行為標簽

標簽描述
worker_daemon_start守護進程開啟
worker_memory_exceeded內(nèi)存超出
worker_queue_restart重啟守護進程
worker_before_process任務開始執(zhí)行之前
worker_before_sleep任務延遲執(zhí)行
queue_failed任務執(zhí)行失敗

命令參數(shù)

參數(shù)默認值可以使用的模式描述
queuenullwork,listen要執(zhí)行的任務名稱
daemonnullwork以守護進程執(zhí)行任務
delay0work,listen失敗后重新執(zhí)行的時間
forcenullwork失敗后重新執(zhí)行的時間
memory128Mwork,listen限制最大內(nèi)存
sleep3work,listen沒有任務的時候等待的時間
tries0work,listen任務失敗后最大嘗試次數(shù)

模式區(qū)別

1: 執(zhí)行原理不同
work: 單進程的處理模式;
無 daemon 參數(shù) work進程在處理完下一個消息后直接結(jié)束當前進程。當不存在新消息時,會sleep一段時間然后退出;
有 daemon 參數(shù) work進程會循環(huán)地處理隊列中的消息,直到內(nèi)存超出參數(shù)配置才結(jié)束進程。當不存在新消息時,會在每次循環(huán)中sleep一段時間;

listen: 父進程 + 子進程 的處理模式;
會在所在的父進程會創(chuàng)建一個單次執(zhí)行模式的work子進程,并通過該work子進程來處理隊列中的下一個消息,當這個work子進程退出之后;
所在的父進程會監(jiān)聽到該子進程的退出信號,并重新創(chuàng)建一個新的單次執(zhí)行的work子進程;

2: 退出時機不同
work: 看上面
listen: 所在的父進程正常情況會一直運行,除非遇到下面兩種情況
01: 創(chuàng)建的某個work子進程的執(zhí)行時間超過了 listen命令行中的--timeout 參數(shù)配置;此時work子進程會被強制結(jié)束,listen所在的父進程也會拋出一個 ProcessTimeoutException 異常并退出;

開發(fā)者可以選擇捕獲該異常,讓父進程繼續(xù)執(zhí)行;
02: 所在的父進程因某種原因存在內(nèi)存泄露,則當父進程本身占用的內(nèi)存超過了命令行中的 --memory 參數(shù)配置時,父子進程均會退出。正常情況下,listen進程本身占用的內(nèi)存是穩(wěn)定不變的。

3: 性能不同
work: 是在腳本內(nèi)部做循環(huán),框架腳本在命令執(zhí)行的初期就已加載完畢;

listen: 是處理完一個任務之后新開一個work進程,此時會重新加載框架腳本;

因此 work 模式的性能會比listen模式高。
注意: 當代碼有更新時,work 模式下需要手動去執(zhí)行 php think queue:restart 命令重啟隊列來使改動生效;而listen 模式會自動生效,無需其他操作。

4: 超時控制能力
work: 本質(zhì)上既不能控制進程自身的運行時間,也無法限制執(zhí)行中的任務的執(zhí)行時間;
listen: 可以限制其創(chuàng)建的work子進程的超時時間;

可通過 timeout 參數(shù)限制work子進程允許運行的最長時間,超過該時間限制仍未結(jié)束的子進程會被強制結(jié)束;
expire 和time的區(qū)別

expire 在配置文件中設(shè)置,指任務的過期時間 這個時間是全局的,影響到所有的work進程
timeout 在命令行參數(shù)中設(shè)置,指work子進程的超時時間,這個時間只對當前執(zhí)行的listen 命令有效,timeout 針對的對象是 work 子進程;

5: 使用場景不同

work 適用場景是:
01: 任務數(shù)量較多
02: 性能要求較高
03: 任務的執(zhí)行時間較短
04: 消費者類中不存在死循環(huán),sleep() ,exit() ,die() 等容易導致bug的邏輯

listen 適用場景是:

01: 任務數(shù)量較少
02: 任務的執(zhí)行時間較長
03: 任務的執(zhí)行時間需要有嚴格限制

公有操作

由于我們是根據(jù)redis來做分析 所以只需要分析src/queue/connector/redis.php
01: 首先調(diào)用 src/Queue.php中的魔術(shù)方法 __callStatic
02: 在__callStatic方法中調(diào)用了 buildConnector
03: buildConnector 中首先加載配置文件 如果無將是同步執(zhí)行
04: 根據(jù)配置文件去創(chuàng)建連接并且傳入配置

在redis.php類的構(gòu)造方法中的操作:
01: 檢測redis擴展是否安裝
02: 合并配置
03: 檢測是redis擴展還是 pRedis
04: 創(chuàng)建連接對象

發(fā)布過程

發(fā)布參數(shù)

參數(shù)名默認值描述可以使用的方法
$job要執(zhí)行任務的類push,later
$data任務數(shù)據(jù)push,later
$queuedefault任務名稱push,later
$delaynull延遲時間later

立即執(zhí)行

    push($job, $data, $queue)
    Queue::push(Test::class, ['id' => 1], 'test');

一頓騷操作后返回一個數(shù)組 并且序列化后 rPush到redis中 key為 queue:queueName
數(shù)組結(jié)構(gòu):

[
    'job' => $job, // 要執(zhí)行任務的類
    'data' => $data, // 任務數(shù)據(jù)
    'id'=>'xxxxx' //任務id
]

寫入 redis并且返回隊列id
至于中間的那頓騷操作太長了就沒寫

延遲發(fā)布

    later($delay, $job, $data, $queue)
    Queue::later(100, Test::class, ['id' => 1], 'test');

跟上面的差不多
一頓騷操作后返回一個數(shù)組 并且序列化后 zAdd 到redis中 key為 queue:queueName:delayed score為當前的時間戳+$delay

執(zhí)行過程

執(zhí)行過程有work模式和listen模式 兩種 區(qū)別上面已經(jīng)說了 代碼邏輯由于太多等下回分解;
最后講一下標簽的使用

    //守護進程開啟
    'worker_daemon_start' => [
        \app\index\behavior\WorkerDaemonStart::class
    ],
    //內(nèi)存超出
    'worker_memory_exceeded' => [
        \app\index\behavior\WorkerMemoryExceeded::class
    ],
    //重啟守護進程
    'worker_queue_restart' => [
        \app\index\behavior\WorkerQueueRestart::class
    ],
    //任務開始執(zhí)行之前
    'worker_before_process' => [
        \app\index\behavior\WorkerBeforeProcess::class
    ],
    //任務延遲執(zhí)行
    'worker_before_sleep' => [
        \app\index\behavior\WorkerBeforeSleep::class
    ],
    //任務執(zhí)行失敗
    'queue_failed' => [
        \app\index\behavior\QueueFailed::class
    ]

think-queue的示例分析

public function run(Output $output)
    {
        $output->write('任務執(zhí)行失敗', true);
    }

控制臺執(zhí)行 php think queue:work --queue test --daemon
會在控制臺一次輸出

守護進程開啟
任務延遲執(zhí)行

失敗的處理 如果有任務執(zhí)行失敗或者執(zhí)行次數(shù)達到最大值
會觸發(fā) queue_failed

app\index\behavior@run方法里面寫失敗的邏輯 比如郵件通知 寫入日志等

最后我們來說一下如何在其他框架或者項目中給tp的項目推送消息隊列,例如兩個項目是分開的 另一個使用的卻不是tp5的框架

在其他項目中推任務

php版本

redis = new Redis();
        $this->redis->connect('127.0.0.1', 6379);
        $this->redis->select(10);
    }

    public function push($job, $data, $queue)
    {
        $payload = $this->createPayload($job, $data);
        $this->redis->rPush('queues:' . $queue, $payload);
    }

    public function later($delay, $job, $data, $queue)
    {
        $payload = $this->createPayload($job, $data);
        $this->redis->zAdd('queues:' . $queue . ':delayed', time() + $delay, $payload);
    }

    private function createPayload($job, $data)
    {
        $payload = $this->setMeta(json_encode(['job' => $job, 'data' => $data]), 'id', $this->random(32));
        return $this->setMeta($payload, 'attempts', 1);
    }

    private function setMeta($payload, $key, $value)
    {
        $payload = json_decode($payload, true);
        $payload[$key] = $value;
        $payload = json_encode($payload);

        if (JSON_ERROR_NONE !== json_last_error()) {
            throw new InvalidArgumentException('Unable to create payload: ' . json_last_error_msg());
        }

        return $payload;
    }

    private function random(int $length = 16): string
    {
        $str = '0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ';
        $randomString = '';
        for ($i = 0; $i < $length; $i++) {
            $randomString .= $str[rand(0, strlen($str) - 1)];
        }
        return $randomString;
    }
}

(new Index())->later(10, 'app\index\jobs\Test', ['id' => 1], 'test');

go版本

package main

import (
    "encoding/json"
    "github.com/garyburd/redigo/redis"
    "math/rand"
    "time"
)

type Payload struct {
    Id       string      `json:"id"`
    Job      string      `json:"job"`
    Data     interface{} `json:"data"`
    Attempts int         `json:"attempts"`
}

var RedisClient *redis.Pool

func init() {
    RedisClient = &redis.Pool{
        MaxIdle:     20,
        MaxActive:   500,
        IdleTimeout: time.Second * 100,
        Dial: func() (conn redis.Conn, e error) {
            c, err := redis.Dial("tcp", "127.0.0.1:6379")

            if err != nil {
                return nil, err
            }

            _, _ = c.Do("SELECT", 10)

            return c, nil
        },
    }

}

func main() {

    var data = make(map[string]interface{})
    data["id"] = "1"

    later(10, "app\\index\\jobs\\Test", data, "test")
}

func push(job string, data interface{}, queue string) {
    payload := createPayload(job, data)
    queueName := "queues:" + queue

    _, _ = RedisClient.Get().Do("rPush", queueName, payload)
}

func later(delay int, job string, data interface{}, queue string) {

    m, _ := time.ParseDuration("+1s")
    currentTime := time.Now()
    op := currentTime.Add(time.Duration(time.Duration(delay) * m)).Unix()
    createPayload(job, data)
    payload := createPayload(job, data)
    queueName := "queues:" + queue + ":delayed"

    _, _ = RedisClient.Get().Do("zAdd", queueName, op, payload)
}

// 創(chuàng)建指定格式的數(shù)據(jù)
func createPayload(job string, data interface{}) (payload string) {
    payload1 := &Payload{Job: job, Data: data, Id: random(32), Attempts: 1}

    jsonStr, _ := json.Marshal(payload1)

    return string(jsonStr)
}

// 創(chuàng)建隨機字符串
func random(n int) string {

    var str = []rune("0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ")

    b := make([]rune, n)
    for i := range b {
        b[i] = str[rand.Intn(len(str))]
    }
    return string(b)
}

感謝你能夠認真閱讀完這篇文章,希望小編分享的“think-queue的示例分析”這篇文章對大家有幫助,同時也希望大家多多支持創(chuàng)新互聯(lián),關(guān)注創(chuàng)新互聯(lián)行業(yè)資訊頻道,更多相關(guān)知識等著你來學習!


名稱欄目:think-queue的示例分析
路徑分享:http://weahome.cn/article/gjosci.html

其他資訊

在線咨詢

微信咨詢

電話咨詢

028-86922220(工作日)

18980820575(7×24)

提交需求

返回頂部