Commit 6c5633a0 authored by Liu lu's avatar Liu lu

异步批日志记录

parent 49866022
...@@ -5,14 +5,39 @@ namespace Meibuyu\Micro\Handler\LogTrace; ...@@ -5,14 +5,39 @@ namespace Meibuyu\Micro\Handler\LogTrace;
use Meibuyu\Micro\Handler\RedisQueueBatchHandler; use Meibuyu\Micro\Handler\RedisQueueBatchHandler;
use Meibuyu\Micro\Model\LogTrace; use Meibuyu\Micro\Model\LogTrace;
use Hyperf\DbConnection\Db;
class LogTraceQueue extends RedisQueueBatchHandler class LogTraceQueue extends RedisQueueBatchHandler
{ {
protected function specifyQueueName() protected function specifyQueueName()
{ {
$this->queue_name = env('APP_NAME').':LogTraceQueue'; $this->createLogTraceTable();
return env('APP_NAME').':LogTraceQueue';
}
protected function specifyRedisServer()
{
$redis = redis();
$redis->select(0);
return $redis;
}
private function createLogTraceTable()
{
Db::insert("
create table if not exist `trace_logs` (
`id` bigint(20) NOT NULL AUTO_INCREMENT,
`request_id` varchar(32) NOT NULL COMMENT '一次http或rpc请求调用的唯一key',
`source` varchar(255) NOT NULL COMMENT '来源,包含调用类命名空间及方法',
`origin_params` json NOT NULL COMMENT '记录注解方法被调用开始的原始传参',
`is_completed` tinyint(1) NOT NULL DEFAULT '0' COMMENT '此请求是否完成,使用LogTraceHandler::markComplete()标记',
`process_info` mediumtext NOT NULL COMMENT '执行过程中输出,使用LogTraceHandler::recordProcess()记录',
`created_at` datetime NOT NULL COMMENT '日志记录开始时间',
PRIMARY KEY (`id`),
UNIQUE KEY `request_id` (`request_id`)
) ENGINE=InnoDB AUTO_INCREMENT=5950 DEFAULT CHARSET=utf8mb4;
");
} }
/** /**
......
...@@ -13,6 +13,10 @@ abstract class RedisQueueBatchHandler ...@@ -13,6 +13,10 @@ abstract class RedisQueueBatchHandler
protected $queue_name; protected $queue_name;
/**
* @var \Hyperf\Redis\Redis
*/
protected $redis;
const MAX_RETRY_TIMES = 3; const MAX_RETRY_TIMES = 3;
...@@ -25,7 +29,8 @@ abstract class RedisQueueBatchHandler ...@@ -25,7 +29,8 @@ abstract class RedisQueueBatchHandler
*/ */
public function __construct() public function __construct()
{ {
$this->specifyQueueName(); $this->queue_name = $this->specifyQueueName();
$this->redis = $this->specifyRedisServer();
} }
...@@ -34,44 +39,40 @@ abstract class RedisQueueBatchHandler ...@@ -34,44 +39,40 @@ abstract class RedisQueueBatchHandler
*/ */
public function addToQueue($data) public function addToQueue($data)
{ {
redis()->rPush($this->queue_name,json_encode($data)); $this->redis->rPush($this->queue_name,json_encode($data));
} }
/** /**
* 通过管道将执行失败的数据重回队列 * 将执行失败的数据重回队列
* @param $arr * @param $arr
* @param \Hyperf\Redis\Redis $redis
*/ */
protected function backToQueue($arr,\Hyperf\Redis\Redis $redis) protected function backToQueue($arr)
{ {
//开启管道 array_unshift($arr,$this->queue_name);
$pip = $redis->multi(); call_user_func_array([$this->redis,'lPush'],$arr);
foreach ($arr as $i){
$pip->lPush($this->queue_name,$i);
}
//批量提交
$pip->exec();
} }
//批处理具体逻辑 //批处理具体逻辑
abstract protected function batchDeal($data); abstract protected function batchDeal($data);
abstract protected function specifyQueueName(); abstract protected function specifyQueueName();
abstract protected function specifyRedisServer();
public function consume() public function consume()
{ {
$redis = redis();
while (true){ while (true){
//有数据则放回 否则延迟20s后返回空数组,同时可保持连接活跃 //有数据则放回 否则延迟20s后返回空数组,同时可保持连接活跃
//执行过程 如果redis服务异常,调用io操作时异常退出,框架重新拉起该进程 //执行过程 如果redis服务异常,调用io操作时异常退出,框架重新拉起该进程
$exist = $redis->blPop($this->queue_name,30); $exist = $this->redis->blPop($this->queue_name,30);
if(empty($exist)) continue ; if(empty($exist)) continue ;
$redis->lPush($this->queue_name,$exist[1]); $this->redis->lPush($this->queue_name,$exist[1]);
//每次从列表取100 //每次从列表取100
$arr = $redis->lRange($this->queue_name,0,self::BATCH_DEAL_NUM-1); $arr = $this->redis->lRange($this->queue_name,0,self::BATCH_DEAL_NUM-1);
//取完 从队列删掉 //取完 从队列删掉
$redis->lTrim($this->queue_name,count($arr),-1); $this->redis->lTrim($this->queue_name,count($arr),-1);
//数据格式化 //数据格式化
$formatArr = array_map(function ($item){ $formatArr = array_map(function ($item){
return json_decode($item,true); return json_decode($item,true);
...@@ -82,10 +83,10 @@ abstract class RedisQueueBatchHandler ...@@ -82,10 +83,10 @@ abstract class RedisQueueBatchHandler
}catch (\Throwable $exception){ }catch (\Throwable $exception){
//错误码为100 通过管道从新推到队列 //错误码为100 新推到队列
if($exception->getCode()==self::ERROR_RETRY_CODE){ if($exception->getCode()==self::ERROR_RETRY_CODE){
if($this->retrys<self::MAX_RETRY_TIMES){ //重试次数不超过3次 if($this->retrys<self::MAX_RETRY_TIMES){ //重试次数不超过3次
$this->backToQueue($arr,$redis); $this->backToQueue($arr);
$this->retrys++; $this->retrys++;
}else{ }else{
$this->retrys = 0; //重置当前次数 $this->retrys = 0; //重置当前次数
...@@ -93,6 +94,7 @@ abstract class RedisQueueBatchHandler ...@@ -93,6 +94,7 @@ abstract class RedisQueueBatchHandler
} }
}else{ }else{
$this->retrys = 0;
$this->errorWriteToFile($formatArr,$exception); $this->errorWriteToFile($formatArr,$exception);
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment