xiaoai/php_server/app/common/workerman/wechat/services/ConnectionService.php

248 lines
7.1 KiB
PHP
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

<?php
declare(strict_types=1);
namespace app\common\workerman\wechat\services;
use Workerman\Connection\TcpConnection;
use Channel\Client;
use app\common\workerman\wechat\traits\{LoggerTrait, CacheTrait, DeviceTrait};
use app\common\workerman\wechat\constants\SocketType;
use Workerman\Timer;
use Workerman\Worker;
/**
* 连接管理服务
*
* 管理连接和消息转发:
* - 绑定设备连接
* - 移除设备连接
* - 进程间通信
*
* @author Qasim
* @package app\service
*/
class ConnectionService
{
use LoggerTrait, CacheTrait, DeviceTrait;
/** @var string 进程类型 */
private string $processType = '';
/**
* 连接到设备的映射
* @var array ['connectionId' => ['deviceId1', 'deviceId2']]
*/
private array $connectionDevices = [];
/**
* 进程类型
*/
const TYPE_SOCKET = 'socket';
const TYPE_WEBSOCKET = 'websocket';
public function __construct() {}
/**
* 添加连接
*
* @param TcpConnection $connection 连接实例
* @param string $deviceId 设备ID
* @return void
*/
public function bindConnection(TcpConnection $connection, string $deviceId, string $type = SocketType::SOCKET): void
{
if (!isset($this->connectionDevices[$connection->id])) {
$this->connectionDevices[$connection->id] = [];
}
// 添加设备ID到连接的映射列表
if (!$this->hasDevice($connection, $deviceId)) {
$this->connectionDevices[$connection->id][] = $deviceId;
}
// 注册Channel监听
$this->registerChannelListener($connection, $deviceId, $type);
// 如果是Socket进程则更新设备状态
if ($type === SocketType::SOCKET) {
$this->updateDeviceStatus($deviceId, true);
}
// 添加到连接缓存
$this->redis()->sAdd($this->getConnectionKey($type), $deviceId);
$this->withChannel('wechat_socket')->withLevel('info')->withTitle('Connection bound')->withContext([
'device_id' => $deviceId,
'connection_id' => $connection->id,
])->log();
}
/**
* 获取连接绑定的所有设备ID
*
* @param TcpConnection $connection 连接实例
* @return array
*/
public function getDeviceIds(TcpConnection $connection): array
{
return $this->connectionDevices[$connection->id] ?? [];
}
/**
* 获取连接绑定的所有设备ID
*
* @param TcpConnection $connection 连接实例
* @return string
*/
public function getDeviceId(TcpConnection $connection): string
{
return $this->connectionDevices[$connection->id][0] ?? '';
}
/**
* 检查连接是否绑定了指定设备
*
* @param TcpConnection $connection 连接实例
* @param string $deviceId 设备ID
* @return bool
*/
public function hasDevice(TcpConnection $connection, string $deviceId): bool
{
return isset($this->connectionDevices[$connection->id]) &&
in_array($deviceId, $this->connectionDevices[$connection->id]);
}
/**
* 解绑连接
*
* @param TcpConnection $connection 连接实例
* @return void
*/
public function unbindConnection(TcpConnection $connection): void
{
if (isset($this->connectionDevices[$connection->id])) {
unset($this->connectionDevices[$connection->id]);
}
}
/**
* 移除连接
*
* @param TcpConnection $connection 连接实例
* @param string $type 连接类型
* @return void
*/
public function removeConnection(TcpConnection $connection, string $type = SocketType::SOCKET): void
{
// 获取当前连接所有设备
foreach ($this->getDeviceIds($connection) as $deviceId) {
// 清理Token缓存
if ($type === SocketType::SOCKET) {
$this->redis()->del($this->getDeviceKey($deviceId, 'token'));
// 更新设备状态 - 离线
$this->updateDeviceStatus($deviceId, false);
} else {
$this->redis()->del($this->getClientKey($deviceId, 'token'));
}
}
// 解绑连接
$this->unbindConnection($connection);
// 删除连接缓存
$this->redis()->del($this->getConnectionKey($type));
$this->withChannel('wechat_socket')->withLevel('info')->withTitle('Device connection removed')->withContext([
'connection_id' => $connection->id
])->log();
}
/**
* 检查设备是否在线
*
* @param string $deviceId 设备ID
* @return bool
*/
public function isDeviceOnline(string $deviceId): bool
{
// 先检查本地连接
if (isset($this->deviceConns[$deviceId])) {
return true;
}
// 再检查Redis中的状态
$statusKey = $this->redis()->getDeviceKey($deviceId, 'status');
$status = $this->redis()->get($statusKey);
return $status === 'online';
}
/**
* 注册Channel监听
*
* @param TcpConnection $connection 连接实例
* @return void
*/
private function registerChannelListener(TcpConnection $connection, string $deviceId, string $type = SocketType::SOCKET): void
{
Client::connect('127.0.0.1', 2206);
// 注册进程消息监听
Client::on("{$type}.{$deviceId}.message", function ($data) use ($connection, $type) {
$message = $data['data'];
if ($type === SocketType::SOCKET) {
$message = pack('N', strlen($message)) . $message;
}
$connection->send($message);
});
$this->withChannel('wechat_socket')->withLevel('info')->withTitle('Channel listener registered')->withContext([
'device_id' => $deviceId,
])->log();
}
/**
* 通过Channel发送跨进程消息
*
* @param string $deviceId 设备ID
* @param mixed $data 消息数据
* @return void
*/
public function sendChannelMessage(string $deviceId, mixed $data): void
{
if (empty($deviceId) || empty($data)) {
return;
}
// 确定目标进程类型
$targetProcess = $this->processType === self::TYPE_SOCKET ? self::TYPE_WEBSOCKET : self::TYPE_SOCKET;
// 通过Channel转发
try {
$channel = "{$targetProcess}.{$deviceId}.message";
Client::publish($channel, [
'data' => is_array($data) ? json_encode($data) : $data
]);
$this->withChannel('wechat_socket')->withLevel('info')->withTitle('Message published')->withContext([
'deviceId' => $deviceId,
'channel' => $channel,
'type' => $targetProcess
])->log();
} catch (\Throwable $e) {
$this->withChannel('wechat_socket')->withLevel('error')->withTitle('Publish error')->withContext([
'error' => $e->getMessage(),
'deviceId' => $deviceId,
'type' => $targetProcess
])->log();
}
}
}