Commit 65ba2ba9 authored by Liu lu's avatar Liu lu


parent 478e0d4c
* 异步协程处理
namespace Meibuyu\Micro\Annotation;
use Hyperf\Di\Annotation\AbstractAnnotation;
* @Annotation
* @Target("METHOD")
class AsyncCoroutine extends AbstractAnnotation
public function collectMethod(string $className, ?string $target): void
parent::collectMethod($className, $target); // TODO: Change the autogenerated stub
\ No newline at end of file
* 日志追踪 写队列 批量更新到数据库
namespace Meibuyu\Micro\Annotation;
use Hyperf\Di\Annotation\AbstractAnnotation;
* @Annotation
* @Target("METHOD")
class LogTrace extends AbstractAnnotation
public function collectMethod(string $className, ?string $target): void
parent::collectMethod($className, $target); // TODO: Change the autogenerated stub
\ No newline at end of file
namespace Meibuyu\Micro\Aspect;
use Meibuyu\Micro\Handler\LogTrace\LogTraceHandler;
use Hyperf\Di\Annotation\Aspect;
use Hyperf\Di\Aop\AbstractAspect;
use Hyperf\Di\Aop\ProceedingJoinPoint;
use Meibuyu\Micro\Annotation\AsyncCoroutine;
use Hyperf\Utils\Coroutine;
* @Aspect(
* annotations={
* AsyncCoroutine::class
* }
* )
class AsyncCoroutineAspect extends AbstractAspect
* 优先级
* @var int
public $priority = 998;
public function process(ProceedingJoinPoint $proceedingJoinPoint)
// 切面切入后,执行对应的方法会由此来负责
// $proceedingJoinPoint 为连接点,通过该类的 process() 方法调用原方法并获得结果
// 在调用前进行某些处理
return Coroutine::create(function ()use($proceedingJoinPoint){
.' ,类:'.$proceedingJoinPoint->className
.' ,方法:'.$proceedingJoinPoint->methodName
$result = $proceedingJoinPoint->process();
\ No newline at end of file
namespace Meibuyu\Micro\Aspect;
use Hyperf\Di\Annotation\Aspect;
use Hyperf\Di\Aop\AbstractAspect;
use Hyperf\Di\Aop\ProceedingJoinPoint;
use Meibuyu\Micro\Annotation\LogTrace;
use Meibuyu\Micro\Handler\LogTrace\LogTraceHandler;
* @Aspect(
* annotations={
* LogTrace::class
* }
* )
class LogTraceAspect extends AbstractAspect
* 优先级
* @var int
public $priority = 999;
public function process(ProceedingJoinPoint $proceedingJoinPoint)
return $proceedingJoinPoint->process();
\ No newline at end of file
* 执行日志记录
namespace Meibuyu\Micro\Handler\LogTrace;
use Meibuyu\Micro\Model\LogTrace;
use Hyperf\Utils\Coroutine;
use Swoole\Server;
use Throwable;
* Class LogTraceHandler
* @package App\Service
class LogTraceHandler
* 1.对执行操作进行的方法入口注解 LogTrace
try {
//流程3 抛出一个异常
throw new Exception('test111');
}catch (\Throwable $exception){
* @param $params
* @param $source
* @return mixed
* @throws \Exception
public static function createLogTrace($source, $params)
if(!Coroutine::inCoroutine()) return;
'request_id' => self::getRequestId(),
'origin_params' => json_encode($params),
'source' => $source,
'created_at' => now(),
'process_info' => ''
* @param bool $isInAsyncCoroutine
* @return string
* @throws \Exception
private static function getRequestId($isInAsyncCoroutine=false)
$workId = posix_getpid();
$cid = $isInAsyncCoroutine?Coroutine::parentId(Coroutine::id()):Coroutine::id();
if(!$cid) throw new \Exception('无法使用协程标记录日志');
return container(Server::class)->stats()['start_time'] .$workId. $cid;
* 程序执行完成标记结束
* @throws \Exception
public static function markComplete()
if(!Coroutine::inCoroutine()) return;
//LogTrace::where('request_id', self::getRequestId())->update(['is_completed' => YES]);
* 事务回滚导致部分流程日志无法记录 暂写到文件里
* 待写到Es后可以避免
* 记录当前日志(包括异常捕获)
public static function recordProcess($track,$isInAsyncCoroutine=false)
if (empty($track)) return;
if(!Coroutine::inCoroutine()) return;
$logInfo = '';
if ($track instanceof Throwable) {
$logInfo = $track->getMessage() . "\n" .
$track->getFile() . " line:" .
$track->getLine() . "\n" .
if (is_array($track)) {
$logInfo = var_export($track, true);
if (is_string($track)||is_numeric($track)) {
$logInfo = $track;
$logInfo .= "\n\n";
// $log = LogTrace::where('request_id', self::getRequestId())->first();
// if(empty($log)) return ;
// $log->update([
// 'process_info' => Db::raw("CONCAT(process_info,\"{$logInfo}\")")
// ]);
// //写入文件
// put_log(
// self::getRequestId()."\n".
// $logInfo,
// str_replace('\\','_',$log->source).'/'.today()
// );
// /**
// * 记录流程日志
// * @param $funName
// * @param $arguments
// * @throws \Exception
// */
// public static function __callStatic($funName, $arguments)
// {
// if(self::$instance){
// throw new \Exception('请用LogTraceService::createLogTrace 先实例化对象');
// }
// self::$instance->$funName($arguments);
// }
\ No newline at end of file
namespace Meibuyu\Micro\Handler\LogTrace;
use Meibuyu\Micro\Handler\RedisQueueBatchHandler;
use Meibuyu\Micro\Model\LogTrace;
class LogTraceQueue extends RedisQueueBatchHandler
protected function specifyQueueName()
$this->queue_name = 'LogTraceQueue';
* 具体批处理逻辑
* @param $dataArr
function batchDeal($dataArr)
$updateArr = [];
foreach ($dataArr as $arr){
$updateArr[$arr['request_id']]['request_id'] = $arr['request_id'];
$updateArr[$arr['request_id']]['is_completed'] = $arr['is_completed'];
$process_info = isset($updateArr[$arr['request_id']]['process_info'])?
$updateArr[$arr['request_id']]['process_info'] = $process_info.$arr['process_info'];
//执行前先查一下 原先有的记录 避免被覆盖
$originLogs = LogTrace::whereIn('request_id',array_column($dataArr,'request_id'))
$updateArr = array_map(function ($item)use($originLogs){
if(!isset($originLogs[$item['request_id']])) return $item;
if(empty($originLogs[$item['request_id']])) return $item;
$item['process_info'] = $originLogs[$item['request_id']]."\n\n".$item['process_info'];
return $item;
\ No newline at end of file
namespace Meibuyu\Micro\Handler;
abstract class RedisQueueBatchHandler
* 重试次数
protected $retrys=0;
protected $queue_name;
const MAX_RETRY_TIMES = 3;
const BATCH_DEAL_NUM = 100;
const ERROR_RETRY_CODE = 9000;
* RedisQueueBatchHandler constructor.
public function __construct()
* 添加至队列
public function addToQueue($data)
* 通过管道将执行失败的数据重回队列
* @param $arr
* @param \Hyperf\Redis\Redis $redis
protected function backToQueue($arr,\Hyperf\Redis\Redis $redis)
$pip = $redis->multi();
foreach ($arr as $i){
abstract protected function batchDeal($data);
abstract protected function specifyQueueName();
public function consume()
$redis = redis();
while (true){
//有数据则放回 否则延迟20s后返回空数组,同时可保持连接活跃
//执行过程 如果redis服务异常,调用io操作时异常退出,框架重新拉起该进程
$exist = $redis->blPop($this->queue_name,30);
if(empty($exist)) continue ;
$arr = $redis->lRange($this->queue_name,0,self::BATCH_DEAL_NUM-1);
//取完 从队列删掉
$formatArr = array_map(function ($item){
return json_decode($item,true);
try {
}catch (\Throwable $exception){
//错误码为100 通过管道从新推到队列
if($this->retrys<self::MAX_RETRY_TIMES){ //重试次数不超过3次
$this->retrys = 0; //重置当前次数
* @param $data
* @param \Throwable $exception
private function errorWriteToFile($data, \Throwable $exception)
$exception->getFile().' line:'.
\ No newline at end of file
namespace Meibuyu\Micro\Model;
use Hyperf\DbConnection\Model\Model ;
abstract class BaseModel extends Model
* Function addDataToMysql
* 批量插入数据到数据库,无则插入,重复则更新
public function batchUpdateOrCreateByUniqueKey($data)
$buildInsertBatchSqlStr = $this->buildInsertBatchSqlStr($data);
$sql = $buildInsertBatchSqlStr['sql'];
return $this->getConnection()->update($sql);
* Function buildInsertBatchSqlStr
* 组装mysql
private function buildInsertBatchSqlStr($data)
//从data中 获取更新的字段
if (empty($data)) {
return false;
// 兼容一维数组
// 大于2维返回false
return false;
$keys_arr = [];
$datas_arr = [];
$where_arr = [];
foreach ($new_data as $k => $v) {
if (!$keys_arr) {
$keys_arr = array_keys($v);
foreach ($v as $k2=>&$v2){
$onedata = "( " . implode(',', $v) . " )";
$datas_arr[] = $onedata;
// 组装格式 pt_uid=VALUES(pt_uid),
foreach ($keys_arr as $k2 => $v2) {
$where_arr[] = "$v2 = VALUES($v2)";
$keys_str = implode(',', $keys_arr);
$datas_str = implode(',', $datas_arr);
$where_str = implode(',', $where_arr);
$sql = "";
if ($keys_str && $datas_str && $where_str) {
$table = $this->getTable();
$sql = " INSERT INTO $table ( $keys_str ) VALUES $datas_str ON DUPLICATE KEY UPDATE $where_str";
unset($keys_str, $where_str);
return ['sql' => $sql, 'count' => count($datas_arr)];
* Function getmaxdim
* 数组维数
private function getmaxdim($vDim){
if(!is_array($vDim)) return 0;
else {
$max1 = 0;
foreach($vDim as $item1) {
$t1 = $this->getmaxdim($item1);
if( $t1 > $max1) $max1 = $t1;
return $max1 + 1;
* 批量更新数组 有则
* @param $data array 待更新的数据,二维数组格式
* @param string $field string 值不同的条件,默认为id
* @return bool|string
public function batchUpdateByField($data, $field = null)
if(is_null($field)) $field = $this->getKeyName();
if (!is_array($data) || !$field ) {
return false;
$updates = $this->parseUpdate($data, $field);
// 获取所有键名为$field列的值,值两边加上单引号,保存在$fields数组中
// array_column()函数需要PHP5.5.0+,如果小于这个版本,可以自己实现,
// 参考地址:
$fields = array_column($data, $field);
$fields = implode(',', array_map(function($value) {
return "'".$value."'";
}, $fields));
$sql = sprintf(
"UPDATE `%s` SET %s WHERE `%s` IN (%s) ",
$this->getTable(), $updates, $field, $fields
return $this->getConnection()->update($sql);
* 将二维数组转换成CASE WHEN THEN的批量更新条件
* @param $data array 二维数组
* @param $field string 列名
* @return string sql语句
private function parseUpdate($data, $field)
$sql = '';
$keys = array_keys(current($data));
foreach ($keys as $column) {
if($column==$keys) continue;
$sql .= sprintf("`%s` = CASE `%s` \n", $column, $field);
foreach ($data as $line) {
$sql .= sprintf(
"WHEN '%s' THEN '%s' \n",
$sql .= "END,";
return rtrim($sql, ',');
* Created by PhpStorm.
* User: Auto generated.
* Date: 2022-01-20
* Time: 10:08:03
* Description:
declare (strict_types=1);
namespace Meibuyu\Micro\Model;
* 模型类 LogTrace
* @package App\Model
* @property integer $id
* @property string $source 路径
* @property string $origin_params
* @property integer $is_completed
* @property string $created_at
* @property string $process_info
class LogTrace extends BaseModel
protected $table = 'trace_logs';
* 是否使用时间戳管理
* @var bool
public $timestamps = false;
* 可写入数据的字段.
* @var array
protected $fillable = [
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment