LogTraceQueue.php 3.12 KB
<?php


namespace Meibuyu\Micro\Handler\LogTrace;

use Meibuyu\Micro\Handler\RedisQueueBatchHandler;
use Meibuyu\Micro\Model\LogTrace;
use Hyperf\DbConnection\Db;
use Hyperf\Redis\Redis;

class LogTraceQueue extends RedisQueueBatchHandler
{

    protected function specifyQueueName()
    {
        $this->createLogTraceTable();
        return env('APP_NAME').':LogTraceQueue';
    }

    protected function specifyRedisServer():Redis
    {
        return redis();
    }

    private function createLogTraceTable()
    {
        Db::insert("
            create table IF NOT EXISTS `trace_logs` (
                  `id` bigint(20) NOT NULL AUTO_INCREMENT,
                  `request_id` varchar(32)  NOT NULL COMMENT '一次http或rpc请求调用的唯一key',
                  `source` varchar(255)  NOT NULL COMMENT '来源,包含调用类命名空间及方法',
                  `origin_params` json NOT NULL COMMENT '记录注解方法被调用开始的原始传参',
                  `is_completed` tinyint(1) NOT NULL DEFAULT '0' COMMENT '此请求是否完成,使用LogTraceHandler::markComplete()标记',
                  `process_info` mediumtext  NOT NULL COMMENT '执行过程中输出,使用LogTraceHandler::recordProcess()记录',
                  `created_at` datetime NOT NULL COMMENT '日志记录开始时间',
              PRIMARY KEY (`id`),
              KEY `created_at` (`created_at`),
              UNIQUE KEY `request_id` (`request_id`)
            ) ENGINE=InnoDB AUTO_INCREMENT=5950 DEFAULT CHARSET=utf8mb4;
        ");
    }

    /**
     * 具体批处理逻辑
     * @param $dataArr
     */
    function batchDeal($dataArr)
    {
        $updateArr = [];
        $updateMarkComplete = [];

        foreach ($dataArr as $arr){

            if(isset($arr['is_completed'])&&$arr['is_completed']==YES){
                $updateMarkComplete[]= $arr['request_id'] ;continue;
            }
            $updateArr[$arr['request_id']]['request_id'] = $arr['request_id'];

            $process_info = isset($updateArr[$arr['request_id']]['process_info'])?
                             $updateArr[$arr['request_id']]['process_info']:'';

            $updateArr[$arr['request_id']]['process_info'] = $process_info.$arr['process_info'];

        }

        //执行前先查一下 原先有的记录 避免被覆盖
        $originLogs = LogTrace::whereIn('request_id',array_column($dataArr,'request_id'))
            ->pluck('process_info','request_id')->toArray();
        //追加到原记录之后
        $updateArr =  array_map(function ($item)use($originLogs){

            if(!isset($originLogs[$item['request_id']])) return $item;

            if(empty($originLogs[$item['request_id']])) return $item;

            $item['process_info'] = $originLogs[$item['request_id']].$item['process_info'];

            return $item;
        },$updateArr);

        //批量更新
        if(!empty($updateMarkComplete)){
            LogTrace::whereIn('request_id',$updateMarkComplete)->update(['is_completed'=>YES]);
        }
        if(!empty($updateArr)){
            LogTrace::getModel()->batchUpdateByField(array_values($updateArr),'request_id');
        }
    }


}