Commit 937681f1 authored by Liu lu's avatar Liu lu

异步协程与异步日志批处理

parent bb7cfa5a
<?php
/**
* 异步协程处理
*/
namespace Meibuyu\Micro\Annotation;
use Hyperf\Di\Annotation\AbstractAnnotation;
/**
* @Annotation
* @Target("METHOD")
*/
class AsyncCoroutine extends AbstractAnnotation
{
public function collectMethod(string $className, ?string $target): void
{
parent::collectMethod($className, $target); // TODO: Change the autogenerated stub
}
}
\ No newline at end of file
<?php
/**
* 日志追踪 写队列 批量更新到数据库
*/
namespace Meibuyu\Micro\Annotation;
use Hyperf\Di\Annotation\AbstractAnnotation;
/**
* @Annotation
* @Target("METHOD")
*/
class LogTrace extends AbstractAnnotation
{
public function collectMethod(string $className, ?string $target): void
{
parent::collectMethod($className, $target); // TODO: Change the autogenerated stub
}
}
\ No newline at end of file
<?php
namespace Meibuyu\Micro\Aspect;
use Meibuyu\Micro\Handler\LogTrace\LogTraceHandler;
use Hyperf\Di\Annotation\Aspect;
use Hyperf\Di\Aop\AbstractAspect;
use Hyperf\Di\Aop\ProceedingJoinPoint;
use Meibuyu\Micro\Annotation\AsyncCoroutine;
use Hyperf\Utils\Coroutine;
/**
* @Aspect(
* annotations={
* AsyncCoroutine::class
* }
* )
*/
class AsyncCoroutineAspect extends AbstractAspect
{
/**
* 优先级
* @var int
*/
public $priority = 998;
public function process(ProceedingJoinPoint $proceedingJoinPoint)
{
// 切面切入后,执行对应的方法会由此来负责
// $proceedingJoinPoint 为连接点,通过该类的 process() 方法调用原方法并获得结果
// 在调用前进行某些处理
return Coroutine::create(function ()use($proceedingJoinPoint){
LogTraceHandler::recordProcess(
'投递到子协程任务,id:'.Coroutine::id()
.' ,类:'.$proceedingJoinPoint->className
.' ,方法:'.$proceedingJoinPoint->methodName
.' ,参数:'.json_encode($proceedingJoinPoint->getArguments())
,
true
);
$result = $proceedingJoinPoint->process();
LogTraceHandler::recordProcess(
'子协程任务id:'.Coroutine::id().'已完成,执行结果:'.
json_encode($result),true
);
});
}
}
\ No newline at end of file
<?php
namespace Meibuyu\Micro\Aspect;
use Hyperf\Di\Annotation\Aspect;
use Hyperf\Di\Aop\AbstractAspect;
use Hyperf\Di\Aop\ProceedingJoinPoint;
use Meibuyu\Micro\Annotation\LogTrace;
use Meibuyu\Micro\Handler\LogTrace\LogTraceHandler;
/**
* @Aspect(
* annotations={
* LogTrace::class
* }
* )
*/
class LogTraceAspect extends AbstractAspect
{
/**
* 优先级
* @var int
*/
public $priority = 999;
public function process(ProceedingJoinPoint $proceedingJoinPoint)
{
LogTraceHandler::createLogTrace(
$proceedingJoinPoint->className.'@'.$proceedingJoinPoint->methodName,
$proceedingJoinPoint->getArguments()
);
$result = $proceedingJoinPoint->process();
LogTraceHandler::recordProcess('返回结果:'.json_encode($result));
return $result;
}
}
\ No newline at end of file
<?php
/**
* 执行日志记录
*/
namespace Meibuyu\Micro\Handler\LogTrace;
use Meibuyu\Micro\Model\LogTrace;
use Hyperf\Utils\Coroutine;
use Swoole\Server;
use Throwable;
/**
* Class LogTraceHandler
* @package App\Service
*/
class LogTraceHandler
{
/**
* 1.对执行操作进行的方法入口注解 LogTrace
2.对程序主动进行输出
try {
//流程1
LogTraceHandler::recordProcess('执行到流程1');
//流程2
LogTraceHandler::recordProcess('执行到流程2');
//记录输出数组
LogTraceHandler::recordProcess(['test'=>1]);
//流程3 抛出一个异常
throw new Exception('test111');
//流程执行完成标记结束
LogTraceHandler::markComplete();
}catch (\Throwable $exception){
//记录异常日志
LogTraceHandler::recordProcess($exception);
}
* @param $params
* @param $source
* @return mixed
* @throws \Exception
*/
public static function createLogTrace($source, $params)
{
if(!Coroutine::inCoroutine()) return;
LogTrace::insertOrIgnore([
'request_id' => self::getRequestId(),
'origin_params' => json_encode($params),
'source' => $source,
'created_at' => now(),
'process_info' => ''
]);
}
/**
* @param bool $isInAsyncCoroutine
* @return string
* @throws \Exception
*/
private static function getRequestId($isInAsyncCoroutine=false)
{
$workId = posix_getpid();
$cid = $isInAsyncCoroutine?Coroutine::parentId(Coroutine::id()):Coroutine::id();
if(!$cid) throw new \Exception('无法使用协程标记录日志');
return container(Server::class)->stats()['start_time'] .$workId. $cid;
}
/**
* 程序执行完成标记结束
* @throws \Exception
*/
public static function markComplete()
{
if(!Coroutine::inCoroutine()) return;
container(LogTraceQueue::class)->addToQueue([
'request_id'=>self::getRequestId(),
'process_info'=>'',
'is_completed'=>YES
]);
//LogTrace::where('request_id', self::getRequestId())->update(['is_completed' => YES]);
}
/*
* 事务回滚导致部分流程日志无法记录 暂写到文件里
* 待写到Es后可以避免
* 记录当前日志(包括异常捕获)
*/
public static function recordProcess($track,$isInAsyncCoroutine=false)
{
if (empty($track)) return;
if(!Coroutine::inCoroutine()) return;
$logInfo = '';
if ($track instanceof Throwable) {
$logInfo = $track->getMessage() . "\n" .
$track->getFile() . " line:" .
$track->getLine() . "\n" .
$track->getTraceAsString();
}
if (is_array($track)) {
$logInfo = var_export($track, true);
}
if (is_string($track)||is_numeric($track)) {
$logInfo = $track;
}
$logInfo .= "\n\n";
container(LogTraceQueue::class)->addToQueue([
'request_id'=>self::getRequestId($isInAsyncCoroutine),
'process_info'=>$logInfo,
'is_completed'=>NO
]);
// $log = LogTrace::where('request_id', self::getRequestId())->first();
// if(empty($log)) return ;
//
// $log->update([
// 'process_info' => Db::raw("CONCAT(process_info,\"{$logInfo}\")")
// ]);
// //写入文件
// put_log(
// self::getRequestId()."\n".
// $logInfo,
// str_replace('\\','_',$log->source).'/'.today()
// );
}
// /**
// * 记录流程日志
// * @param $funName
// * @param $arguments
// * @throws \Exception
// */
// public static function __callStatic($funName, $arguments)
// {
// if(self::$instance){
// throw new \Exception('请用LogTraceService::createLogTrace 先实例化对象');
// }
// self::$instance->$funName($arguments);
// }
}
\ No newline at end of file
<?php
namespace Meibuyu\Micro\Handler\LogTrace;
use Meibuyu\Micro\Handler\RedisQueueBatchHandler;
use Meibuyu\Micro\Model\LogTrace;
class LogTraceQueue extends RedisQueueBatchHandler
{
protected function specifyQueueName()
{
$this->queue_name = env('APP_NAME').':LogTraceQueue';
}
/**
* 具体批处理逻辑
* @param $dataArr
*/
function batchDeal($dataArr)
{
$updateArr = [];
foreach ($dataArr as $arr){
$updateArr[$arr['request_id']]['request_id'] = $arr['request_id'];
$updateArr[$arr['request_id']]['is_completed'] = $arr['is_completed'];
$process_info = isset($updateArr[$arr['request_id']]['process_info'])?
$updateArr[$arr['request_id']]['process_info']:'';
$updateArr[$arr['request_id']]['process_info'] = $process_info.$arr['process_info'];
}
//执行前先查一下 原先有的记录 避免被覆盖
$originLogs = LogTrace::whereIn('request_id',array_column($dataArr,'request_id'))
->pluck('process_info','request_id')->toArray();
//追加到原记录之后
$updateArr = array_map(function ($item)use($originLogs){
if(!isset($originLogs[$item['request_id']])) return $item;
if(empty($originLogs[$item['request_id']])) return $item;
$item['process_info'] = $originLogs[$item['request_id']].$item['process_info'];
return $item;
},$updateArr);
LogTrace::getModel()->batchUpdateByField(array_values($updateArr),'request_id');
}
}
\ No newline at end of file
<?php
namespace Meibuyu\Micro\Handler;
abstract class RedisQueueBatchHandler
{
/**
* 重试次数
*/
protected $retrys=0;
protected $queue_name;
const MAX_RETRY_TIMES = 3;
const BATCH_DEAL_NUM = 100;
const ERROR_RETRY_CODE = 9000;
/**
* RedisQueueBatchHandler constructor.
*/
public function __construct()
{
$this->specifyQueueName();
}
/**
* 添加至队列
*/
public function addToQueue($data)
{
redis()->rPush($this->queue_name,json_encode($data));
}
/**
* 通过管道将执行失败的数据重回队列
* @param $arr
* @param \Hyperf\Redis\Redis $redis
*/
protected function backToQueue($arr,\Hyperf\Redis\Redis $redis)
{
//开启管道
$pip = $redis->multi();
foreach ($arr as $i){
$pip->lPush($this->queue_name,$i);
}
//批量提交
$pip->exec();
}
//批处理具体逻辑
abstract protected function batchDeal($data);
abstract protected function specifyQueueName();
public function consume()
{
$redis = redis();
while (true){
//有数据则放回 否则延迟20s后返回空数组,同时可保持连接活跃
//执行过程 如果redis服务异常,调用io操作时异常退出,框架重新拉起该进程
$exist = $redis->blPop($this->queue_name,30);
if(empty($exist)) continue ;
$redis->lPush($this->queue_name,$exist[1]);
//每次从列表取100
$arr = $redis->lRange($this->queue_name,0,self::BATCH_DEAL_NUM-1);
//取完 从队列删掉
$redis->lTrim($this->queue_name,count($arr),-1);
//数据格式化
$formatArr = array_map(function ($item){
return json_decode($item,true);
},$arr);
try {
//具体批处理逻辑
$this->batchDeal($formatArr);
}catch (\Throwable $exception){
//错误码为100 通过管道从新推到队列
if($exception->getCode()==self::ERROR_RETRY_CODE){
if($this->retrys<self::MAX_RETRY_TIMES){ //重试次数不超过3次
$this->backToQueue($arr,$redis);
$this->retrys++;
}else{
$this->retrys = 0; //重置当前次数
$this->errorWriteToFile($formatArr,$exception);
}
}else{
$this->errorWriteToFile($formatArr,$exception);
}
}
}
}
/**
* @param $data
* @param \Throwable $exception
*/
private function errorWriteToFile($data, \Throwable $exception)
{
put_log(
json_encode($data)."\n".
$exception->getMessage()."\n".
$exception->getFile().' line:'.
$exception->getLine()."\n".
$exception->getTraceAsString()
,'RedisQueue/'.$this->queue_name.'/'.today()
);
}
}
\ No newline at end of file
<?php
declare(strict_types=1);
namespace Meibuyu\Micro\Model;
use Hyperf\DbConnection\Model\Model ;
abstract class BaseModel extends Model
{
/**
* Function addDataToMysql
* 批量插入数据到数据库,无则插入,重复则更新
*/
public function batchUpdateOrCreateByUniqueKey($data)
{
$buildInsertBatchSqlStr = $this->buildInsertBatchSqlStr($data);
$sql = $buildInsertBatchSqlStr['sql'];
return $this->getConnection()->update($sql);
}
/**
* Function buildInsertBatchSqlStr
* 组装mysql
*/
private function buildInsertBatchSqlStr($data)
{
//从data中 获取更新的字段
if (empty($data)) {
return false;
}
$new_data=[];
$maxdim=$this->getmaxdim($data);
if($maxdim==1){
// 兼容一维数组
$new_data[]=$data;
}elseif($maxdim>2){
// 大于2维返回false
return false;
}else{
$new_data=$data;
}
$keys_arr = [];
$datas_arr = [];
$where_arr = [];
foreach ($new_data as $k => $v) {
if (!$keys_arr) {
$keys_arr = array_keys($v);
}
foreach ($v as $k2=>&$v2){
if(is_string($v2)){
$v2="'".addslashes($v2)."'";
}
}
//数据重组
$onedata = "( " . implode(',', $v) . " )";
$datas_arr[] = $onedata;
unset($onedata);
}
// 组装格式 pt_uid=VALUES(pt_uid),
foreach ($keys_arr as $k2 => $v2) {
$where_arr[] = "$v2 = VALUES($v2)";
}
$keys_str = implode(',', $keys_arr);
$datas_str = implode(',', $datas_arr);
$where_str = implode(',', $where_arr);
$sql = "";
if ($keys_str && $datas_str && $where_str) {
$table = $this->getTable();
$sql = " INSERT INTO $table ( $keys_str ) VALUES $datas_str ON DUPLICATE KEY UPDATE $where_str";
}
unset($keys_str, $where_str);
return ['sql' => $sql, 'count' => count($datas_arr)];
}
/**
* Function getmaxdim
* 数组维数
*/
private function getmaxdim($vDim){
if(!is_array($vDim)) return 0;
else {
$max1 = 0;
foreach($vDim as $item1) {
$t1 = $this->getmaxdim($item1);
if( $t1 > $max1) $max1 = $t1;
}
return $max1 + 1;
}
}
/**
* 批量更新数组 有则
* @param $data array 待更新的数据,二维数组格式
* @param string $field string 值不同的条件,默认为id
* @return bool|string
*/
public function batchUpdateByField($data, $field = null)
{
if(is_null($field)) $field = $this->getKeyName();
if (!is_array($data) || !$field ) {
return false;
}
$updates = $this->parseUpdate($data, $field);
// 获取所有键名为$field列的值,值两边加上单引号,保存在$fields数组中
// array_column()函数需要PHP5.5.0+,如果小于这个版本,可以自己实现,
// 参考地址:http://php.net/manual/zh/function.array-column.php#118831
$fields = array_column($data, $field);
$fields = implode(',', array_map(function($value) {
return "'".$value."'";
}, $fields));
$sql = sprintf(
"UPDATE `%s` SET %s WHERE `%s` IN (%s) ",
$this->getTable(), $updates, $field, $fields
);
return $this->getConnection()->update($sql);
}
/**
* 将二维数组转换成CASE WHEN THEN的批量更新条件
* @param $data array 二维数组
* @param $field string 列名
* @return string sql语句
*/
private function parseUpdate($data, $field)
{
$sql = '';
$keys = array_keys(current($data));
foreach ($keys as $column) {
if($column==$keys) continue;
$sql .= sprintf("`%s` = CASE `%s` \n", $column, $field);
foreach ($data as $line) {
$sql .= sprintf(
"WHEN '%s' THEN '%s' \n",
$line[$field],
!is_numeric($line[$column])?addslashes($line[$column]):$line[$column],
);
}
$sql .= "END,";
}
return rtrim($sql, ',');
}
}
<?php
/**
* Created by PhpStorm.
* User: Auto generated.
* Date: 2022-01-20
* Time: 10:08:03
* Description:
*/
declare (strict_types=1);
namespace Meibuyu\Micro\Model;
/**
* 模型类 LogTrace
* @package App\Model
* @property integer $id
* @property string $source 路径
* @property string $origin_params
* @property integer $is_completed
* @property string $created_at
* @property string $process_info
*/
class LogTrace extends BaseModel
{
protected $table = 'trace_logs';
/**
* 是否使用时间戳管理
* @var bool
*/
public $timestamps = false;
/**
* 可写入数据的字段.
* @var array
*/
protected $fillable = [
'source',
'origin_params',
'is_completed',
'process_info',
];
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment