欢迎您访问 最编程 本站为您分享编程语言代码,编程技术文章!
您现在的位置是: 首页

在 WPF 下使用 FreeRedis 操作 RedisStream 的简单消息队列。

最编程 2024-10-03 07:02:52
...

Redis Stream简介

Redis Stream是随着5.0版本发布的一种新的Redis数据类型:

高效消费者组:允许多个消费者组从同一数据流的不同部分消费数据,每个消费者组都能独立地处理消息,这样可以并行处理和提高效率。

阻塞操作:消费者可以设置阻塞操作,这样它们会在流中有新数据添加时被唤醒并开始处理,这有助于减少资源消耗并提高响应速度。

数据持久化:它可以将数据持久化到内存(配置本地持久化后会写入到存储设备)中进行保存,等待消费。

多生产者多消费者:Redis Streams能够在多个生产者和消费者之间建立一个数据通道,使得数据的流动和处理更加灵活。

扩展性和异步通信:用户可以通过应用程序轻松扩展消费者数量,并且生产者和消费者之间的通信可以是异步的,这有助于提高系统的整体性能。

满足多样化需求:Redis Streams满足从实时数据处理到历史数据访问的各种需求,同时保持易于管理。

Redis Stream可以干什么

消息队列:Redis Stream可以用作一个可靠的消息队列系统,支持发布/订阅模式,生产者和消费者可以异步地发送和接收消息。

任务调度:Redis Stream可以用于实现分布式任务调度系统,将任务分发到多个消费者进行处理,从而提高处理速度和系统可扩展性。

事件驱动架构:Redis Stream可以作为事件驱动架构中的核心组件,用于处理来自不同服务的事件,实现解耦和灵活性。

FreeRedis简介

FreeRedis 的命名来自,“*”、“免费”,它和名字与 FreeSql 是一个理念,简易是他们一致的追寻方向,最低可支持 .NET Framework 4.0 运行环境,支持到 Redis-server 7.2。

github MIT开源协议

作者

官方介绍

基于 .NET 的 Redis 客户端,支持 .NET Core 2.1+、.NET Framework 4.0+、Xamarin 以及 AOT。

  • ???? 所有方法名与 redis-cli 保持一致
  • ???? 支持 Redis 集群(服务端要求 3.2 及以上版本)
  • ⛳ 支持 Redis 哨兵模式
  • ???? 支持主从分离(Master-Slave)
  • ???? 支持发布订阅(Pub-Sub)
  • ???? 支持 Redis Lua 脚本
  • ???? 支持管道(Pipeline)
  • ???? 支持事务
  • ???? 支持 GEO 命令(服务端要求 3.2 及以上版本)
  • ???? 支持 STREAM 类型命令(服务端要求 5.0 及以上版本)
  • ⚡ 支持本地缓存(Client-side-cahing,服务端要求 6.0 及以上版本)
  • ???? 支持 Redis 6 的 RESP3 协议

要实现的功能

1、生产者生产数据

2、消费者消费数据后确认

3、消费者消费数据后不确认

4、已消费但超时未确认的消息监控

5、已消费但超时未确认的消息二次消费

项目依赖

WPF
CommunityToolkit.Mvvm
FreeRedis
Newtonsoft.Json
NLog
redis-windows-7.2.5

业务场景代码

涉及到的Redis命令

创建消费者组 XGROUP CREATE key group <id | $> [MKSTREAM] [ENTRIESREAD entries-read]

查询消费者组信息 XINFO STREAM key [FULL [COUNT count]]

消息队列数量(长度) XLEN key

添加消息到队列尾部 XADD key [NOMKSTREAM] [<MAXLEN | MINID> [= | ~] threshold [LIMIT count]] <* | id> field value [field value ...]

消费组成员读取消息 XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key ...] id [id ...]

确认消息 XACK key group id [id ...]

删除消息 XDEL key id [id ...]

获取消费未确认消息的队列信息 XPENDING key group [[IDLE min-idle-time] start end count [consumer]]

把未消费的消息消费者转移到当前消费者名下 XAUTOCLAIM key group consumer min-idle-time start [COUNT count] [JUSTID]

代码

App.xaml.cs

    public partial class App : Application
    {
        public static Logger Logger = LogManager.GetCurrentClassLogger();        
        public static RedisClient RedisHelper;
        public static MainViewModel MainViewModel;
        private void App_OnStartup(object sender, StartupEventArgs e)
        {
            Current.DispatcherUnhandledException += Current_DispatcherUnhandledException;

            try
            {
                //redis6以上版本启用了ACL用户管理机制,默认用户名是default,可以忽略密码
                RedisHelper = new RedisClient("127.0.0.1:6379,user=defualt,defaultDatabase=13");
                RedisHelper.Serialize = JsonConvert.SerializeObject;//序列化
                RedisHelper.Deserialize = JsonConvert.DeserializeObject;//反序列化
                RedisHelper.Notice += (s, ee) => Console.WriteLine(ee.Log); //打印命令日志
                MainViewModel = new MainViewModel();
            }
            catch (Exception exception)
            {
                MessageBox.Show(exception.Message);
                Current.Shutdown(-100);
            }
        }

        private void Current_DispatcherUnhandledException(object sender, System.Windows.Threading.DispatcherUnhandledExceptionEventArgs e)
        {
            e.Handled = true;
            Logger.Error($"未捕获的错误:来源:{sender},错误:{e}");
        }

        private void App_OnEx

推荐阅读