<?php namespace Meibuyu\Micro\Handler; abstract class RedisQueueBatchHandler { /** * 重试次数 */ protected $retrys=0; protected $queue_name; const MAX_RETRY_TIMES = 3; const BATCH_DEAL_NUM = 100; const ERROR_RETRY_CODE = 9000; /** * RedisQueueBatchHandler constructor. */ public function __construct() { $this->specifyQueueName(); } /** * 添加至队列 */ public function addToQueue($data) { redis()->rPush($this->queue_name,json_encode($data)); } /** * 通过管道将执行失败的数据重回队列 * @param $arr * @param \Hyperf\Redis\Redis $redis */ protected function backToQueue($arr,\Hyperf\Redis\Redis $redis) { //开启管道 $pip = $redis->multi(); foreach ($arr as $i){ $pip->lPush($this->queue_name,$i); } //批量提交 $pip->exec(); } //批处理具体逻辑 abstract protected function batchDeal($data); abstract protected function specifyQueueName(); public function consume() { $redis = redis(); while (true){ //有数据则放回 否则延迟20s后返回空数组,同时可保持连接活跃 //执行过程 如果redis服务异常,调用io操作时异常退出,框架重新拉起该进程 $exist = $redis->blPop($this->queue_name,30); if(empty($exist)) continue ; $redis->lPush($this->queue_name,$exist[1]); //每次从列表取100 $arr = $redis->lRange($this->queue_name,0,self::BATCH_DEAL_NUM-1); //取完 从队列删掉 $redis->lTrim($this->queue_name,self::BATCH_DEAL_NUM,-1); //数据格式化 $formatArr = array_map(function ($item){ return json_decode($item,true); },$arr); try { //具体批处理逻辑 $this->batchDeal($formatArr); }catch (\Throwable $exception){ //错误码为100 通过管道从新推到队列 if($exception->getCode()==self::ERROR_RETRY_CODE){ if($this->retrys<self::MAX_RETRY_TIMES){ //重试次数不超过3次 $this->backToQueue($arr,$redis); $this->retrys++; }else{ $this->retrys = 0; //重置当前次数 $this->errorWriteToFile($formatArr,$exception); } }else{ $this->errorWriteToFile($formatArr,$exception); } } } } /** * @param $data * @param \Throwable $exception */ private function errorWriteToFile($data, \Throwable $exception) { put_log( json_encode($data)."\n". $exception->getMessage()."\n". $exception->getFile().' line:'. $exception->getLine()."\n". $exception->getTraceAsString() ,'RedisQueue/'.$this->queue_name.'/'.today() ); } }