首页后端开发ASP.NET九哥聊Kestrel网络编程第一章:开发一个Redis服务器(redis开源项目)

九哥聊Kestrel网络编程第一章:开发一个Redis服务器(redis开源项目)

时间2023-03-24 17:30:26发布访客分类ASP.NET浏览565
导读:推荐序之前在.NET 性能优化群内交流时,我们发现很多朋友对于高性能网络框架有需求,需要创建自己的消息服务器、游戏服务器或者物联网网关。但是大多数小伙伴只知道 DotNetty,虽然 DotNetty 是一个非常优秀的网络框架,广泛应用于各...

推荐序

之前在.NET 性能优化群内交流时,我们发现很多朋友对于高性能网络框架有需求,需要创建自己的消息服务器、游戏服务器或者物联网网关。但是大多数小伙伴只知道 DotNetty,虽然 DotNetty 是一个非常优秀的网络框架,广泛应用于各种网络服务器中,不过因为各种原因它已经不再有新的特性支持和更新,很多小伙伴都在寻找替代品。

这一切都不用担心,在.NET Core 以后的时代,我们有了更快、更强、更好的 Kestrel 网络框架,正如其名,Kestrel 中文翻译为红隼(hóng sǔn)封面就是红隼的样子,是一种飞行速度极快的猛禽。Kestrel 是 ASPNET Core 成为.NET 平台性能最强 Web 服务框架的原因之一,但是很多人还觉得 Kestrel 只是用于 ASPNET Core 的网络框架,但是其实它是一个高性能的通用网络框架。

我和拥有多个.NET 千星开源项目作者九哥[1]一拍即合,为了让更多的人了解 Kestrel,计划写一系列的文章来介绍它;本文是九哥发布的第一篇,通过 Kestrel 实现一个 Redis 的伪服务器,带大家了解 Kestrel 除了承载 HTTP 协议,还有其它更多的可能性,DotNetty 能做到的,Kestrel 同样也可以。

由于公众号排版问题,建议在PC上浏览

1 文章目的

本文讲解基于 kestrel 开发实现了部分 redis 命令的 redis 伪服务器的过程,让读者了解 kestrel 网络编程的完整步骤,其中 redis 通讯协议需要读者自行查阅,文章里不做具体解析。

2 开发顺序

  1. 创建 Kestrel 的 Redis 协议处理者
  2. 配置监听的 EndPoint 并使用 Redis 处理者
  3. 设计交互上下文 RedisContext
  4. 设计 Redis 命令处理者
  5. 设计 Redis 中间件
  6. 编排 Redis 中间件构建应用

3. 创建 Redis 协议处理者

在 Kestrel 中,末级的中间件是一个没有 next 的特殊中间件,基表现出来就是一个 ConnectionHandler 的行为。我们开发 redis 应用只需要继承 ConnectionHandler 这个抽象类来,当 kestrel 接收到新的连接时将连接交给我们来处理,我们处理完成之后,不再有下一个处理者来处理这个连接了。

/// summary>
    
/// 表示Redis连接处理者
/// /summary>

sealed class RedisConnectionHandler : ConnectionHandler
{
    
    /// summary>
    
    /// 处理Redis连接
    /// /summary>
    
    /// param name="context">
    redis连接上下文/param>
    
    /// returns>
    /returns>

    public async override Task OnConnectedAsync(ConnectionContext context)
    {

        // 开始处理这个redis连接
        ...
        // 直到redis连接断开后结束
    }

}

4. 配置监听的 EndPoint

4.1 json 配置文件

我们在配置文件里指定监听本机的 5007 端口来做服务器,当然你可以指定本机具体的某个 IP 或任意 IP。

{

  "Kestrel": {

    "Endpoints": {

      "Redis": {
 // redis协议服务器,只监听loopback的IP
        "Url": "http://localhost:5007"
      }

    }

  }

}

{

  "Kestrel": {

    "Endpoints": {

      "Redis": {
 // redis协议服务器,监听所有IP
        "Url": "http://*:5007"
      }

    }

  }

}
    

4.2 在代码中配置 Redis 处理者

为 Redis 这个节点关联上RedisConnectionHandler,当 redis 客户端连接到 5007 这个端口之后,OnConnectedAsync()方法就得到触发且收到连接上下文对象。

builder.WebHost.ConfigureKestrel((context, kestrel) =>

{
    
    var section = context.Configuration.GetSection("Kestrel");
    
    kestrel.Configure(section).Endpoint("Redis", endpoint =>

    {
    
        endpoint.ListenOptions.UseConnectionHandlerRedisConnectionHandler>
    ();

    }
    );

}
    );
    

5 设计 RedisContext

在 asp.netcore 里,我们知道应用层每次 http 请求都创建一个 HttpContext 对象,里面就塞着各种与本次请求有关的对象。对于 Redis 的请求,我们也可以这么抄袭 asp.netcore 来设计 Redis。

5.1 RedisContext

Redis 请求上下文,包含 Client、Request、Response 和 Features 对象,我们要知道是收到了哪个 Redis 客户端的什么请求,从而请求命令处理者可以向它响应对应的内容。

/// summary>
    
/// 表示redis上下文
/// /summary>

sealed class RedisContext : ApplicationContext
{
    
    /// summary>
    
    /// 获取redis客户端
    /// /summary>

    public RedisClient Client {
     get;
 }
    

    /// summary>
    
    /// 获取redis请求
    /// /summary>

    public RedisRequest Reqeust {
     get;
 }
    

    /// summary>
    
    /// 获取redis响应
    /// /summary>

    public RedisResponse Response {
     get;
 }
    

    /// summary>
    
    /// redis上下文
    /// /summary>
    
    /// param name="client">
    /param>
    
    /// param name="request">
    /param>
    
    /// param name="response">
    /param>
    
    /// param name="features">
    /param>

    public RedisContext(RedisClient client, RedisRequest request, RedisResponse response, IFeatureCollection features)
        : base(features)
    {
    
        this.Client = client;
    
        this.Reqeust = request;
    
        this.Response = response;

    }


    public override string ToString()
    {

        return $"{
this.Client}
 {
this.Reqeust}
    ";

    }

}
    

5.2 ApplicationContext

这是抽象的应用层上下文,它强调 Features,做为多个中间件之间的沟通渠道。

/// summary>
    
/// 表示应用程序请求上下文
/// /summary>

public abstract class ApplicationContext
{
    
    /// summary>
    
    /// 获取特征集合
    /// /summary>

    public IFeatureCollection Features {
     get;
 }
    

    /// summary>
    
    /// 应用程序请求上下文
    /// /summary>
    
    /// param name="features">
    /param>

    public ApplicationContext(IFeatureCollection features)
    {
    
        this.Features = new FeatureCollection(features);

    }

}
    

5.3 RedisRequest

一个 redis 请求包含请求的命令和 0 到多个参数值。

/// summary>
    
/// 表示Redis请求
/// /summary>

sealed class RedisRequest
{
    
    private readonly ListRedisValue>
     values = new();
    

    /// summary>
    
    /// 获取命令名称
    /// /summary>

    public RedisCmd Cmd {
     get;
     private set;
 }
    

    /// summary>
    
    /// 获取参数数量
    /// /summary>
    
    public int ArgumentCount =>
     this.values.Count - 1;
    

    /// summary>
    
    /// 获取参数
    /// /summary>
    
    /// param name="index">
    /param>
    
    /// returns>
    /returns>

    public RedisValue Argument(int index)
    {
    
        return this.values[index + 1];

    }

}
    

RedisRequest 的解析:

/// summary>
    
/// 从内存中解析
/// /summary>
    
/// param name="memory">
    /param>
    
/// param name="request">
    /param>
    
/// exception cref="RedisProtocolException">
    /exception>
    
/// returns>
    /returns>
    
private static bool TryParse(ReadOnlyMemorybyte>
 memory, [MaybeNullWhen(false)] out RedisRequest request)
{
    
    request = default;

    if (memory.IsEmpty == true)
    {
    
        return false;

    }
    

    var span = memory.Span;

    if (span[0] != '*')
    {
    
        throw new RedisProtocolException();

    }


    if (span.Length  4)
    {
    
        return false;

    }
    

    var lineLength = span.IndexOf((byte)'\n') + 1;

    if (lineLength  4)
    {
    
        throw new RedisProtocolException();

    }
    

    var lineCountSpan = span.Slice(1, lineLength - 3);
    
    var lineCountString = Encoding.ASCII.GetString(lineCountSpan);

    if (int.TryParse(lineCountString, out var lineCount) == false || lineCount  0)
    {
    
        throw new RedisProtocolException();

    }
    

    request = new RedisRequest();
    
    span = span.Slice(lineLength);
    
    for (var i = 0;
     i  lineCount;
 i++)
    {

        if (span[0] != '$')
        {
    
            throw new RedisProtocolException();

        }
    

        lineLength = span.IndexOf((byte)'\n') + 1;

        if (lineLength  4)
        {
    
            throw new RedisProtocolException();

        }
    

        var lineContentLengthSpan = span.Slice(1, lineLength - 3);
    
        var lineContentLengthString = Encoding.ASCII.GetString(lineContentLengthSpan);

        if (int.TryParse(lineContentLengthString, out var lineContentLength) == false)
        {
    
            throw new RedisProtocolException();

        }
    

        span = span.Slice(lineLength);

        if (span.Length  lineContentLength + 2)
        {
    
            return false;

        }
    

        var lineContentBytes = span.Slice(0, lineContentLength).ToArray();
    
        var value = new RedisValue(lineContentBytes);
    
        request.values.Add(value);
    

        span = span.Slice(lineContentLength + 2);

    }
    

    request.Size = memory.Span.Length - span.Length;
    
    Enum.TryParseRedisCmd>
    (request.values[0].ToString(), ignoreCase: true, out var name);
    
    request.Cmd = name;
    

    return true;

}
    

5.4 RedisResponse

/// summary>
    
/// 表示redis回复
/// /summary>

sealed class RedisResponse
{
    
    private readonly PipeWriter writer;


    public RedisResponse(PipeWriter writer)
    {
    
        this.writer = writer;

    }
    

    /// summary>
    
    /// 写入\r\n
    /// /summary>
    
    /// returns>
    /returns>

    public RedisResponse WriteLine()
    {
    
        this.writer.WriteCRLF();
    
        return this;

    }


    public RedisResponse Write(char value)
    {
    
        this.writer.Write((byte)value);
    
        return this;

    }
    

    public RedisResponse Write(ReadOnlySpanchar>
 value)
    {
    
        this.writer.Write(value, Encoding.UTF8);
    
        return this;

    }
    

    public RedisResponse Write(ReadOnlyMemorybyte>
 value)
    {
    
        this.writer.Write(value.Span);
    
        return this;

    }
    


    public ValueTaskFlushResult>
 FlushAsync()
    {
    
        return this.writer.FlushAsync();

    }
    

    public ValueTaskFlushResult>
 WriteAsync(ResponseContent content)
    {
    
        return this.writer.WriteAsync(content.ToMemory());

    }

}
    

5.5 RedisClient

Redis 是有状态的长连接协议,所以在服务端,我把连接接收到的连接包装为 RedisClient 的概念,方便我们业务理解。对于连接级生命周期的对象属性,我们都应该放到 RedisClient 上,比如是否已认证授权等。

/// summary>
    
/// 表示Redis客户端
/// /summary>

sealed class RedisClient
{
    
    private readonly ConnectionContext context;
    

    /// summary>
    
    /// 获取或设置是否已授权
    /// /summary>

    public bool? IsAuthed {
     get;
     set;
 }
    

    /// summary>
    
    /// 获取远程终结点
    /// /summary>
    
    public EndPoint? RemoteEndPoint =>
     context.RemoteEndPoint;
    

    /// summary>
    
    /// Redis客户端
    /// /summary>
    
    /// param name="context">
    /param>

    public RedisClient(ConnectionContext context)
    {
    
        this.context = context;

    }
    

    /// summary>
    
    /// 关闭连接
    /// /summary>

    public void Close()
    {
    
        this.context.Abort();

    }
    

    /// summary>
    
    /// 转换为字符串
    /// /summary>
    
    /// returns>
    /returns>

    public override string? ToString()
    {
    
        return this.RemoteEndPoint?.ToString();

    }

}
    

6. 设计 Redis 命令处理者

redis 命令非常多,我们希望有一一对应的 cmdHandler 来对应处理,来各尽其责。所以我们要设计 cmdHandler 的接口,然后每个命令增加一个实现类型,最后使用一个中间件来聚合这些 cmdHandler。

6.1 IRedisCmdHanler 接口

/// summary>
    
/// 定义redis请求处理者
/// /summary>

interface IRedisCmdHanler
{
    
    /// summary>
    
    /// 获取能处理的请求命令
    /// /summary>

    RedisCmd Cmd {
     get;
 }
    

    /// summary>
    
    /// 处理请求
    /// /summary>
    
    /// param name="context">
    /param>
    
    /// returns>
    /returns>
    
    ValueTask HandleAsync(RedisContext context);

}
    

6.2 IRedisCmdHanler 实现

由于实现类型特别多,这里只举个例子

/// summary>
    
/// Ping处理者
/// /summary>

sealed class PingHandler : IRedisCmdHanler
{
    
    public RedisCmd Cmd =>
     RedisCmd.Ping;
    

    /// summary>
    
    /// 处理请求
    /// /summary>
    
    /// param name="context">
    /param>
    
    /// returns>
    /returns>

    public async ValueTask HandleAsync(RedisContext context)
    {
    
        await context.Response.WriteAsync(ResponseContent.Pong);

    }

}
    

7.设计 Redis 中间件

对于 Redis 服务器应用而言,我们处理一个请求需要经过多个大的步骤:

  1. 如果服务器要求 Auth 的话,验证连接是否已 Auth
  2. 如果 Auth 验证通过之后,则查找与请求对应的 IRedisCmdHanler 来处理请求
  3. 如果没有 IRedisCmdHanler 来处理,则告诉客户端命令不支持。

7.1 中间件接口

/// summary>
    
/// redis中间件
/// /summary>
    
interface IRedisMiddleware : IApplicationMiddlewareRedisContext>

{

}
    
/// summary>
    
/// 应用程序中间件的接口
/// /summary>
    
/// typeparam name="TContext">
    /typeparam>
    
public interface IApplicationMiddlewareTContext>

{
    
    /// summary>
    
    /// 执行中间件
    /// /summary>
    
    /// param name="next">
    下一个中间件/param>
    
    /// param name="context">
    上下文/param>
    
    /// returns>
    /returns>
    
    Task InvokeAsync(ApplicationDelegateTContext>
     next, TContext context);

}
    

7.2 命令处理者中间件

这里只拿重要的命令处理者中间件来做代码说明,其它中间件也是一样处理方式。

/// summary>
    
/// 命令处理中间件
/// /summary>

sealed class CmdMiddleware : IRedisMiddleware
{
    
    private readonly DictionaryRedisCmd, IRedisCmdHanler>
     cmdHandlers;
    

    public CmdMiddleware(IEnumerableIRedisCmdHanler>
 cmdHanlers)
    {
    
        this.cmdHandlers = cmdHanlers.ToDictionary(item =>
     item.Cmd, item =>
     item);

    }
    

    public async Task InvokeAsync(ApplicationDelegateRedisContext>
 next, RedisContext context)
    {

        if (this.cmdHandlers.TryGetValue(context.Reqeust.Cmd, out var hanler))
        {
    
            // 这里是本中间件要干的活
            await hanler.HandleAsync(context);

        }

        else
        {
    
            // 本中间件干不了,留给下一个中间件来干
            await next(context);

        }

    }

}

8 编排 Redis 中间件

回到 RedisConnectionHandler,我们需要实现它,实现逻辑是编排 Redis 中间件并创建可以处理应用请求的委托application,再将收到的 redis 请求创建 RedisContext 对象的实例,最后使用application来执行 RedisContext 实例即可。

8.1 构建 application 委托

sealed class RedisConnectionHandler : ConnectionHandler
{
    
    private readonly ILoggerRedisConnectionHandler>
     logger;
    
    private readonly ApplicationDelegateRedisContext>
     application;
    

    /// summary>
    
    /// Redis连接处理者
    /// /summary>
    
    /// param name="appServices">
    /param>
    
    /// param name="logger">
    /param>
    
    public RedisConnectionHandler(
        IServiceProvider appServices,
        ILoggerRedisConnectionHandler>
 logger)
    {
    
        this.logger = logger;
    
        this.application = new ApplicationBuilderRedisContext>
    (appServices)
            .UseAuthMiddleware>
    ()
            .UseCmdMiddleware>
    ()
            .UseFallbackMiddlware>
    ()
            .Build();

    }

}

8.2 使用 application 委托处理请求

sealed class RedisConnectionHandler : ConnectionHandler
{
    
    /// summary>
    
    /// 处理Redis连接
    /// /summary>
    
    /// param name="context">
    /param>
    
    /// returns>
    /returns>

    public async override Task OnConnectedAsync(ConnectionContext context)
    {

        try
        {
    
            await this.HandleRequestsAsync(context);

        }

        catch (Exception ex)
        {
    
            this.logger.LogDebug(ex.Message);

        }

        finally
        {
    
            await context.DisposeAsync();

        }

    }
    

    /// summary>
    
    /// 处理redis请求
    /// /summary>
    
    /// param name="context">
    /param>
    
    /// returns>
    /returns>

    private async Task HandleRequestsAsync(ConnectionContext context)
    {
    
        var input = context.Transport.Input;
    
        var client = new RedisClient(context);
    
        var response = new RedisResponse(context.Transport.Output);


        while (context.ConnectionClosed.IsCancellationRequested == false)
        {
    
            var result = await input.ReadAsync();

            if (result.IsCanceled)
            {
    
                break;

            }
    

            var requests = RedisRequest.Parse(result.Buffer, out var consumed);
    
            if (requests.Count >
 0)
            {

                foreach (var request in requests)
                {
    
                    var redisContext = new RedisContext(client, request, response, context.Features);
    
                    await this.application.Invoke(redisContext);

                }
    
                input.AdvanceTo(consumed);

            }

            else
            {
    
                input.AdvanceTo(result.Buffer.Start, result.Buffer.End);

            }


            if (result.IsCompleted)
            {
    
                break;

            }

        }

    }

}
    

9 文章总结

在还没有进入阅读本文章之前,您可能会觉得我会大量讲解 Socket 知识内容,例如Socket BindSocket AcceptSocket SendSocket Receive等。但实际上没完全没有任何涉及,因为终结点的监听、连接的接收、缓冲区的处理、数据接收与发送等这些基础而复杂的网络底层 kestrel 已经帮我处理好,我们关注是我们的应用协议层的解析、还有应用本身功能的开发两个本质问题。

您可能发也现了,本文章的 RedisRequest 解析,也没有多少行代码!反而文章中都是抽象的中间件、处理者、上下文等概念。实际上这不但不会带来项目复杂度,反而让项目更好的解耦,比如要增加一个新的指令的支持,只需要增加一个 xxxRedisCmdHanler 的文件,其它地方都不用任何修改。

本文章是KestrelApp 项目[2]里面的一个 demo 的讲解,希望对您有用。

参考资料

[1]

九哥: https://www.cnblogs.com/kewei/

[2]

KestrelApp项目: https://github.com/xljiulang/KestrelApp

声明:本文内容由网友自发贡献,本站不承担相应法律责任。对本内容有异议或投诉,请联系2913721942#qq.com核实处理,我们将尽快回复您,谢谢合作!

云数据库Redissocket编程腾讯云开发者社区.netasp.net

若转载请注明出处: 九哥聊Kestrel网络编程第一章:开发一个Redis服务器(redis开源项目)
本文地址: https://pptw.com/jishu/258.html
.NET 使用NLog增强日志输出 asp.net core 成为构建企业web应用首选

游客 回复需填写必要信息