1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
<?php
namespace Meibuyu\Micro\Handler\LogTrace;
use Meibuyu\Micro\Handler\RedisQueueBatchHandler;
use Meibuyu\Micro\Model\LogTrace;
use Hyperf\DbConnection\Db;
use Hyperf\Redis\Redis;
class LogTraceQueue extends RedisQueueBatchHandler
{
protected function specifyQueueName()
{
$this->createLogTraceTable();
return env('APP_NAME').':LogTraceQueue';
}
protected function specifyRedisServer():Redis
{
return redis();
}
private function createLogTraceTable()
{
Db::insert("
create table IF NOT EXISTS `trace_logs` (
`id` bigint(20) NOT NULL AUTO_INCREMENT,
`request_id` varchar(32) NOT NULL COMMENT '一次http或rpc请求调用的唯一key',
`source` varchar(255) NOT NULL COMMENT '来源,包含调用类命名空间及方法',
`origin_params` json NOT NULL COMMENT '记录注解方法被调用开始的原始传参',
`is_completed` tinyint(1) NOT NULL DEFAULT '0' COMMENT '此请求是否完成,使用LogTraceHandler::markComplete()标记',
`process_info` mediumtext NOT NULL COMMENT '执行过程中输出,使用LogTraceHandler::recordProcess()记录',
`created_at` datetime NOT NULL COMMENT '日志记录开始时间',
PRIMARY KEY (`id`),
KEY `created_at` (`created_at`),
UNIQUE KEY `request_id` (`request_id`)
) ENGINE=InnoDB AUTO_INCREMENT=5950 DEFAULT CHARSET=utf8mb4;
");
}
/**
* 具体批处理逻辑
* @param $dataArr
*/
function batchDeal($dataArr)
{
$updateArr = [];
$updateMarkComplete = [];
foreach ($dataArr as $arr){
if(isset($arr['is_completed'])&&$arr['is_completed']==YES){
$updateMarkComplete[]= $arr['request_id'] ;continue;
}
$updateArr[$arr['request_id']]['request_id'] = $arr['request_id'];
$process_info = isset($updateArr[$arr['request_id']]['process_info'])?
$updateArr[$arr['request_id']]['process_info']:'';
$updateArr[$arr['request_id']]['process_info'] = $process_info.$arr['process_info'];
}
//执行前先查一下 原先有的记录 避免被覆盖
$originLogs = LogTrace::whereIn('request_id',array_column($dataArr,'request_id'))
->pluck('process_info','request_id')->toArray();
//追加到原记录之后
$updateArr = array_map(function ($item)use($originLogs){
if(!isset($originLogs[$item['request_id']])) return $item;
if(empty($originLogs[$item['request_id']])) return $item;
$item['process_info'] = $originLogs[$item['request_id']].$item['process_info'];
return $item;
},$updateArr);
//批量更新
if(!empty($updateMarkComplete)){
LogTrace::whereIn('request_id',$updateMarkComplete)->update(['is_completed'=>YES]);
}
if(!empty($updateArr)){
LogTrace::getModel()->batchUpdateByField(array_values($updateArr),'request_id');
}
}
}