在项目中rabbitmq得到了广泛的时候,这里对rabbitmq的常规功能做了一个简单的总结,并封装成了composer包,composer包地址(https://packagist.org/packages/maweibinguo/easyrabbitmq)、github地址(https://github.com/maweibinguo/easyrabbitmq),欢迎fork,由于水平有限,难免存在bug,欢迎提出宝贵意见
【推荐学习:php视频教程】
easy-rabbitmq 包简介对php-amqplib/php-amqplib包的二次封装,为常见功能提供一套开箱即用的生产解决方案
。目前支持的功能列表如下:
推送消息到直连交换机(含延迟消息)推送消息到扇形交换机(含延迟消息)推送消息到主题交换机(含延迟消息)订阅模式下的可靠消费, 消费者消费失败后将会尝试继续消费,最多尝试5次。拉取模式下的可靠消费, 消费者消费失败后将会尝试继续消费,最多尝试5次。如果还有其它场景,欢迎继续补充,随后进行迭代!!
要求安装包对php版本对要求主要取决于php-amqplib/php-amqplib包本身对要求,这里为了兼顾php5.0的使用者,我们使用了php-amqplib/php-amqplib包v2.9.0的版本。
具体的要求参照这里(https://packagist.org/packages/php-amqplib/php-amqplib#v2.9.0)。
不过笔者推荐使用php7.0及其以上版本, 这个开发包也是在7.0这个版本上面开发完成的!安装 composer require maweibinguo/easyrabbitmq
使用在这里我们推荐php脚本+supervisor结合使用,用以保证消费进程的可靠性、增强worker的消费能力! 如果你还没有听说过supervisor,可以点击这里(http://www.supervisord.org/introduction.html)了解.
1、推送消息1-1、推送消息到直连交换机 $config = [ host => 127.0.0.1, port => 5672, user => guest, password => guest, vhost => /, channel_max_num => 10, ]; $instance = rabbitmq::getinstance($config); //延迟消息,30 秒中后才会到达指定的交换机 $instance->pushtodirect( $msg = time(), //消息体内容 $exchange = easy_direct_exchange, //交换机名称 $routingkey = direct_test_queue, //消息的routingkey,consume(get) 方法到bingdingkey 要和routingkey保持一致 $delaysec = 30 //延迟秒数 ); //无延迟,推入到指定到直链交换机 $instance->pushtodirect( $msg = time(), //消息体内容 $exchange = easy_direct_exchange, //交换机名称 $routingkey = direct_test_queue, //消息的routingkey,consume(get) 方法到bingdingkey 要和routingkey保持一致 );
1-2、推送消息到扇形交换机 $config = [ host => 127.0.0.1, port => 5672, user => guest, password => guest, vhost => /, channel_max_num => 10, ]; $instance = rabbitmq::getinstance($config); //延迟消息,30 秒中后才会到达指定的交换机 $instance->pushtofanout( $msg = time(), //消息体内容 $exchange = easy_fanout_exchange, //交换机名称 $delaysec = 30 //延迟秒数 ); //无延迟,推入到指定到直链交换机 $instance->pushtofanout( $msg = time(), //消息体内容 $exchange = easy_fanout_exchange //交换机名称 );
1-3、推送消息到主题交换机 $config = [ host => 127.0.0.1, port => 5672, user => guest, password => guest, vhost => /, channel_max_num => 10, ]; $instance = rabbitmq::getinstance($config); //延迟消息,30 秒中后才会到达指定的交换机 $instance->pushtotopic( $msg = time(), //消息体内容 $exchange = easy_topic_exchange, //交换机名称 /** * routingkey 要同consum(get)方法的bindingkey相匹配 * bindingkey支持两种特殊的字符*、“#”,用作模糊匹配, 其中*用于匹配一个单词、“#”用于匹配多个单词(也可以是0个) * 无论是bindingkey还是routingkey, 被.分隔开的每一段独立的字符串就是一个单词, easy.topic.queue, 包含三个单词easy、topic、queue */ $routingkey = easy.topic.queue, $delaysec = 30 //延迟秒数 ); //无延迟,推入到指定到直链交换机 $instance->pushtotopic( $msg = time(), //消息体内容 $exchange = easy_topic_exchange, //交换机名称 $routingkey = easy.topic.queue );
2、消费消息消费支持自动重试,最多尝试重试5次,每次消费失败后该消息将会被重新投入到消费队列中。重新的时间将会随着失败的次数增多逐渐推移,本客户端支持的推移策略如下:
失败1次(1秒钟后会再被投递), 失败2次(2秒钟后会再被投递), 失败3次(4秒钟后会再被投递), 失败4次(8秒钟后会再被投递), 失败5次(16秒钟后会再被投递)
2-1、订阅模式订阅模式下的可靠消费 $config = [ host => 127.0.0.1, port => 5672, user => guest, password => guest, vhost => /, channel_max_num => 10, ]; $instance = rabbitmq::getinstance($config); $instance->consume( $queuename = direct_test_queue,//订阅的队列名称 $consumertag = c1,//消费标记 $exchange = easy_direct_exchange,//交换机名称 $bindingkey = direct_test_queue,//bindingkey,如果是直链交换机需要同routingkey保持一致 $callback = function($msg){ $body = $msg->body; file_put_contents(./test.log, time => . time() . \t . body => . $body . php_eol , file_append); //如果返回结果不绝对等于(===)true,那么将触发消息重试机制 return false; }, //5次消费消费失败后,失败消息将会投递到的队列名称 $failedqueue = easymq@failed );
2-2、拉取模式拉取模式下的可靠消费 $config = [ host => 127.0.0.1, port => 5672, user => guest, password => guest, vhost => /, channel_max_num => 10, ]; $instance = rabbitmq::getinstance($config); $instance->get( $queue = get_queue, $exchange = easy_fanout_exchange, $bindingkey = , $callback = function($msg){ $body = $msg->body; file_put_contents(./test.log, time => . time() . \t . body => . $body . php_eol , file_append); //如果返回结果不绝对等于(===)true,那么将触发消息重试机制 return false; }, //5次消费消费失败后,失败消息将会投递到的队列名称 $failedqueue = 'easymq@failed' );
以上就是rabbitmq常见功能封装(php版本)的详细内容。
