這篇文章主要介紹swoole實(shí)現(xiàn)實(shí)時(shí)推送的方法,文中介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們一定要看完!

創(chuàng)新互聯(lián)建站服務(wù)項(xiàng)目包括豐順網(wǎng)站建設(shè)、豐順網(wǎng)站制作、豐順網(wǎng)頁(yè)制作以及豐順網(wǎng)絡(luò)營(yíng)銷策劃等。多年來(lái),我們專注于互聯(lián)網(wǎng)行業(yè),利用自身積累的技術(shù)優(yōu)勢(shì)、行業(yè)經(jīng)驗(yàn)、深度合作伙伴關(guān)系等,向廣大中小型企業(yè)、政府機(jī)構(gòu)等提供互聯(lián)網(wǎng)行業(yè)的解決方案,豐順網(wǎng)站推廣取得了明顯的社會(huì)效益與經(jīng)濟(jì)效益。目前,我們服務(wù)的客戶以成都為中心已經(jīng)輻射到豐順省份的部分城市,未來(lái)相信會(huì)繼續(xù)擴(kuò)大服務(wù)區(qū)域并繼續(xù)獲得客戶的支持與信任!
swoole+redis實(shí)現(xiàn)實(shí)時(shí)數(shù)據(jù)推送
<?php
/**
* ***************************************
* 單進(jìn)程保護(hù) *
* ***************************************
*/
$phpSelf = realpath($_SERVER['PHP_SELF']);
$lockFile = $phpSelf.'.lock';
$lockFileHandle = fopen($lockFile, "w");
if ($lockFileHandle == false) {
exit("Can not create lock file $lockFile\n");
}
if (!flock($lockFileHandle, LOCK_EX + LOCK_NB)) {
exit(date("Y-m-d H:i:s")."Process already exist.\n");
}
/**
* ***************************************
* 進(jìn)入程序,定義相關(guān)配置 *
* ***************************************
*/
set_time_limit(0);
//socket會(huì)話的超時(shí)時(shí)間,根據(jù)業(yè)務(wù)場(chǎng)景設(shè)置,這里設(shè)置為永不超時(shí)
//如果設(shè)置了時(shí)間,則從socket建立=>傳輸=>關(guān)閉整個(gè)過(guò)程必須在定義的時(shí)間內(nèi)完成,否則自動(dòng)close該socket并拋出warning
ini_set('default_socket_timeout', -1);
$conf = array(
'listen' => array('host' => '0.0.0.0','port' => '8008'),
'setting' => array(
//程序允許的最大連接數(shù),用以設(shè)置server最大允許維持多少個(gè)TCP連接,超過(guò)該數(shù)量后,新連接將被拒絕,默認(rèn)為ulimit -n的值,如果設(shè)置大于ulimit -n則強(qiáng)制重置為ulimit- n,如果確實(shí)需要設(shè)置超過(guò)ulimit -n的值,請(qǐng)修改系統(tǒng)值 vim /etc/security/limits.conf 修改nofile的值
"max_conn" => 1024,
//啟用CPU親和設(shè)置(在全異步非阻塞是可啟用),在多核的服務(wù)器中,啟用此特性會(huì)將swoole的reactor線程/worker進(jìn)程綁定到固定的一個(gè)核上。可以避免進(jìn)程/線程的運(yùn)行時(shí)在多個(gè)核之間互相切換,提高CPU Cache的命中率,如何確定綁定在了哪個(gè)核上,請(qǐng)參考文檔, 查看命令: taskset -p 進(jìn)程id
'open_cpu_affinity' => 0,
//配置task進(jìn)程數(shù)量,配置此參數(shù)后將會(huì)啟用task功能。所以Server務(wù)必要注冊(cè)onTask、onFinish3個(gè)事件回調(diào)函數(shù)。如果沒(méi)有注冊(cè),服務(wù)器程序?qū)o(wú)法啟動(dòng).Task進(jìn)程是同步阻塞的,配置方式與Worker同步模式一致。
'task_worker_num' => 20,
//設(shè)置task進(jìn)程的最大任務(wù)數(shù)。一個(gè)task進(jìn)程在處理完超過(guò)此數(shù)值的任務(wù)后將自動(dòng)退出。這個(gè)參數(shù)是為了防止PHP進(jìn)程內(nèi)存溢出。如果不希望進(jìn)程自動(dòng)退出可以設(shè)置為0, 默認(rèn)是0
'task_max_request' => 1024,
//設(shè)置task的數(shù)據(jù)臨時(shí)目錄,在swoole_server中,如果投遞的數(shù)據(jù)超過(guò)8192字節(jié),將啟用臨時(shí)文件來(lái)保存數(shù)據(jù)。這里的task_tmpdir就是用來(lái)設(shè)置臨時(shí)文件保存的位置。
'task_tmpdir' => '/tmp/',
//worker進(jìn)程數(shù)量,根據(jù)業(yè)務(wù)代碼的模式作調(diào)整,全異步非阻塞可設(shè)置為CPU核數(shù)的1-4倍;同步阻塞,請(qǐng)參考文檔調(diào)整
'worker_num' => 8,
//指定swoole錯(cuò)誤日志文件
'log_file' => '/tmp/log/log.txt',
//SSL公鑰和私鑰的位置,啟用wss必須在編譯swoole時(shí)加入--enable-openssl選項(xiàng)
'ssl_cert_file' => '/usr/local/nginx/conf/server.cer',
'ssl_key_file' => '/usr/local/nginx/conf/server.key',
),
);
/**
* ***************************************
* 初始化Redis連接 *
* ***************************************
*/
$redis = null;
$redis = new Redis();
$redis->connect(REDIS_HOST, REDIS_PORT);
$redis->auth(REDIS_PWD);
$GLOBALS['redis']=$redis;
/**
* ***************************************
* 腳本重啟時(shí),清除歷史的數(shù)據(jù) *
* ***************************************
*/
$sArr = $redis->sMembers(REDIS_S_KEY);
if (!empty($sArr)) {
foreach ((array)$sArr as $key => $sc) {
$fdArr = $redis->sMembers(REDIS_S_FD.$sc);
foreach ((array)$fdArr as $k => $fd) {
$res1 = $redis->del(REDIS_FD_S.$fd);
}
$res2 = $redis->del(REDIS_S_FD.$sc);
}
$redis->del(REDIS_S_KEY);
}
$redis->del(REDIS_ZS_KEY);
/**
* ***************************************
* 綁定回調(diào)事件 *
* ***************************************
*/
$ws = null;
//wss服務(wù)
$ws = new swoole_websocket_server($conf['listen']['host'], $conf['listen']['port'], SWOOLE_PROCESS, SWOOLE_SOCK_TCP | SWOOLE_SSL);
$ws->set($conf['setting']);
/**
* Server啟動(dòng)在主進(jìn)程的主線程回調(diào)此函數(shù)
* 在此事件之前Swoole Server已進(jìn)行了如下操作
* 已創(chuàng)建了manager進(jìn)程
* 已創(chuàng)建了worker子進(jìn)程
* 已監(jiān)聽(tīng)所有TCP/UDP端口
* 已監(jiān)聽(tīng)了定時(shí)器
* 在onStart中創(chuàng)建的全局資源對(duì)象不能在worker進(jìn)程中被使用,因?yàn)榘l(fā)生onStart調(diào)用時(shí),worker進(jìn)程已經(jīng)創(chuàng)建好了。新創(chuàng)建的對(duì)象在主進(jìn)程內(nèi),worker進(jìn)程無(wú)法訪問(wèn)到此內(nèi)存區(qū)域。因此全局對(duì)象創(chuàng)建的代碼需要放置在swoole_server_start之前
*/
$ws->on('start', function ($ws) {
swoole_set_process_name(PROCESS_NAME.'_master');
});
/**
* 與onStart回調(diào)在不同進(jìn)程中并行執(zhí)行的回調(diào)函數(shù)(不存在先后順序)
* @param: $ws swoole_websocket_server object
* @param: $wid 創(chuàng)建該進(jìn)程時(shí)swoole分配的id(不是進(jìn)程id)
* 注意點(diǎn):
* 1. 此事件在worker進(jìn)程/task進(jìn)程啟動(dòng)時(shí)發(fā)生。onWorkerStart/onStart是并發(fā)執(zhí)行的,沒(méi)有先后順序,這里創(chuàng)建的對(duì)象可以在進(jìn)程生命周期內(nèi)使用
* 2. swoole1.6.11之后task_worker中也會(huì)觸發(fā)onWorkerStart,故而在下面的處理中,加入了判斷業(yè)務(wù)類型$jobType是task還是work,如果是task則命名為****_Tasker_$id,如果是worker則命名為****_Worker_$id
* 3. 發(fā)生PHP致命錯(cuò)誤或者代碼中主動(dòng)調(diào)用exit時(shí),Worker/Task進(jìn)程會(huì)退出,管理進(jìn)程會(huì)重新創(chuàng)建新的進(jìn)程
* 5. 如果想使用swoole_server_reload實(shí)現(xiàn)代碼重載入,必須在workerStart中require你的業(yè)務(wù)文件,而不是在文件頭部。在onWorkerStart調(diào)用之前已包含的文件,不會(huì)重新載入代碼。
* 6. 可以將公用的,不易變的php文件放置到onWorkerStart之前(例如上面的redis配置)。這樣雖然不能重載入代碼,但所有worker是共享的,不需要額外的內(nèi)存來(lái)保存這些數(shù)據(jù)。
* 7. onWorkerStart之后的代碼每個(gè)worker都需要在內(nèi)存中保存一份
*/
$ws->on('workerstart', function ($ws, $wid) {
$jobType = $ws->taskworker ? 'Tasker' : 'Worker';
swoole_set_process_name(PROCESS_NAME.'_'.$jobType.'_'.$wid);
$GLOBALS['ws'] = $ws; //保存server對(duì)象到全局中以待使用
if ($jobType == 'Worker') { //在某個(gè)worker進(jìn)程上綁定redis訂閱進(jìn)程
if ($wid === 0) {
$dataRedis = null;
$dataRedis = new Redis();
$dataRedis->connect(REDIS_HOST_DATA, REDIS_PORT_DATA);
$dataRedis->auth(REDIS_PWD_DATA);
//使用psubscribe訂閱指定模式的頻道,這里*表示所有頻道
//請(qǐng)注意,redis訂閱不提供區(qū)分庫(kù)(db)的功能,所以多個(gè)庫(kù)都同時(shí)在發(fā)布同一個(gè)名字的頻道時(shí),都將被訂閱到
$dataRedis->psubscribe(array("*"), "sendTask");
}
}
});
/**
* 管理進(jìn)程啟用時(shí),調(diào)用該回調(diào)函數(shù)
* 注意manager進(jìn)程中不能添加定時(shí)器
* manager進(jìn)程中可以調(diào)用sendMessage接口向其他工作進(jìn)程發(fā)送消息
*/
$ws->on('managerstart', function ($ws) {
swoole_set_process_name(PROCESS_NAME.'_manage');
});
/**
* swoole websocket服務(wù)特有的回調(diào)函數(shù),此函數(shù)在websocket服務(wù)器中必須定義實(shí)現(xiàn),否則websocket服務(wù)將無(wú)法啟動(dòng)
* 當(dāng)服務(wù)器收到來(lái)自客戶端的數(shù)據(jù)幀時(shí)會(huì)回調(diào)此函數(shù)
* @param: $ws為swoole_websocket_server對(duì)象,其結(jié)構(gòu)在調(diào)試時(shí)可var_dump查看
* @param: $frame為swoole_websocket_frame對(duì)象,包含了客戶端發(fā)來(lái)的數(shù)據(jù)幀信息,包含以下四個(gè)屬性:
* @param: $frame->fd: 客戶端的socket id,每個(gè)id對(duì)應(yīng)一個(gè)客戶端,推送消息的時(shí)候需要指定
* @param: $frame->data: 數(shù)據(jù)內(nèi)容,可以是文本內(nèi)容或者是二進(jìn)制數(shù)據(jù)(圖片等),可以通過(guò)opcode的值來(lái)判斷。$data 如果是文本類型,編碼格式必然是UTF-8,這是WebSocket協(xié)議規(guī)定的
* @param: $frame->opcode: WebSocket的OpCode類型,可以參考WebSocket協(xié)議標(biāo)準(zhǔn)文檔, WEBSOCKET_OPCODE_TEXT = 0x1 ,文本數(shù)據(jù); WEBSOCKET_OPCODE_BINARY = 0x2 ,二進(jìn)制數(shù)據(jù)
* @param: $frame->finish: 表示數(shù)據(jù)幀是否完整,一個(gè)WebSocket請(qǐng)求可能會(huì)分成多個(gè)數(shù)據(jù)幀進(jìn)行發(fā)送
* 注意點(diǎn): 客戶端發(fā)送的ping幀不會(huì)觸發(fā)onMessage,底層會(huì)自動(dòng)回復(fù)pong包
*/
$ws->on('message', function ($ws, $frame) {
echo "Server has receive message\n";
//接收到客戶端請(qǐng)求,并建立連接之后,進(jìn)行相應(yīng)業(yè)務(wù)的處理
handleClientData($ws, $frame);
});
/**
* 在task_worker進(jìn)程內(nèi)被調(diào)用。worker進(jìn)程可以使用swoole_server_task函數(shù)向task_worker進(jìn)程投遞新的任務(wù)(此處使用的是taskwait)
* 當(dāng)前的Task進(jìn)程在調(diào)用onTask回調(diào)函數(shù)時(shí)會(huì)將進(jìn)程狀態(tài)切換為忙碌,這時(shí)將不再接收新的Task,當(dāng)onTask函數(shù)返回時(shí)會(huì)將進(jìn)程狀態(tài)切換為空閑然后繼續(xù)接收新的Task。
* @param: $ws swoole_websocket_server object
* @param: $tid task process id
* @param: $wid from id 表示來(lái)自哪個(gè)Worker進(jìn)程。$task_id和$wid組合起來(lái)才是全局唯一的,不同的worker進(jìn)程投遞的任務(wù)ID可能會(huì)有相同
* @param: $data 需要執(zhí)行的任務(wù)內(nèi)容
* 注意點(diǎn): onTask函數(shù)執(zhí)行時(shí)遇到致命錯(cuò)誤退出,或者被外部進(jìn)程強(qiáng)制kill,當(dāng)前的任務(wù)會(huì)被丟棄,但不會(huì)影響其他正在排隊(duì)的Task
*/
$ws->on('task', function ($ws, $tid, $wid, $data) {
switch ($data['cmd']) {
case 'pushToClient': $ret = pushToClientTask($ws, $data['key'], $data['val']); break;
}
//1.7.2以上的版本,在onTask函數(shù)中 return字符串,表示將此內(nèi)容返回給worker進(jìn)程。worker進(jìn)程中會(huì)觸發(fā)onFinish函數(shù),表示投遞的task已完成。return的變量可以是任意非null的PHP變量
return $returnContent;
//1.7.2以前的版本,需要調(diào)用swoole_server->finish()函數(shù)將結(jié)果返回給worker進(jìn)程
// $ws->finish($data);
});
/**
* 當(dāng)worker進(jìn)程投遞的任務(wù)在task_worker中完成時(shí),task進(jìn)程會(huì)通過(guò)$ws->finish()方法將任務(wù)處理的結(jié)果發(fā)送給worker進(jìn)程。
* @param: $ws swoole_websocket_server object
* @param: $tid task_id
* @param: $data 任務(wù)處理后的結(jié)果內(nèi)容
* 注意點(diǎn): task進(jìn)程的onTask事件中沒(méi)有調(diào)用finish方法或者return結(jié)果,worker進(jìn)程不會(huì)觸發(fā)onFinish
* 執(zhí)行onFinish邏輯的worker進(jìn)程與下發(fā)task任務(wù)的worker進(jìn)程是同一個(gè)進(jìn)程
*/
$ws->on('finish', function($ws, $tid, $data) {
});
/**
* TCP客戶端連接關(guān)閉后,在worker進(jìn)程中回調(diào)此函數(shù)
* 在函數(shù)中可以做一些類似于刪除業(yè)務(wù)中與每個(gè)客戶端交互時(shí)存放的數(shù)據(jù)的操作
* @param: $ws swoole_websocket_server object
* @param: $fd 已關(guān)閉的fd interger
* @param: $rid(可選),來(lái)自哪個(gè)reactor線程
* 注意點(diǎn):
* 1. onClose回調(diào)函數(shù)如果發(fā)生了致命錯(cuò)誤,會(huì)導(dǎo)致連接泄漏。通過(guò)netstat命令會(huì)看到大量CLOSE_WAIT狀態(tài)的TCP連接
* 2. 查看命令netstat -anopc | grep 端口號(hào),可以查看到TCP接收和發(fā)送隊(duì)列是否有堆積以及TCP連接的狀態(tài)
* 3. 無(wú)論由客戶端發(fā)起close還是服務(wù)器端主動(dòng)調(diào)用$serv->close()關(guān)閉連接,都會(huì)觸發(fā)此事件。因此只要連接關(guān)閉,就一定會(huì)回調(diào)此函數(shù)
* 4. 1.7.7+版本以后onClose中依然可以調(diào)用connection_info方法獲取到連接信息,在onClose回調(diào)函數(shù)執(zhí)行完畢后才會(huì)調(diào)用close關(guān)閉TCP連接
* 5. 這里回調(diào)onClose時(shí)表示客戶端連接已經(jīng)關(guān)閉,所以無(wú)需執(zhí)行$server->close($fd)。代碼中執(zhí)行$serv->close($fd)會(huì)拋出PHP錯(cuò)誤告警。也就是在onclose中不能再$ws->close()了.
* 6. swoole-1.9.7版本修改了$reactorId參數(shù),當(dāng)服務(wù)器主動(dòng)關(guān)閉連接時(shí),底層會(huì)設(shè)置此參數(shù)為-1,可以通過(guò)判斷$reactorId < 0來(lái)分辨關(guān)閉是由服務(wù)器端還是客戶端發(fā)起的(debug時(shí)可以使用)
*/
$ws->on('close', function ($ws, $fd) {
$redis = new Redis();
$redis->connect(REDIS_HOST, REDIS_PORT);
$redis->auth(REDIS_PWD);
$sArr = $redis->sMembers(REDIS_FD_S.$fd);
if (!empty($sArr)) {
foreach ((array)$sArr as $key => $sc) {
$res = $redis->sRem(REDIS_S_FD.$sc, $fd);
$num = $redis->sCard(REDIS_S_FD.$sc);
if ($num == '0') {
$redis->sRem(REDIS_S_KEY, $sc);
$redis->hDel(REDIS_ZS_KEY, $sc);
}
}
}
$redis->del(REDIS_FD_S.$fd);
$redis->close();
echo "FD $fd has closed.\n";
});
/**
* 開(kāi)啟swoole_websocket_server服務(wù)
*/
$ws->start();
/**
* 接受到消息以后進(jìn)行響應(yīng)異步任務(wù)的執(zhí)行
* @param: $ws swoole_websocket_sever object
* @param: $frame swoole_websocket_frame obejct
*/
function handleClientData($ws, $frame) {
$data = $frame->data;
$redis = new Redis();
$redis->connect(REDIS_HOST, REDIS_PORT);
$redis->auth(REDIS_PWD);
$isMembers = $redis->sIsmember(REDIS_S_FD.$sc, $frame->fd);
if (!$isMembers) {
$res = $redis->sAdd(REDIS_S_FD.$sc, $frame->fd);
}
$redis->sAdd(REDIS_FD_S.$frame->fd, $sc);
$isMembers = $redis->sIsmember(REDIS_S_KEY, $sc);
if (!$isMembers) {
$redis->sAdd(REDIS_S_KEY, $sc);
}
}
/**
* redis訂閱后的回調(diào)函數(shù)
* @param: $ins instance實(shí)例
* @param: $pattern 匹配模式
* @param: $channel 頻道名
* @param: $data 數(shù)據(jù)
* 注意點(diǎn): subscribe和psubscribe兩種不同的訂閱方式的回調(diào)函數(shù)的參數(shù)個(gè)數(shù)不一樣,后者多了$pattern參數(shù)
*/
function sendTask($ins, $pattern, $channel, $data) {
//滿足一些條件后,投遞到task進(jìn)程中進(jìn)行推送
$taskData = array(
'cmd' => 'pushToClient',
'key' => $sc,
'val' => $data,
);
//請(qǐng)注意,taskwait是同步阻塞的,所以改腳本并不是全異步非阻塞的
$GLOBALS['ws']->taskwait($taskData);
}
/**
* 推送消息到指定的客戶端
* @param: $ws swoole_websocket_server object
* @param: $sc 股票代碼
* @param: $data 要推送的數(shù)據(jù)
*/
function pushToClientTask($ws, $sc, $data) {
$redis = new Redis();
$redis->connect(REDIS_HOST, REDIS_PORT);
$redis->auth(REDIS_PWD);
$fdList = $redis->sMembers(REDIS_S_FD.$sArr[4]);
if (!empty($fdList)) {
foreach ((array)$fdList as $fd) {
$res = $GLOBALS['ws']->push($fd, $data);
echo "FD: $fd push $res.\n";
if (!$res) { //推送失敗,即客戶端已經(jīng)斷開(kāi)連接
//從該fd訂閱的所有股票中刪除該fd
$sArrOfFd = $redis->sMembers(REDIS_FD_S.$fd);
if (!empty($sArrOfFd)) {
foreach ((array)$sArrOfFd as $key => $sc) {
$res = $redis->sRem(REDIS_S_FD.$sc, $fd);
$num = $redis->sCard(REDIS_S_FD.$sc);
if ($num == '0') {
$redis->sRem(REDIS_S_KEY, $sc);
$redis->hDel(REDIS_ZS_KEY, $sc);
}
}
}
$redis->del(REDIS_FD_S.$fd);
}
}
}
$redis->close();
}以上是“swoole實(shí)現(xiàn)實(shí)時(shí)推送的方法”這篇文章的所有內(nèi)容,感謝各位的閱讀!希望分享的內(nèi)容對(duì)大家有幫助,更多相關(guān)知識(shí),歡迎關(guān)注創(chuàng)新互聯(lián)行業(yè)資訊頻道!
文章標(biāo)題:swoole實(shí)現(xiàn)實(shí)時(shí)推送的方法
標(biāo)題URL:http://www.chinadenli.net/article2/gpgoic.html
成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供品牌網(wǎng)站設(shè)計(jì)、全網(wǎng)營(yíng)銷推廣、軟件開(kāi)發(fā)、企業(yè)網(wǎng)站制作、商城網(wǎng)站、網(wǎng)站導(dǎo)航
聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請(qǐng)盡快告知,我們將會(huì)在第一時(shí)間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如需處理請(qǐng)聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時(shí)需注明來(lái)源: 創(chuàng)新互聯(lián)