Commit 6deedbdb authored by Liu lu's avatar Liu lu

异步日志兼容

parent 2ba37acf
...@@ -7,6 +7,7 @@ use Hyperf\Di\Aop\AbstractAspect; ...@@ -7,6 +7,7 @@ use Hyperf\Di\Aop\AbstractAspect;
use Hyperf\Di\Aop\ProceedingJoinPoint; use Hyperf\Di\Aop\ProceedingJoinPoint;
use Meibuyu\Micro\Annotation\AsyncCoroutine; use Meibuyu\Micro\Annotation\AsyncCoroutine;
use Hyperf\Utils\Coroutine; use Hyperf\Utils\Coroutine;
use Hyperf\Utils\Context;
/** /**
* @Aspect( * @Aspect(
...@@ -29,8 +30,11 @@ class AsyncCoroutineAspect extends AbstractAspect ...@@ -29,8 +30,11 @@ class AsyncCoroutineAspect extends AbstractAspect
// 切面切入后,执行对应的方法会由此来负责 // 切面切入后,执行对应的方法会由此来负责
// $proceedingJoinPoint 为连接点,通过该类的 process() 方法调用原方法并获得结果 // $proceedingJoinPoint 为连接点,通过该类的 process() 方法调用原方法并获得结果
// 在调用前进行某些处理 // 在调用前进行某些处理
return Coroutine::create(function ()use($proceedingJoinPoint){ $fd = Context::get('log_trace_request_id');
return Coroutine::create(function ()use($proceedingJoinPoint,$fd){
try {
Context::set('log_trace_request_id',$fd);
LogTraceHandler::recordProcess( LogTraceHandler::recordProcess(
'投递到子协程任务,id:'.Coroutine::id() '投递到子协程任务,id:'.Coroutine::id()
.' ,类:'.$proceedingJoinPoint->className .' ,类:'.$proceedingJoinPoint->className
...@@ -44,6 +48,9 @@ class AsyncCoroutineAspect extends AbstractAspect ...@@ -44,6 +48,9 @@ class AsyncCoroutineAspect extends AbstractAspect
'子协程任务id:'.Coroutine::id().'已完成,执行结果:'. '子协程任务id:'.Coroutine::id().'已完成,执行结果:'.
json_encode($result),true json_encode($result),true
); );
}catch (\Throwable $exception){
LogTraceHandler::recordProcess($exception,true);
}
}); });
......
...@@ -5,8 +5,11 @@ use Hyperf\Di\Annotation\Aspect; ...@@ -5,8 +5,11 @@ use Hyperf\Di\Annotation\Aspect;
use Hyperf\Di\Aop\AbstractAspect; use Hyperf\Di\Aop\AbstractAspect;
use Hyperf\Di\Aop\ProceedingJoinPoint; use Hyperf\Di\Aop\ProceedingJoinPoint;
use Hyperf\HttpServer\Contract\RequestInterface; use Hyperf\HttpServer\Contract\RequestInterface;
use Hyperf\Utils\Context;
use Meibuyu\Micro\Annotation\LogTrace; use Meibuyu\Micro\Annotation\LogTrace;
use Meibuyu\Micro\Handler\LogTrace\LogTraceHandler; use Meibuyu\Micro\Handler\LogTrace\LogTraceHandler;
use Psr\Http\Message\ServerRequestInterface;
use Hyperf\Utils\Coroutine;
/** /**
* @Aspect( * @Aspect(
...@@ -26,17 +29,39 @@ class LogTraceAspect extends AbstractAspect ...@@ -26,17 +29,39 @@ class LogTraceAspect extends AbstractAspect
public function process(ProceedingJoinPoint $proceedingJoinPoint) public function process(ProceedingJoinPoint $proceedingJoinPoint)
{ {
try {
$object = Context::get(ServerRequestInterface::class);
$fd = $object?('fd'.$object->getSwooleRequest()->fd):('co'.Coroutine::id());
Context::set('log_trace_request_id',$fd);
$originParams = [ $originParams = [
'called_params'=>$proceedingJoinPoint->getArguments(), 'called_params'=>$proceedingJoinPoint->getArguments(),
'http_params'=>make(RequestInterface::class)->all() 'http_params'=>$object?make(RequestInterface::class)->all():[]
]; ];
LogTraceHandler::createLogTrace( LogTraceHandler::createLogTrace(
$proceedingJoinPoint->className.'@'.$proceedingJoinPoint->methodName, $proceedingJoinPoint->className.'@'.$proceedingJoinPoint->methodName,
$originParams $originParams
); );
$requestId = LogTraceHandler::getRequestId();
register_shutdown_function(function ()use($requestId,$originParams,$proceedingJoinPoint) {
put_log(json_encode([
'request_id'=>$requestId,'error_msg'=>error_get_last(),'params'=>$originParams
]),
str_replace('\\','_',$proceedingJoinPoint->className).
'/'.$proceedingJoinPoint->methodName.'/'.date('Ymd').'.txt'
);
});
$result = $proceedingJoinPoint->process(); $result = $proceedingJoinPoint->process();
LogTraceHandler::recordProcess('返回结果:'.json_encode($result)); LogTraceHandler::recordProcess('返回结果:'.json_encode($result));
LogTraceHandler::markComplete();
return $result; return $result;
}catch (\Throwable $exception){
LogTraceHandler::recordProcess($exception);
throw $exception;
} }
}
} }
\ No newline at end of file
...@@ -9,6 +9,7 @@ namespace Meibuyu\Micro\Handler\LogTrace; ...@@ -9,6 +9,7 @@ namespace Meibuyu\Micro\Handler\LogTrace;
use Meibuyu\Micro\Model\LogTrace; use Meibuyu\Micro\Model\LogTrace;
use Hyperf\Utils\Coroutine; use Hyperf\Utils\Coroutine;
use Hyperf\Utils\Context;
use Swoole\Server; use Swoole\Server;
use Throwable; use Throwable;
...@@ -66,17 +67,16 @@ class LogTraceHandler ...@@ -66,17 +67,16 @@ class LogTraceHandler
* @return string * @return string
* @throws \Exception * @throws \Exception
*/ */
private static function getRequestId($isInAsyncCoroutine=false) public static function getRequestId()
{ {
$workId = posix_getpid(); $workId = posix_getpid();
$cid = $isInAsyncCoroutine?Coroutine::parentId(Coroutine::id()):Coroutine::id(); $cid = Context::get('log_trace_request_id');
if(!$cid) throw new \Exception('无法使用协程标记录日志'); if(!$cid) return false;
return container(Server::class)->stats()['start_time'] .$workId. $cid; return container(Server::class)->stats()['start_time'] .$workId. $cid;
} }
/** /**
* 程序执行完成标记结束 * 程序执行完成标记结束
* @throws \Exception
*/ */
public static function markComplete() public static function markComplete()
{ {
...@@ -93,9 +93,8 @@ class LogTraceHandler ...@@ -93,9 +93,8 @@ class LogTraceHandler
* 待写到Es后可以避免 * 待写到Es后可以避免
* 记录当前日志(包括异常捕获) * 记录当前日志(包括异常捕获)
*/ */
public static function recordProcess($track,$isInAsyncCoroutine=false) public static function recordProcess($track,$isInAsyncCoroutine=false,$requestId=0)
{ {
if (empty($track)) return;
if(!Coroutine::inCoroutine()) return; if(!Coroutine::inCoroutine()) return;
$logInfo = ''; $logInfo = '';
...@@ -104,18 +103,17 @@ class LogTraceHandler ...@@ -104,18 +103,17 @@ class LogTraceHandler
$track->getFile() . " line:" . $track->getFile() . " line:" .
$track->getLine() . "\n" . $track->getLine() . "\n" .
$track->getTraceAsString(); $track->getTraceAsString();
} }else if (is_string($track)||is_numeric($track)) {
if (is_array($track)) {
$logInfo = var_export($track, true);
}
if (is_string($track)||is_numeric($track)) {
$logInfo = $track; $logInfo = $track;
}else{
$logInfo = print_r($track, true);
} }
$logInfo .= "\n\n"; $logInfo .= "\n\n";
if(!self::getRequestId()) return ;
container(LogTraceQueue::class)->addToQueue([ container(LogTraceQueue::class)->addToQueue([
'request_id'=>self::getRequestId($isInAsyncCoroutine), 'request_id'=>self::getRequestId(),
'process_info'=>$logInfo 'process_info'=>$logInfo
]); ]);
// $log = LogTrace::where('request_id', self::getRequestId())->first(); // $log = LogTrace::where('request_id', self::getRequestId())->first();
......
...@@ -5,14 +5,38 @@ namespace Meibuyu\Micro\Handler\LogTrace; ...@@ -5,14 +5,38 @@ namespace Meibuyu\Micro\Handler\LogTrace;
use Meibuyu\Micro\Handler\RedisQueueBatchHandler; use Meibuyu\Micro\Handler\RedisQueueBatchHandler;
use Meibuyu\Micro\Model\LogTrace; use Meibuyu\Micro\Model\LogTrace;
use Hyperf\DbConnection\Db;
use Hyperf\Redis\Redis;
class LogTraceQueue extends RedisQueueBatchHandler class LogTraceQueue extends RedisQueueBatchHandler
{ {
protected function specifyQueueName() protected function specifyQueueName()
{ {
$this->queue_name = env('APP_NAME').':LogTraceQueue'; $this->createLogTraceTable();
return env('APP_NAME').':LogTraceQueue';
}
protected function specifyRedisServer():Redis
{
return redis();
}
private function createLogTraceTable()
{
Db::insert("
create table IF NOT EXISTS `trace_logs` (
`id` bigint(20) NOT NULL AUTO_INCREMENT,
`request_id` varchar(32) NOT NULL COMMENT '一次http或rpc请求调用的唯一key',
`source` varchar(255) NOT NULL COMMENT '来源,包含调用类命名空间及方法',
`origin_params` json NOT NULL COMMENT '记录注解方法被调用开始的原始传参',
`is_completed` tinyint(1) NOT NULL DEFAULT '0' COMMENT '此请求是否完成,使用LogTraceHandler::markComplete()标记',
`process_info` mediumtext NOT NULL COMMENT '执行过程中输出,使用LogTraceHandler::recordProcess()记录',
`created_at` datetime NOT NULL COMMENT '日志记录开始时间',
PRIMARY KEY (`id`),
UNIQUE KEY `request_id` (`request_id`)
) ENGINE=InnoDB AUTO_INCREMENT=5950 DEFAULT CHARSET=utf8mb4;
");
} }
/** /**
...@@ -57,8 +81,10 @@ class LogTraceQueue extends RedisQueueBatchHandler ...@@ -57,8 +81,10 @@ class LogTraceQueue extends RedisQueueBatchHandler
if(!empty($updateMarkComplete)){ if(!empty($updateMarkComplete)){
LogTrace::whereIn('request_id',$updateMarkComplete)->update(['is_completed'=>YES]); LogTrace::whereIn('request_id',$updateMarkComplete)->update(['is_completed'=>YES]);
} }
if(!empty($updateArr)){
LogTrace::getModel()->batchUpdateByField(array_values($updateArr),'request_id'); LogTrace::getModel()->batchUpdateByField(array_values($updateArr),'request_id');
} }
}
} }
\ No newline at end of file
...@@ -2,21 +2,25 @@ ...@@ -2,21 +2,25 @@
namespace Meibuyu\Micro\Handler; namespace Meibuyu\Micro\Handler;
use Hyperf\Redis\Redis;
abstract class RedisQueueBatchHandler abstract class RedisQueueBatchHandler
{ {
/** /**
* 重试次数 * 重试次数
*/ */
protected $retrys=0; protected $retry=0;
protected $queue_name; protected $queue_name;
/**
* @var Redis
*/
protected $redis;
const MAX_RETRY_TIMES = 3; const MAX_RETRY_TIMES = 3;
const BATCH_DEAL_NUM = 100; const BATCH_DEAL_NUM = 36;
const ERROR_RETRY_CODE = 9000; const ERROR_RETRY_CODE = 9000;
...@@ -25,53 +29,51 @@ abstract class RedisQueueBatchHandler ...@@ -25,53 +29,51 @@ abstract class RedisQueueBatchHandler
*/ */
public function __construct() public function __construct()
{ {
$this->specifyQueueName(); $this->queue_name = $this->specifyQueueName();
$this->redis = $this->specifyRedisServer();
} }
/** /**
* 添加至队列 * @param $data
* @param $specifyQueue
*/ */
public function addToQueue($data) public function addToQueue($data,$specifyQueue='')
{ {
redis()->rPush($this->queue_name,json_encode($data)); $this->redis->rPush($specifyQueue?:$this->queue_name,json_encode($data));
} }
/** /**
* 通过管道将执行失败的数据重回队列 * 将执行失败的数据重回队列
* @param $arr * @param $arr
* @param \Hyperf\Redis\Redis $redis
*/ */
protected function backToQueue($arr,\Hyperf\Redis\Redis $redis) protected function backToQueue($arr)
{ {
//开启管道 array_unshift($arr,$this->queue_name);
$pip = $redis->multi(); call_user_func_array([$this->redis,'lPush'],$arr);
foreach ($arr as $i){
$pip->lPush($this->queue_name,$i);
}
//批量提交
$pip->exec();
} }
//批处理具体逻辑 //批处理具体逻辑
abstract protected function batchDeal($data); abstract protected function batchDeal($data);
abstract protected function specifyQueueName(); abstract protected function specifyQueueName();
abstract protected function specifyRedisServer() : Redis;
public function consume() public function consume()
{ {
$redis = redis();
while (true){ while (true){
//有数据则放回 否则延迟20s后返回空数组,同时可保持连接活跃 //有数据则放回 否则延迟20s后返回空数组,同时可保持连接活跃
//执行过程 如果redis服务异常,调用io操作时异常退出,框架重新拉起该进程 //执行过程 如果redis服务异常,调用io操作时异常退出,框架重新拉起该进程
$exist = $redis->blPop($this->queue_name,30); $exist = $this->redis->blPop($this->queue_name,30);
if(empty($exist)) continue ; if(empty($exist)) continue ;
$redis->lPush($this->queue_name,$exist[1]); $this->redis->lPush($this->queue_name,$exist[1]);
//每次从列表取100 //每次从列表取100
$arr = $redis->lRange($this->queue_name,0,self::BATCH_DEAL_NUM-1); $arr = $this->redis->lRange($this->queue_name,0,self::BATCH_DEAL_NUM-1);
//取完 从队列删掉 //取完 从队列删掉
$redis->lTrim($this->queue_name,count($arr),-1); $this->redis->lTrim($this->queue_name,count($arr),-1);
//数据格式化 //数据格式化
$formatArr = array_map(function ($item){ $formatArr = array_map(function ($item){
return json_decode($item,true); return json_decode($item,true);
...@@ -82,17 +84,18 @@ abstract class RedisQueueBatchHandler ...@@ -82,17 +84,18 @@ abstract class RedisQueueBatchHandler
}catch (\Throwable $exception){ }catch (\Throwable $exception){
//错误码为100 通过管道从新推到队列 //错误码为100 新推到队列
if($exception->getCode()==self::ERROR_RETRY_CODE){ if($exception->getCode()==self::ERROR_RETRY_CODE){
if($this->retrys<self::MAX_RETRY_TIMES){ //重试次数不超过3次 if($this->retry<self::MAX_RETRY_TIMES){ //重试次数不超过3次
$this->backToQueue($arr,$redis); $this->backToQueue($arr);
$this->retrys++; $this->retry++;
}else{ }else{
$this->retrys = 0; //重置当前次数 $this->retry = 0; //重置当前次数
$this->errorWriteToFile($formatArr,$exception); $this->errorWriteToFile($formatArr,$exception);
} }
}else{ }else{
$this->retry = 0;
$this->errorWriteToFile($formatArr,$exception); $this->errorWriteToFile($formatArr,$exception);
} }
......
...@@ -74,7 +74,7 @@ abstract class BaseModel extends Model ...@@ -74,7 +74,7 @@ abstract class BaseModel extends Model
$where_str = implode(',', $where_arr); $where_str = implode(',', $where_arr);
$sql = ""; $sql = "";
if ($keys_str && $datas_str && $where_str) { if ($keys_str && $datas_str && $where_str) {
$table = $this->getTable(); $table = env('DB_PREFIX','').$this->getTable();
$sql = " INSERT INTO $table ( $keys_str ) VALUES $datas_str ON DUPLICATE KEY UPDATE $where_str"; $sql = " INSERT INTO $table ( $keys_str ) VALUES $datas_str ON DUPLICATE KEY UPDATE $where_str";
} }
unset($keys_str, $where_str); unset($keys_str, $where_str);
...@@ -125,7 +125,7 @@ abstract class BaseModel extends Model ...@@ -125,7 +125,7 @@ abstract class BaseModel extends Model
$sql = sprintf( $sql = sprintf(
"UPDATE `%s` SET %s WHERE `%s` IN (%s) ", "UPDATE `%s` SET %s WHERE `%s` IN (%s) ",
$this->getTable(), $updates, $field, $fields env('DB_PREFIX','').$this->getTable(), $updates, $field, $fields
); );
return $this->getConnection()->update($sql); return $this->getConnection()->update($sql);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment