摘要:為了避免重復(fù)執(zhí)行的問(wèn)題,我們需要引入一個(gè)有序集合存放正在執(zhí)行的任務(wù),命名為。最后以上講述了一個(gè)任務(wù)調(diào)度程序的逐步演變,設(shè)計(jì)方案很大程度上參考了。
原文鏈接:https://blog.breezelin.cn/scheme-redis-task-queue.html
一個(gè)網(wǎng)關(guān)服務(wù)器就跟快餐店一樣,總是希望客人來(lái)得快、去得也快,這樣在相同時(shí)間內(nèi)才可以服務(wù)更多的客人。如果快餐店的服務(wù)員在一個(gè)顧客點(diǎn)餐、等餐和結(jié)賬時(shí)都全程跟陪的話,那么這個(gè)服務(wù)員大部分時(shí)間都是在空閑的等待。應(yīng)該有專門的服務(wù)員負(fù)責(zé)點(diǎn)餐,專門的服務(wù)員負(fù)責(zé)送餐,專門的服務(wù)員負(fù)責(zé)結(jié)賬,這樣才能提高效率。同樣道理,網(wǎng)關(guān)服務(wù)器中也需要分工明確。舉個(gè)例子:
假設(shè)有一個(gè)申請(qǐng)發(fā)送重置密碼郵件的網(wǎng)關(guān)接口,須知道發(fā)送一封郵件可能會(huì)花費(fèi)上好幾秒鐘,如果網(wǎng)關(guān)服務(wù)器直接在線上給用戶發(fā)送重置密碼郵件,高并發(fā)的情況下就很容易造成網(wǎng)絡(luò)擁擠。但實(shí)際上,網(wǎng)關(guān)服務(wù)器并非一定要等待郵件發(fā)送成功后才能響應(yīng)用戶,完全可以先告知用戶郵件會(huì)發(fā)送的,而后再在線下把郵件發(fā)送出去(就像快餐店里點(diǎn)餐的服務(wù)員跟顧客說(shuō)先去找位置坐,飯菜做好后會(huì)有人給他送過(guò)去)。
那么是誰(shuí)來(lái)把郵件發(fā)送出去呢?
任務(wù)隊(duì)列為了網(wǎng)關(guān)接口能夠盡快響應(yīng)用戶請(qǐng)求,無(wú)需即時(shí)知道結(jié)果的耗時(shí)操作可以交由任務(wù)隊(duì)列機(jī)制來(lái)處理。
任務(wù)隊(duì)列機(jī)制中包含兩種角色,一個(gè)是任務(wù)生產(chǎn)者,一個(gè)是任務(wù)消費(fèi)者,而任務(wù)隊(duì)列是兩者之間的紐帶:
生產(chǎn)者往隊(duì)列里放入任務(wù);
消費(fèi)者從隊(duì)列里取出任務(wù)。
任務(wù)隊(duì)列的整體運(yùn)行流程是:任務(wù)生產(chǎn)者把當(dāng)前操作的關(guān)鍵信息(后續(xù)可以根據(jù)這些信息還原出當(dāng)前操作)抽象出來(lái),比如發(fā)送重置密碼的郵件,我們只需要當(dāng)前用戶郵箱和用戶名就可以了;任務(wù)生產(chǎn)者把任務(wù)放進(jìn)隊(duì)列,實(shí)際就是把任務(wù)的關(guān)鍵信息存儲(chǔ)起來(lái),這里會(huì)用到MySQL、Redis之類數(shù)據(jù)存儲(chǔ)工具,常用的是Redis;而任務(wù)消費(fèi)者就不斷地從數(shù)據(jù)庫(kù)中取出任務(wù)信息,逐一執(zhí)行。
任務(wù)生產(chǎn)者的工作是任務(wù)分發(fā),一般由線上的網(wǎng)關(guān)服務(wù)程序執(zhí)行;任務(wù)消費(fèi)者的工作是任務(wù)調(diào)度,一般由線下的程序執(zhí)行,這樣即使任務(wù)耗時(shí)再多,也不阻塞網(wǎng)關(guān)服務(wù)。
這里主要討論的是任務(wù)調(diào)度(任務(wù)消費(fèi)者)的程序設(shè)計(jì)。
簡(jiǎn)單直接假設(shè)我們用Redis列表List存儲(chǔ)任務(wù)信息,列表鍵名是queues:default,任務(wù)發(fā)布就是往列表queues:default后追加數(shù)據(jù):
那么任務(wù)調(diào)度可以這樣簡(jiǎn)單直接的實(shí)現(xiàn):
handle($task); continue; } sleep(1); } } public function handle($task) { // do something time-consuming } } $worker = new Worker; $worker->schedule();意外保險(xiǎn)上面代碼是直接從queues:default列表中移出第一個(gè)任務(wù)(lpop),因?yàn)?b>handle($task)函數(shù)是一個(gè)耗時(shí)的操作,過(guò)程中若是遇到什么意外導(dǎo)致了整個(gè)程序退出,這個(gè)任務(wù)可能還沒(méi)執(zhí)行完成,可是任務(wù)信息已經(jīng)完全丟失了。保險(xiǎn)起見(jiàn),對(duì)schedule()函數(shù)進(jìn)行以下修改:
handle($task); Redis::lpop("queues:default"); continue; } sleep(1); } } ...即在任務(wù)完成后才將任務(wù)信息從列表中移除。
延時(shí)執(zhí)行queues:default列表中的任務(wù)都是需要即時(shí)執(zhí)行的,但是有些任務(wù)是需要間隔一段時(shí)間后或者在某個(gè)時(shí)間點(diǎn)上執(zhí)行,那么可以引入一個(gè)有序集合,命名為queues:default:delayed,來(lái)存放這些任務(wù)。任務(wù)發(fā)布時(shí)需要指明執(zhí)行的時(shí)間點(diǎn)$time:
任務(wù)調(diào)度時(shí),如果queues:default列表已經(jīng)空了,就從queues:default:delayed集合中取出到達(dá)執(zhí)行時(shí)間的任務(wù)放入queues:default列表中:
handle($task); Redis::lpop("queues:default"); continue; } $seri_arr = Redis::zremrangebyscore("queues:default:delayed", 0, time()); if($seri_arr) { Redis::rpush("queues:default", $seri_arr); continue; } sleep(1); } } ...任務(wù)超時(shí)預(yù)估任務(wù)正常執(zhí)行所需的最大時(shí)間值,若是任務(wù)執(zhí)行超過(guò)了這個(gè)時(shí)間,可能是過(guò)程中遇到一些意外,如果任由它繼續(xù)卡著,那么后面的任務(wù)就會(huì)無(wú)法被執(zhí)行了。
首先我們給任務(wù)設(shè)定一個(gè)時(shí)限屬性timeout,然后在執(zhí)行任務(wù)前先給進(jìn)程本身設(shè)置一個(gè)鬧鐘信號(hào),timeout后收到信號(hào)說(shuō)明任務(wù)執(zhí)行超時(shí),需要退出當(dāng)前進(jìn)程(用supervisor守護(hù)進(jìn)程時(shí),進(jìn)程自身退出,supervisor會(huì)自動(dòng)再拉起)。
注意:pcntl_alarm($timeout)會(huì)覆蓋之前鬧鐘信號(hào),而pcntl_alarm(0)會(huì)取消鬧鐘信號(hào);任務(wù)超時(shí)后,當(dāng)前任務(wù)放入queues:default:delayed集合中延時(shí)執(zhí)行,以免再次阻塞隊(duì)列。timeoutHanle($task); $this->handle($task); Redis::lpop("queues:default"); continue; } $seri_arr = Redis::zremrangebyscore("queues:default:delayed", 0, time()); if($seri_arr) { Redis::rpush("queues:default", $seri_arr); continue; } pcntl_alarm(0); sleep(1); } } public function timeoutHanle($task) { $timeout = (int)$task->timeout; if ($timeout > 0) { pcntl_signal(SIGALRM, function () { $seri = Redis::lpop("queues:default"); Redis::zadd("queues:default:delayed", time()+10), $seri); posix_kill(getmypid(), SIGKILL); }); } pcntl_alarm($timeout); } ...并發(fā)執(zhí)行上面代碼,直觀上沒(méi)什么問(wèn)題,但是在多進(jìn)程并發(fā)執(zhí)行的時(shí)候,有些任務(wù)可能會(huì)被重復(fù)執(zhí)行,是因?yàn)闆](méi)能及時(shí)將當(dāng)前執(zhí)行的任務(wù)從queues:default列表中移出,其他進(jìn)程也可以讀取到。為了避免重復(fù)執(zhí)行的問(wèn)題,我們需要引入一個(gè)有序集合SortedSet存放正在執(zhí)行的任務(wù),命名為queues:default:reserved。
首先任務(wù)是從queues:default列表中直接移出,然后開(kāi)始執(zhí)行任務(wù)前先把任務(wù)放進(jìn)queues:default:reserved集合中,任務(wù)完成了再?gòu)?b>queues:default:reserved集合中移出。
再結(jié)合任務(wù)超時(shí),假設(shè)一個(gè)任務(wù)執(zhí)行時(shí)間不可能超過(guò)60*60秒(可以按需調(diào)整),在queues:default列表為空的時(shí)候,queues:default:reserved集合中有任務(wù)已經(jīng)存放超過(guò)了60*60秒,那么有可能是某些進(jìn)程在執(zhí)行任務(wù)是意外退出了,所以把這些任務(wù)放到queues:default:delayed集合中稍后執(zhí)行。timeoutHanle($task); $this->handle($task); Redis::zrem("queues:default:reserved", $seri); continue; } $seri_arr = Redis::zremrangebyscore("queues:default:delayed", 0, time()); if($seri_arr) { Redis::rpush("queues:default", $seri_arr); continue; } $seri_arr = Redis::zremrangebyscore("queues:default:reserved", 0, time()-60*60); if($seri_arr) { foreach($seri_arr as $seri) { Redis::zadd("queues:default:delayed", time()+10, $seri); } } sleep(1); } } public function timeoutHanle($task) { $timeout = (int)$task->timeout; if ($timeout > 0) { pcntl_signal(SIGALRM, function () use ($task) { $seri = serialize($task); Redis::zrem("queues:default:reserved", $seri); Redis::zadd("queues:default:delayed", time()+10), $seri); posix_kill(getmypid(), SIGKILL); }); } pcntl_alarm($timeout); } ...其他 失敗重試以上代碼沒(méi)有檢驗(yàn)任務(wù)是否執(zhí)行成功,應(yīng)該有任務(wù)失敗的處理機(jī)制:比如給任務(wù)設(shè)定一個(gè)最多重試次數(shù)屬性retry_times,任務(wù)每執(zhí)行一次retry_times,任務(wù)執(zhí)行失敗時(shí),若是retry_times等于0,則將任務(wù)放入queues:default:failed列表中不在執(zhí)行;否則放入放到queues:default:delayed集合中稍后執(zhí)行。
休眠時(shí)間以上代碼是進(jìn)程忙時(shí)連續(xù)執(zhí)行,閑時(shí)休眠一秒,可以按需調(diào)整優(yōu)化。
事件監(jiān)聽(tīng)若是需要在任務(wù)執(zhí)行成功或失敗時(shí)進(jìn)行某些操作,可以給任務(wù)設(shè)定成功操作方法afterSucceeded()或失敗操作方法afterFailed(),在相應(yīng)的時(shí)候回調(diào)。
最后以上講述了一個(gè)任務(wù)調(diào)度程序的逐步演變,設(shè)計(jì)方案很大程度上參考了Laravel Queue。
用工具,知其然,知其所以然。
文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請(qǐng)注明本文地址:http://hztianpu.com/yun/28333.html
摘要:架構(gòu)消息代理,作為臨時(shí)儲(chǔ)存任務(wù)的中間媒介,為提供了隊(duì)列服務(wù)。生產(chǎn)者將任務(wù)發(fā)送到,消費(fèi)者再?gòu)墨@取任務(wù)。如果使用,則有可能發(fā)生突然斷電之類的問(wèn)題造成突然終止后的數(shù)據(jù)丟失等后果。任務(wù)調(diào)度器,負(fù)責(zé)調(diào)度并觸發(fā)定時(shí)周期任務(wù)。 架構(gòu) showImg(https://segmentfault.com/img/bVbmDXa?w=831&h=413); Broker 消息代理,作為臨時(shí)儲(chǔ)存任務(wù)的中間媒...
閱讀 3997·2021-11-24 09:39
閱讀 1898·2021-11-02 14:41
閱讀 940·2019-08-30 15:53
閱讀 3556·2019-08-29 12:43
閱讀 1296·2019-08-29 12:31
閱讀 3164·2019-08-26 13:50
閱讀 888·2019-08-26 13:45
閱讀 1088·2019-08-26 10:56