延遲任務(wù)應(yīng)用場景

我們提供的服務(wù)有:成都做網(wǎng)站、網(wǎng)站設(shè)計、微信公眾號開發(fā)、網(wǎng)站優(yōu)化、網(wǎng)站認(rèn)證、蘇州ssl等。為超過千家企事業(yè)單位解決了網(wǎng)站和推廣的問題。提供周到的售前咨詢和貼心的售后服務(wù),是有科學(xué)管理、有技術(shù)的蘇州網(wǎng)站制作公司
場景一:物聯(lián)網(wǎng)系統(tǒng)經(jīng)常會遇到向終端下發(fā)命令,如果命令一段時間沒有應(yīng)答,就需要設(shè)置成超時。
場景二:訂單下單之后30分鐘后,如果用戶沒有付錢,則系統(tǒng)自動取消訂單。
實現(xiàn)方案
定時任務(wù)輪詢數(shù)據(jù)庫,看是否有產(chǎn)生新任務(wù),如果產(chǎn)生則消費任務(wù)
pcntl_alarm為進(jìn)程設(shè)置一個鬧鐘信號
swoole的異步高精度定時器:swoole_time_tick(類似javascript的setInterval)和swoole_time_after(相當(dāng)于javascript的setTimeout)
rabbitmq延遲任務(wù)
以上四種方案,如果生產(chǎn)環(huán)境有使用到swoole建議使用第三種方案。此篇文章重點講述第四種方案實現(xiàn)
Rabbitmq延遲隊列實現(xiàn)
RabbitMQ沒有直接去實現(xiàn)延遲隊列這個功能。而是需要通過消息的TTL和死信Exchange這兩者的組合來實現(xiàn)。
消息的TTL就是消息的存活時間。RabbitMQ可以對隊列和消息分別設(shè)置TTL。對隊列設(shè)置就是隊列沒有消費者連著的保留時間,也可以對每一個單獨的消息做單獨的設(shè)置。超過了這個時間,我們認(rèn)為這個消息就死了,稱之為死信。如果隊列設(shè)置了,消息也設(shè)置了,那么會取小的。所以一個消息如果被路由到不同的隊列中,這個消息死亡的時間有可能不一樣(不同的隊列設(shè)置)。這里單講單個消息的TTL,因為它才是實現(xiàn)延遲任務(wù)的關(guān)鍵。
可以通過設(shè)置消息的expiration字段或者隊列x-message-ttl屬性來設(shè)置時間,兩者是一樣的效果。下面例子是通過隊列的ttl實現(xiàn)死信
$queue = new AMQPQueue($channel); $queue->setName($params['queueName']?:''); $queue->setFlags(AMQP_DURABLE); $queue->setArguments(array( 'x-dead-letter-exchange' => 'delay_exchange', 'x-dead-letter-routing-key' => 'delay_route', 'x-message-ttl' => 60000, )); $queue->declareQueue();
當(dāng)上面的消息扔到該隊列中后,過了60秒,如果沒有被消費,它就死了。不會被消費者消費到。這個消息后面的,沒有“死掉”的消息對頂上來,被消費者消費。死信在隊列中并不會被刪除和釋放,它會被統(tǒng)計到隊列的消息數(shù)中去。單靠死信還不能實現(xiàn)延遲任務(wù),還要靠Dead Letter Exchange。
Exchage的概念在這里就不在贅述,可以從這里進(jìn)行了解。一個消息在滿足如下條件下,會進(jìn)死信路由,記住這里是路由而不是隊列,一個路由可以對應(yīng)很多隊列。
1. 一個消息被Consumer拒收了,并且reject方法的參數(shù)里requeue是false。也就是說不會被再次放在隊列里,被其他消費者使用。
2. 上面的消息的TTL到了,消息過期了。
3. 隊列的長度限制滿了。排在前面的消息會被丟棄或者扔到死信路由上。
Dead Letter Exchange其實就是一種普通的exchange,和創(chuàng)建其他exchange沒有兩樣。只是在某一個設(shè)置Dead Letter Exchange的隊列中有消息過期了,會自動觸發(fā)消息的轉(zhuǎn)發(fā),發(fā)送到Dead Letter Exchange中去。
示例
生產(chǎn)者:
<?php
header('Content-Type:text/html;charset=utf8;');
$params = array(
'exchangeName' => 'test_cache_exchange',
'queueName' => 'test_cache_queue',
'routeKey' => 'test_cache_route',
);
$connectConfig = array(
'host' => 'localhost',
'port' => 5672,
'login' => 'rabbitmq',
'password' => 'rabbitmq',
'vhost' => '/'
);
//var_dump(extension_loaded('amqp')); 判斷是否加載amqp擴展
//exit();
try {
$conn = new AMQPConnection($connectConfig);
$conn->connect();
if (!$conn->isConnected()) {
//die('Conexiune esuata');
//TODO 記錄日志
echo 'rabbit-mq 連接錯誤:', json_encode($connectConfig);
exit();
}
$channel = new AMQPChannel($conn);
if (!$channel->isConnected()) {
// die('Connection through channel failed');
//TODO 記錄日志
echo 'rabbit-mq Connection through channel failed:', json_encode($connectConfig);
exit();
}
$exchange = new AMQPExchange($channel);
$exchange->setFlags(AMQP_DURABLE);//持久化
$exchange->setName($params['exchangeName']?:'');
$exchange->setType(AMQP_EX_TYPE_DIRECT); //direct類型
$exchange->declareExchange();
//$channel->startTransaction();
$queue = new AMQPQueue($channel);
$queue->setName($params['queueName']?:'');
$queue->setFlags(AMQP_DURABLE);
$queue->setArguments(array(
'x-dead-letter-exchange' => 'delay_exchange',
'x-dead-letter-routing-key' => 'delay_route',
'x-message-ttl' => 60000,
));
$queue->declareQueue();
//綁定
$queue->bind($params['exchangeName'], $params['routeKey']);
} catch(Exception $e) {
}
//$num = mt_rand(100, 500);
$num = 1;
//生成消息
$exchange->publish("this is test message..", $params['routeKey'], AMQP_MANDATORY, array('delivery_mode'=>2));消費者:
<?php
header('Content-Type:text/html;charset=utf8;');
$params = array(
'exchangeName' => 'delay_exchange',
'queueName' => 'delay_queue',
'routeKey' => 'delay_route',
);
$connectConfig = array(
'host' => 'localhost',
'port' => 5672,
'login' => 'rabbitmq',
'password' => 'rabbitmq',
'vhost' => '/'
);
//var_dump(extension_loaded('amqp'));
//exit();
try {
$conn = new AMQPConnection($connectConfig);
$conn->connect();
if (!$conn->isConnected()) {
//die('Conexiune esuata');
//TODO 記錄日志
echo 'rabbit-mq 連接錯誤:', json_encode($connectConfig);
exit();
}
$channel = new AMQPChannel($conn);
if (!$channel->isConnected()) {
// die('Connection through channel failed');
//TODO 記錄日志
echo 'rabbit-mq Connection through channel failed:', json_encode($connectConfig);
exit();
}
$exchange = new AMQPExchange($channel);
$exchange->setFlags(AMQP_DURABLE);//聲明一個已存在的交換器的,如果不存在將拋出異常,這個一般用在consume端
$exchange->setName($params['exchangeName']?:'');
$exchange->setType(AMQP_EX_TYPE_DIRECT); //direct類型
$exchange->declareExchange();
//$channel->startTransaction();
$queue = new AMQPQueue($channel);
$queue->setName($params['queueName']?:'');
$queue->setFlags(AMQP_DURABLE);
$queue->declareQueue();
//綁定
$queue->bind($params['exchangeName'], $params['routeKey']);
} catch(Exception $e) {
echo $e->getMessage();
exit();
}
function callback(AMQPEnvelope $message) {
global $queue;
if ($message) {
$body = $message->getBody();
echo $body . PHP_EOL;
$queue->ack($message->getDeliveryTag());
} else {
echo 'no message' . PHP_EOL;
}
}
//$queue->consume('callback'); 第一種消費方式,但是會阻塞,程序一直會卡在此處
//第二種消費方式,非阻塞
$start = time();
while(true)
{
$message = $queue->get();
if(!empty($message))
{
echo $message->getBody();
$queue->ack($message->getDeliveryTag()); //應(yīng)答,代表該消息已經(jīng)消費
$end = time();
echo '<br>' . ($end - $start);
exit();
}
else
{
//echo 'message not found' . PHP_EOL;
}
}這個示例注意要跟上一篇博文示例作對比rabbitmq以及php amqp擴展使用,最關(guān)鍵的點就是在生產(chǎn)者那里
$queue->setArguments(array( 'x-dead-letter-exchange' => 'delay_exchange', 'x-dead-letter-routing-key' => 'delay_route', 'x-message-ttl' => 60000, ));
詳細(xì)過程:
首先由正常隊列(test_cache_queue)和正常exchange(test_cache_exchange),兩者相綁定。
該正常隊列設(shè)置了死信路由(delay_exchange)和死信路由key以及TTL,生產(chǎn)者生產(chǎn)消息到正常隊列和正常路由上.
當(dāng)正常隊列設(shè)置TTL時間一到,那延遲消息就會自動發(fā)布到死信路由
消費者通過死信路由(delay_exchange)和死信隊列(delay_queue)來消費
參考文章:
https://www.cnblogs.com/haoxinyue/p/6613706.html
標(biāo)題名稱:rabbitmq延遲隊列之php實現(xiàn)
瀏覽路徑:http://www.chinadenli.net/article6/gjosig.html
成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供ChatGPT、電子商務(wù)、定制網(wǎng)站、網(wǎng)站改版、網(wǎng)站維護(hù)、域名注冊
聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請盡快告知,我們將會在第一時間刪除。文章觀點不代表本網(wǎng)站立場,如需處理請聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時需注明來源: 創(chuàng)新互聯(lián)