您好,欢迎来到三六零分类信息网!老站,搜索引擎当天收录,欢迎发信息

解析think-queue(围绕redis做分析)

2024/6/16 20:06:24发布41次查看
think-queue 解析前言分析之前请大家务必了解消息队列的实现
tp5的消息队列是基于database redis 和tp官方自己实现的 topthink
本章是围绕redis来做分析
存储key:key类型描述
queues:queuename list 要执行的任务
think:queue:restart string 重启队列时间戳
queues:queuename:delayed zset 延迟任务
queues:queuename:reserved zset 执行失败,等待重新执行
执行命令work和listen的区别在下面会解释命令描述
php think queue:work 监听队列
php think queue:listen 监听队列
php think queue:restart 重启队列
php think queue:subscribe 暂无,可能是保留的 官方有什么其他想法但是还没实现
行为标签标签描述
worker_daemon_start 守护进程开启
worker_memory_exceeded 内存超出
worker_queue_restart 重启守护进程
worker_before_process 任务开始执行之前
worker_before_sleep 任务延迟执行
queue_failed 任务执行失败
命令参数参数默认值可以使用的模式描述
queue null work,listen 要执行的任务名称
daemon null work 以守护进程执行任务
delay 0 work,listen 失败后重新执行的时间
force null work 失败后重新执行的时间
memory 128m work,listen 限制最大内存
sleep 3 work,listen 没有任务的时候等待的时间
tries 0 work,listen 任务失败后最大尝试次数
模式区别1: 执行原理不同
work: 单进程的处理模式;
无 daemon 参数 work进程在处理完下一个消息后直接结束当前进程。当不存在新消息时,会sleep一段时间然后退出;
有 daemon 参数 work进程会循环地处理队列中的消息,直到内存超出参数配置才结束进程。当不存在新消息时,会在每次循环中sleep一段时间;
listen: 父进程 + 子进程 的处理模式;
会在所在的父进程会创建一个单次执行模式的work子进程,并通过该work子进程来处理队列中的下一个消息,当这个work子进程退出之后;
所在的父进程会监听到该子进程的退出信号,并重新创建一个新的单次执行的work子进程;
2: 退出时机不同
work: 看上面
listen: 所在的父进程正常情况会一直运行,除非遇到下面两种情况
01: 创建的某个work子进程的执行时间超过了 listen命令行中的--timeout 参数配置;此时work子进程会被强制结束,listen所在的父进程也会抛出一个 processtimeoutexception 异常并退出;
开发者可以选择捕获该异常,让父进程继续执行;
02: 所在的父进程因某种原因存在内存泄露,则当父进程本身占用的内存超过了命令行中的 --memory 参数配置时,父子进程均会退出。正常情况下,listen进程本身占用的内存是稳定不变的。
3: 性能不同
work: 是在脚本内部做循环,框架脚本在命令执行的初期就已加载完毕;
listen: 是处理完一个任务之后新开一个work进程,此时会重新加载框架脚本;
因此 work 模式的性能会比listen模式高。
注意: 当代码有更新时,work 模式下需要手动去执行 php think queue:restart 命令重启队列来使改动生效;而listen 模式会自动生效,无需其他操作。
4: 超时控制能力
work: 本质上既不能控制进程自身的运行时间,也无法限制执行中的任务的执行时间;
listen: 可以限制其创建的work子进程的超时时间;
可通过 timeout 参数限制work子进程允许运行的最长时间,超过该时间限制仍未结束的子进程会被强制结束;
expire 和time的区别
expire 在配置文件中设置,指任务的过期时间 这个时间是全局的,影响到所有的work进程
timeout 在命令行参数中设置,指work子进程的超时时间,这个时间只对当前执行的listen 命令有效,timeout 针对的对象是 work 子进程;
5: 使用场景不同
work 适用场景是:
01: 任务数量较多
02: 性能要求较高
03: 任务的执行时间较短
04: 消费者类中不存在死循环,sleep() ,exit() ,die() 等容易导致bug的逻辑
listen 适用场景是:
01: 任务数量较少
02: 任务的执行时间较长
03: 任务的执行时间需要有严格限制
公有操作由于我们是根据redis来做分析 所以只需要分析src/queue/connector/redis.php
01: 首先调用 src/queue.php中的魔术方法 __callstatic
02: 在__callstatic方法中调用了 buildconnector
03: buildconnector 中首先加载配置文件 如果无将是同步执行
04: 根据配置文件去创建连接并且传入配置
在redis.php类的构造方法中的操作:
01: 检测redis扩展是否安装
02: 合并配置
03: 检测是redis扩展还是 predis
04: 创建连接对象
发布过程发布参数参数名默认值描述可以使用的方法
$job 无 要执行任务的类 push,later
$data 空 任务数据 push,later
$queue default 任务名称 push,later
$delay null 延迟时间 later
立即执行    push($job, $data, $queue)    queue::push(test::class, ['id' => 1], 'test');
一顿骚操作后返回一个数组 并且序列化后 rpush到redis中 key为 queue:queuename
数组结构:
[    'job' => $job, // 要执行任务的类    'data' => $data, // 任务数据    'id'=>'xxxxx' //任务id]
写入 redis并且返回队列id
至于中间的那顿骚操作太长了就没写
延迟发布    later($delay, $job, $data, $queue)    queue::later(100, test::class, ['id' => 1], 'test');
跟上面的差不多
一顿骚操作后返回一个数组 并且序列化后 zadd 到redis中 key为 queue:queuename:delayed score为当前的时间戳+$delay
执行过程执行过程有work模式和listen模式 两种 区别上面已经说了 代码逻辑由于太多等下回分解;
最后讲一下标签的使用
    //守护进程开启    'worker_daemon_start' => [        \app\index\behavior\workerdaemonstart::class    ],    //内存超出    'worker_memory_exceeded' => [        \app\index\behavior\workermemoryexceeded::class    ],    //重启守护进程    'worker_queue_restart' => [        \app\index\behavior\workerqueuerestart::class    ],    //任务开始执行之前    'worker_before_process' => [        \app\index\behavior\workerbeforeprocess::class    ],    //任务延迟执行    'worker_before_sleep' => [        \app\index\behavior\workerbeforesleep::class    ],    //任务执行失败    'queue_failed' => [        \app\index\behavior\queuefailed::class    ]
public function run(output $output)    {        $output->write('<info>任务执行失败</info>', true);    }
控制台执行 php think queue:work --queue test --daemon
会在控制台一次输出
守护进程开启任务延迟执行
失败的处理 如果有任务执行失败或者执行次数达到最大值
会触发 queue_failed
在app\index\behavior@run方法里面写失败的逻辑 比如邮件通知 写入日志等
最后我们来说一下如何在其他框架或者项目中给tp的项目推送消息队列,例如两个项目是分开的 另一个使用的却不是tp5的框架
在其他项目中推任务php版本<?phpclass index{ private $redis = null; public function __construct() { $this->redis = new redis();        $this->redis->connect('127.0.0.1', 6379);        $this->redis->select(10);    }    public function push($job, $data, $queue)    {        $payload = $this->createpayload($job, $data);        $this->redis->rpush('queues:' . $queue, $payload);    }    public function later($delay, $job, $data, $queue)    {        $payload = $this->createpayload($job, $data);        $this->redis->zadd('queues:' . $queue . ':delayed', time() + $delay, $payload);    }    private function createpayload($job, $data)    {        $payload = $this->setmeta(json_encode(['job' => $job, 'data' => $data]), 'id', $this->random(32));        return $this->setmeta($payload, 'attempts', 1);    }    private function setmeta($payload, $key, $value)    {        $payload = json_decode($payload, true);        $payload[$key] = $value;        $payload = json_encode($payload);        if (json_error_none !== json_last_error()) {            throw new invalidargumentexception('unable to create payload: ' . json_last_error_msg());        }        return $payload;    }    private function random(int $length = 16): string    {        $str = '0123456789abcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyz';        $randomstring = '';        for ($i = 0; $i < $length; $i++) { $randomstring .= $str[rand(0, strlen($str) - 1)]; } return $randomstring; }}(new index())->later(10, 'app\index\jobs\test', ['id' => 1], 'test');
go版本package mainimport (    encoding/json    github.com/garyburd/redigo/redis    math/rand    time)type payload struct {    id       string      `json:id`    job      string      `json:job`    data     interface{} `json:data`    attempts int         `json:attempts`}var redisclient *redis.poolfunc init() {    redisclient = &redis.pool{        maxidle:     20,        maxactive:   500,        idletimeout: time.second * 100,        dial: func() (conn redis.conn, e error) {            c, err := redis.dial(tcp, 127.0.0.1:6379)            if err != nil {                return nil, err            }            _, _ = c.do(select, 10)            return c, nil        },    }}func main() {    var data = make(map[string]interface{})    data[id] = 1    later(10, app\\index\\jobs\\test, data, test)}func push(job string, data interface{}, queue string) {    payload := createpayload(job, data)    queuename := queues: + queue    _, _ = redisclient.get().do(rpush, queuename, payload)}func later(delay int, job string, data interface{}, queue string) {    m, _ := time.parseduration(+1s)    currenttime := time.now()    op := currenttime.add(time.duration(time.duration(delay) * m)).unix()    createpayload(job, data)    payload := createpayload(job, data)    queuename := queues: + queue + :delayed    _, _ = redisclient.get().do(zadd, queuename, op, payload)}// 创建指定格式的数据func createpayload(job string, data interface{}) (payload string) {    payload1 := &payload{job: job, data: data, id: random(32), attempts: 1}    jsonstr, _ := json.marshal(payload1)    return string(jsonstr)}// 创建随机字符串func random(n int) string {    var str = []rune(0123456789abcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyz)    b := make([]rune, n)    for i := range b {        b[i] = str[rand.intn(len(str))]    }    return string(b)}
更多thinkphp技术知识,请访问thinkphp教程栏目!
以上就是解析think-queue(围绕redis做分析)的详细内容。
该用户其它信息

VIP推荐

免费发布信息,免费发布B2B信息网站平台 - 三六零分类信息网 沪ICP备09012988号-2
企业名录 Product