欢迎您访问 最编程 本站为您分享编程语言代码,编程技术文章!
您现在的位置是: 首页

关于 ASP.NET Core WebSocket 实现集群的思考

最编程 2024-04-11 08:03:54
...

前言

    提到WebSocket相信大家都听说过,它的初衷是为了解决客户端浏览器与服务端进行双向通信,是在单个TCP连接上进行全双工通讯的协议。在没有WebSocket之前只能通过浏览器到服务端的请求应答模式比如轮询,来实现服务端的变更响应到客户端,现在服务端也可以主动发送数据到客户端浏览器。WebSocket协议和Http协议平行,都属于TCP/IP四层模型中的第四层应用层。由于WebSocket握手阶段采用HTTP协议,所以也需要进行跨域处理。它的协议标识是wswss对应了常规标识和安全通信协议标识。本文重点并不是介绍WebSocket协议相关,而是提供一种基于ASP.NET Core原生WebSocket的方式实现集群的实现思路。关于这套思路其实很早之前我就构思过了,只是之前一直没有系统的整理出来,本篇文章就来和大家分享一下,由于主要是提供一种思路,所以涉及到具体细节或者业务相关的可能没有体现出来,还望大家理解。

实现

咱们的重点关键字就是两个WebSocket集群,实现的框架便是基于ASP.NET Core,我也基于golang实现了一套,本文涉及到的相关源码和golang版本的实现都已上传至我的github,具体仓库地址可以转到文末自行跳转到#示例源码中查看。既然涉及到集群,这里咱们就用nginx作为反向代理,来搭建一个集群实例。大致的示例结构如下图所示`

redis在这里扮演的角色呢,是用来处理Server端的消息相互传递用的,主要是使用的redis的pub/sub`功能来实现的,这里便涉及到几个核心问题

  • 首先,集群状态每个用户被分发到具体的哪台服务器上是不得而知的
  • 其次,处在不同Server端的不同用户间的相互通信是需要一个传递媒介
  • 最后,针对不同的场景比如单发消息、分组消息、全部通知等要有不同的处理策略

这里需要考虑的是,如果需要搭建实时通信服务器的话,需要注意集群的隔离性,主要是和核心业务进行隔离,毕竟WebSocket需要保持长链接、且消息的大小需要评估。

上面提到了redis的主要功能就是用来传递消息用的,毕竟每个server服务器是无状态的。这当然不是必须的,任何可以进行消息分发的中间件都可以,比如消息队列rabbitmq、kafka、rocketmq、mqtt等,甚至只要能把要处理的消息存储起来都可以比如缓存甚至是关系型数据库等等。这压力使用redis主要是因为操作起来简单、轻量级、灵活,让大家关注点在思路上,而不是使用中案件的代码上。

nginx配置

通过上面的图我们可以看到,我们这里构建集群示例使用的nginx,如果让nginx支持WebSocket的话,需要额外的配置,这个在网上有很多相关的文章介绍,这里就来列一下咱们示例的nginx配置,在配置文件nginx.conf

//上游服务器地址也就是websocket服务的真实地址
upstream wsbackend {
    server 127.0.0.1:5001;
    server 127.0.0.1:5678;
}

server {
    listen       5000;
    server_name  localhost;

    location ~/chat/{
        //upstream地址
        proxy_pass http://wsbackend;
        proxy_connect_timeout 60s; 
        proxy_read_timeout 3600s;
        proxy_send_timeout 3600s;
        //记得转发避免踩坑
        proxy_set_header Host $host;
        proxy_http_version 1.1; 
        //http升级成websocket协议的头标识
        proxy_set_header Upgrade $http_upgrade;
        proxy_set_header Connection "Upgrade";
    }
}

这套配置呢,在搜索引擎上能收到很多,不过不妨碍我把使用的粘贴出来。这一套亲测有效,也是我使用的配置,请放心使用。个人认为如果是线上环境采用的负载均衡策略可以选择ip_hash的方式,保证同一个ip的客户端用户可以分发到一台WebSocket实例中去,这样的话能尽量避免使用redis的用户频道做消息传递。好了,接下来准备开始展示具体实现的代码了。

一对一发送

首先介绍的就是一对一发送的情况,也就是我把消息发给你,聊天的时候私聊的情况。这里呢涉及到两种情况

  • 如果你需要通信的客户端和你连接在一个Server端里,这样的话可以直接在链接里找到这个端的通信实例直接发送。
  • 如果你需要通信的客户端和你不在一个Server端里,这个时候咱们就需要借助redis的pub/sub的功能,把消息传递给另一个Server端。

咱们通过一张图大致的展示一下它的工作方式

解释一下,每个客户端注册到WebSocket服务里的时候会在redis里订阅一个user:用户唯一标识的频道,这个频道用于接收和当前WebSocket连接不在一个服务端的其他WebSocket发送过来的消息。每次发送消息的时候你会知道你要发送给谁,不在当前服务器的话则发送到redis的user:用户唯一标识频道,这样的话目标WebSocket就能收到消息了。首先是注入相关的依赖项,这里我使用的redis客户端是freeredis,主要是因为操作起来简单,具体实现代码如下

var builder = WebApplication.CreateBuilder(args);
//注册freeredis
builder.Services.AddSingleton(provider => {
    var logger = provider.GetService<ILogger<WebSocketChannelHandler>>();
    RedisClient cli = new RedisClient("127.0.0.1:6379");
    cli.Notice += (s, e) => logger?.LogInformation(e.Log);
    return cli;
});
//注册WebSocket具体操作的类
builder.Services.AddSingleton<WebSocketHandler>();
builder.Services.AddControllers();

var app = builder.Build();

var webSocketOptions = new WebSocketOptions
{
    KeepAliveInterval = TimeSpan.FromMinutes(2)
};
//注册WebSocket中间件
app.UseWebSockets(webSocketOptions);

app.MapGet("/", () => "Hello World!");
app.MapControllers();

app.Run();

接下来我们定义一个Controller用来处理WebSocket请求

public class WebSocketController : ControllerBase
{
    private readonly ILogger<WebSocketController> _logger;
    private readonly WebSocketHandler _socketHandler;

    public WebSocketController(ILogger<WebSocketController> logger, WebSocketHandler socketHandler, WebSocketChannelHandler webSocketChannelHandler)
    {
        _logger = logger;
        _socketHandler = socketHandler;
    }
    
    //这里的id代表当前连接的客户端唯一标识比如用户唯一标识
    [HttpGet("/chat/user/{id}")]
    public async Task ChatUser(string id)
    {
        //判断是否是WebSocket请求
        if (HttpContext.WebSockets.IsWebSocketRequest)
        {
            _logger.LogInformation($"user:{id}-{Request.HttpContext.Connection.RemoteIpAddress}:{Request.HttpContext.Connection.RemotePort} join");

            var webSocket = await HttpContext.WebSockets.AcceptWebSocketAsync();
            //处理请求相关
            await _socketHandler.Handle(id, webSocket);
        }
        else
        {
            HttpContext.Response.StatusCode = StatusCodes.Status400BadRequest;
        }
    }
}

这里的WebSocketHandler是用来处理具体逻辑用的,咱们看一下相关代码

public class WebSocketHandler:IDisposable
{
    //存储当前服务用户的集合
    private readonly UserConnection UserConnection = new();
    //redis频道前缀
    private readonly string userPrefix = "user:";
    //用户对应的redis频道
    private readonly ConcurrentDictionary<string, IDisposable> _disposables = new();

    private readonly ILogger<WebSocketHandler> _logger;
    //redis客户端
    private readonly RedisClient _redisClient;

    public WebSocketHandler(ILogger<WebSocketHandler> logger, RedisClient redisClient)
    {
        _logger = logger;
        _redisClient = redisClient;
    }

    public async Task Handle(string id, WebSocket webSocket)
    {
        //把当前用户连接存储起来
        _ = UserConnection.GetOrAdd(id, webSocket);
        //订阅一个当前用户的频道
        await SubMsg($"{userPrefix}{id}");

        var buffer = new byte[1024 * 4];
        //接收发送过来的消息,这个方法是阻塞的,如果没收到消息则一直阻塞
        var receiveResult = await webSocket.ReceiveAsync(new ArraySegment<byte>(buffer), CancellationToken.None);
        //循环接收消息
        while (webSocket.State == WebSocketState.Open)
        {
            try
            {
                //因为缓冲区长度是固定的所以要获取实际长度
                string msg = Encoding.UTF8.GetString(buffer[..receiveResult.Count]).TrimEnd('\0');
                //接收的到消息转换成实体
                MsgBody msgBody = JsonConvert.DeserializeObject<MsgBody>(msg);
                //发送到其他客户端的数据
                byte[] sendByte = Encoding.UTF8.GetBytes($"user {id} send:{msgBody.Msg}");
                _logger.LogInformation($"user {id} send:{msgBody.Msg}");
                 
                //判断目标客户端是否在当前当前服务,如果在当前服务直接扎到目标连接直接发送
                if (UserConnection.TryGetValue(msgBody.Id, out var targetSocket))
                {
                    if (targetSocket.State == WebSocketState.Open)
                    {
                        await targetSocket.SendAsync(new ArraySegment<byte>(sendByte, 0, sendByte.Length), receiveResult.MessageType, true, CancellationToken.None);
                    }
                }
                else
                {
                    //如果要发送的目标端不在当前服务,则发送给目标redis端的频道
                    ChannelMsgBody channelMsgBody = new ChannelMsgBody { FromId = id, ToId = msgBody.Id, Msg = msgBody.Msg };
                    //目标的redis频道
                    _redisClient.Publish($"{userPrefix}{msgBody.Id}", JsonConvert.SerializeObject(channelMsgBody));
                }
                
                //继续阻塞循环接收消息
                receiveResult = await webSocket.ReceiveAsync(new ArraySegment<byte>(buffer), CancellationToken.None);
            }
            catch (Exception ex)
            {
                _logger.LogError(ex, ex.Message);
                break;
            }
        }
        
        //循环结束意味着当前端已经退出
        //从当前用户的集合移除当前用户
        _ = UserConnection.TryRemove(id, out _);
        //关闭当前WebSocket连接
        await webSocket.CloseAsync(receiveResult.CloseStatus.Value, receiveResult.CloseStatusDescription, CancellationToken.None);
        //在当前订阅集合移除当前用户
        _disposables.TryRemove($"{userPrefix}{id}", out var disposable);
        //关闭当前用户的通道
        disposable.Dispose();
    }

    private async Task SubMsg(string channel)
    {
        //订阅当前用户频道
        var sub = _redisClient.Subscribe(channel,  async (channel, data) => {
            //接收过来当前频道数据,说明发送端不在当前服务
            ChannelMsgBody msgBody = JsonConvert.DeserializeObject<ChannelMsgBody>(data.ToString());
            byte[] sendByte = Encoding.UTF8.GetBytes($"user {msgBody.FromId} send:{msgBody.Msg}");
            //在当前服务找到目标的WebSocket连接并发送消息
            if (UserConnection.TryGetValue(msgBody.ToId, out var targetSocket))
            {
                if (targetSocket.State == WebSocketState.Open)
                {
                    await targetSocket.SendAsync(new ArraySegment<byte>(sendByte, 0, sendByte.Length), WebSocketMessageType.Text, true, CancellationToken.None);
                }
            }
        });
        //把redis订阅频道添加到集合中
        _disposables.TryAdd(channel, sub);
    }
    
    //程序退出的时候取消当前服务订阅的redis频道
    public void Dispose()
    {
        foreach (var disposable in _disposables)
        {
            disposable.Value.Dispose();
        }

        _disposables.Clear();
    }
}

这里涉及到几个辅助相关的类,其中UserConnection类是存储注册到当前服务的连接,MsgBody类用来接受客户端发送过来的消息,ChannelMsgBody是用来发送redis频道的相关消息,因为要把相关消息通过redis发布出去,咱们列一下这几个类的相关代码

//注册到当前服务的连接
public class UserConnection : IEnumerable<KeyValuePair<string, WebSocket>>
{
    //存储用户唯一标识和WebSocket的对应关系
    private ConcurrentDictionary<string, WebSocket> _users = new ConcurrentDictionary<string, WebSocket>();

    //当前服务的用户数量
    public int Count => _users.Count;

    public WebSocket GetOrAdd(string userId, WebSocket webSocket)
    {
        return _users.GetOrAdd(userId, webSocket);
    }

    public bool TryGetValue(string userId, out WebSocket webSocket)
    {
        return _users.TryGetValue(userId, out webSocket);
    }

    public bool TryRemove(string userId, out WebSocket webSocket)
    {
        return _users.TryRemove(userId, out webSocket);
    }

    public void Clear()
    {
        _users.Clear();
    }

    public IEnumerator<KeyValuePair<string, WebSocket>> GetEnumerator()
    {
        return _users.GetEnumerator();
    }

    IEnumerator IEnumerable.GetEnumerator()
    {
        return this.GetEnumerator();
    }
}

//客户端消息
public class MsgBody
{
    //目标用户标识
    public string Id { get; set; }
    //要发送的消息
    public string Msg { get; set; }
}

//频道订阅消息
public class ChannelMsgBody
{
    //用户标识
    public string FromId { get; set; }
    //目标用户标识,也就是要发送给谁
    public string ToId { get; set; }
    //要发送的消息
    public string Msg { get; set; }
}

这样的话关于一对一发送消息的相关逻辑就实现完成了,启动两个Server端,由于nginx默认的负载均衡策略是轮询,所以注册两个用户的话会被分发到不同的服务里去

Postman连接三个连接唯一标识分别是1、2、3,模拟一下消息发送,效果如下,发送效果

接收效果

群组发送

上面我们展示了一对一发送的情况,接下来我们来看一下,群组发送的情况。群组发送的话就是只要大家都加入一个群组,只要客户端在群组里发送一条消息,则注册到当前群组内的所有客户端都可以收到消息。相对于一对一的情况就是如果当前WebSocket服务端如果存在用户加入某个群组,则当前当前WebSocket服务端则可以订阅一个group:群组唯一标识的redis频道,集群中的其他WebSocket服务器通过这个redis频道接收群组消息,通过一张图描述一下
群组的实现方式相对于一对一要简单一点

  • 发送端可以不用考虑当前服务中的客户端连接,一股脑的交给redis把消息发布出去
  • 如果有WebSocket服务中的用户订阅了当前分组则可以接受消息,获取组内的用户循环发送消息

展示一下代码实现的方式,首先是定义一个action用于表示群组的相关场景

//包含两个标识一个是组别标识一个是注册到组别的用户
[HttpGet("/chat/group/{groupId}/{userId}")]
public async Task ChatGroup(string groupId, string userId)
{
    if (HttpContext.WebSockets.IsWebSocketRequest)
    {
        _logger.LogInformation($"group:{groupId} user:{userId}-{Request.HttpContext.Connection.RemoteIpAddress}:{Request.HttpContext.Connection.RemotePort} join");

        var webSocket = await HttpContext.WebSockets.AcceptWebSocketAsync();
        //调用HandleGroup处理群组相关的消息
        await _socketHandler.HandleGroup(groupId, userId, webSocket);
    }
    else
    {
        HttpContext.Response.StatusCode = StatusCodes.Status400BadRequest;
    }
}

接下来看一下HandleGroup的相关逻辑,还是在WebSocketHandler类中,看一下代码实现

public class WebSocketHandler:IDisposable
{
    private readonly UserConnection UserConnection = new();
    private readonly GroupUser GroupUser = new();
    private readonly SemaphoreSlim _lock = new(1, 1);
    private readonly ConcurrentDictionary<string, IDisposable> _disposables = new();
    private readonly string groupPrefix = "group:";

    private readonly ILogger<WebSocketHandler> _logger;
    private readonly RedisClient _redisClient;

    public WebSocketHandler(ILogger<WebSocketHandler> logger, RedisClient redisClient)
    {
        _logger = logger;
        _redisClient = redisClient;
    }

    public async Task HandleGroup(string groupId, string userId, WebSocket webSocket)
    {
        //因为群组的集合可能会存在很多用户一起访问所以限制访问数量
        await _lock.WaitAsync();
        //初始化群组容器 群唯一标识为key 群员容器为value
        var currentGroup = GroupUser.Groups.GetOrAdd(groupId, new UserConnection { });