基于redis有序集实现延迟任务执行,比如某个时间给某个用户发短信,订单过期处理,等等
我是在tp5框架上写的,实现起来很简单,对于一些不是很复杂的应用足够了,目前在公司项目中使用,后台进程并没有实现多进程,
不多说,贴代码,不回排版,见谅
1、命令行脚本 执行方法:php think delay-queue queuename(这是有序集的key)
namespace app\command; use app\common\lib\delayqueue\delayqueue; use think\console\command; use think\console\input; use think\console\output; use think\db; class delayqueueworker extends command { const command_argv_1 = 'queue'; protected function configure() { $this->setname('delay-queue')->setdescription('延迟队列任务进程'); $this->addargument(self::command_argv_1); } protected function execute(input $input, output $output) { $queue = $input->getargument(self::command_argv_1); //参数1 延迟队列表名,对应与redis的有序集key名 while (true) { delayqueue::getinstance($queue)->perform(); usleep(300000); } } }
库类目录结构
config.php 里是redis连接参数配置
redishandler.php只实现有序集的操作,重连机制还没有实现
namespace app\common\lib\delayqueue; class redishandler { public $provider; private static $_instance = null; private function __construct() { $this->provider = new \redis(); //host port $config = require_once 'config.php'; $this->provider->connect($config['redis_host'], $config['redis_port']); } final private function __clone() {} public static function getinstance() { if(!self::$_instance) { self::$_instance = new redishandler(); } return self::$_instance; } /** * @param string $key 有序集key * @param number $score 排序值 * @param string $value 格式化的数据 * @return int */ public function zadd($key, $score, $value) { return $this->provider->zadd($key, $score, $value); } /** * 获取有序集数据 * @param $key * @param $start * @param $end * @param null $withscores * @return array */ public function zrange($key, $start, $end, $withscores = null) { return $this->provider->zrange($key, $start, $end, $withscores); } /** * 删除有序集数据 * @param $key * @param $member * @return int */ public function zrem($key,$member) { return $this->provider->zrem($key,$member); } }
延迟队列类
namespace app\common\lib\delayqueue; class delayqueue { private $prefix = 'delay_queue:'; private $queue; private static $_instance = null; private function __construct($queue) { $this->queue = $queue; } final private function __clone() {} public static function getinstance($queue = '') { if(!self::$_instance) { self::$_instance = new delayqueue($queue); } return self::$_instance; } /** * 添加任务信息到队列 * * demo delayqueue::getinstance('test')->addtask( * 'app\common\lib\delayqueue\job\test', * strtotime('2018-05-02 20:55:20'), * ['abc'=>111] * ); * * @param $jobclass * @param int $runtime 执行时间 * @param array $args */ public function addtask($jobclass, $runtime, $args = null) { $key = $this->prefix.$this->queue; $params = [ 'class' => $jobclass, 'args' => $args, 'runtime' => $runtime, ]; redishandler::getinstance()->zadd( $key, $runtime, serialize($params) ); } /** * 执行job * @return bool */ public function perform() { $key = $this->prefix.$this->queue; //取出有序集第一个元素 $result = redishandler::getinstance()->zrange($key, 0 ,0); if (!$result) { return false; } $jobinfo = unserialize($result[0]); print_r('job: '.$jobinfo['class'].' will run at: '. date('y-m-d h:i:s',$jobinfo['runtime']).php_eol); $jobclass = $jobinfo['class']; if(!@class_exists($jobclass)) { print_r($jobclass.' undefined'. php_eol); redishandler::getinstance()->zrem($key, $result[0]); return false; } // 到时间执行 if (time() >= $jobinfo['runtime']) { $job = new $jobclass; $job->setpayload($jobinfo['args']); $jobresult = $job->preform(); if ($jobresult) { // 将任务移除 redishandler::getinstance()->zrem($key, $result[0]); return true; } } return false; } }
异步任务基类:
namespace app\common\lib\delayqueue; class delayjob { protected $payload; public function preform () { // todo return true; } public function setpayload($args = null) { $this->payload = $args; } }
所有异步执行的任务都卸载job目录下,且要继承delayjob,你可以实现任何你想延迟执行的任务
如:
namespace app\common\lib\delayqueue\job; use app\common\lib\delayqueue\delayjob; class test extends delayjob { public function preform() { // payload 里应该有处理任务所需的参数,通过delayqueue的addtask传入 print_r('test job'.php_eol); return true; } }
使用方法:
假设用户创建了一个订单,订单在10分钟后失效,那么在订单创建后加入:
delayqueue::getinstance('close_order')->addtask( 'app\common\lib\delayqueue\job\closeorder', // 自己实现的job strtotime('2018-05-02 20:55:20'), // 订单失效时间 ['order_id'=>123456] // 传递给job的参数 );
close_order 是有序集的key
命令行启动进程
php think delay-queue close_order
相关推荐:
php+redis实现session共享
以上就是php+redis实现延迟队列的详细内容。