RabbitMQ在Windows环境下安装及搭配PHP的基础用法
IT小马 -前言相关概念消息(Message)
是指在应用间传送的数据。消息可以非常简单,比如只包含文本字符串,也可以更复杂,可能包含嵌入对象。
消息队列(Message Queue)是一种应用间的通信方式,消息发送后可以立即返回,由消息系统来确保消息的可靠传递。消息发布者只管把消息发布到 MQ 中而不用管谁来取,消息使用者只管从 MQ 中取消息而不管是谁发布的。这样发布者和使用者都不用知道对方的存在。
RabbitMQRabbitMQ 是一个由 Erlang 语言开发的 AMQP 的开源实现。
AMQPAdvanced Message Queue,高级消息队列协议。
Erlang面向并发的编程语言。
RabbitMQ特点1.可靠性(Reliability) RabbitMQ 使用一些机制来保证可靠性,如持久化、传输确认、发布确认。
2.灵活的路由(Flexible Routing) 在消息进入队列之前,通过 Exchange 来路由消息的。对于典型的路由功能,RabbitMQ已经提供了一些内置的 Exchange 来实现。针对更复杂的路由功能,可以将多个Exchange 绑定在一起,也通过插件机制实现自己的 Exchange 。
3.消息集群(Clustering) 多个 RabbitMQ 服务器可以组成一个集群,形成一个逻辑 Broker
4.高可用(Highly Available Queues) 队列可以在集群中的机器上进行镜像,使得在部分节点出问题的情况下队列仍然可用。
5.多种协议(Multi-protocol) RabbitMQ 支持多种消息队列协议,比如 STOMP、MQTT 等等。
6.多语言客户端(Many Clients) RabbitMQ 几乎支持所有常用语言,比如 Java、.NET、Ruby 等等。
7.管理界面(Management UI) RabbitMQ 提供了一个易用的用户界面,使得用户可以监控和管理消息 Broker 的许多方面。
8.跟踪机制(Tracing) 如果消息异常,RabbitMQ 提供了消息跟踪机制,使用者可以找出发生了什么。
9.插件机制(Plugin System) RabbitMQ 提供了许多插件,来从多方面进行扩展,也可以编写自己的插件。
概念模型Message消息,消息是不具名的,它由消息头和消息体组成。消息体是不透明的,而消息头则由一系列的可选属性组成,这些属性包括routing-key(路由键)、priority(相对于其他消息的优先权)、delivery-mode(指出该消息可能需要持久性存储)等。Publisher
消息的生产者,也是一个向交换器发布消息的客户端应用程序。Exchange
交换器,用来接收生产者发送的消息并将这些消息路由给服务器中的队列。Binding
绑定,用于消息队列和交换器之间的关联。一个绑定就是基于路由键将交换器和消息队列连接起来的路由规则,所以可以将交换器理解成一个由绑定构成的路由表。Queue
消息队列,用来保存消息直到发送给消费者。它是消息的容器,也是消息的终点。一个消息可投入一个或多个队列。消息一直在队列里面,等待消费者连接到这个队列将其取走。Connection
网络连接,比如一个TCP连接。Channel
信道,多路复用连接中的一条独立的双向数据流通道。信道是建立在真实的TCP连接内地虚拟连接,AMQP 命令都是通过信道发出去的,不管是发布消息、订阅队列还是接收消息,这些动作都是通过信道完成。因为对于操作系统来说建立和销毁 TCP 都是非常昂贵的开销,所以引入了信道的概念,以复用一条 TCP 连接。Consumer
消息的消费者,表示一个从消息队列中取得消息的客户端应用程序。Virtual Host
虚拟主机,表示一批交换器、消息队列和相关对象。虚拟主机是共享相同的身份认证和加密环境的独立服务器域。每个 vhost 本质上就是一个 mini 版的 RabbitMQ 服务器,拥有自己的队列、交换器、绑定和权限机制。vhost 是 AMQP 概念的基础,必须在连接时指定,RabbitMQ 默认的 vhost 是 / 。Broker
表示消息队列服务器实体。安装Windows环境安装Erlang
下载地址: https://erlang.org/download/o...
配置环境变量 ERLANG_HOME C:\Program Files (x86)\erl5.9
添加到PATH %ERLANG_HOME%\bin;
安装RabbitMq下载地址:https://www.rabbitmq.com/ 或 百度网盘
配置环境变量 C:\Program Files (x86)\RabbitMQ Server\rabbitmq_server-3.9.11
添加到PATH %RABBITMQ_SERVER%\sbin;
运行:
D:\Program Files\RabbitMQ Server\rabbitmq_server-3.9.11\sbin>rabbitmq-server.bat
访问:
打开浏览器。访问 http://127.0.0.1:15672
注:
默认账号:guest 密码:guest,仅用于浏览器访问,API需要新建用户默认浏览器访问端口15672,API访问端口5672安装php的amqp扩展1.下载地址:http://pecl.php.net/package/amqp
2.将php_amqp.dll复制到php/ext,同时在php.ini中添加如下代码:
[amqp]
extension=php_amqp.dll
3.然后将rabbitmq.4.dll复制到php根目录
4.查看是否安装成功:php -m
docker pull rabbitmq # 镜像未配有控制台
docker pull rabbitmq:management # 镜像配有控制台
docker run --name rabbitmq -d -p 15672:15672 -p 5672:5672 rabbitmq:management
启动容器后,可以浏览器中访问http://localhost:15672来查看控制台信息。RabbitMQ
默认的用户名:guest
,密码:guest
docker pull php:7.4-fpm
docker run -d -p 9000:9000 -v /Users/ma/docker/php:/www --name phpfpm php:7.4-fpm
docker exec -it phpfpm /bin/bash
#安装扩展
apt-get update && apt-get install -y libfreetype6-dev librabbitmq-dev libjpeg62-turbo-dev libmcrypt-dev libpng-dev
pecl install amqp
docker-php-ext-enable amqp
使用普通队列生产者$params = [
'host' => '192.168.0.134',
'port' => '5672',
'vhost' => '/',
'login' => 'admin',
'password' => '123456'
];
$connection = new \AMQPConnection($params);
if (!$connection->connect()) {
echo "Cannot connect to the broker!";
exit;
}
$channel = new \AMQPChannel($connection);
$exchangeName = 'direct_exchange';
$queueName = 'direct_queue';
$routeKey = 'direct_queue';
$exchange = new \AMQPExchange($channel);
$exchange->setName($exchangeName);
$exchange->setFlags(AMQP_DURABLE);
$exchange->setType(AMQP_EX_TYPE_DIRECT);
// 声明交换机
$exchange->declareExchange();
// 创建消息队列
$queue = new \AMQPQueue($channel);
$queue->setName($queueName);
// 设置持久性
$queue->setFlags(AMQP_DURABLE);
// 声明消息队列
$queue->declareQueue();
// 开启事务,确保数据真正不丢失
$channel->startTransaction();
// 将消息和标识绑定到交换器中
$exchange->publish($message, $routeKey);
$channel->commitTransaction();
$connection->disconnect();
var_dump("[x] Sent $message");
消费者$params = [
'host' => '192.168.0.134',
'port' => '5672',
'vhost' => '/',
'login' => 'admin',
'password' => '123456'
];
$connection = new \AMQPConnection($params);
if (!$connection->connect()) {
echo "Cannot connect to the broker!";
exit;
}
$channel = new \AMQPChannel($connection);
$exchangeName = 'direct_exchange';
$queueName = 'direct_queue';
$routeKey = 'direct_queue';
$exchange = new \AMQPExchange($channel);
$exchange->setName($exchangeName);
$exchange->setType(AMQP_EX_TYPE_DIRECT);
$exchange->declareExchange();
$queue = new \AMQPQueue($channel);
$queue->setName($queueName);
$queue->setFlags(AMQP_DURABLE);
$queue->declareQueue();
$queue->bind($exchange->getName(), $routeKey);
// 接收消息并处理回调
$queue->consume(function ($envelop, $queue) {
$message = $envelop->getBody();
echo $message . PHP_EOL;
// ACK 通知生产者任务完成
$queue->ack($envelop->getDeliveryTag(), AMQP_NOPARAM);
});
//设置每次只能处理一条,避免消息堆积,从而导致队列挂掉
$channel->qos(0, 1);
//关闭连接
$connection->disconnect();
延迟队列生产者<?php
//来源公众号:【码农编程进阶笔记】
//header('Content-Type:text/html;charset=utf-8;');
$params = array(
'exchangeName' => 'test_cache_exchange',
'queueName' => 'test_cache_queue',
'routeKey' => 'test_cache_route',
);
$connectConfig = array(
'host' => 'localhost',
'port' => 5672,
'login' => 'guest',
'password' => 'guest',
'vhost' => '/'
);
//var_dump(extension_loaded('amqp')); 判断是否加载amqp扩展
//exit();
for($i=5;$i>0;$i--){
try {
$conn = new AMQPConnection($connectConfig);
$conn->connect();
if (!$conn->isConnected()) {
//die('Conexiune esuata');
//TODO 记录日志
echo 'rabbit-mq 连接错误:', json_encode($connectConfig);
exit();
}
$channel = new AMQPChannel($conn);
if (!$channel->isConnected()) {
// die('Connection through channel failed');
//TODO 记录日志
echo 'rabbit-mq Connection through channel failed:', json_encode($connectConfig);
exit();
}
$exchange = new AMQPExchange($channel);
$exchange->setFlags(AMQP_DURABLE);//持久化
$exchange->setName($params['exchangeName']);
$exchange->setType(AMQP_EX_TYPE_DIRECT); //direct类型
$exchange->declareExchange();
//$channel->startTransaction();
//RabbitMQ不容许声明2个相同名称、配置不同的Queue队列
$queue = new AMQPQueue($channel);
$queue->setName($params['queueName'].$i);
$queue->setFlags(AMQP_DURABLE);
$queue->setArguments(array(
'x-dead-letter-exchange' => 'delay_exchange', 死信交换机
'x-dead-letter-routing-key' => 'delay_route', // 死信路由
'x-message-ttl' => (10000*$i), // 当上面的消息扔到该队列中后,过了60秒,如果没有被消费,它就死了
// 在RMQ中想要使用优先级特性需要的版本为3.5+。
//'x-max-priority'=>0,//将队列声明为优先级队列,即在创建队列的时候添加参数 x-max-priority 以指定最大的优先级,值为0-255(整数)。
));
$queue->declareQueue();
//绑定队列和交换机
$queue->bind($params['exchangeName'], $params['routeKey'].$i);
//$channel->commitTransaction();
} catch(Exception $e) {
}
// 当mandatory标志位设置为true时,如果exchange根据自身类型和消息routeKey无法找到一个符合条件的queue,那么会调用basic.return方法将消息返还给生产者;当mandatory设为false时,出现上述情形broker会直接将消息扔掉。
//delivery_mode=2指明message为持久的
//生成消息
echo '发送时间:'.date("Y-m-d H:i:s", time()).PHP_EOL;
echo 'i='.$i.',延迟'.($i*10).'秒'.PHP_EOL;
$message = json_encode(['order_id'=>time(),'i'=>$i]);
$exchange->publish($message, $params['routeKey'].$i, AMQP_MANDATORY, array('delivery_mode'=>2));
$conn->disconnect();
sleep(2);
}
消费者<?php
//来源公众号:【码农编程进阶笔记】
//header('Content-Type:text/html;charset=utf8;');
$params = array(
'exchangeName' => 'delay_exchange',
'queueName' => 'delay_queue',
'routeKey' => 'delay_route',
);
$connectConfig = array(
'host' => 'localhost',
'port' => 5672,
'login' => 'guest',
'password' => 'guest',
'vhost' => '/'
);
//var_dump(extension_loaded('amqp'));
try {
$conn = new AMQPConnection($connectConfig);
$conn->connect();
if (!$conn->isConnected()) {
//die('Conexiune esuata');
//TODO 记录日志
echo 'rabbit-mq 连接错误:', json_encode($connectConfig);
exit();
}
$channel = new AMQPChannel($conn);
if (!$channel->isConnected()) {
// die('Connection through channel failed');
//TODO 记录日志
echo 'rabbit-mq Connection through channel failed:', json_encode($connectConfig);
exit();
}
$exchange = new AMQPExchange($channel);
$exchange->setFlags(AMQP_DURABLE);//声明一个已存在的交换器的,如果不存在将抛出异常,这个一般用在consume端
$exchange->setName($params['exchangeName']?:'');
$exchange->setType(AMQP_EX_TYPE_DIRECT); //direct类型
$exchange->declareExchange();
//$channel->startTransaction();
$queue = new AMQPQueue($channel);
$queue->setName($params['queueName']?:'');
$queue->setFlags(AMQP_DURABLE);
$queue->declareQueue();
//绑定
$queue->bind($params['exchangeName'], $params['routeKey']);
} catch(Exception $e) {
echo $e->getMessage();
exit();
}
function callback(AMQPEnvelope $message) {
global $queue;
if ($message) {
$body = $message->getBody();
echo '接收时间:'.date("Y-m-d H:i:s", time()). PHP_EOL;
echo '接收内容:'.$body . PHP_EOL;
//为了防止接收端在处理消息时down掉,只有在消息处理完成后才发送ack消息
$queue->ack($message->getDeliveryTag());
} else {
echo 'no message' . PHP_EOL;
}
}
//$queue->consume('callback'); 第一种消费方式,但是会阻塞,程序一直会卡在此处
//注意:这里需要注意的是这个方法:$queue->consume,queue对象有两个方法可用于取消息:consume和get。前者是阻塞的,无消息时会被挂起,适合循环中使用;后者则是非阻塞的,取消息时有则取,无则返回false。
//就是说用了consume之后,会同步阻塞,该程序常驻内存,不能用nginx,apache调用。
$action = '2';
if($action == '1'){
$queue->consume('callback'); //第一种消费方式,但是会阻塞,程序一直会卡在此处
}else{
//第二种消费方式,非阻塞
$start = time();
while(true)
{
$message = $queue->get();
if(!empty($message))
{
echo '接收时间:'.date("Y-m-d H:i:s", time()). PHP_EOL;
echo '接收内容:'.$message->getBody().PHP_EOL;
$queue->ack($message->getDeliveryTag()); //应答,代表该消息已经消费
$end = time();
echo '运行时间:'.($end - $start).'秒'.PHP_EOL;
//exit();
}
else
{
//echo 'message not found' . PHP_EOL;
}
}
}
守护进程Windows端:supervisor-win下载地址:https://pypi.org/project/supe...
安装#重装setuptools
sudo pip3 uninstall setuptools
pip3 install setuptools --upgrade
#安装supervisor-win
pip install supervisor-win
配置app/public/supervisord/supervisord.conf
[program:cancelUnpayUniOrder]
directory=E:\\dev\\tp51\\app
command=D:\\phpstudy_pro\\Extensions\\php\\php7.3.4nts\\php.exe think cancelUnpayUniOrder
[program:syncWechatPayResult]
directory=E:\\dev\\tp51\\app
command=D:\\phpstudy_pro\\Extensions\\php\\php7.3.4nts\\php.exe think syncWechatPayResult
[supervisord]
nodaemon=true
logfile = E:\dev\tp51\app\runtime\log\supervisord.log
pidfile = E:\dev\tp51\app\runtime\log\supervisord.pid
[supervisorctl]
启动start.bat
::守护进程应设置任务计划,开机时启动
supervisord -c supervisord.conf
参考RabbitMQ 中文文档
Windows安装RabbitMQ详细教程](https://zhuanlan.zhihu.com/p/...)
基于 Docker 安装 RabbitMQ
docker安装PHP扩展
PHP 使用rabbitmq 入门教程
php使用rabbitMQ
基于RabbitMQ实现延迟队列--PHP版
教你如何在 Windows 下让崩溃的 Python 程序自重启
基于 Docker 安装 RabbitMQ
docker安装PHP扩展
php介绍
PHP即“超文本预处理器”,是一种通用开源脚本语言。PHP是在服务器端执行的脚本语言,与C语言类似,是常用的网站编程语言。PHP独特的语法混合了C、Java、Perl以及 PHP 自创的语法。利于学习,使用广泛,主要适用于Web开发领域。