Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Support
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
M
meibuyu-micro
Project
Project
Details
Activity
Releases
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Boards
Labels
Milestones
Merge Requests
1
Merge Requests
1
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
without authentication
meibuyu-micro
Commits
af89a3c0
Commit
af89a3c0
authored
Feb 21, 2022
by
王源
🎧
Browse files
Options
Browse Files
Download
Plain Diff
Merge branch 'dev_async_log1' into 2.0
parents
33a70faf
7574a3c7
Changes
10
Hide whitespace changes
Inline
Side-by-side
Showing
10 changed files
with
922 additions
and
0 deletions
+922
-0
README.md
README.md
+237
-0
AsyncCoroutine.php
src/Annotation/AsyncCoroutine.php
+22
-0
LogTrace.php
src/Annotation/LogTrace.php
+21
-0
AsyncCoroutineAspect.php
src/Aspect/AsyncCoroutineAspect.php
+52
-0
LogTraceAspect.php
src/Aspect/LogTraceAspect.php
+42
-0
LogTraceHandler.php
src/Handler/LogTrace/LogTraceHandler.php
+152
-0
LogTraceQueue.php
src/Handler/LogTrace/LogTraceQueue.php
+64
-0
RedisQueueBatchHandler.php
src/Handler/RedisQueueBatchHandler.php
+123
-0
BaseModel.php
src/Model/BaseModel.php
+160
-0
LogTrace.php
src/Model/LogTrace.php
+49
-0
No files found.
README.md
View file @
af89a3c0
...
@@ -153,3 +153,240 @@ human_time(time()-strtotime('2020-06-06 12:12'));
...
@@ -153,3 +153,240 @@ human_time(time()-strtotime('2020-06-06 12:12'));
info('aaa',[1,2,3],new stdClass(){$a=1;},collect([1,23,4]));
info('aaa',[1,2,3],new stdClass(){$a=1;},collect([1,23,4]));
info(1);
info(1);
```
```
### 5、数据表批量操作
用法:
继承
\M
eibuyu
\M
icro
\M
odel
\B
aseModel 的模型:
```
class LogTrace extends BaseModel
{
protected $table = 'trace_logs';
/**
* 是否使用时间戳管理
* @var bool
*/
public $timestamps = false;
/**
* 可写入数据的字段.
* @var array
*/
protected $fillable = [
'source',
'origin_params',
'is_completed',
'process_info',
];
}
```
得到基于主键或唯一索引作为条件的俩个批处理方法:
```
//批量更新,、$data为二维数组,必须包含主键或唯一索引的数据
//参数二缺省为主键名,或唯一索引名
LogTrace::getModel()->batchUpdateByField($data,'request_id')
//基于ON DUPLICATE KEY UPDATE 批量更新或插入 $data 必须包含主键或唯一索引的数据
LogTrace::getModel()->batchUpdateOrCreateByUniqueKey($data);
```
### 6、基于@LogTrace()注解,实现异步日志队列服务
用法:
#### 1)、建立日志跟踪表
```
sql
CREATE
TABLE
`trace_logs`
(
`id`
bigint
(
20
)
NOT
NULL
AUTO_INCREMENT
,
`request_id`
varchar
(
32
)
COLLATE
utf8mb4_unicode_ci
NOT
NULL
COMMENT
'一次http或rpc请求调用的唯一key'
,
`source`
varchar
(
255
)
COLLATE
utf8mb4_unicode_ci
NOT
NULL
COMMENT
'来源,包含调用类命名空间及方法'
,
`origin_params`
json
NOT
NULL
COMMENT
'记录注解方法被调用开始的原始传参'
,
`is_completed`
tinyint
(
1
)
NOT
NULL
DEFAULT
'0'
COMMENT
'此请求是否完成,使用LogTraceHandler::markComplete()标记'
,
`process_info`
mediumtext
COLLATE
utf8mb4_unicode_ci
NOT
NULL
COMMENT
'执行过程中输出,使用LogTraceHandler::recordProcess()记录'
,
`created_at`
datetime
NOT
NULL
COMMENT
'日志记录开始时间'
,
PRIMARY
KEY
(
`id`
),
UNIQUE
KEY
`request_id`
(
`request_id`
)
)
ENGINE
=
InnoDB
AUTO_INCREMENT
=
5868
DEFAULT
CHARSET
=
utf8mb4
COLLATE
=
utf8mb4_unicode_ci
;
```
#### 2)、定义消费进程,日志批量更新到数据库
```
php
<?php
/**
* 异步日志队列批处理
*/
namespace
App\Process
;
use
Hyperf\Process\AbstractProcess
;
use
Hyperf\Process\Annotation\Process
;
use
Meibuyu\Micro\Handler\LogTrace\LogTraceQueue
;
/**
* @Process(name="SyncTraceLog")
*/
class
SyncTraceLog
extends
AbstractProcess
{
/**
* 进程数量
* @var int
*/
public
$nums
=
1
;
/**
* 进程名称
* @var string
*/
public
$name
=
'syn-trace-log'
;
/**
* 管道类型
* @var int
*/
public
$pipeType
=
2
;
/**
* 是否启用协程
* @var bool
*/
public
$enableCoroutine
=
true
;
/**
* @inheritDoc
*/
public
function
handle
()
:
void
{
make
(
LogTraceQueue
::
class
)
->
consume
();
}
}
```
#### 3)、对操作方法指定注解,主动记录日志信息
```
给test方法加上 @LogTrace() 注解,从此处开始记录日志,可在此请求的任何流程地方
使用Meibuyu\Micro\Handler\LogTrace\LogTraceHandler::recordProcess
手动记录输出,下面只是最简单的示例
/**
* @LogTrace()
* @return string
* @throws \Exception
*/
public function test()
{
try {
//记录数组
LogTraceHandler::recordProcess($this->request->all());
//流程1
LogTraceHandler::recordProcess('执行到流程1');
//流程2
LogTraceHandler::recordProcess('执行到流程2');
//流程3 抛出一个异常
throw new Exception('test111');
//流程执行完成标记结束
LogTraceHandler::markComplete();
}catch (\Throwable $exception){
//记录异常日志
LogTraceHandler::recordProcess($exception);
}
return 'test222';
}
##执行过程输出到 trace_logs表 process_info:
array (
'scanNo' => 'SPUS-20211202-158-3',
)
执行到流程1
执行到流程2
抛出一个异常
/var/www/runtime/container/proxy/App_Controller_IndexController.proxy.php line:80
#0 /var/www/vendor/hyperf/di/src/Aop/ProceedingJoinPoint.php(84): App\Controller\IndexController->App\Controller\{closure}()
#1 /var/www/vendor/hyperf/di/src/Aop/ProxyTrait.php(85): Hyperf\Di\Aop\ProceedingJoinPoint->processOriginalMethod()
#2 /var/www/vendor/hyperf/utils/src/Pipeline.php(104): App\Controller\IndexController::Hyperf\Di\Aop\{closure}()
#3 /var/www/vendor/hyperf/di/src/Aop/ProceedingJoinPoint.php(69): Hyperf\Utils\Pipeline::Hyperf\Utils\{closure}()
#4 /var/www/vendor/meibuyu/micro/src/Aspect/LogTraceAspect.php(32): Hyperf\Di\Aop\ProceedingJoinPoint->process()
#5 /var/www/vendor/hyperf/di/src/Aop/Pipeline.php(30): Meibuyu\Micro\Aspect\LogTraceAspect->process()
#6 /var/www/vendor/hyperf/utils/src/Pipeline.php(95): Hyperf\Di\Aop\Pipeline->Hyperf\Di\Aop\{closure}()
#7 /var/www/vendor/hyperf/di/src/Aop/ProxyTrait.php(86): Hyperf\Utils\Pipeline->then()
#8 /var/www/vendor/hyperf/di/src/Aop/ProxyTrait.php(29): App\Controller\IndexController::handleAround()
#9 /var/www/runtime/container/proxy/App_Controller_IndexController.proxy.php(88): App\Controller\IndexController::__proxyCall()
#10 /var/www/vendor/hyperf/http-server/src/CoreMiddleware.php(161): App\Controller\IndexController->test1()
#11 /var/www/vendor/hyperf/http-server/src/CoreMiddleware.php(113): Hyperf\HttpServer\CoreMiddleware->handleFound()
#12 /var/www/vendor/hyperf/dispatcher/src/AbstractRequestHandler.php(64): Hyperf\HttpServer\CoreMiddleware->process()
#13 /var/www/vendor/hyperf/dispatcher/src/HttpRequestHandler.php(26): Hyperf\Dispatcher\AbstractRequestHandler->handleRequest()
#14 /var/www/vendor/hyperf/dispatcher/src/HttpDispatcher.php(40): Hyperf\Dispatcher\HttpRequestHandler->handle()
#15 /var/www/vendor/hyperf/http-server/src/Server.php(116): Hyperf\Dispatcher\HttpDispatcher->dispatch()
#16 {main}
返回结果:"test222"
```
####使用说明
> 1. 对方法加上 @LogTrace() 注解,建议注解的地方为http请求和rpc调用的入口处,便于使用脚本拿到原始传参便捷发起重试
> 2. 使用@LogTrace()注解的方法逻辑内任意地方使用LogTraceHandler::recordProcess记录输出(如须异步协程里也跟踪,第二参数须为true)
> 3. 日志跟踪数据都存放在trace_logs表,每一次请求或rpc调用都对应一条唯一记录,process_info字段按顺序记录了流程输出
### 7、基于@AsyncCoroutine()注解,对方法实现异步协程处理
对于耗时长,加快影响效率,可以使用写队列,消费处理。
也可以将这部分逻辑放到异步协程去处理,用法:
```
http 请求test1,调用 延迟5s的continueTest(该方法已加入AsyncCoroutine注解),
但接口马上返回结果。5s后,后台将continueTest方法逻辑执行输出到控制台或者写入日志
/**
* @LogTrace()
* @return array
* @throws \Exception
*/
public function test1()
{
//此处调用异步协程去处理,立刻返回结果
$this->continueTest($this->request->all());
return Coroutine::id();
}
/**
* 使用AsyncCoroutine注解,使该方法投递到子协程里执行
* @AsyncCoroutine()
*/
private function continueTest($params)
{
sleep(5); //睡眠5s
LogTraceHandler::recordProcess(Coroutine::id(),true);
LogTraceHandler::recordProcess(Coroutine::parentId(),true);
return Coroutine::id();
}
```
##### 使用须知
```
1. 给某个方法加上异步协程AsyncCoroutine注解,该方法被投放到另一个协程执行,
该方法的传参尽量不要使用模型对象(等连接资源对象),如果使用模型对象投递到
另一个协程,进行更新操作,会造成数据错乱。如要使用应禁止更新等操作
2. 配合基于@LogTrace()注解异步日志队列服务,可以使用 LogTraceHandler::recordProcess
对异步协程执行的情况进行跟踪,但在异步协程里,第二个参数必须为true
如 LogTraceHandler::recordProcess('记录输出数据',true);
```
\ No newline at end of file
src/Annotation/AsyncCoroutine.php
0 → 100644
View file @
af89a3c0
<?php
/**
* 异步协程处理
*/
namespace
Meibuyu\Micro\Annotation
;
use
Hyperf\Di\Annotation\AbstractAnnotation
;
/**
* @Annotation
* @Target("METHOD")
*/
class
AsyncCoroutine
extends
AbstractAnnotation
{
public
function
collectMethod
(
string
$className
,
?
string
$target
)
:
void
{
parent
::
collectMethod
(
$className
,
$target
);
// TODO: Change the autogenerated stub
}
}
\ No newline at end of file
src/Annotation/LogTrace.php
0 → 100644
View file @
af89a3c0
<?php
/**
* 日志追踪 写队列 批量更新到数据库
*/
namespace
Meibuyu\Micro\Annotation
;
use
Hyperf\Di\Annotation\AbstractAnnotation
;
/**
* @Annotation
* @Target("METHOD")
*/
class
LogTrace
extends
AbstractAnnotation
{
public
function
collectMethod
(
string
$className
,
?
string
$target
)
:
void
{
parent
::
collectMethod
(
$className
,
$target
);
// TODO: Change the autogenerated stub
}
}
\ No newline at end of file
src/Aspect/AsyncCoroutineAspect.php
0 → 100644
View file @
af89a3c0
<?php
namespace
Meibuyu\Micro\Aspect
;
use
Meibuyu\Micro\Handler\LogTrace\LogTraceHandler
;
use
Hyperf\Di\Annotation\Aspect
;
use
Hyperf\Di\Aop\AbstractAspect
;
use
Hyperf\Di\Aop\ProceedingJoinPoint
;
use
Meibuyu\Micro\Annotation\AsyncCoroutine
;
use
Hyperf\Utils\Coroutine
;
/**
* @Aspect(
* annotations={
* AsyncCoroutine::class
* }
* )
*/
class
AsyncCoroutineAspect
extends
AbstractAspect
{
/**
* 优先级
* @var int
*/
public
$priority
=
998
;
public
function
process
(
ProceedingJoinPoint
$proceedingJoinPoint
)
{
// 切面切入后,执行对应的方法会由此来负责
// $proceedingJoinPoint 为连接点,通过该类的 process() 方法调用原方法并获得结果
// 在调用前进行某些处理
return
Coroutine
::
create
(
function
()
use
(
$proceedingJoinPoint
){
LogTraceHandler
::
recordProcess
(
'投递到子协程任务,id:'
.
Coroutine
::
id
()
.
' ,类:'
.
$proceedingJoinPoint
->
className
.
' ,方法:'
.
$proceedingJoinPoint
->
methodName
.
' ,参数:'
.
json_encode
(
$proceedingJoinPoint
->
getArguments
())
,
true
);
$result
=
$proceedingJoinPoint
->
process
();
LogTraceHandler
::
recordProcess
(
'子协程任务id:'
.
Coroutine
::
id
()
.
'已完成,执行结果:'
.
json_encode
(
$result
),
true
);
});
}
}
\ No newline at end of file
src/Aspect/LogTraceAspect.php
0 → 100644
View file @
af89a3c0
<?php
namespace
Meibuyu\Micro\Aspect
;
use
Hyperf\Di\Annotation\Aspect
;
use
Hyperf\Di\Aop\AbstractAspect
;
use
Hyperf\Di\Aop\ProceedingJoinPoint
;
use
Hyperf\HttpServer\Contract\RequestInterface
;
use
Meibuyu\Micro\Annotation\LogTrace
;
use
Meibuyu\Micro\Handler\LogTrace\LogTraceHandler
;
/**
* @Aspect(
* annotations={
* LogTrace::class
* }
* )
*/
class
LogTraceAspect
extends
AbstractAspect
{
/**
* 优先级
* @var int
*/
public
$priority
=
999
;
public
function
process
(
ProceedingJoinPoint
$proceedingJoinPoint
)
{
$originParams
=
[
'called_params'
=>
$proceedingJoinPoint
->
getArguments
(),
'http_params'
=>
make
(
RequestInterface
::
class
)
->
all
()
];
LogTraceHandler
::
createLogTrace
(
$proceedingJoinPoint
->
className
.
'@'
.
$proceedingJoinPoint
->
methodName
,
$originParams
);
$result
=
$proceedingJoinPoint
->
process
();
LogTraceHandler
::
recordProcess
(
'返回结果:'
.
json_encode
(
$result
));
return
$result
;
}
}
\ No newline at end of file
src/Handler/LogTrace/LogTraceHandler.php
0 → 100644
View file @
af89a3c0
<?php
/**
* 执行日志记录
*/
namespace
Meibuyu\Micro\Handler\LogTrace
;
use
Meibuyu\Micro\Model\LogTrace
;
use
Hyperf\Utils\Coroutine
;
use
Swoole\Server
;
use
Throwable
;
/**
* Class LogTraceHandler
* @package App\Service
*/
class
LogTraceHandler
{
/**
* 1.对执行操作进行的方法入口注解 LogTrace
2.对程序主动进行输出
try {
//流程1
LogTraceHandler::recordProcess('执行到流程1');
//流程2
LogTraceHandler::recordProcess('执行到流程2');
//记录输出数组
LogTraceHandler::recordProcess(['test'=>1]);
//流程3 抛出一个异常
throw new Exception('test111');
//流程执行完成标记结束
LogTraceHandler::markComplete();
}catch (\Throwable $exception){
//记录异常日志
LogTraceHandler::recordProcess($exception);
}
* @param $params
* @param $source
* @return mixed
* @throws \Exception
*/
public
static
function
createLogTrace
(
$source
,
$params
)
{
if
(
!
Coroutine
::
inCoroutine
())
return
;
LogTrace
::
insertOrIgnore
([
'request_id'
=>
self
::
getRequestId
(),
'origin_params'
=>
json_encode
(
$params
),
'source'
=>
$source
,
'created_at'
=>
now
(),
'process_info'
=>
''
]);
}
/**
* @param bool $isInAsyncCoroutine
* @return string
* @throws \Exception
*/
private
static
function
getRequestId
(
$isInAsyncCoroutine
=
false
)
{
$workId
=
posix_getpid
();
$cid
=
$isInAsyncCoroutine
?
Coroutine
::
parentId
(
Coroutine
::
id
())
:
Coroutine
::
id
();
if
(
!
$cid
)
throw
new
\Exception
(
'无法使用协程标记录日志'
);
return
container
(
Server
::
class
)
->
stats
()[
'start_time'
]
.
$workId
.
$cid
;
}
/**
* 程序执行完成标记结束
* @throws \Exception
*/
public
static
function
markComplete
()
{
if
(
!
Coroutine
::
inCoroutine
())
return
;
container
(
LogTraceQueue
::
class
)
->
addToQueue
([
'request_id'
=>
self
::
getRequestId
(),
'is_completed'
=>
YES
]);
//LogTrace::where('request_id', self::getRequestId())->update(['is_completed' => YES]);
}
/*
* 事务回滚导致部分流程日志无法记录 暂写到文件里
* 待写到Es后可以避免
* 记录当前日志(包括异常捕获)
*/
public
static
function
recordProcess
(
$track
,
$isInAsyncCoroutine
=
false
)
{
if
(
empty
(
$track
))
return
;
if
(
!
Coroutine
::
inCoroutine
())
return
;
$logInfo
=
''
;
if
(
$track
instanceof
Throwable
)
{
$logInfo
=
$track
->
getMessage
()
.
"
\n
"
.
$track
->
getFile
()
.
" line:"
.
$track
->
getLine
()
.
"
\n
"
.
$track
->
getTraceAsString
();
}
if
(
is_array
(
$track
))
{
$logInfo
=
var_export
(
$track
,
true
);
}
if
(
is_string
(
$track
)
||
is_numeric
(
$track
))
{
$logInfo
=
$track
;
}
$logInfo
.=
"
\n\n
"
;
container
(
LogTraceQueue
::
class
)
->
addToQueue
([
'request_id'
=>
self
::
getRequestId
(
$isInAsyncCoroutine
),
'process_info'
=>
$logInfo
]);
// $log = LogTrace::where('request_id', self::getRequestId())->first();
// if(empty($log)) return ;
//
// $log->update([
// 'process_info' => Db::raw("CONCAT(process_info,\"{$logInfo}\")")
// ]);
// //写入文件
// put_log(
// self::getRequestId()."\n".
// $logInfo,
// str_replace('\\','_',$log->source).'/'.today()
// );
}
// /**
// * 记录流程日志
// * @param $funName
// * @param $arguments
// * @throws \Exception
// */
// public static function __callStatic($funName, $arguments)
// {
// if(self::$instance){
// throw new \Exception('请用LogTraceService::createLogTrace 先实例化对象');
// }
// self::$instance->$funName($arguments);
// }
}
\ No newline at end of file
src/Handler/LogTrace/LogTraceQueue.php
0 → 100644
View file @
af89a3c0
<?php
namespace
Meibuyu\Micro\Handler\LogTrace
;
use
Meibuyu\Micro\Handler\RedisQueueBatchHandler
;
use
Meibuyu\Micro\Model\LogTrace
;
class
LogTraceQueue
extends
RedisQueueBatchHandler
{
protected
function
specifyQueueName
()
{
$this
->
queue_name
=
env
(
'APP_NAME'
)
.
':LogTraceQueue'
;
}
/**
* 具体批处理逻辑
* @param $dataArr
*/
function
batchDeal
(
$dataArr
)
{
$updateArr
=
[];
$updateMarkComplete
=
[];
foreach
(
$dataArr
as
$arr
){
if
(
isset
(
$arr
[
'is_completed'
])
&&
$arr
[
'is_completed'
]
==
YES
){
$updateMarkComplete
[]
=
$arr
[
'request_id'
]
;
continue
;
}
$updateArr
[
$arr
[
'request_id'
]][
'request_id'
]
=
$arr
[
'request_id'
];
$process_info
=
isset
(
$updateArr
[
$arr
[
'request_id'
]][
'process_info'
])
?
$updateArr
[
$arr
[
'request_id'
]][
'process_info'
]
:
''
;
$updateArr
[
$arr
[
'request_id'
]][
'process_info'
]
=
$process_info
.
$arr
[
'process_info'
];
}
//执行前先查一下 原先有的记录 避免被覆盖
$originLogs
=
LogTrace
::
whereIn
(
'request_id'
,
array_column
(
$dataArr
,
'request_id'
))
->
pluck
(
'process_info'
,
'request_id'
)
->
toArray
();
//追加到原记录之后
$updateArr
=
array_map
(
function
(
$item
)
use
(
$originLogs
){
if
(
!
isset
(
$originLogs
[
$item
[
'request_id'
]]))
return
$item
;
if
(
empty
(
$originLogs
[
$item
[
'request_id'
]]))
return
$item
;
$item
[
'process_info'
]
=
$originLogs
[
$item
[
'request_id'
]]
.
$item
[
'process_info'
];
return
$item
;
},
$updateArr
);
//批量更新
if
(
!
empty
(
$updateMarkComplete
)){
LogTrace
::
whereIn
(
'request_id'
,
$updateMarkComplete
)
->
update
([
'is_completed'
=>
YES
]);
}
LogTrace
::
getModel
()
->
batchUpdateByField
(
array_values
(
$updateArr
),
'request_id'
);
}
}
\ No newline at end of file
src/Handler/RedisQueueBatchHandler.php
0 → 100644
View file @
af89a3c0
<?php
namespace
Meibuyu\Micro\Handler
;
abstract
class
RedisQueueBatchHandler
{
/**
* 重试次数
*/
protected
$retrys
=
0
;
protected
$queue_name
;
const
MAX_RETRY_TIMES
=
3
;
const
BATCH_DEAL_NUM
=
100
;
const
ERROR_RETRY_CODE
=
9000
;
/**
* RedisQueueBatchHandler constructor.
*/
public
function
__construct
()
{
$this
->
specifyQueueName
();
}
/**
* 添加至队列
*/
public
function
addToQueue
(
$data
)
{
redis
()
->
rPush
(
$this
->
queue_name
,
json_encode
(
$data
));
}
/**
* 通过管道将执行失败的数据重回队列
* @param $arr
* @param \Hyperf\Redis\Redis $redis
*/
protected
function
backToQueue
(
$arr
,
\Hyperf\Redis\Redis
$redis
)
{
//开启管道
$pip
=
$redis
->
multi
();
foreach
(
$arr
as
$i
){
$pip
->
lPush
(
$this
->
queue_name
,
$i
);
}
//批量提交
$pip
->
exec
();
}
//批处理具体逻辑
abstract
protected
function
batchDeal
(
$data
);
abstract
protected
function
specifyQueueName
();
public
function
consume
()
{
$redis
=
redis
();
while
(
true
){
//有数据则放回 否则延迟20s后返回空数组,同时可保持连接活跃
//执行过程 如果redis服务异常,调用io操作时异常退出,框架重新拉起该进程
$exist
=
$redis
->
blPop
(
$this
->
queue_name
,
30
);
if
(
empty
(
$exist
))
continue
;
$redis
->
lPush
(
$this
->
queue_name
,
$exist
[
1
]);
//每次从列表取100
$arr
=
$redis
->
lRange
(
$this
->
queue_name
,
0
,
self
::
BATCH_DEAL_NUM
-
1
);
//取完 从队列删掉
$redis
->
lTrim
(
$this
->
queue_name
,
count
(
$arr
),
-
1
);
//数据格式化
$formatArr
=
array_map
(
function
(
$item
){
return
json_decode
(
$item
,
true
);
},
$arr
);
try
{
//具体批处理逻辑
$this
->
batchDeal
(
$formatArr
);
}
catch
(
\Throwable
$exception
){
//错误码为100 通过管道从新推到队列
if
(
$exception
->
getCode
()
==
self
::
ERROR_RETRY_CODE
){
if
(
$this
->
retrys
<
self
::
MAX_RETRY_TIMES
){
//重试次数不超过3次
$this
->
backToQueue
(
$arr
,
$redis
);
$this
->
retrys
++
;
}
else
{
$this
->
retrys
=
0
;
//重置当前次数
$this
->
errorWriteToFile
(
$formatArr
,
$exception
);
}
}
else
{
$this
->
errorWriteToFile
(
$formatArr
,
$exception
);
}
}
}
}
/**
* @param $data
* @param \Throwable $exception
*/
private
function
errorWriteToFile
(
$data
,
\Throwable
$exception
)
{
put_log
(
json_encode
(
$data
)
.
"
\n
"
.
$exception
->
getMessage
()
.
"
\n
"
.
$exception
->
getFile
()
.
' line:'
.
$exception
->
getLine
()
.
"
\n
"
.
$exception
->
getTraceAsString
()
,
'RedisQueue/'
.
$this
->
queue_name
.
'/'
.
today
()
);
}
}
\ No newline at end of file
src/Model/BaseModel.php
0 → 100644
View file @
af89a3c0
<?php
declare
(
strict_types
=
1
);
namespace
Meibuyu\Micro\Model
;
use
Hyperf\DbConnection\Model\Model
;
abstract
class
BaseModel
extends
Model
{
/**
* Function addDataToMysql
* 批量插入数据到数据库,无则插入,重复则更新
*/
public
function
batchUpdateOrCreateByUniqueKey
(
$data
)
{
$buildInsertBatchSqlStr
=
$this
->
buildInsertBatchSqlStr
(
$data
);
$sql
=
$buildInsertBatchSqlStr
[
'sql'
];
return
$this
->
getConnection
()
->
update
(
$sql
);
}
/**
* Function buildInsertBatchSqlStr
* 组装mysql
*/
private
function
buildInsertBatchSqlStr
(
$data
)
{
//从data中 获取更新的字段
if
(
empty
(
$data
))
{
return
false
;
}
$new_data
=
[];
$maxdim
=
$this
->
getmaxdim
(
$data
);
if
(
$maxdim
==
1
){
// 兼容一维数组
$new_data
[]
=
$data
;
}
elseif
(
$maxdim
>
2
){
// 大于2维返回false
return
false
;
}
else
{
$new_data
=
$data
;
}
$keys_arr
=
[];
$datas_arr
=
[];
$where_arr
=
[];
foreach
(
$new_data
as
$k
=>
$v
)
{
if
(
!
$keys_arr
)
{
$keys_arr
=
array_keys
(
$v
);
}
foreach
(
$v
as
$k2
=>&
$v2
){
if
(
is_string
(
$v2
)){
$v2
=
"'"
.
addslashes
(
$v2
)
.
"'"
;
}
}
//数据重组
$onedata
=
"( "
.
implode
(
','
,
$v
)
.
" )"
;
$datas_arr
[]
=
$onedata
;
unset
(
$onedata
);
}
// 组装格式 pt_uid=VALUES(pt_uid),
foreach
(
$keys_arr
as
$k2
=>
$v2
)
{
$where_arr
[]
=
"
$v2
= VALUES(
$v2
)"
;
}
$keys_str
=
implode
(
','
,
$keys_arr
);
$datas_str
=
implode
(
','
,
$datas_arr
);
$where_str
=
implode
(
','
,
$where_arr
);
$sql
=
""
;
if
(
$keys_str
&&
$datas_str
&&
$where_str
)
{
$table
=
$this
->
getTable
();
$sql
=
" INSERT INTO
$table
(
$keys_str
) VALUES
$datas_str
ON DUPLICATE KEY UPDATE
$where_str
"
;
}
unset
(
$keys_str
,
$where_str
);
return
[
'sql'
=>
$sql
,
'count'
=>
count
(
$datas_arr
)];
}
/**
* Function getmaxdim
* 数组维数
*/
private
function
getmaxdim
(
$vDim
){
if
(
!
is_array
(
$vDim
))
return
0
;
else
{
$max1
=
0
;
foreach
(
$vDim
as
$item1
)
{
$t1
=
$this
->
getmaxdim
(
$item1
);
if
(
$t1
>
$max1
)
$max1
=
$t1
;
}
return
$max1
+
1
;
}
}
/**
* 批量更新数组 有则
* @param $data array 待更新的数据,二维数组格式
* @param string $field string 值不同的条件,默认为id
* @return bool|string
*/
public
function
batchUpdateByField
(
$data
,
$field
=
null
)
{
if
(
is_null
(
$field
))
$field
=
$this
->
getKeyName
();
if
(
!
is_array
(
$data
)
||
!
$field
)
{
return
false
;
}
$updates
=
$this
->
parseUpdate
(
$data
,
$field
);
// 获取所有键名为$field列的值,值两边加上单引号,保存在$fields数组中
// array_column()函数需要PHP5.5.0+,如果小于这个版本,可以自己实现,
// 参考地址:http://php.net/manual/zh/function.array-column.php#118831
$fields
=
array_column
(
$data
,
$field
);
$fields
=
implode
(
','
,
array_map
(
function
(
$value
)
{
return
"'"
.
$value
.
"'"
;
},
$fields
));
$sql
=
sprintf
(
"UPDATE `%s` SET %s WHERE `%s` IN (%s) "
,
$this
->
getTable
(),
$updates
,
$field
,
$fields
);
return
$this
->
getConnection
()
->
update
(
$sql
);
}
/**
* 将二维数组转换成CASE WHEN THEN的批量更新条件
* @param $data array 二维数组
* @param $field string 列名
* @return string sql语句
*/
private
function
parseUpdate
(
$data
,
$field
)
{
$sql
=
''
;
$keys
=
array_keys
(
current
(
$data
));
foreach
(
$keys
as
$column
)
{
if
(
$column
==
$keys
)
continue
;
$sql
.=
sprintf
(
"`%s` = CASE `%s`
\n
"
,
$column
,
$field
);
foreach
(
$data
as
$line
)
{
$sql
.=
sprintf
(
"WHEN '%s' THEN '%s'
\n
"
,
$line
[
$field
],
!
is_numeric
(
$line
[
$column
])
?
addslashes
(
$line
[
$column
])
:
$line
[
$column
]
);
}
$sql
.=
"END,"
;
}
return
rtrim
(
$sql
,
','
);
}
}
src/Model/LogTrace.php
0 → 100644
View file @
af89a3c0
<?php
/**
* Created by PhpStorm.
* User: Auto generated.
* Date: 2022-01-20
* Time: 10:08:03
* Description:
*/
declare
(
strict_types
=
1
);
namespace
Meibuyu\Micro\Model
;
/**
* 模型类 LogTrace
* @package App\Model
* @property integer $id
* @property string $source 路径
* @property string $origin_params
* @property integer $is_completed
* @property string $created_at
* @property string $process_info
*/
class
LogTrace
extends
BaseModel
{
protected
$table
=
'trace_logs'
;
/**
* 是否使用时间戳管理
* @var bool
*/
public
$timestamps
=
false
;
/**
* 可写入数据的字段.
* @var array
*/
protected
$fillable
=
[
'source'
,
'origin_params'
,
'is_completed'
,
'process_info'
,
];
}
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment