本篇內(nèi)容主要講解“消息隊(duì)列RabbitMQ入門與PHP實(shí)例分析”,感興趣的朋友不妨來看看。本文介紹的方法操作簡(jiǎn)單快捷,實(shí)用性強(qiáng)。下面就讓小編來帶大家學(xué)習(xí)“消息隊(duì)列RabbitMQ入門與PHP實(shí)例分析”吧!
在廊坊等地區(qū),都構(gòu)建了全面的區(qū)域性戰(zhàn)略布局,加強(qiáng)發(fā)展的系統(tǒng)性、市場(chǎng)前瞻性、產(chǎn)品創(chuàng)新能力,以專注、極致的服務(wù)理念,為客戶提供網(wǎng)站設(shè)計(jì)制作、成都做網(wǎng)站 網(wǎng)站設(shè)計(jì)制作按需開發(fā)網(wǎng)站,公司網(wǎng)站建設(shè),企業(yè)網(wǎng)站建設(shè),成都品牌網(wǎng)站建設(shè),網(wǎng)絡(luò)營(yíng)銷推廣,外貿(mào)網(wǎng)站建設(shè),廊坊網(wǎng)站建設(shè)費(fèi)用合理。

說明
MQ(Message Queue) 即消息隊(duì)列,是應(yīng)用間的通信方式,消息發(fā)送后可立即返回,由消息系統(tǒng)來確保消息的可靠傳遞。”消息隊(duì)列“是在消息的傳輸過程中保存消息的容器。它是典型的:生產(chǎn)者、消費(fèi)者模型。生產(chǎn)者不斷向消息隊(duì)列中生產(chǎn)消息,消費(fèi)者不斷的從隊(duì)列中獲取消息。因?yàn)橄⒌纳a(chǎn)和消費(fèi)都是異步的,而且只關(guān)心消息的發(fā)送和接收,沒有業(yè)務(wù)邏輯的侵入,這樣就實(shí)現(xiàn)生產(chǎn)者和消費(fèi)者的解耦。
為什么使用消息中間件?
消息隊(duì)列是分布式系統(tǒng)中重要的組件,解決應(yīng)用解耦,異步消息,流量削峰等問題,實(shí)現(xiàn)高并發(fā),高可用,可伸縮和最終一致性架構(gòu)
異步處理
用戶注冊(cè)信息后需要發(fā)送郵件和注冊(cè)短信
1、用戶注冊(cè)信息寫入數(shù)據(jù)庫(kù)后即使返回注冊(cè)成功的信息
2、發(fā)送郵件和注冊(cè)短信通過消息隊(duì)列異步執(zhí)行,用戶不需要等待這兩個(gè)操作
應(yīng)用解耦
用戶下單后,訂單系統(tǒng)需要通知庫(kù)存系統(tǒng)。傳統(tǒng)的做法是,訂單系統(tǒng)調(diào)用庫(kù)存系統(tǒng)的接口,進(jìn)行增減庫(kù)存
1、用戶下單入列生產(chǎn),返回成功提示
2、隊(duì)列消費(fèi)庫(kù)存系統(tǒng),進(jìn)行庫(kù)存增減
流量削峰
流量削峰也是消息隊(duì)列中的常見場(chǎng)景,一般在秒殺或團(tuán)搶活動(dòng)中使用廣泛
1、當(dāng)一批用戶請(qǐng)求過來進(jìn)入列隊(duì),控制入列數(shù)量,超出一定數(shù)量返回秒殺結(jié)束
2、然后隊(duì)列一個(gè)個(gè)按照先進(jìn)先出進(jìn)行隊(duì)列消費(fèi).jpg)
Rabbitmq特性
可靠性(Reliability)RabbitMQ 使用一些機(jī)制來保證可靠性,如持久化、傳輸確認(rèn)、發(fā)布確認(rèn)。
靈活的路由(Flexible Routing)在消息進(jìn)入隊(duì)列之前,通過 Exchange 來路由消息的。對(duì)于典型的路由功能,RabbitMQ 已經(jīng)提供了一些內(nèi)置的 Exchange 來實(shí)現(xiàn)。針對(duì)更復(fù)雜的路由功能,可以將多個(gè) Exchange 綁定在一起,也通過插件機(jī)制實(shí)現(xiàn)自己的 Exchange 。
消息集群(Clustering)多個(gè) RabbitMQ 服務(wù)器可以組成一個(gè)集群,形成一個(gè)邏輯 Broker 。
高可用(Highly Available Queues)隊(duì)列可以在集群中的機(jī)器上進(jìn)行鏡像,使得在部分節(jié)點(diǎn)出問題的情況下隊(duì)列仍然可用。
多種協(xié)議(Multi-protocol)RabbitMQ 支持多種消息隊(duì)列協(xié)議,比如 STOMP、MQTT 等等。
多語言客戶端(Many Clients)RabbitMQ 幾乎支持所有常用語言,比如PHP Java、.NET、Ruby 等等。
管理界面(Management UI)RabbitMQ 提供了一個(gè)易用的用戶界面,使得用戶可以監(jiān)控和管理消息 Broker 的許多方面。
跟蹤機(jī)制(Tracing)如果消息異常,RabbitMQ 提供了消息跟蹤機(jī)制,使用者可以找出發(fā)生了什么。
插件機(jī)制(Plugin System)RabbitMQ 提供了許多插件,來從多方面進(jìn)行擴(kuò)展,也可以編寫自己的插件。
RabbitMQ的工作原理.jpg)
Broker: 接收和分發(fā)消息的應(yīng)用,RabbitMQ Server就是Message Broker。
Virtual host: 類似于MySQL的數(shù)據(jù)庫(kù),當(dāng)多個(gè)不同的用戶使用同一個(gè)RabbitMQ server提供的服務(wù)時(shí),可以劃分出多個(gè)vhost,每個(gè)用戶在自己的vhost創(chuàng)建exchange/queue等。
Connection: publisher/consumer和broker之間的TCP連接。
Channel: 如果每一次訪問RabbitMQ都建立一個(gè)Connection,在消息量大的時(shí)候建立TCP Connection的開銷將是巨大的,效率也較低。Channel是在connection內(nèi)部建立的邏輯連接Channel作為輕量級(jí)的Connection極大減少了操作系統(tǒng)建立TCP connection的開銷。
Exchange: message到達(dá)broker的第一站,根據(jù)分發(fā)規(guī)則,匹配查詢表中的routing key,分發(fā)消息到queue中去。常用的類型有:direct (point-to-point), topic (publish-subscribe) and fanout (multicast)。
Queue: 消息最終被送到這里等待consumer取走。一個(gè)message可以被同時(shí)拷貝到多個(gè)queue中。
RabbitMQ官方地址:http://www.rabbitmq.com
安裝rabbitmq需要先安裝erlang
第一步:erlang 安裝
安裝rabbitmq需要先安裝erlang,centos7不支持erlang 24版本的安裝.jpg)
下載:.jpg)
.jpg)




# 系統(tǒng) centos 7# 下載erlang包,手動(dòng)下載后上傳至服務(wù)器,我在使用wget下載后無法安裝,這里沒明白 # 安裝 yum install erlang-23.3.4.4-1.el7.x86_64.rpm # 驗(yàn)證安裝是否成功 erl

第二步:安裝rabbitmq


# 系統(tǒng) centos 7# 下載rabbitmq包,手動(dòng)下載后上傳至服務(wù)器,我在使用wget下載后無法安裝,這里沒明白 # 安裝 yum install rabbitmq-server-3.8.19-1.el7.noarch.rpm # 啟動(dòng) systemctl start rabbitmq-server # 關(guān)閉 systemctl stop rabbitmq-server # 查看默認(rèn)端口服務(wù)是否啟動(dòng) netstat -tunlp

4369:epmd(Erlang Port Mapper Daemon),erlang服務(wù)端口
5672 :client端通信口
15672:HTTP API客戶端,管理UI(僅在啟用了管理插件的情況下)不一定會(huì)啟動(dòng)
25672:用于節(jié)點(diǎn)間通信(Erlang分發(fā)服務(wù)器端口)
rabbitmq 管理命令
啟動(dòng)15672:HTTP API客戶端,管理UI(僅在啟用了管理插件的情況下)
# 啟動(dòng)rabbitmq_management插件 rabbitmq-plugins enable rabbitmq_management # 查看所有插件 rabbitmq-plugins list
測(cè)試訪問UI界面:(此時(shí)非localhost地址是無法登錄)
http://192.168.10.105:15672/
rabbitmq 配置管理界面
# 新增一個(gè)用戶 rabbitmqctl add_user 【用戶名Username】 【密碼Password】 rabbitmqctl add_user root root # 刪除一個(gè)用戶 rabbitmqctl delete_user Username # 修改用戶的密碼 rabbitmqctl change_password Username Newpassword # 查看當(dāng)前用戶列表 rabbitmqctl list_users # 設(shè)置用戶角色的命令為: rabbitmqctl set_user_tags User Tag rabbitmqctl set_user_tags root administrator # User為用戶名, Tag為角色名(對(duì)應(yīng)于上面的administrator,monitoring,policymaker,management,或其他自定義名稱)。
命令行創(chuàng)建vhost以及php擴(kuò)展安裝
類似于mysql的數(shù)據(jù)庫(kù),當(dāng)多個(gè)不同的用戶使用同一個(gè)RabbitMQ server提供的服務(wù)時(shí),可以劃分出多個(gè)vhost,每個(gè)用戶在自己的vhost創(chuàng)建exchange/queue等。
1)查看不同用戶的vhost

創(chuàng)建vhost,以及分配權(quán)限
# 新增vhost rabbitmqctl add_vhost vhostname rabbitmqctl add_vhost order # 查看vhost列表 rabbitmqctl list_vhosts #為vhost添加用戶 rabbitmqctl set_permissions -p vhostname username ".*" ".*" ".*"rabbitmqctl set_permissions -p order root ".*" ".*" ".*" ".*" ".*" ".*"后邊三個(gè).*分別代表:配置權(quán)限、寫權(quán)限、讀權(quán)限

2)為php安裝rabbitmq擴(kuò)展安裝
https://github.com/php-amqplib/php-amqplib 擴(kuò)展安裝
修改阿里云鏡像
composer config -g repo.packagist composer https://mirrors.aliyun.com/composer/
開始下載–這里有時(shí)候會(huì)下載成2.8低版本的,需要指定版本
,下載不成功則升級(jí)composer、php.ini 打開 sockets 擴(kuò)展和切換國(guó)內(nèi)鏡像

# 升級(jí)composer composer self-update #php.ini 打開 sockets 擴(kuò)展 #下載指定版本 composer require php-amqplib/php-amqplib=^3.0
simple模式生產(chǎn)者消息推送到消息隊(duì)列
文檔:
https://www.rabbitmq.com/tutorials/tutorial-one-php.html
簡(jiǎn)單的生產(chǎn)者與消息者
生產(chǎn)者代碼
http://localhost/rabbitmq/simple/pro.php
<?php
require_once "../vendor/autoload.php";
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
//生產(chǎn)者
//Connection: publisher/consumer和broker之間的TCP連接
//Channel: 如果每一次訪問RabbitMQ都建立一個(gè)Connection,在消息量大的時(shí)候建立TCP Connection的開銷將是巨大的,效率也較低。Channel是在connection內(nèi)部建立的邏輯連接Channel作為輕量級(jí)的Connection極大減少了操作系統(tǒng)建立TCP connection的開銷。
//建立connction
$connection = new AMQPStreamConnection('192.168.10.105', 5672, 'root', 'root', 'order');
//Channel
$channel = $connection->channel();
//聲明隊(duì)列名為:goods
$queue_name = 'goods';
$channel->queue_declare($queue_name, false, true, false, false);
//生產(chǎn)數(shù)據(jù)
$data = 'this is messge';
//創(chuàng)建消息
$msg = new AMQPMessage($data, ['delivery_mode' => AMQPMessage::DELIVERY_MODE_NON_PERSISTENT]);
//發(fā)布消息
$channel->basic_publish($msg, $exchange = '', $queue_name);
//關(guān)閉連接
$channel->close();
$connection->close();運(yùn)行生產(chǎn)者腳本:
http://localhost/rabbitmq/simple/pro.php
點(diǎn)擊goods隊(duì)列可以進(jìn)入到消息詳情
http://localhost/rabbitmq/simple/con.php
<?php
require_once "../vendor/autoload.php";
use PhpAmqpLib\Connection\AMQPStreamConnection;
//建立connction
$connection = new AMQPStreamConnection('192.168.10.105', 5672, 'root', 'root', 'order');
//Channel
$channel = $connection->channel();
//聲明隊(duì)列名為:goods
$queue_name = 'goods';
$channel->queue_declare($queue_name, false, true, false, false);
echo " [*] Waiting for messages. To exit press CTRL+C\n";
$callback = function ($msg) {
echo 'received = ', $msg->body . "\n";
};
//開啟消費(fèi)
$channel->basic_consume($queue_name, '', false, true, false, false, $callback);
//不斷的循環(huán)進(jìn)行消費(fèi)
while ($channel->is_open()) {
$channel->wait();
}
//關(guān)閉連接
$channel->close();
$connection->close();rabbitmq Work Queues
一個(gè)生產(chǎn)者對(duì)應(yīng)多個(gè)消費(fèi)者,消費(fèi)特別慢時(shí)增加幾個(gè)消費(fèi)分發(fā)
生產(chǎn)者,和上文生產(chǎn)者不變
<?php
require_once "../vendor/autoload.php";
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
//生產(chǎn)者
//Connection: publisher/consumer和broker之間的TCP連接
//Channel: 如果每一次訪問RabbitMQ都建立一個(gè)Connection,在消息量大的時(shí)候建立TCP Connection的開銷將是巨大的,效率也較低。Channel是在connection內(nèi)部建立的邏輯連接Channel作為輕量級(jí)的Connection極大減少了操作系統(tǒng)建立TCP connection的開銷。
//建立connction
$connection = new AMQPStreamConnection('192.168.10.105', 5672, 'root', 'root', 'order');
//Channel
$channel = $connection->channel();
//聲明隊(duì)列名為:task_queue
$queue_name = 'task_queue';
$channel->queue_declare($queue_name, false, true, false, false);
for ($i = 0; $i < 10; $i++) {
//生產(chǎn)數(shù)據(jù)
$data = 'this is messge' . $i;
//創(chuàng)建消息
$msg = new AMQPMessage($data, ['delivery_mode' => AMQPMessage::DELIVERY_MODE_NON_PERSISTENT]);
//發(fā)布消息
$channel->basic_publish($msg, $exchange = '', $queue_name);
}
//關(guān)閉連接
$channel->close();
$connection->close();消費(fèi)者worker1
D:\phpstudy_pro\WWW\rabbitmq\worker\worker1.php
<?php
require_once "../vendor/autoload.php";
use PhpAmqpLib\Connection\AMQPStreamConnection;
//建立connction
$connection = new AMQPStreamConnection('192.168.10.105', 5672, 'root', 'root', 'order');
//Channel
$channel = $connection->channel();
//聲明隊(duì)列名為:task_queue
$queue_name = 'task_queue';
$channel->queue_declare($queue_name, false, true, false, false);
echo " [*] Waiting for messages. To exit press CTRL+C\n";
$callback = function ($msg) {
echo 'received = ', $msg->body . "\n";
};
//開啟消費(fèi)
$channel->basic_consume($queue_name, '', false, true, false, false, $callback);
//不斷的循環(huán)進(jìn)行消費(fèi)
while ($channel->is_open()) {
$channel->wait();
}
//關(guān)閉連接
$channel->close();
$connection->close();消費(fèi)者worker2,代碼和worker1一樣,同時(shí)運(yùn)行開啟后會(huì)一起消費(fèi)
D:\phpstudy_pro\WWW\rabbitmq\worker\worker2.php
消費(fèi)者消費(fèi)消息ack確認(rèn)
用以確認(rèn)不會(huì)丟失消息
消費(fèi)消息
basic_consume($queue = ‘’, $consumer_tag = ‘’, $no_local = false, $no_ack = false, $exclusive = false, $nowait = false, $callback = null, $ticket = null, $arguments = array())
no_ack=false,設(shè)置為手動(dòng)應(yīng)答
開啟后需要進(jìn)行消息的消費(fèi)確認(rèn)后才會(huì)進(jìn)行移除,否者該消息會(huì)一直存在消息隊(duì)列中
消費(fèi)端代碼
D:\phpstudy_pro\WWW\rabbitmq\worker\worker1.php
<?php
require_once "../vendor/autoload.php";
use PhpAmqpLib\Connection\AMQPStreamConnection;
//建立connction
$connection = new AMQPStreamConnection('192.168.10.105', 5672, 'root', 'root', 'order');
//Channel
$channel = $connection->channel();
//聲明隊(duì)列名為:task_queue
$queue_name = 'task_queue';
$channel->queue_declare($queue_name, false, true, false, false);
echo " [*] Waiting for messages. To exit press CTRL+C\n";
$callback = function ($msg) {
echo 'received = ', $msg->body . "\n";
//確認(rèn)消息已被消費(fèi),從生產(chǎn)隊(duì)列中移除
$msg->ack();
};
//設(shè)置消費(fèi)成功后才能繼續(xù)進(jìn)行下一個(gè)消費(fèi)
$channel->basic_qos(null, 1, null);
//開啟消費(fèi)no_ack=false,設(shè)置為手動(dòng)應(yīng)答
$channel->basic_consume($queue_name, '', false, false, false, false, $callback);
//不斷的循環(huán)進(jìn)行消費(fèi)
while ($channel->is_open()) {
$channel->wait();
}
//關(guān)閉連接
$channel->close();
$connection->close();發(fā)布/訂閱模式
是要是公用一個(gè)交換機(jī)的消費(fèi)端都能收到同樣的消息,類似廣播的功能
文檔:rabbitmq Publish/Subscribe
https://www.rabbitmq.com/tutorials/tutorial-three-php.html
rabbitmq Exchange類型
交換器、路由鍵、綁定 Exchange:交換器。發(fā)送消息的AMQP實(shí)體。交換器拿到一個(gè)消息之后將它路由給一個(gè)或幾個(gè)隊(duì)列。它使用哪種路由算法是由交換機(jī)類型和被稱作綁定(Binding)的規(guī)則所決定的。RabbitMQ有四種類型。 RoutingKey:路由鍵。生產(chǎn)者將消息發(fā)送給交換器。一般會(huì)指定一個(gè)RoutingKey,用來指定這個(gè)消息的路由規(guī)則,而這個(gè)RoutingKey需要與交換器類型和綁定鍵(BindingKey)聯(lián)合使用才能最終失效。 Binding:綁定。綁定(Binding)是交換機(jī)(Exchange)將消息(Message)路由給隊(duì)列(Queue)所需遵循的規(guī)則。 # 四種模式 Direct 定向 消息與一個(gè)特定的路由鍵完全匹配 Topic 通配符 路由鍵和某模式進(jìn)行匹配 Fanout 廣播 發(fā)送到該類型交換機(jī)的消息都會(huì)被廣播到與該交換機(jī)綁定的所有隊(duì)列 Headers 不處理路由鍵,而是根據(jù)發(fā)送的消息內(nèi)容中的headers屬性進(jìn)行匹配
exchange_declare($exchange, $type, $passive = false, $durable = false, $auto_delete = true, $internal = false, $nowait = false, $arguments = array(), $ticket = null) 。試探性申請(qǐng)一個(gè)交換器,若該交換器不存在,則創(chuàng)建;若存在,則跳過。

生產(chǎn)者代碼
D:\phpstudy_pro\WWW\rabbitmq\ps\pro.php
<?php
require_once "../vendor/autoload.php";
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
//建立connction
$connection = new AMQPStreamConnection('192.168.10.105', 5672, 'root', 'root', 'order');
//Channel
$channel = $connection->channel();
//聲明交換器
$exc_name = 'exch';
$channel->exchange_declare($exc_name, 'fanout', false, false, false);
//聲明數(shù)據(jù)
$data = 'this is fanout message';
//創(chuàng)建消息
$msg = new AMQPMessage($data, ['delivery_mode' => AMQPMessage::DELIVERY_MODE_NON_PERSISTENT]);
//發(fā)布消息
$channel->basic_publish($msg, $exc_name);
//關(guān)閉連接
$channel->close();
$connection->close();
fanout模式消費(fèi)者消費(fèi)消息
是要是公用一個(gè)交換機(jī)的消費(fèi)端都能收到同樣的消息,類似廣播的功能
當(dāng)消費(fèi)端運(yùn)行時(shí)才會(huì)顯示該隊(duì)列
消費(fèi)端:
D:\phpstudy_pro\WWW\rabbitmq\ps\worker1.php
<?php
require_once "../vendor/autoload.php";
use PhpAmqpLib\Connection\AMQPStreamConnection;
//建立connction
$connection = new AMQPStreamConnection('192.168.10.105', 5672, 'root', 'root', 'order');
//Channel
$channel = $connection->channel();
//聲明交換器
$exc_name = 'exch';
$channel->exchange_declare($exc_name, 'fanout', false, false, false);
//獲取系統(tǒng)生成的消息隊(duì)列名稱
list($queue_name, ,) = $channel->queue_declare('', false, false, true, false);
//將隊(duì)列名與交換器名進(jìn)行綁定
$channel->queue_bind($queue_name,$exc_name);
$callback = function ($msg) {
echo 'received = ', $msg->body . "\n";
//確認(rèn)消息已被消費(fèi),從生產(chǎn)隊(duì)列中移除
$msg->ack();
};
//設(shè)置消費(fèi)成功后才能繼續(xù)進(jìn)行下一個(gè)消費(fèi)
$channel->basic_qos(null, 1, null);
//開啟消費(fèi)no_ack=false,設(shè)置為手動(dòng)應(yīng)答
$channel->basic_consume($queue_name, '', false, false, false, false, $callback);
//不斷的循環(huán)進(jìn)行消費(fèi)
while ($channel->is_open()) {
$channel->wait();
}
//關(guān)閉連接
$channel->close();
$connection->close();文檔:
https://www.rabbitmq.com/tutorials/tutorial-four-php.html
用來指定不同的交換機(jī)和指定routing_key,在消費(fèi)端進(jìn)行消費(fèi)
生產(chǎn)者代碼:
D:\phpstudy_pro\WWW\rabbitmq\routing\pro.php
<?php
require_once "../vendor/autoload.php";
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
//建立connction
$connection = new AMQPStreamConnection('192.168.10.105', 5672, 'root', 'root', 'order');
//Channel
$channel = $connection->channel();
//聲明交換器
$exc_name = 'direct_log';
//指定routing_key
$routing_key = 'info';
//指定交換機(jī)類型為direct
$channel->exchange_declare($exc_name, 'direct', false, false, false);
//聲明數(shù)據(jù)
$data = 'this is ' . $routing_key . ' message';
//創(chuàng)建消息
$msg = new AMQPMessage($data, ['delivery_mode' => AMQPMessage::DELIVERY_MODE_NON_PERSISTENT]);
//發(fā)布消息
//指定使用的routing_key
$channel->basic_publish($msg, $exc_name, $routing_key);
//關(guān)閉連接
$channel->close();
$connection->close();消費(fèi)者代碼
D:\phpstudy_pro\WWW\rabbitmq\routing\info.php
<?php
require_once "../vendor/autoload.php";
use PhpAmqpLib\Connection\AMQPStreamConnection;
//建立connction
$connection = new AMQPStreamConnection('192.168.10.105', 5672, 'root', 'root', 'order');
//Channel
$channel = $connection->channel();
//聲明交換器
$exc_name = 'direct_log';
//指定routing_key
$routing_key = 'info';
$channel->exchange_declare($exc_name, 'direct', false, false, false);
//獲取系統(tǒng)生成的消息隊(duì)列名稱
list($queue_name, ,) = $channel->queue_declare('', false, false, true, false);
//將隊(duì)列名與交換器名進(jìn)行綁定,并指定routing_key
$channel->queue_bind($queue_name,$exc_name,$routing_key);
$callback = function ($msg) {
echo 'received = ', $msg->body . "\n";
//確認(rèn)消息已被消費(fèi),從生產(chǎn)隊(duì)列中移除
$msg->ack();
};
//設(shè)置消費(fèi)成功后才能繼續(xù)進(jìn)行下一個(gè)消費(fèi)
$channel->basic_qos(null, 1, null);
//開啟消費(fèi)no_ack=false,設(shè)置為手動(dòng)應(yīng)答
$channel->basic_consume($queue_name, '', false, false, false, false, $callback);
//不斷的循環(huán)進(jìn)行消費(fèi)
while ($channel->is_open()) {
$channel->wait();
}
//關(guān)閉連接
$channel->close();
$connection->close();通配符的匹配模式
如消費(fèi)端中routing_key = ‘user.*’;
生產(chǎn)者:
指定routing_key= ‘user.top’
<?php
require_once "../vendor/autoload.php";
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
//建立connction
$connection = new AMQPStreamConnection('192.168.10.105', 5672, 'root', 'root', 'order');
//Channel
$channel = $connection->channel();
//聲明交換器
$exc_name = 'topic_log';
//指定routing_key
$routing_key = 'user.top';
//指定交換機(jī)類型為direct
$channel->exchange_declare($exc_name, 'topic', false, false, false);
//聲明數(shù)據(jù)
$data = 'this is ' . $routing_key . ' message';
//創(chuàng)建消息
$msg = new AMQPMessage($data, ['delivery_mode' => AMQPMessage::DELIVERY_MODE_NON_PERSISTENT]);
//發(fā)布消息
//指定使用的routing_key
$channel->basic_publish($msg, $exc_name, $routing_key);
//關(guān)閉連接
$channel->close();
$connection->close();消費(fèi)者
消費(fèi)端中routing_key = ‘user.*’;
<?php
require_once "../vendor/autoload.php";
use PhpAmqpLib\Connection\AMQPStreamConnection;
//建立connction
$connection = new AMQPStreamConnection('192.168.10.105', 5672, 'root', 'root', 'order');
//Channel
$channel = $connection->channel();
//聲明交換器
$exc_name = 'direct_log';
//指定routing_key
$routing_key = 'user.*';
$channel->exchange_declare($exc_name, 'topic', false, false, false);
//獲取系統(tǒng)生成的消息隊(duì)列名稱
list($queue_name, ,) = $channel->queue_declare('', false, false, true, false);
//將隊(duì)列名與交換器名進(jìn)行綁定,并指定routing_key
$channel->queue_bind($queue_name,$exc_name,$routing_key);
$callback = function ($msg) {
echo 'received = ', $msg->body . "\n";
//確認(rèn)消息已被消費(fèi),從生產(chǎn)隊(duì)列中移除
$msg->ack();
};
//設(shè)置消費(fèi)成功后才能繼續(xù)進(jìn)行下一個(gè)消費(fèi)
$channel->basic_qos(null, 1, null);
//開啟消費(fèi)no_ack=false,設(shè)置為手動(dòng)應(yīng)答
$channel->basic_consume($queue_name, '', false, false, false, false, $callback);
//不斷的循環(huán)進(jìn)行消費(fèi)
while ($channel->is_open()) {
$channel->wait();
}
//關(guān)閉連接
$channel->close();
$connection->close();到此,相信大家對(duì)“消息隊(duì)列RabbitMQ入門與PHP實(shí)例分析”有了更深的了解,不妨來實(shí)際操作一番吧!這里是創(chuàng)新互聯(lián)網(wǎng)站,更多相關(guān)內(nèi)容可以進(jìn)入相關(guān)頻道進(jìn)行查詢,關(guān)注我們,繼續(xù)學(xué)習(xí)!
當(dāng)前題目:消息隊(duì)列RabbitMQ入門與PHP實(shí)例分析
文章路徑:http://www.chinadenli.net/article6/peseog.html
成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供動(dòng)態(tài)網(wǎng)站、Google、品牌網(wǎng)站建設(shè)、關(guān)鍵詞優(yōu)化、ChatGPT、網(wǎng)站排名
聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請(qǐng)盡快告知,我們將會(huì)在第一時(shí)間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如需處理請(qǐng)聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時(shí)需注明來源: 創(chuàng)新互聯(lián)