RedisQueueBatchHandler.php 3.23 KB
Newer Older
1 2 3 4
<?php


namespace Meibuyu\Micro\Handler;
Liu lu's avatar
Liu lu committed
5
use Hyperf\Redis\Redis;
6 7 8 9 10 11

abstract class RedisQueueBatchHandler
{
    /**
     * 重试次数
     */
Liu lu's avatar
Liu lu committed
12
    protected $retry=0;
13 14 15

    protected $queue_name;

Liu lu's avatar
Liu lu committed
16
    /**
Liu lu's avatar
Liu lu committed
17
     * @var Redis
Liu lu's avatar
Liu lu committed
18 19
     */
    protected $redis;
20 21 22

    const MAX_RETRY_TIMES = 3;

23
    const BATCH_DEAL_NUM = 36;
24 25 26 27 28 29 30 31

    const ERROR_RETRY_CODE = 9000;

    /**
     * RedisQueueBatchHandler constructor.
     */
    public function __construct()
    {
Liu lu's avatar
Liu lu committed
32 33
        $this->queue_name = $this->specifyQueueName();
        $this->redis = $this->specifyRedisServer();
34 35 36 37
    }


    /**
Liu lu's avatar
Liu lu committed
38 39
     * @param $data
     * @param $specifyQueue
40
     */
Liu lu's avatar
Liu lu committed
41
    public function addToQueue($data,$specifyQueue='')
42
    {
Liu lu's avatar
Liu lu committed
43
        $this->specifyRedisServer()->rPush($specifyQueue?:$this->queue_name,json_encode($data));
44 45 46
    }

    /**
Liu lu's avatar
Liu lu committed
47
     * 将执行失败的数据重回队列
48 49
     * @param $arr
     */
Liu lu's avatar
Liu lu committed
50
    protected function backToQueue($arr)
51
    {
Liu lu's avatar
Liu lu committed
52 53
        array_unshift($arr,$this->queue_name);
        call_user_func_array([$this->redis,'lPush'],$arr);
54 55 56 57 58
    }

    //批处理具体逻辑
    abstract protected function batchDeal($data);
    abstract protected function specifyQueueName();
Liu lu's avatar
Liu lu committed
59
    abstract protected function specifyRedisServer() : Redis;
Liu lu's avatar
Liu lu committed
60

61 62 63

    public function consume()
    {
Liu lu's avatar
Liu lu committed
64

65 66 67
        while (true){
            //有数据则放回 否则延迟20s后返回空数组,同时可保持连接活跃
            //执行过程 如果redis服务异常,调用io操作时异常退出,框架重新拉起该进程
Liu lu's avatar
Liu lu committed
68
            $exist = $this->redis->blPop($this->queue_name,30);
69
            if(empty($exist)) continue ;
Liu lu's avatar
Liu lu committed
70
            $this->redis->lPush($this->queue_name,$exist[1]);
71 72

            //每次从列表取100
Liu lu's avatar
Liu lu committed
73
            $arr = $this->redis->lRange($this->queue_name,0,self::BATCH_DEAL_NUM-1);
74 75

            //取完 从队列删掉
Liu lu's avatar
Liu lu committed
76
            $this->redis->lTrim($this->queue_name,count($arr),-1);
77 78 79 80 81 82 83 84 85 86
            //数据格式化
            $formatArr = array_map(function ($item){
                return json_decode($item,true);
            },$arr);
            try {
                //具体批处理逻辑
                $this->batchDeal($formatArr);

            }catch (\Throwable $exception){

Liu lu's avatar
Liu lu committed
87
                //错误码为100 重新推到队列
88
                if($exception->getCode()==self::ERROR_RETRY_CODE){
Liu lu's avatar
Liu lu committed
89
                    if($this->retry<self::MAX_RETRY_TIMES){ //重试次数不超过3次
Liu lu's avatar
Liu lu committed
90
                        $this->backToQueue($arr);
Liu lu's avatar
Liu lu committed
91
                        $this->retry++;
92
                    }else{
Liu lu's avatar
Liu lu committed
93
                        $this->retry = 0; //重置当前次数
94 95 96 97
                        $this->errorWriteToFile($formatArr,$exception);
                    }

                }else{
Liu lu's avatar
Liu lu committed
98
                    $this->retry = 0;
99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126
                    $this->errorWriteToFile($formatArr,$exception);
                }

            }

        }

    }

    /**
     * @param $data
     * @param \Throwable $exception
     */
    private function errorWriteToFile($data, \Throwable $exception)
    {
        put_log(
            json_encode($data)."\n".
            $exception->getMessage()."\n".
            $exception->getFile().' line:'.
            $exception->getLine()."\n".
            $exception->getTraceAsString()

            ,'RedisQueue/'.$this->queue_name.'/'.today()
        );
    }


}