Hyperf Socket.io 服务

按hyperf文档操作步骤搭建Socket.io服务,其中有几处官方写的不是很详细,hyperf搭建socketio官方文档,我这里把我操作过程中的难点与坑都有解决后写下了这篇文章,希望可以给小伙伴一些参考。

Socket.io 是一款非常流行的应用层实时通讯协议和框架,可以轻松实现应答、分组、广播。hyperf/socketio-server 支持了 Socket.io 的 WebSocket 传输协议。可以理解为封装好的websocket框架。

安装

composer require hyperf/socketio-server

hyperf/socketio-server 组件是基于 WebSocket 实现的,请确保服务端已经添加了 WebSocket 服务 的配置。

// config/autoload/server.php
[
    'name' => 'socket-io',
    'type' => Server::SERVER_WEBSOCKET,
    'host' => '0.0.0.0',
    'port' => 9502,
    'sock_type' => SWOOLE_SOCK_TCP,
    'callbacks' => [
        Event::ON_HAND_SHAKE => [Hyperf\WebSocketServer\Server::class, 'onHandShake'],
        Event::ON_MESSAGE => [Hyperf\WebSocketServer\Server::class, 'onMessage'],
        Event::ON_CLOSE => [Hyperf\WebSocketServer\Server::class, 'onClose'],
    ],
],

安装redis

如果安装好Socket.io后,在composer.json查看是否安装hyperf/redis,没有则需要安装一下不然启动hyperf服务会报redis方面的错,安装代码如下:

composer require hyperf/redis

安装好redis后,查看config/autoload文件夹是否存在redis.php,若不存在需添加redis.php

redis.php

<?php

declare(strict_types=1);
/**
 * This file is part of Hyperf.
 *
 * @link     https://www.hyperf.io
 * @document https://hyperf.wiki
 * @contact  group@hyperf.io
 * @license  https://github.com/hyperf/hyperf/blob/master/LICENSE
 */
return [
    'default' => [
        'host' => env('REDIS_HOST', 'localhost'),
        'auth' => env('REDIS_AUTH', null),
        'port' => (int) env('REDIS_PORT', 6379),
        'db' => (int) env('REDIS_DB', 0),
        'timeout' => 0.0,
        'reserved' => null,
        'retry_interval' => 0,
        'cluster' => [
            'enable' => (bool) env('REDIS_CLUSTER_ENABLE', false),
            'name' => null,
            'seeds' => [],
        ],
        'sentinel' => [
            'enable' => (bool) env('REDIS_SENTINEL_ENABLE', false),
            'master_name' => env('REDIS_MASTER_NAME', 'mymaster'),
            'nodes' => explode(';', env('REDIS_SENTINEL_NODE', '')),
            'persistent' => '',
            'read_timeout' => 0,
            'auth' => null,
        ],
        'pool' => [
            'min_connections' => 1,
            'max_connections' => 10,
            'connect_timeout' => 10.0,
            'wait_timeout' => 3.0,
            'heartbeat' => -1,
            'max_idle_time' => (float) env('REDIS_MAX_IDLE_TIME', 60),
        ],
    ],
];

服务端

注意 hyperf官网文档使用的是php8注解 php8服务端如下

<?php

declare(strict_types=1);

namespace App\Controller;

use Hyperf\SocketIOServer\Annotation\Event;
use Hyperf\SocketIOServer\Annotation\SocketIONamespace;
use Hyperf\SocketIOServer\BaseNamespace;
use Hyperf\SocketIOServer\Socket;
use Hyperf\Utils\Codec\Json;

#[SocketIONamespace("/")]
class WebSocketController extends BaseNamespace
{
    /**
     * @param string $data
     */
    #[Event("event")]
    public function onEvent(Socket $socket, $data)
    {
        // 应答
        return 'Event Received: ' . $data;
    }

    /**
     * @param string $data
     */
    #[Event("join-room")]
    public function onJoinRoom(Socket $socket, $data)
    {
        // 将当前用户加入房间
        $socket->join($data);
        // 向房间内其他用户推送(不含当前用户)
        $socket->to($data)->emit('event', $socket->getSid() . "has joined {$data}");
        // 向房间内所有人广播(含当前用户)
        $this->emit('event', 'There are ' . count($socket->getAdapter()->clients($data)) . " players in {$data}");
    }

    /**
     * @param string $data
     */
    #[Event("say")]
    public function onSay(Socket $socket, $data)
    {
        $data = Json::decode($data);
        $socket->to($data['room'])->emit('event', $socket->getSid() . " say: {$data['message']}");
    }
}

搭建Socket.io 服务端php版本至少为php7.4版本 7.4+版本如下

<?php

declare(strict_types=1);

namespace App\Controller;

use Hyperf\SocketIOServer\Annotation\Event;
use Hyperf\SocketIOServer\Annotation\SocketIONamespace;
use Hyperf\SocketIOServer\BaseNamespace;
use Hyperf\SocketIOServer\Socket;
use Hyperf\Utils\Codec\Json;
/**
 * @SocketIONamespace("/")
 */
class WebSocketController extends BaseNamespace
{
    
    /**
     * @Event("connect")
     */
    public function connect(Socket $socket)
    { 
        // 向房间内所有人广播(含当前用户)
        $this->emit('event',  '用户'.$socket->getSid().'连接' );
    }

    /**
     * @Event("disconnect")
     */
    public function disconnect(Socket $socket,$data)
    {
         // 向房间内所有人广播(含当前用户)
        $this->emit('event',  '用户'.$socket->getSid().'断开' );
    }
    
    /**
     * @Event("event")
     * @param string $data
     */
    public function onEvent(Socket $socket, $data)
    {
        var_dump($data);
        // 应答
        return  '用户'.$socket->getSid().'发送内容:' . $data;
    }

    /**
     * @Event("join-room")
     * @param string $data
     */
    public function onJoinRoom(Socket $socket, $data)
    {
        
        $socket->join($data);
        // 向房间内其他用户推送(不含当前用户)
        $socket->to($data)->emit('event', '用户'.$socket->getSid() . "已经加入房间{$data}");
        // 向房间内所有人推送(含当前用户)
        $this->emit('event',  "房间{$data}在线数为". count($socket->getAdapter()->clients($data)) );
       
    }
    
    /**
     * @Event("say")
     * @param string $data
     */
    public function onSay(Socket $socket, $data)
    {
        $data = Json::decode($data);
        $socket->to($data['room'])->emit('event', '用户'.$socket->getSid() . "说: {$data['message']}");
    }
    
    
    
    /**
     * @Event("someevent")
     * @param string $data
     */
    public function onSomeevent(Socket $socket, $data)
    {
        //获取fd
        // var_dump($socket->getFd());
        //获取sid
        // var_dump($socket->getSid());
        //获取当前命名空间
        // var_dump($socket->getNamespace());
        //获取当前Request
        // var_dump($socket->getRequest());
        // 将当前用户加入房间
        // $socket->join($data);
        // 当前用户离开房间
        // $socket->leave($data);
        //当前用户离开全部房间
        // $socket->leaveAll();
        //将sid用户加入房间
        // $socket->getAdapter()->add($socket->getSid(),$data);
        //将sid用户踢出房间
        // $socket->getAdapter()->del($socket->getSid(),$data);
        //将sid用户离开全部房间
        // $socket->getAdapter()->del($socket->getSid());
        //当前用户断开链接
        // $socket->disconnect();
        //获取该房间全部用户
        // var_dump($socket->getAdapter()->clients($data));
        //获取该sid用户所在的全部房间
        // $socket->join('game1');
        // var_dump($socket->getAdapter()->clientRooms($socket->getSid()));
        //向自己推送
        $socket->emit('event',  "我是用户".$socket->getSid());
        // 向房间内其他用户推送(不含当前用户)
        $socket->to($data)->emit('event', '用户'.$socket->getSid() . "已经加入房间{$data}");
        // 向房间内所有人推送(含当前用户)
        $this->emit('event',  "房间{$data}在线数为". count($socket->getAdapter()->clients($data)) );
        //发送多个房间(不含当前用户)
        $socket->to('game1')->to('game2')->emit('playgame', "game1与game2群快上号!!!");
        //发送多个房间(不含当前用户)
        $socket->to('game1')->to('game2')->to($data)->emit('playgame', "game、与game2与room1群快上号!!!");
         // 向所有连接推送 broadcast 事件,但是不包括当前连接。
        $socket->broadcast->emit('broadcast', 'hello friends!');
        // 无压缩推送
        $socket->compress(false)->emit('compress', "无压缩测试");
        // 发送信息,并且等待并接收客户端响应。
        $reply = $socket->emit('question', '是否接收邀请')->reply();
        var_dump($reply);
    }
     /**
     * @Event("compress")
     * @param string $data
     */
    public function onCompress(Socket $socket, $data)
    {
        var_dump($data);
        return "响应成功";
    }
        

}

服务端根据自己php版本选择搭建服务,如果服务器是php7.4+使用php8服务端,服务端显示搭建成功,但并不会生效,客户端访问会一直处于fd[x] start a handshake request,fd[x] closed这样的状态,php8与php7.4+版本在使用方式上仅注解不同,其余操作,官网文档两个版本通用。

每个 socket 会自动加入以自己 sid 命名的房间($socket->getSid()),发送私聊信息就推送到对应 sid 即可。

框架会自动触发 connect 和 disconnect 两个事件。

客户端

由于服务端只实现了 WebSocket 通讯,所以客户端要加上 {transports:["websocket"]} 。

<script src="https://cdn.bootcdn.net/ajax/libs/socket.io/2.3.0/socket.io.js"></script>
    <script>
        var socket = io('ws://8.142.13.127:9502', { transports: ["websocket"] });
        socket.on('connect', data => {
            console.log(socket.id);
            socket.emit('event', 'hello, hyperf');
            socket.emit('join-room', 'room1', console.log);
            setInterval(function () {
                socket.emit('say', '{"room":"room1", "message":"Hello Hyperf."}');
            }, 10000);
            
            
            socket.emit('someevent', 'room1', console.log);
        });
        socket.on('event', console.log);
        
        
        
        socket.on('disconnect', (data)=>{
            //手动关闭客户端对服务器的链接,如果不关闭客户端,重启服务端后未关闭的客户端仍然可用
            socket.close();
            //或者
            // socket.disconnect();
            console.log('服务器关闭');
        });
        socket.on('reconnect', (data)=>{
            console.log('服务器重新连接');
        });
        socket.on('connect_error', (error) => {
          console.log('服务器连接错误');
        });
        socket.on('compress', (data)=>{
            //设置修改器,是否对向服务器传输的数据进行压缩。默认为true,即压缩。
            socket.compress(false).emit('compress', '等待返回响应', (data) => {
              console.log(data);
            });
        });
        socket.on('playgame', (data)=>{
            console.log(data);
        });
        socket.on('broadcast', (data)=>{
            console.log(data);
        });
        socket.on('question', (data,fn)=>{
             // 发送信息,服务端等待,客户端响应。
            fn('woot');
        });
        
    </script>

客户端可以对接php8与php7.4+的服务端,以上客户端方法与php7.4+服务端方法对应,只用于测试,如果你的服务端是php8,客户端调用方式不变。如果socketio客户端具体方法不清楚可以查看socket.io 客户端 API文档,是很全面的。

设置 Socket.io 命名空间

Socket.io 通过自定义命名空间实现多路复用。(注意:不是 PHP 的命名空间)

1.可以通过 @SocketIONamespace("/xxx") 将控制器映射为 xxx 的命名空间,就是注解方式

2.也可通过

<?php
use Hyperf\SocketIOServer\Collector\SocketIORouter;
use App\Controller\WebSocketController;
SocketIORouter::addNamespace('/xxx' , WebSocketController::class);

在路由中添加。

开启 Session

安装并配置好 hyperf/session 组件及其对应中间件,再通过 SessionAspect 切入 SocketIO 来使用 Session 。

<?php
// config/autoload/aspect.php
return [
    \Hyperf\SocketIOServer\Aspect\SessionAspect::class,
];

Swoole 4.4.17 及以下版本只能读取 HTTP 创建好的 Cookie,Swoole 4.4.18 及以上版本可以在 WebSocket 握手时创建 Cookie

调整房间适配器

默认的房间功能通过 Redis 适配器实现,可以适应多进程乃至分布式场景。

1.可以替换为内存适配器,只适用于单 worker 场景。

<?php
// config/autoload/dependencies.php
return [
    \Hyperf\SocketIOServer\Room\AdapterInterface::class => \Hyperf\SocketIOServer\Room\MemoryAdapter::class,
];

2.可以替换为空适配器,不需要房间功能时可以降低消耗。

<?php
// config/autoload/dependencies.php
return [
    \Hyperf\SocketIOServer\Room\AdapterInterface::class => \Hyperf\SocketIOServer\Room\NullAdapter::class,
];

调整 SocketID (sid)

默认 SocketID 使用 ServerID#FD 的格式,可以适应分布式场景。

1.可以替换为直接使用 Fd 。

<?php
// config/autoload/dependencies.php
return [
    \Hyperf\SocketIOServer\SidProvider\SidProviderInterface::class => \Hyperf\SocketIOServer\SidProvider\LocalSidProvider::class,
];

2.也可以替换为 SessionID 。

<?php
// config/autoload/dependencies.php
return [
    \Hyperf\SocketIOServer\SidProvider\SidProviderInterface::class => \Hyperf\SocketIOServer\SidProvider\SessionSidProvider::class,
];

修改 SocketIO 基础参数

框架默认参数:

配置类型默认值
$pingTimeoutint100
$pingIntervalint10000
$clientCallbackTimeoutint10000

有时候,由于推送消息比较多或者网络较卡,在 100ms 内,无法及时返回 PONG,就会导致连接断开。这时候我们可以通过以下方式,进行重写:

<?php

declare(strict_types=1);

namespace App\Kernel;

use Hyperf\Contract\StdoutLoggerInterface;
use Hyperf\SocketIOServer\Parser\Decoder;
use Hyperf\SocketIOServer\Parser\Encoder;
use Hyperf\SocketIOServer\SidProvider\SidProviderInterface;
use Hyperf\SocketIOServer\SocketIO;
use Hyperf\WebSocketServer\Sender;
use Psr\Container\ContainerInterface;

class SocketIOFactory
{
    public function __invoke(ContainerInterface $container)
    {
        $io = new SocketIO(
            $container->get(StdoutLoggerInterface::class),
            $container->get(Sender::class),
            $container->get(Decoder::class),
            $container->get(Encoder::class),
            $container->get(SidProviderInterface::class)
        );

        // 重写 pingTimeout 参数
        $io->setPingTimeout(10000);

        return $io;
    }
}

然后在 dependencies.php 添加对应映射即可。

return [
    Hyperf\SocketIOServer\SocketIO::class => App\Kernel\SocketIOFactory::class,
];

Auth 鉴权

您可以通过使用中间件来拦截 WebSocket 握手,实现鉴权功能,如下:

<?php

declare(strict_types=1);

namespace App\Middleware;

use Psr\Container\ContainerInterface;
use Psr\Http\Message\ResponseInterface;
use Psr\Http\Server\MiddlewareInterface;
use Psr\Http\Message\ServerRequestInterface;
use Psr\Http\Server\RequestHandlerInterface;

class WebSocketAuthMiddleware implements MiddlewareInterface
{
    protected ContainerInterface $container;

    public function __construct(ContainerInterface $container)
    {
        $this->container = $container;
    }

    public function process(ServerRequestInterface $request, RequestHandlerInterface $handler): ResponseInterface
    {
        // 伪代码,通过 isAuth 方法拦截握手请求并实现权限检查
        if (! $this->isAuth($request)) {
            return $this->container->get(\Hyperf\HttpServer\Contract\ResponseInterface::class)->raw('Forbidden');
        }

        return $handler->handle($request);
    }
}

并将上面的中间件配置到对应的 WebSocket Server 中去即可。

获取原始请求对象

连接建立以后,有时需获取客户端 IP ,Cookie 等请求信息。原始请求对象已经被保留在连接上下文中,您可以用如下方式在事件回调中获取:

public function onEvent($socket, $data)
{
    $request = Hyperf\WebSocketServer\Context::get(
        Psr\Http\Message\ServerRequestInterface::class
    );
}

Nginx 代理配置

使用 Nginx 反向代理 Socket.io 与 WebSocket 有些许区别

server {
    location ^~/socket.io/ {
        # 执行代理访问真实服务器
        proxy_pass http://hyperf;
        proxy_http_version 1.1;
        proxy_set_header Host $host;
        proxy_set_header Upgrade $http_upgrade;
        proxy_set_header Connection "upgrade";
        proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
    }
}

以上为hyperf搭建socketio服务总结,可以与 hyperf搭建socketio官方文档socket.io 客户端 API 这两个文档结合着去看,效果更好。