Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Support
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
M
meibuyu-micro
Project
Project
Details
Activity
Releases
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Boards
Labels
Milestones
Merge Requests
1
Merge Requests
1
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
without authentication
meibuyu-micro
Commits
893f68a9
Commit
893f68a9
authored
Jan 13, 2023
by
Liu lu
Browse files
Options
Browse Files
Download
Plain Diff
Merge branch 'dev_async_log1' of
http://git.huaperfect.com/without_auth/meibuyu-micro
into v2.2.2-t
parents
c2bdfe37
efd84316
Changes
6
Hide whitespace changes
Inline
Side-by-side
Showing
6 changed files
with
134 additions
and
74 deletions
+134
-74
AsyncCoroutineAspect.php
src/Aspect/AsyncCoroutineAspect.php
+22
-15
LogTraceAspect.php
src/Aspect/LogTraceAspect.php
+37
-12
LogTraceHandler.php
src/Handler/LogTrace/LogTraceHandler.php
+11
-13
LogTraceQueue.php
src/Handler/LogTrace/LogTraceQueue.php
+30
-3
RedisQueueBatchHandler.php
src/Handler/RedisQueueBatchHandler.php
+32
-29
BaseModel.php
src/Model/BaseModel.php
+2
-2
No files found.
src/Aspect/AsyncCoroutineAspect.php
View file @
893f68a9
...
@@ -7,6 +7,7 @@ use Hyperf\Di\Aop\AbstractAspect;
...
@@ -7,6 +7,7 @@ use Hyperf\Di\Aop\AbstractAspect;
use
Hyperf\Di\Aop\ProceedingJoinPoint
;
use
Hyperf\Di\Aop\ProceedingJoinPoint
;
use
Meibuyu\Micro\Annotation\AsyncCoroutine
;
use
Meibuyu\Micro\Annotation\AsyncCoroutine
;
use
Hyperf\Utils\Coroutine
;
use
Hyperf\Utils\Coroutine
;
use
Hyperf\Utils\Context
;
/**
/**
* @Aspect(
* @Aspect(
...
@@ -29,21 +30,27 @@ class AsyncCoroutineAspect extends AbstractAspect
...
@@ -29,21 +30,27 @@ class AsyncCoroutineAspect extends AbstractAspect
// 切面切入后,执行对应的方法会由此来负责
// 切面切入后,执行对应的方法会由此来负责
// $proceedingJoinPoint 为连接点,通过该类的 process() 方法调用原方法并获得结果
// $proceedingJoinPoint 为连接点,通过该类的 process() 方法调用原方法并获得结果
// 在调用前进行某些处理
// 在调用前进行某些处理
return
Coroutine
::
create
(
function
()
use
(
$proceedingJoinPoint
){
$fd
=
Context
::
get
(
'log_trace_request_id'
);
return
Coroutine
::
create
(
function
()
use
(
$proceedingJoinPoint
,
$fd
){
LogTraceHandler
::
recordProcess
(
'投递到子协程任务,id:'
.
Coroutine
::
id
()
try
{
.
' ,类:'
.
$proceedingJoinPoint
->
className
Context
::
set
(
'log_trace_request_id'
,
$fd
);
.
' ,方法:'
.
$proceedingJoinPoint
->
methodName
LogTraceHandler
::
recordProcess
(
.
' ,参数:'
.
json_encode
(
$proceedingJoinPoint
->
getArguments
())
'投递到子协程任务,id:'
.
Coroutine
::
id
()
,
.
' ,类:'
.
$proceedingJoinPoint
->
className
true
.
' ,方法:'
.
$proceedingJoinPoint
->
methodName
);
.
' ,参数:'
.
json_encode
(
$proceedingJoinPoint
->
getArguments
())
$result
=
$proceedingJoinPoint
->
process
();
,
LogTraceHandler
::
recordProcess
(
true
'子协程任务id:'
.
Coroutine
::
id
()
.
'已完成,执行结果:'
.
);
json_encode
(
$result
),
true
$result
=
$proceedingJoinPoint
->
process
();
);
LogTraceHandler
::
recordProcess
(
'子协程任务id:'
.
Coroutine
::
id
()
.
'已完成,执行结果:'
.
json_encode
(
$result
),
true
);
}
catch
(
\Throwable
$exception
){
LogTraceHandler
::
recordProcess
(
$exception
,
true
);
}
});
});
...
...
src/Aspect/LogTraceAspect.php
View file @
893f68a9
...
@@ -5,8 +5,11 @@ use Hyperf\Di\Annotation\Aspect;
...
@@ -5,8 +5,11 @@ use Hyperf\Di\Annotation\Aspect;
use
Hyperf\Di\Aop\AbstractAspect
;
use
Hyperf\Di\Aop\AbstractAspect
;
use
Hyperf\Di\Aop\ProceedingJoinPoint
;
use
Hyperf\Di\Aop\ProceedingJoinPoint
;
use
Hyperf\HttpServer\Contract\RequestInterface
;
use
Hyperf\HttpServer\Contract\RequestInterface
;
use
Hyperf\Utils\Context
;
use
Meibuyu\Micro\Annotation\LogTrace
;
use
Meibuyu\Micro\Annotation\LogTrace
;
use
Meibuyu\Micro\Handler\LogTrace\LogTraceHandler
;
use
Meibuyu\Micro\Handler\LogTrace\LogTraceHandler
;
use
Psr\Http\Message\ServerRequestInterface
;
use
Hyperf\Utils\Coroutine
;
/**
/**
* @Aspect(
* @Aspect(
...
@@ -26,17 +29,39 @@ class LogTraceAspect extends AbstractAspect
...
@@ -26,17 +29,39 @@ class LogTraceAspect extends AbstractAspect
public
function
process
(
ProceedingJoinPoint
$proceedingJoinPoint
)
public
function
process
(
ProceedingJoinPoint
$proceedingJoinPoint
)
{
{
$originParams
=
[
try
{
'called_params'
=>
$proceedingJoinPoint
->
getArguments
(),
$object
=
Context
::
get
(
ServerRequestInterface
::
class
);
'http_params'
=>
make
(
RequestInterface
::
class
)
->
all
()
$fd
=
$object
?
(
'fd'
.
$object
->
getSwooleRequest
()
->
fd
)
:
(
'co'
.
Coroutine
::
id
());
];
Context
::
set
(
'log_trace_request_id'
,
$fd
);
LogTraceHandler
::
createLogTrace
(
$proceedingJoinPoint
->
className
.
'@'
.
$proceedingJoinPoint
->
methodName
,
$originParams
=
[
$originParams
'called_params'
=>
$proceedingJoinPoint
->
getArguments
(),
);
'http_params'
=>
$object
?
make
(
RequestInterface
::
class
)
->
all
()
:
[]
$result
=
$proceedingJoinPoint
->
process
();
];
LogTraceHandler
::
recordProcess
(
'返回结果:'
.
json_encode
(
$result
));
LogTraceHandler
::
createLogTrace
(
$proceedingJoinPoint
->
className
.
'@'
.
$proceedingJoinPoint
->
methodName
,
return
$result
;
$originParams
);
$requestId
=
LogTraceHandler
::
getRequestId
();
register_shutdown_function
(
function
()
use
(
$requestId
,
$originParams
,
$proceedingJoinPoint
)
{
put_log
(
json_encode
([
'request_id'
=>
$requestId
,
'error_msg'
=>
error_get_last
(),
'params'
=>
$originParams
]),
str_replace
(
'\\'
,
'_'
,
$proceedingJoinPoint
->
className
)
.
'/'
.
$proceedingJoinPoint
->
methodName
.
'/'
.
date
(
'Ymd'
)
.
'.txt'
);
});
$result
=
$proceedingJoinPoint
->
process
();
LogTraceHandler
::
recordProcess
(
'返回结果:'
.
json_encode
(
$result
));
LogTraceHandler
::
markComplete
();
return
$result
;
}
catch
(
\Throwable
$exception
){
LogTraceHandler
::
recordProcess
(
$exception
);
throw
$exception
;
}
}
}
}
}
\ No newline at end of file
src/Handler/LogTrace/LogTraceHandler.php
View file @
893f68a9
...
@@ -9,6 +9,7 @@ namespace Meibuyu\Micro\Handler\LogTrace;
...
@@ -9,6 +9,7 @@ namespace Meibuyu\Micro\Handler\LogTrace;
use
Meibuyu\Micro\Model\LogTrace
;
use
Meibuyu\Micro\Model\LogTrace
;
use
Hyperf\Utils\Coroutine
;
use
Hyperf\Utils\Coroutine
;
use
Hyperf\Utils\Context
;
use
Swoole\Server
;
use
Swoole\Server
;
use
Throwable
;
use
Throwable
;
...
@@ -66,17 +67,16 @@ class LogTraceHandler
...
@@ -66,17 +67,16 @@ class LogTraceHandler
* @return string
* @return string
* @throws \Exception
* @throws \Exception
*/
*/
p
rivate
static
function
getRequestId
(
$isInAsyncCoroutine
=
false
)
p
ublic
static
function
getRequestId
(
)
{
{
$workId
=
posix_getpid
();
$workId
=
posix_getpid
();
$cid
=
$isInAsyncCoroutine
?
Coroutine
::
parentId
(
Coroutine
::
id
())
:
Coroutine
::
id
(
);
$cid
=
Context
::
get
(
'log_trace_request_id'
);
if
(
!
$cid
)
throw
new
\Exception
(
'无法使用协程标记录日志'
)
;
if
(
!
$cid
)
return
false
;
return
container
(
Server
::
class
)
->
stats
()[
'start_time'
]
.
$workId
.
$cid
;
return
container
(
Server
::
class
)
->
stats
()[
'start_time'
]
.
$workId
.
$cid
;
}
}
/**
/**
* 程序执行完成标记结束
* 程序执行完成标记结束
* @throws \Exception
*/
*/
public
static
function
markComplete
()
public
static
function
markComplete
()
{
{
...
@@ -93,9 +93,8 @@ class LogTraceHandler
...
@@ -93,9 +93,8 @@ class LogTraceHandler
* 待写到Es后可以避免
* 待写到Es后可以避免
* 记录当前日志(包括异常捕获)
* 记录当前日志(包括异常捕获)
*/
*/
public
static
function
recordProcess
(
$track
,
$isInAsyncCoroutine
=
false
)
public
static
function
recordProcess
(
$track
,
$isInAsyncCoroutine
=
false
,
$requestId
=
0
)
{
{
if
(
empty
(
$track
))
return
;
if
(
!
Coroutine
::
inCoroutine
())
return
;
if
(
!
Coroutine
::
inCoroutine
())
return
;
$logInfo
=
''
;
$logInfo
=
''
;
...
@@ -104,18 +103,17 @@ class LogTraceHandler
...
@@ -104,18 +103,17 @@ class LogTraceHandler
$track
->
getFile
()
.
" line:"
.
$track
->
getFile
()
.
" line:"
.
$track
->
getLine
()
.
"
\n
"
.
$track
->
getLine
()
.
"
\n
"
.
$track
->
getTraceAsString
();
$track
->
getTraceAsString
();
}
}
else
if
(
is_string
(
$track
)
||
is_numeric
(
$track
))
{
if
(
is_array
(
$track
))
{
$logInfo
=
var_export
(
$track
,
true
);
}
if
(
is_string
(
$track
)
||
is_numeric
(
$track
))
{
$logInfo
=
$track
;
$logInfo
=
$track
;
}
else
{
$logInfo
=
json_encode
(
$track
,
JSON_PRETTY_PRINT
);
;
//print_r($track, true);
}
}
$logInfo
.=
"
\n\n
"
;
$logInfo
.=
"
\n\n
"
;
if
(
!
self
::
getRequestId
())
return
;
container
(
LogTraceQueue
::
class
)
->
addToQueue
([
container
(
LogTraceQueue
::
class
)
->
addToQueue
([
'request_id'
=>
self
::
getRequestId
(
$isInAsyncCoroutine
),
'request_id'
=>
self
::
getRequestId
(),
'process_info'
=>
$logInfo
'process_info'
=>
$logInfo
]);
]);
// $log = LogTrace::where('request_id', self::getRequestId())->first();
// $log = LogTrace::where('request_id', self::getRequestId())->first();
...
...
src/Handler/LogTrace/LogTraceQueue.php
View file @
893f68a9
...
@@ -5,14 +5,39 @@ namespace Meibuyu\Micro\Handler\LogTrace;
...
@@ -5,14 +5,39 @@ namespace Meibuyu\Micro\Handler\LogTrace;
use
Meibuyu\Micro\Handler\RedisQueueBatchHandler
;
use
Meibuyu\Micro\Handler\RedisQueueBatchHandler
;
use
Meibuyu\Micro\Model\LogTrace
;
use
Meibuyu\Micro\Model\LogTrace
;
use
Hyperf\DbConnection\Db
;
use
Hyperf\Redis\Redis
;
class
LogTraceQueue
extends
RedisQueueBatchHandler
class
LogTraceQueue
extends
RedisQueueBatchHandler
{
{
protected
function
specifyQueueName
()
protected
function
specifyQueueName
()
{
{
$this
->
queue_name
=
env
(
'APP_NAME'
)
.
':LogTraceQueue'
;
$this
->
createLogTraceTable
();
return
env
(
'APP_NAME'
)
.
':LogTraceQueue'
;
}
protected
function
specifyRedisServer
()
:
Redis
{
return
redis
();
}
private
function
createLogTraceTable
()
{
Db
::
insert
(
"
create table IF NOT EXISTS `trace_logs` (
`id` bigint(20) NOT NULL AUTO_INCREMENT,
`request_id` varchar(32) NOT NULL COMMENT '一次http或rpc请求调用的唯一key',
`source` varchar(255) NOT NULL COMMENT '来源,包含调用类命名空间及方法',
`origin_params` json NOT NULL COMMENT '记录注解方法被调用开始的原始传参',
`is_completed` tinyint(1) NOT NULL DEFAULT '0' COMMENT '此请求是否完成,使用LogTraceHandler::markComplete()标记',
`process_info` mediumtext NOT NULL COMMENT '执行过程中输出,使用LogTraceHandler::recordProcess()记录',
`created_at` datetime NOT NULL COMMENT '日志记录开始时间',
PRIMARY KEY (`id`),
KEY `created_at` (`created_at`),
UNIQUE KEY `request_id` (`request_id`)
) ENGINE=InnoDB AUTO_INCREMENT=5950 DEFAULT CHARSET=utf8mb4;
"
);
}
}
/**
/**
...
@@ -57,7 +82,9 @@ class LogTraceQueue extends RedisQueueBatchHandler
...
@@ -57,7 +82,9 @@ class LogTraceQueue extends RedisQueueBatchHandler
if
(
!
empty
(
$updateMarkComplete
)){
if
(
!
empty
(
$updateMarkComplete
)){
LogTrace
::
whereIn
(
'request_id'
,
$updateMarkComplete
)
->
update
([
'is_completed'
=>
YES
]);
LogTrace
::
whereIn
(
'request_id'
,
$updateMarkComplete
)
->
update
([
'is_completed'
=>
YES
]);
}
}
LogTrace
::
getModel
()
->
batchUpdateByField
(
array_values
(
$updateArr
),
'request_id'
);
if
(
!
empty
(
$updateArr
)){
LogTrace
::
getModel
()
->
batchUpdateByField
(
array_values
(
$updateArr
),
'request_id'
);
}
}
}
...
...
src/Handler/RedisQueueBatchHandler.php
View file @
893f68a9
...
@@ -2,21 +2,25 @@
...
@@ -2,21 +2,25 @@
namespace
Meibuyu\Micro\Handler
;
namespace
Meibuyu\Micro\Handler
;
use
Hyperf\Redis\Redis
;
abstract
class
RedisQueueBatchHandler
abstract
class
RedisQueueBatchHandler
{
{
/**
/**
* 重试次数
* 重试次数
*/
*/
protected
$retry
s
=
0
;
protected
$retry
=
0
;
protected
$queue_name
;
protected
$queue_name
;
/**
* @var Redis
*/
protected
$redis
;
const
MAX_RETRY_TIMES
=
3
;
const
MAX_RETRY_TIMES
=
3
;
const
BATCH_DEAL_NUM
=
100
;
const
BATCH_DEAL_NUM
=
36
;
const
ERROR_RETRY_CODE
=
9000
;
const
ERROR_RETRY_CODE
=
9000
;
...
@@ -25,53 +29,49 @@ abstract class RedisQueueBatchHandler
...
@@ -25,53 +29,49 @@ abstract class RedisQueueBatchHandler
*/
*/
public
function
__construct
()
public
function
__construct
()
{
{
$this
->
specifyQueueName
();
$this
->
queue_name
=
$this
->
specifyQueueName
();
$this
->
redis
=
$this
->
specifyRedisServer
();
}
}
/**
/**
* 添加至队列
* @param $data
* @param $specifyQueue
*/
*/
public
function
addToQueue
(
$data
)
public
function
addToQueue
(
$data
,
$specifyQueue
=
''
)
{
{
redis
()
->
rPush
(
$this
->
queue_name
,
json_encode
(
$data
));
$this
->
redis
->
rPush
(
$specifyQueue
?:
$this
->
queue_name
,
json_encode
(
$data
));
}
}
/**
/**
*
通过管道
将执行失败的数据重回队列
* 将执行失败的数据重回队列
* @param $arr
* @param $arr
* @param \Hyperf\Redis\Redis $redis
*/
*/
protected
function
backToQueue
(
$arr
,
\Hyperf\Redis\Redis
$redis
)
protected
function
backToQueue
(
$arr
)
{
{
//开启管道
array_unshift
(
$arr
,
$this
->
queue_name
);
$pip
=
$redis
->
multi
();
call_user_func_array
([
$this
->
redis
,
'lPush'
],
$arr
);
foreach
(
$arr
as
$i
){
$pip
->
lPush
(
$this
->
queue_name
,
$i
);
}
//批量提交
$pip
->
exec
();
}
}
//批处理具体逻辑
//批处理具体逻辑
abstract
protected
function
batchDeal
(
$data
);
abstract
protected
function
batchDeal
(
$data
);
abstract
protected
function
specifyQueueName
();
abstract
protected
function
specifyQueueName
();
abstract
protected
function
specifyRedisServer
()
:
Redis
;
public
function
consume
()
public
function
consume
()
{
{
$redis
=
redis
();
while
(
true
){
while
(
true
){
//有数据则放回 否则延迟20s后返回空数组,同时可保持连接活跃
//有数据则放回 否则延迟20s后返回空数组,同时可保持连接活跃
//执行过程 如果redis服务异常,调用io操作时异常退出,框架重新拉起该进程
//执行过程 如果redis服务异常,调用io操作时异常退出,框架重新拉起该进程
$exist
=
$redis
->
blPop
(
$this
->
queue_name
,
30
);
$exist
=
$
this
->
redis
->
blPop
(
$this
->
queue_name
,
30
);
if
(
empty
(
$exist
))
continue
;
if
(
empty
(
$exist
))
continue
;
$redis
->
lPush
(
$this
->
queue_name
,
$exist
[
1
]);
$
this
->
redis
->
lPush
(
$this
->
queue_name
,
$exist
[
1
]);
//每次从列表取100
//每次从列表取100
$arr
=
$redis
->
lRange
(
$this
->
queue_name
,
0
,
self
::
BATCH_DEAL_NUM
-
1
);
$arr
=
$
this
->
redis
->
lRange
(
$this
->
queue_name
,
0
,
self
::
BATCH_DEAL_NUM
-
1
);
//取完 从队列删掉
$redis
->
lTrim
(
$this
->
queue_name
,
count
(
$arr
),
-
1
);
//数据格式化
//数据格式化
$formatArr
=
array_map
(
function
(
$item
){
$formatArr
=
array_map
(
function
(
$item
){
return
json_decode
(
$item
,
true
);
return
json_decode
(
$item
,
true
);
...
@@ -79,20 +79,23 @@ abstract class RedisQueueBatchHandler
...
@@ -79,20 +79,23 @@ abstract class RedisQueueBatchHandler
try
{
try
{
//具体批处理逻辑
//具体批处理逻辑
$this
->
batchDeal
(
$formatArr
);
$this
->
batchDeal
(
$formatArr
);
//取完 从队列删掉
$this
->
redis
->
lTrim
(
$this
->
queue_name
,
count
(
$arr
),
-
1
);
}
catch
(
\Throwable
$exception
){
}
catch
(
\Throwable
$exception
){
//错误码为100
通过管道从
新推到队列
//错误码为100
重
新推到队列
if
(
$exception
->
getCode
()
==
self
::
ERROR_RETRY_CODE
){
if
(
$exception
->
getCode
()
==
self
::
ERROR_RETRY_CODE
){
if
(
$this
->
retry
s
<
self
::
MAX_RETRY_TIMES
){
//重试次数不超过3次
if
(
$this
->
retry
<
self
::
MAX_RETRY_TIMES
){
//重试次数不超过3次
$this
->
backToQueue
(
$arr
,
$redis
);
//$this->backToQueue($arr
);
$this
->
retry
s
++
;
$this
->
retry
++
;
}
else
{
}
else
{
$this
->
retrys
=
0
;
//重置当前次数
$this
->
redis
->
lTrim
(
$this
->
queue_name
,
count
(
$arr
),
-
1
);
$this
->
retry
=
0
;
//重置当前次数
$this
->
errorWriteToFile
(
$formatArr
,
$exception
);
$this
->
errorWriteToFile
(
$formatArr
,
$exception
);
}
}
}
else
{
}
else
{
$this
->
retry
=
0
;
$this
->
errorWriteToFile
(
$formatArr
,
$exception
);
$this
->
errorWriteToFile
(
$formatArr
,
$exception
);
}
}
...
...
src/Model/BaseModel.php
View file @
893f68a9
...
@@ -74,7 +74,7 @@ abstract class BaseModel extends Model
...
@@ -74,7 +74,7 @@ abstract class BaseModel extends Model
$where_str
=
implode
(
','
,
$where_arr
);
$where_str
=
implode
(
','
,
$where_arr
);
$sql
=
""
;
$sql
=
""
;
if
(
$keys_str
&&
$datas_str
&&
$where_str
)
{
if
(
$keys_str
&&
$datas_str
&&
$where_str
)
{
$table
=
$this
->
getTable
();
$table
=
env
(
'DB_PREFIX'
,
''
)
.
$this
->
getTable
();
$sql
=
" INSERT INTO
$table
(
$keys_str
) VALUES
$datas_str
ON DUPLICATE KEY UPDATE
$where_str
"
;
$sql
=
" INSERT INTO
$table
(
$keys_str
) VALUES
$datas_str
ON DUPLICATE KEY UPDATE
$where_str
"
;
}
}
unset
(
$keys_str
,
$where_str
);
unset
(
$keys_str
,
$where_str
);
...
@@ -125,7 +125,7 @@ abstract class BaseModel extends Model
...
@@ -125,7 +125,7 @@ abstract class BaseModel extends Model
$sql
=
sprintf
(
$sql
=
sprintf
(
"UPDATE `%s` SET %s WHERE `%s` IN (%s) "
,
"UPDATE `%s` SET %s WHERE `%s` IN (%s) "
,
$this
->
getTable
(),
$updates
,
$field
,
$fields
env
(
'DB_PREFIX'
,
''
)
.
$this
->
getTable
(),
$updates
,
$field
,
$fields
);
);
return
$this
->
getConnection
()
->
update
(
$sql
);
return
$this
->
getConnection
()
->
update
(
$sql
);
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment