xiaoai/php_server/app/common/workerman/wechat/services/MessageService.php

307 lines
11 KiB
PHP

<?php
declare(strict_types=1);
namespace app\common\workerman\wechat\services;
use app\common\workerman\wechat\traits\{LoggerTrait, ResponseTrait, CacheTrait, AichatTrait, OperationTrait};
use app\common\workerman\wechat\validators\MessageValidator;
use app\common\workerman\wechat\exceptions\ResponseException;
use Workerman\Connection\TcpConnection;
use Jubo\JuLiao\IM\Wx\Proto\TransportMessage;
use Jubo\JuLiao\IM\Wx\Proto\EnumMsgType;
use app\common\workerman\wechat\constants\ResponseCode;
use app\common\workerman\wechat\constants\ClientRequestMsgType;
use Channel\Client;
use Google\Protobuf\Any;
use Google\Protobuf\Internal\Message;
use app\common\workerman\wechat\constants\SocketType;
use app\common\workerman\wechat\services\Service;
/**
* 消息核心服务
*
* 消息处理:
* - 进程间消息互通
* - 构建protobuf消息
* - 参数验证
* - 自动分发业务
*
* @author Qasim
* @package app\service
*/
class MessageService
{
use LoggerTrait, ResponseTrait, CacheTrait, AichatTrait, OperationTrait;
private MessageValidator $validator;
protected Service $service;
/**
* 构造函数
*/
public function __construct()
{
$this->validator = new MessageValidator();
$this->service = Service::getInstance();
}
/**
* 设备消息处理器
*
* @param TcpConnection $connection 连接实例
* @param string $data 消息数据
* @return void
*/
public function handleDeviceMessage(TcpConnection $connection, string $data): void
{
$message = new TransportMessage();
$message->mergeFromString($data);
$msgType = $message->getMsgType();
$content = $message->getContent();
try {
// 特殊消息类型处理
$content = match ($msgType) {
EnumMsgType::HeartBeatReq => '',
default => $content->getValue()
};
// 构建业务数据上下文
$context = ['Connection' => $connection];
// 获取设备、验证设备
if ($msgType !== EnumMsgType::DeviceAuthReq) {
// 验证消息
$deviceId = $this->validator->validateDeviceMessage($connection);
$context['DeviceId'] = $deviceId;
}
// 获取处理器类
$handlerClass = $this->getHandlerClass($msgType);
// 调用业务处理,获取响应内容
$response = $handlerClass::handle($content, $context);
if($msgType !== EnumMsgType::HeartBeatReq){
// 记录下日志
$this->withChannel('wechat_socket')->withLevel('info')->withTitle('Device message received')->withContext([
'msgType' => EnumMsgType::name($msgType),
'content' => $content,
])->log();
}
// 获取设备ID
if ($msgType === EnumMsgType::DeviceAuthReq) {
$deviceId = $connection->deviceId;
unset($connection->deviceId);
// 绑定设备连接
$this->service->connectionService->bindConnection($connection, $deviceId, SocketType::SOCKET);
}
if($msgType == 1025){
$statusKey = "device:{$deviceId}:voiceToText";
$isVoiceToText = $this->redis()->get($statusKey);
if($isVoiceToText == 1 && $response['Data']['MsgType'] == 'VoiceTransTextTask'){
$_content = $response['Data']['Content'];
$key = "device:{$deviceId}:voice:{$_content['WeChatId']}:taskid:{$_content['TaskId']}";
$this->redis()->setex($key, 30, $_content['ErrMsg']);
}
}
// 根据消息类型处理响应
match ($msgType) {
// Socket设备消息响应
EnumMsgType::HeartBeatReq => $this->sendChannelMessage(SocketType::SOCKET, $deviceId, $response),
EnumMsgType::DeviceAuthReq => $this->sendChannelMessage(SocketType::SOCKET, $deviceId, $response),
EnumMsgType::FriendTalkNotice => $this->sendFriendTalkNoticeMessage(SocketType::SOCKET, $deviceId, $response, $connection),
EnumMsgType::FriendAddNotice => $this->sendFriendAddNotice(SocketType::SOCKET, $deviceId, $response, $connection),
// 其他业务消息通过Channel广播
default => $this->sendChannelMessage(SocketType::WEBSOCKET, $deviceId, $response)
};
} catch (ResponseException $e) {
$this->withChannel('wechat_socket')->withLevel('error')->withTitle('Device message handle failed')->withContext([
'code' => $e->getCode(),
'message' => $e->getMessage(),
'msgType' => $msgType,
'trace' => $e->getTraceAsString()
])->log();
// 发送错误消息到Socket进程
$response = \app\common\workerman\wechat\handlers\device\ErrorHandler::handle($e->getCode(), $e->getMessage());
$meesage = $this->buildMessage($response);
// 发送消息到Socket进程
$connection->send(pack('N', strlen($meesage)) . $meesage);
}
}
/**
* 客户端消息处理器
*
* @param TcpConnection $connection 连接实例
* @param array $message 消息数据
* @return void
*/
public function handleClientMessage(TcpConnection $connection, array $message): void
{
// 验证消息
[$msgType, $content] = $this->validator->validateClientMessage($connection, $message);
try {
if($msgType !== ClientRequestMsgType::HEARTBEAT){
$this->withChannel('wechat_socket')->withLevel('info')->withTitle('Client message received')->withContext([
'msgType' => $msgType,
'content' => $content,
//'response' => json_encode($response, JSON_UNESCAPED_UNICODE),
])->log();
}
// 追加连接实例
$content['Connection'] = $connection;
// 获取对应的业务处理器
$handlerClass = $this->getHandlerClass($msgType);
$response = $handlerClass::handle($content);
//$response = json_encode($response, JSON_UNESCAPED_UNICODE);
// 绑定设备连接
if ($msgType === ClientRequestMsgType::AUTH) {
$this->service->connectionService->bindConnection($connection, $content['DeviceId'], SocketType::WEBSOCKET);
}
// 发送响应消息
match ($msgType) {
// Socket设备消息响应
ClientRequestMsgType::AUTH => $connection->send(json_encode($response, JSON_UNESCAPED_UNICODE)),
ClientRequestMsgType::HEARTBEAT => $connection->send(json_encode($response, JSON_UNESCAPED_UNICODE)),
ClientRequestMsgType::ADD_DEVICE => $connection->send(json_encode($response, JSON_UNESCAPED_UNICODE)),
ClientRequestMsgType::REMOVE_DEVICE => $connection->send(json_encode($response, JSON_UNESCAPED_UNICODE)),
ClientRequestMsgType::WX_INFO => $connection->send(json_encode($response, JSON_UNESCAPED_UNICODE)),
ClientRequestMsgType::DEVICE_INFO => $connection->send(json_encode($response, JSON_UNESCAPED_UNICODE)),
ClientRequestMsgType::CLEAN_CACHE => $connection->send(json_encode($response, JSON_UNESCAPED_UNICODE)),
// 其他业务消息通过Channel广播
default => $this->sendChannelMessage(SocketType::SOCKET, $content['DeviceId'], $response)
};
} catch (ResponseException $e) {
$this->withChannel('wechat_socket')->withLevel('error')->withTitle('Client message handle failed')->withContext([
'code' => $e->getCode(),
'message' => $e->getMessage(),
'msgType' => $msgType,
'trace' => $e->getTraceAsString()
])->log();
unset($content['Connection']);
$response = \app\common\workerman\wechat\handlers\client\ErrorHandler::handle($e->getCode(), $e->getMessage(), $msgType, $content);
$this->withChannel('wechat_socket')->withLevel('error')->withTitle('Client message handle response failed')->withContext([
'response' => $response
])->log();
$connection->send(json_encode($response, JSON_UNESCAPED_UNICODE));
}
}
/**
* 发送消息到指定进程
*
* @param string $targetProcess 目标进程
* @param string $deviceId 设备ID
* @param array $data 消息内容
* @return void
*/
public function sendChannelMessage(string $targetProcess, string $deviceId, array $data): void
{
if ($targetProcess === SocketType::SOCKET) {
// 构建protobuf消息
$data = $this->buildMessage($data);
}
$channel = "{$targetProcess}.{$deviceId}.message";
Client::publish($channel, [
'data' => is_array($data) ? json_encode($data) : $data
]);
}
/**
* 构建protobuf消息
*
* @param array $data 消息内容
* @return string 消息内容
*/
public function buildMessage(array $data): string
{
$message = new TransportMessage();
$message->setMsgType($data['MsgType']);
$any = new Any();
$any->pack($data['Content']);
$message->setContent($any);
return $message->serializeToString();
}
/**
* 构建传输消息
*
* @param int $msgType 消息类型
* @param Message $content 消息内容
* @return string
*/
public function buildTransportMessage(int $msgType, Message $content): string
{
$message = new TransportMessage();
$message->setMsgType($msgType);
$any = new Any();
$any->pack($content);
$message->setContent($any);
return $message->serializeToString();
}
/**
* 获取消息处理器
*
* @param string|int $type 消息类型
* @return string 处理器类名
*/
private function getHandlerClass(string|int $type): string
{
// 处理设备消息类型
if (is_numeric($type)) {
$handlerClass = 'app\\common\\workerman\\wechat\\handlers\\device\\' . EnumMsgType::name($type) . 'Handler';
} else {
// 处理客户端消息类型
$handlerClass = 'app\\common\\workerman\\wechat\\handlers\\client\\' . ucfirst($type) . 'Handler';
}
if (!class_exists($handlerClass)) {
$this->withChannel('wechat_socket')->withLevel('error')->withTitle('Handler not found')->withContext([
'type' => $type,
'class' => $handlerClass
])->log();
throw new ResponseException(ResponseCode::HANDLER_NOT_FOUND);
}
// 验证是否实现了静态handle方法
if (!method_exists($handlerClass, 'handle')) {
$this->withChannel('wechat_socket')->withLevel('error')->withTitle('Handler method not found')->withContext([
'type' => $type,
'class' => $handlerClass
])->log();
throw new ResponseException(ResponseCode::HANDLER_METHOD_NOT_FOUND);
}
return $handlerClass;
}
}