欧美一区二区三区老妇人-欧美做爰猛烈大尺度电-99久久夜色精品国产亚洲a-亚洲福利视频一区二区

消息隊(duì)列RabbitMQ入門與PHP實(shí)例分析

本篇內(nèi)容主要講解“消息隊(duì)列RabbitMQ入門與PHP實(shí)例分析”,感興趣的朋友不妨來看看。本文介紹的方法操作簡(jiǎn)單快捷,實(shí)用性強(qiáng)。下面就讓小編來帶大家學(xué)習(xí)“消息隊(duì)列RabbitMQ入門與PHP實(shí)例分析”吧!

在廊坊等地區(qū),都構(gòu)建了全面的區(qū)域性戰(zhàn)略布局,加強(qiáng)發(fā)展的系統(tǒng)性、市場(chǎng)前瞻性、產(chǎn)品創(chuàng)新能力,以專注、極致的服務(wù)理念,為客戶提供網(wǎng)站設(shè)計(jì)制作、成都做網(wǎng)站 網(wǎng)站設(shè)計(jì)制作按需開發(fā)網(wǎng)站,公司網(wǎng)站建設(shè),企業(yè)網(wǎng)站建設(shè),成都品牌網(wǎng)站建設(shè),網(wǎng)絡(luò)營(yíng)銷推廣,外貿(mào)網(wǎng)站建設(shè),廊坊網(wǎng)站建設(shè)費(fèi)用合理。

消息隊(duì)列RabbitMQ入門與PHP實(shí)例分析

消息隊(duì)列介紹以及消息隊(duì)列應(yīng)用場(chǎng)景

RabbitMQ

說明
MQ(Message Queue) 即消息隊(duì)列,是應(yīng)用間的通信方式,消息發(fā)送后可立即返回,由消息系統(tǒng)來確保消息的可靠傳遞。”消息隊(duì)列“是在消息的傳輸過程中保存消息的容器。它是典型的:生產(chǎn)者、消費(fèi)者模型。生產(chǎn)者不斷向消息隊(duì)列中生產(chǎn)消息,消費(fèi)者不斷的從隊(duì)列中獲取消息。因?yàn)橄⒌纳a(chǎn)和消費(fèi)都是異步的,而且只關(guān)心消息的發(fā)送和接收,沒有業(yè)務(wù)邏輯的侵入,這樣就實(shí)現(xiàn)生產(chǎn)者和消費(fèi)者的解耦。

為什么使用消息中間件?
消息隊(duì)列是分布式系統(tǒng)中重要的組件,解決應(yīng)用解耦,異步消息,流量削峰等問題,實(shí)現(xiàn)高并發(fā),高可用,可伸縮和最終一致性架構(gòu)

異步處理
用戶注冊(cè)信息后需要發(fā)送郵件和注冊(cè)短信
1、用戶注冊(cè)信息寫入數(shù)據(jù)庫(kù)后即使返回注冊(cè)成功的信息
2、發(fā)送郵件和注冊(cè)短信通過消息隊(duì)列異步執(zhí)行,用戶不需要等待這兩個(gè)操作
消息隊(duì)列RabbitMQ入門與PHP實(shí)例分析

應(yīng)用解耦
用戶下單后,訂單系統(tǒng)需要通知庫(kù)存系統(tǒng)。傳統(tǒng)的做法是,訂單系統(tǒng)調(diào)用庫(kù)存系統(tǒng)的接口,進(jìn)行增減庫(kù)存
1、用戶下單入列生產(chǎn),返回成功提示
2、隊(duì)列消費(fèi)庫(kù)存系統(tǒng),進(jìn)行庫(kù)存增減
消息隊(duì)列RabbitMQ入門與PHP實(shí)例分析

流量削峰
流量削峰也是消息隊(duì)列中的常見場(chǎng)景,一般在秒殺或團(tuán)搶活動(dòng)中使用廣泛
1、當(dāng)一批用戶請(qǐng)求過來進(jìn)入列隊(duì),控制入列數(shù)量,超出一定數(shù)量返回秒殺結(jié)束
2、然后隊(duì)列一個(gè)個(gè)按照先進(jìn)先出進(jìn)行隊(duì)列消費(fèi)
消息隊(duì)列RabbitMQ入門與PHP實(shí)例分析

Rabbitmq特性

可靠性(Reliability)RabbitMQ 使用一些機(jī)制來保證可靠性,如持久化、傳輸確認(rèn)、發(fā)布確認(rèn)。
靈活的路由(Flexible Routing)在消息進(jìn)入隊(duì)列之前,通過 Exchange 來路由消息的。對(duì)于典型的路由功能,RabbitMQ 已經(jīng)提供了一些內(nèi)置的 Exchange 來實(shí)現(xiàn)。針對(duì)更復(fù)雜的路由功能,可以將多個(gè) Exchange 綁定在一起,也通過插件機(jī)制實(shí)現(xiàn)自己的 Exchange 。
消息集群(Clustering)多個(gè) RabbitMQ 服務(wù)器可以組成一個(gè)集群,形成一個(gè)邏輯 Broker 。
高可用(Highly Available Queues)隊(duì)列可以在集群中的機(jī)器上進(jìn)行鏡像,使得在部分節(jié)點(diǎn)出問題的情況下隊(duì)列仍然可用。
多種協(xié)議(Multi-protocol)RabbitMQ 支持多種消息隊(duì)列協(xié)議,比如 STOMP、MQTT 等等。
多語言客戶端(Many Clients)RabbitMQ 幾乎支持所有常用語言,比如PHP Java、.NET、Ruby 等等。
管理界面(Management UI)RabbitMQ 提供了一個(gè)易用的用戶界面,使得用戶可以監(jiān)控和管理消息 Broker 的許多方面。
跟蹤機(jī)制(Tracing)如果消息異常,RabbitMQ 提供了消息跟蹤機(jī)制,使用者可以找出發(fā)生了什么。
插件機(jī)制(Plugin System)RabbitMQ 提供了許多插件,來從多方面進(jìn)行擴(kuò)展,也可以編寫自己的插件。

RabbitMQ的工作原理
消息隊(duì)列RabbitMQ入門與PHP實(shí)例分析

Broker: 接收和分發(fā)消息的應(yīng)用,RabbitMQ Server就是Message Broker。

Virtual host: 類似于MySQL的數(shù)據(jù)庫(kù),當(dāng)多個(gè)不同的用戶使用同一個(gè)RabbitMQ server提供的服務(wù)時(shí),可以劃分出多個(gè)vhost,每個(gè)用戶在自己的vhost創(chuàng)建exchange/queue等。

Connection: publisher/consumer和broker之間的TCP連接。

Channel: 如果每一次訪問RabbitMQ都建立一個(gè)Connection,在消息量大的時(shí)候建立TCP Connection的開銷將是巨大的,效率也較低。Channel是在connection內(nèi)部建立的邏輯連接Channel作為輕量級(jí)的Connection極大減少了操作系統(tǒng)建立TCP connection的開銷。

Exchange: message到達(dá)broker的第一站,根據(jù)分發(fā)規(guī)則,匹配查詢表中的routing key,分發(fā)消息到queue中去。常用的類型有:direct (point-to-point), topic (publish-subscribe) and fanout (multicast)。

Queue: 消息最終被送到這里等待consumer取走。一個(gè)message可以被同時(shí)拷貝到多個(gè)queue中。

rabbitmq安裝啟動(dòng)

RabbitMQ官方地址:http://www.rabbitmq.com
安裝rabbitmq需要先安裝erlang

第一步:erlang 安裝
安裝rabbitmq需要先安裝erlang,centos7不支持erlang 24版本的安裝
消息隊(duì)列RabbitMQ入門與PHP實(shí)例分析
下載:
消息隊(duì)列RabbitMQ入門與PHP實(shí)例分析
消息隊(duì)列RabbitMQ入門與PHP實(shí)例分析

消息隊(duì)列RabbitMQ入門與PHP實(shí)例分析

消息隊(duì)列RabbitMQ入門與PHP實(shí)例分析
消息隊(duì)列RabbitMQ入門與PHP實(shí)例分析

消息隊(duì)列RabbitMQ入門與PHP實(shí)例分析

# 系統(tǒng)  centos 7# 下載erlang包,手動(dòng)下載后上傳至服務(wù)器,我在使用wget下載后無法安裝,這里沒明白


# 安裝
yum install erlang-23.3.4.4-1.el7.x86_64.rpm

# 驗(yàn)證安裝是否成功
erl

消息隊(duì)列RabbitMQ入門與PHP實(shí)例分析

第二步:安裝rabbitmq
消息隊(duì)列RabbitMQ入門與PHP實(shí)例分析

消息隊(duì)列RabbitMQ入門與PHP實(shí)例分析
消息隊(duì)列RabbitMQ入門與PHP實(shí)例分析

# 系統(tǒng)  centos 7# 下載rabbitmq包,手動(dòng)下載后上傳至服務(wù)器,我在使用wget下載后無法安裝,這里沒明白


# 安裝
yum install rabbitmq-server-3.8.19-1.el7.noarch.rpm 

# 啟動(dòng)
systemctl start rabbitmq-server

# 關(guān)閉
systemctl stop rabbitmq-server

# 查看默認(rèn)端口服務(wù)是否啟動(dòng)
netstat -tunlp

消息隊(duì)列RabbitMQ入門與PHP實(shí)例分析

php消息隊(duì)列rabbitmq各種模式使用

rabbitmq管理界面及命令行使用

4369:epmd(Erlang Port Mapper Daemon),erlang服務(wù)端口

5672 :client端通信口

15672:HTTP API客戶端,管理UI(僅在啟用了管理插件的情況下)不一定會(huì)啟動(dòng)

25672:用于節(jié)點(diǎn)間通信(Erlang分發(fā)服務(wù)器端口)

rabbitmq 管理命令
啟動(dòng)15672:HTTP API客戶端,管理UI(僅在啟用了管理插件的情況下)

# 啟動(dòng)rabbitmq_management插件
rabbitmq-plugins  enable  rabbitmq_management

# 查看所有插件
rabbitmq-plugins  list

測(cè)試訪問UI界面:(此時(shí)非localhost地址是無法登錄)
http://192.168.10.105:15672/

rabbitmq 配置管理界面

# 新增一個(gè)用戶  
rabbitmqctl add_user 【用戶名Username】 【密碼Password】
rabbitmqctl add_user root root
# 刪除一個(gè)用戶  
rabbitmqctl delete_user Username

# 修改用戶的密碼 
rabbitmqctl change_password Username Newpassword 

# 查看當(dāng)前用戶列表    
rabbitmqctl list_users

# 設(shè)置用戶角色的命令為: 
rabbitmqctl set_user_tags User Tag  
rabbitmqctl set_user_tags root administrator
# User為用戶名, Tag為角色名(對(duì)應(yīng)于上面的administrator,monitoring,policymaker,management,或其他自定義名稱)。

命令行創(chuàng)建vhost以及php擴(kuò)展安裝
類似于mysql的數(shù)據(jù)庫(kù),當(dāng)多個(gè)不同的用戶使用同一個(gè)RabbitMQ server提供的服務(wù)時(shí),可以劃分出多個(gè)vhost,每個(gè)用戶在自己的vhost創(chuàng)建exchange/queue等。

1)查看不同用戶的vhost
消息隊(duì)列RabbitMQ入門與PHP實(shí)例分析
消息隊(duì)列RabbitMQ入門與PHP實(shí)例分析

創(chuàng)建vhost,以及分配權(quán)限

# 新增vhost
rabbitmqctl add_vhost   vhostname
rabbitmqctl add_vhost order

# 查看vhost列表
rabbitmqctl  list_vhosts

#為vhost添加用戶
rabbitmqctl set_permissions -p vhostname username ".*" ".*" ".*"rabbitmqctl set_permissions -p order root ".*" ".*" ".*"
 ".*" ".*" ".*"后邊三個(gè).*分別代表:配置權(quán)限、寫權(quán)限、讀權(quán)限

消息隊(duì)列RabbitMQ入門與PHP實(shí)例分析

2)為php安裝rabbitmq擴(kuò)展安裝
https://github.com/php-amqplib/php-amqplib 擴(kuò)展安裝

修改阿里云鏡像

composer config -g repo.packagist composer https://mirrors.aliyun.com/composer/

開始下載–這里有時(shí)候會(huì)下載成2.8低版本的,需要指定版本
,下載不成功則升級(jí)composer、php.ini 打開 sockets 擴(kuò)展和切換國(guó)內(nèi)鏡像

消息隊(duì)列RabbitMQ入門與PHP實(shí)例分析

# 升級(jí)composer
composer self-update

#php.ini 打開 sockets 擴(kuò)展

#下載指定版本
composer require php-amqplib/php-amqplib=^3.0

simple模式生產(chǎn)者消息推送到消息隊(duì)列
文檔:
https://www.rabbitmq.com/tutorials/tutorial-one-php.html

簡(jiǎn)單的生產(chǎn)者與消息者
消息隊(duì)列RabbitMQ入門與PHP實(shí)例分析
生產(chǎn)者代碼
http://localhost/rabbitmq/simple/pro.php

<?php

require_once "../vendor/autoload.php";

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

//生產(chǎn)者
//Connection: publisher/consumer和broker之間的TCP連接
//Channel: 如果每一次訪問RabbitMQ都建立一個(gè)Connection,在消息量大的時(shí)候建立TCP Connection的開銷將是巨大的,效率也較低。Channel是在connection內(nèi)部建立的邏輯連接Channel作為輕量級(jí)的Connection極大減少了操作系統(tǒng)建立TCP connection的開銷。

//建立connction
$connection = new AMQPStreamConnection('192.168.10.105', 5672, 'root', 'root', 'order');
//Channel
$channel = $connection->channel();
//聲明隊(duì)列名為:goods
$queue_name = 'goods';
$channel->queue_declare($queue_name, false, true, false, false);

//生產(chǎn)數(shù)據(jù)
$data = 'this is messge';
//創(chuàng)建消息
$msg = new AMQPMessage($data, ['delivery_mode' => AMQPMessage::DELIVERY_MODE_NON_PERSISTENT]);
//發(fā)布消息
$channel->basic_publish($msg, $exchange = '', $queue_name);
//關(guān)閉連接
$channel->close();
$connection->close();

運(yùn)行生產(chǎn)者腳本:
http://localhost/rabbitmq/simple/pro.php
消息隊(duì)列RabbitMQ入門與PHP實(shí)例分析
點(diǎn)擊goods隊(duì)列可以進(jìn)入到消息詳情
消息隊(duì)列RabbitMQ入門與PHP實(shí)例分析

simple模式消費(fèi)者接受消息

http://localhost/rabbitmq/simple/con.php

<?php
require_once "../vendor/autoload.php";

use PhpAmqpLib\Connection\AMQPStreamConnection;

//建立connction
$connection = new AMQPStreamConnection('192.168.10.105', 5672, 'root', 'root', 'order');
//Channel
$channel = $connection->channel();

//聲明隊(duì)列名為:goods
$queue_name = 'goods';
$channel->queue_declare($queue_name, false, true, false, false);

echo " [*] Waiting for messages. To exit press CTRL+C\n";

$callback = function ($msg) {
    echo 'received = ', $msg->body . "\n";
};
//開啟消費(fèi)
$channel->basic_consume($queue_name, '', false, true, false, false, $callback);

//不斷的循環(huán)進(jìn)行消費(fèi)
while ($channel->is_open()) {
    $channel->wait();
}

//關(guān)閉連接
$channel->close();
$connection->close();
worker模式生產(chǎn)消費(fèi)消息

rabbitmq Work Queues
一個(gè)生產(chǎn)者對(duì)應(yīng)多個(gè)消費(fèi)者,消費(fèi)特別慢時(shí)增加幾個(gè)消費(fèi)分發(fā)
消息隊(duì)列RabbitMQ入門與PHP實(shí)例分析
生產(chǎn)者,和上文生產(chǎn)者不變

<?php

require_once "../vendor/autoload.php";

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

//生產(chǎn)者
//Connection: publisher/consumer和broker之間的TCP連接
//Channel: 如果每一次訪問RabbitMQ都建立一個(gè)Connection,在消息量大的時(shí)候建立TCP Connection的開銷將是巨大的,效率也較低。Channel是在connection內(nèi)部建立的邏輯連接Channel作為輕量級(jí)的Connection極大減少了操作系統(tǒng)建立TCP connection的開銷。

//建立connction
$connection = new AMQPStreamConnection('192.168.10.105', 5672, 'root', 'root', 'order');
//Channel
$channel = $connection->channel();
//聲明隊(duì)列名為:task_queue
$queue_name = 'task_queue';
$channel->queue_declare($queue_name, false, true, false, false);

for ($i = 0; $i < 10; $i++) {
    //生產(chǎn)數(shù)據(jù)
    $data = 'this is messge' . $i;
//創(chuàng)建消息
    $msg = new AMQPMessage($data, ['delivery_mode' => AMQPMessage::DELIVERY_MODE_NON_PERSISTENT]);
//發(fā)布消息
    $channel->basic_publish($msg, $exchange = '', $queue_name);
}

//關(guān)閉連接
$channel->close();
$connection->close();

消費(fèi)者worker1
D:\phpstudy_pro\WWW\rabbitmq\worker\worker1.php

<?php
require_once "../vendor/autoload.php";

use PhpAmqpLib\Connection\AMQPStreamConnection;

//建立connction
$connection = new AMQPStreamConnection('192.168.10.105', 5672, 'root', 'root', 'order');
//Channel
$channel = $connection->channel();

//聲明隊(duì)列名為:task_queue
$queue_name = 'task_queue';
$channel->queue_declare($queue_name, false, true, false, false);

echo " [*] Waiting for messages. To exit press CTRL+C\n";

$callback = function ($msg) {
    echo 'received = ', $msg->body . "\n";
};
//開啟消費(fèi)
$channel->basic_consume($queue_name, '', false, true, false, false, $callback);

//不斷的循環(huán)進(jìn)行消費(fèi)
while ($channel->is_open()) {
    $channel->wait();
}

//關(guān)閉連接
$channel->close();
$connection->close();

消費(fèi)者worker2,代碼和worker1一樣,同時(shí)運(yùn)行開啟后會(huì)一起消費(fèi)
D:\phpstudy_pro\WWW\rabbitmq\worker\worker2.php

消費(fèi)者消費(fèi)消息ack確認(rèn)

用以確認(rèn)不會(huì)丟失消息

消費(fèi)消息
basic_consume($queue = ‘’, $consumer_tag = ‘’, $no_local = false, $no_ack = false, $exclusive = false, $nowait = false, $callback = null, $ticket = null, $arguments = array())
消息隊(duì)列RabbitMQ入門與PHP實(shí)例分析
no_ack=false,設(shè)置為手動(dòng)應(yīng)答
開啟后需要進(jìn)行消息的消費(fèi)確認(rèn)后才會(huì)進(jìn)行移除,否者該消息會(huì)一直存在消息隊(duì)列中
消息隊(duì)列RabbitMQ入門與PHP實(shí)例分析

消費(fèi)端代碼
D:\phpstudy_pro\WWW\rabbitmq\worker\worker1.php

<?php
require_once "../vendor/autoload.php";

use PhpAmqpLib\Connection\AMQPStreamConnection;


//建立connction
$connection = new AMQPStreamConnection('192.168.10.105', 5672, 'root', 'root', 'order');
//Channel
$channel = $connection->channel();

//聲明隊(duì)列名為:task_queue
$queue_name = 'task_queue';
$channel->queue_declare($queue_name, false, true, false, false);

echo " [*] Waiting for messages. To exit press CTRL+C\n";

$callback = function ($msg) {
    echo 'received = ', $msg->body . "\n";
    //確認(rèn)消息已被消費(fèi),從生產(chǎn)隊(duì)列中移除
    $msg->ack();
};

//設(shè)置消費(fèi)成功后才能繼續(xù)進(jìn)行下一個(gè)消費(fèi)
$channel->basic_qos(null, 1, null);

//開啟消費(fèi)no_ack=false,設(shè)置為手動(dòng)應(yīng)答
$channel->basic_consume($queue_name, '', false, false, false, false, $callback);


//不斷的循環(huán)進(jìn)行消費(fèi)
while ($channel->is_open()) {
    $channel->wait();
}

//關(guān)閉連接
$channel->close();
$connection->close();
fanout模式生產(chǎn)者推送到交換器

發(fā)布/訂閱模式
是要是公用一個(gè)交換機(jī)的消費(fèi)端都能收到同樣的消息,類似廣播的功能

文檔:rabbitmq Publish/Subscribe
https://www.rabbitmq.com/tutorials/tutorial-three-php.html
消息隊(duì)列RabbitMQ入門與PHP實(shí)例分析

rabbitmq Exchange類型

交換器、路由鍵、綁定

    Exchange:交換器。發(fā)送消息的AMQP實(shí)體。交換器拿到一個(gè)消息之后將它路由給一個(gè)或幾個(gè)隊(duì)列。它使用哪種路由算法是由交換機(jī)類型和被稱作綁定(Binding)的規(guī)則所決定的。RabbitMQ有四種類型。

    RoutingKey:路由鍵。生產(chǎn)者將消息發(fā)送給交換器。一般會(huì)指定一個(gè)RoutingKey,用來指定這個(gè)消息的路由規(guī)則,而這個(gè)RoutingKey需要與交換器類型和綁定鍵(BindingKey)聯(lián)合使用才能最終失效。

    Binding:綁定。綁定(Binding)是交換機(jī)(Exchange)將消息(Message)路由給隊(duì)列(Queue)所需遵循的規(guī)則。

# 四種模式
Direct  定向 消息與一個(gè)特定的路由鍵完全匹配

Topic  通配符 路由鍵和某模式進(jìn)行匹配

Fanout  廣播 發(fā)送到該類型交換機(jī)的消息都會(huì)被廣播到與該交換機(jī)綁定的所有隊(duì)列
Headers 不處理路由鍵,而是根據(jù)發(fā)送的消息內(nèi)容中的headers屬性進(jìn)行匹配

exchange_declare($exchange, $type, $passive = false, $durable = false, $auto_delete = true, $internal = false, $nowait = false, $arguments = array(), $ticket = null) 。試探性申請(qǐng)一個(gè)交換器,若該交換器不存在,則創(chuàng)建;若存在,則跳過。

消息隊(duì)列RabbitMQ入門與PHP實(shí)例分析
生產(chǎn)者代碼
D:\phpstudy_pro\WWW\rabbitmq\ps\pro.php

<?php

require_once "../vendor/autoload.php";

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

//建立connction
$connection = new AMQPStreamConnection('192.168.10.105', 5672, 'root', 'root', 'order');
//Channel
$channel = $connection->channel();
//聲明交換器
$exc_name = 'exch';
$channel->exchange_declare($exc_name, 'fanout', false, false, false);

//聲明數(shù)據(jù)
$data = 'this is fanout message';
//創(chuàng)建消息
$msg = new AMQPMessage($data, ['delivery_mode' => AMQPMessage::DELIVERY_MODE_NON_PERSISTENT]);

//發(fā)布消息
$channel->basic_publish($msg, $exc_name);
//關(guān)閉連接
$channel->close();
$connection->close();

消息隊(duì)列RabbitMQ入門與PHP實(shí)例分析

fanout模式消費(fèi)者消費(fèi)消息
是要是公用一個(gè)交換機(jī)的消費(fèi)端都能收到同樣的消息,類似廣播的功能

當(dāng)消費(fèi)端運(yùn)行時(shí)才會(huì)顯示該隊(duì)列
消息隊(duì)列RabbitMQ入門與PHP實(shí)例分析
消費(fèi)端:
D:\phpstudy_pro\WWW\rabbitmq\ps\worker1.php

<?php
require_once "../vendor/autoload.php";

use PhpAmqpLib\Connection\AMQPStreamConnection;

//建立connction
$connection = new AMQPStreamConnection('192.168.10.105', 5672, 'root', 'root', 'order');
//Channel
$channel = $connection->channel();

//聲明交換器
$exc_name = 'exch';
$channel->exchange_declare($exc_name, 'fanout', false, false, false);

//獲取系統(tǒng)生成的消息隊(duì)列名稱
list($queue_name, ,) = $channel->queue_declare('', false, false, true, false);

//將隊(duì)列名與交換器名進(jìn)行綁定
$channel->queue_bind($queue_name,$exc_name);

$callback = function ($msg) {
    echo 'received = ', $msg->body . "\n";
    //確認(rèn)消息已被消費(fèi),從生產(chǎn)隊(duì)列中移除
    $msg->ack();
};

//設(shè)置消費(fèi)成功后才能繼續(xù)進(jìn)行下一個(gè)消費(fèi)
$channel->basic_qos(null, 1, null);

//開啟消費(fèi)no_ack=false,設(shè)置為手動(dòng)應(yīng)答
$channel->basic_consume($queue_name, '', false, false, false, false, $callback);

//不斷的循環(huán)進(jìn)行消費(fèi)
while ($channel->is_open()) {
    $channel->wait();
}

//關(guān)閉連接
$channel->close();
$connection->close();
direct模式消息隊(duì)列使用

文檔:
https://www.rabbitmq.com/tutorials/tutorial-four-php.html

用來指定不同的交換機(jī)和指定routing_key,在消費(fèi)端進(jìn)行消費(fèi)
消息隊(duì)列RabbitMQ入門與PHP實(shí)例分析
生產(chǎn)者代碼:
D:\phpstudy_pro\WWW\rabbitmq\routing\pro.php

<?php

require_once "../vendor/autoload.php";

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

//建立connction
$connection = new AMQPStreamConnection('192.168.10.105', 5672, 'root', 'root', 'order');
//Channel
$channel = $connection->channel();
//聲明交換器
$exc_name = 'direct_log';
//指定routing_key
$routing_key = 'info';

//指定交換機(jī)類型為direct
$channel->exchange_declare($exc_name, 'direct', false, false, false);

//聲明數(shù)據(jù)
$data = 'this is ' . $routing_key . ' message';
//創(chuàng)建消息
$msg = new AMQPMessage($data, ['delivery_mode' => AMQPMessage::DELIVERY_MODE_NON_PERSISTENT]);

//發(fā)布消息
//指定使用的routing_key
$channel->basic_publish($msg, $exc_name, $routing_key);
//關(guān)閉連接
$channel->close();
$connection->close();

消費(fèi)者代碼
D:\phpstudy_pro\WWW\rabbitmq\routing\info.php

<?php
require_once "../vendor/autoload.php";

use PhpAmqpLib\Connection\AMQPStreamConnection;

//建立connction
$connection = new AMQPStreamConnection('192.168.10.105', 5672, 'root', 'root', 'order');
//Channel
$channel = $connection->channel();

//聲明交換器
$exc_name = 'direct_log';
//指定routing_key
$routing_key = 'info';

$channel->exchange_declare($exc_name, 'direct', false, false, false);

//獲取系統(tǒng)生成的消息隊(duì)列名稱
list($queue_name, ,) = $channel->queue_declare('', false, false, true, false);

//將隊(duì)列名與交換器名進(jìn)行綁定,并指定routing_key
$channel->queue_bind($queue_name,$exc_name,$routing_key);

$callback = function ($msg) {
    echo 'received = ', $msg->body . "\n";
    //確認(rèn)消息已被消費(fèi),從生產(chǎn)隊(duì)列中移除
    $msg->ack();
};

//設(shè)置消費(fèi)成功后才能繼續(xù)進(jìn)行下一個(gè)消費(fèi)
$channel->basic_qos(null, 1, null);

//開啟消費(fèi)no_ack=false,設(shè)置為手動(dòng)應(yīng)答
$channel->basic_consume($queue_name, '', false, false, false, false, $callback);

//不斷的循環(huán)進(jìn)行消費(fèi)
while ($channel->is_open()) {
    $channel->wait();
}

//關(guān)閉連接
$channel->close();
$connection->close();
topic模式消息隊(duì)列使用

通配符的匹配模式

如消費(fèi)端中routing_key = ‘user.*’;
消息隊(duì)列RabbitMQ入門與PHP實(shí)例分析

生產(chǎn)者:
指定routing_key= ‘user.top’

<?php

require_once "../vendor/autoload.php";

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

//建立connction
$connection = new AMQPStreamConnection('192.168.10.105', 5672, 'root', 'root', 'order');
//Channel
$channel = $connection->channel();
//聲明交換器
$exc_name = 'topic_log';
//指定routing_key
$routing_key = 'user.top';

//指定交換機(jī)類型為direct
$channel->exchange_declare($exc_name, 'topic', false, false, false);

//聲明數(shù)據(jù)
$data = 'this is ' . $routing_key . ' message';
//創(chuàng)建消息
$msg = new AMQPMessage($data, ['delivery_mode' => AMQPMessage::DELIVERY_MODE_NON_PERSISTENT]);

//發(fā)布消息
//指定使用的routing_key
$channel->basic_publish($msg, $exc_name, $routing_key);
//關(guān)閉連接
$channel->close();
$connection->close();

消費(fèi)者
消費(fèi)端中routing_key = ‘user.*’;

<?php
require_once "../vendor/autoload.php";

use PhpAmqpLib\Connection\AMQPStreamConnection;

//建立connction
$connection = new AMQPStreamConnection('192.168.10.105', 5672, 'root', 'root', 'order');
//Channel
$channel = $connection->channel();

//聲明交換器
$exc_name = 'direct_log';
//指定routing_key
$routing_key = 'user.*';

$channel->exchange_declare($exc_name, 'topic', false, false, false);

//獲取系統(tǒng)生成的消息隊(duì)列名稱
list($queue_name, ,) = $channel->queue_declare('', false, false, true, false);

//將隊(duì)列名與交換器名進(jìn)行綁定,并指定routing_key
$channel->queue_bind($queue_name,$exc_name,$routing_key);

$callback = function ($msg) {
    echo 'received = ', $msg->body . "\n";
    //確認(rèn)消息已被消費(fèi),從生產(chǎn)隊(duì)列中移除
    $msg->ack();
};

//設(shè)置消費(fèi)成功后才能繼續(xù)進(jìn)行下一個(gè)消費(fèi)
$channel->basic_qos(null, 1, null);

//開啟消費(fèi)no_ack=false,設(shè)置為手動(dòng)應(yīng)答
$channel->basic_consume($queue_name, '', false, false, false, false, $callback);

//不斷的循環(huán)進(jìn)行消費(fèi)
while ($channel->is_open()) {
    $channel->wait();
}

//關(guān)閉連接
$channel->close();
$connection->close();

到此,相信大家對(duì)“消息隊(duì)列RabbitMQ入門與PHP實(shí)例分析”有了更深的了解,不妨來實(shí)際操作一番吧!這里是創(chuàng)新互聯(lián)網(wǎng)站,更多相關(guān)內(nèi)容可以進(jìn)入相關(guān)頻道進(jìn)行查詢,關(guān)注我們,繼續(xù)學(xué)習(xí)!

當(dāng)前題目:消息隊(duì)列RabbitMQ入門與PHP實(shí)例分析
文章路徑:http://www.chinadenli.net/article6/peseog.html

成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供動(dòng)態(tài)網(wǎng)站Google品牌網(wǎng)站建設(shè)關(guān)鍵詞優(yōu)化ChatGPT網(wǎng)站排名

廣告

聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請(qǐng)盡快告知,我們將會(huì)在第一時(shí)間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如需處理請(qǐng)聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時(shí)需注明來源: 創(chuàng)新互聯(lián)

成都app開發(fā)公司