RedisTemplate-opsForStream实现消息队列,主要演示 xgroup,xreadgroup,xpending,xack,xinfo的用法

stream  更多详细命令使用,可查看博文
redis基于Stream类型实现消息队列,命令操作,术语概念,个人总结等-CSDN博客

1 springboot整合redis 就不多说了

2 有用到hutool工具类,添加下 pom 依赖

<dependency>
            <groupId>cn.hutool</groupId>
            <artifactId>hutool-all</artifactId>
            <version>5.8.5</version>
            <scope>compile</scope>
 </dependency>

3 写个pojo类

@Data
@AllArgsConstructor
@NoArgsConstructor
public class SeckillOrder {

    private String userId;
    private String goodsId;
    private String orderId;

}

4 往redis 写入些测试数据,如下

del order_stream

keys *

# 往消息队列添加消息
xadd order_stream * userId user_01, goodsId goodsId_01, orderId order_01
xadd order_stream * userId user_02, goodsId goodsId_02, orderId order_02
xadd order_stream * userId user_03, goodsId goodsId_03, orderId order_03
xadd order_stream * userId user_04, goodsId goodsId_04, orderId order_04
xadd order_stream * userId user_05, goodsId goodsId_05, orderId order_05
xadd order_stream * userId user_06, goodsId goodsId_06, orderId order_06
xadd order_stream * userId user_07, goodsId goodsId_07, orderId order_07
xadd order_stream * userId user_08, goodsId goodsId_08, orderId order_08
xadd order_stream * userId user_09, goodsId goodsId_09, orderId order_09
xadd order_stream * userId user_10, goodsId goodsId_10, orderId order_10

# 获取流中的数据(- 表示最小值,+表示最大值)
xrange order_stream - +

创建g1消费者组,从头开始消费
XGROUP CREATE order_stream g1 0-0

创建g2消费者组,从尾部开始消费
XGROUP CREATE order_stream g2 $

# 通过 xreadgroup 的一个消费者(consumer1)读取流(order_stream)中的1条消息
xreadgroup group g1 consumer1 count 1 streams order_stream >

# 显示g1消费者组 待处理消息的相关信息
xpending order_stream g1

# 打印流信息
xinfo stream order_stream
# 打印消费组信息
xinfo groups order_stream

5 核心类,主要演示 xreadgroup,xpending,xack的用法

import cn.hutool.core.bean.BeanUtil;
import lombok.extern.slf4j.Slf4j;
import org.example.service_a.domain.SeckillOrder;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.connection.stream.*;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;


@Component
@Slf4j
public class Redis_Stream_Test {

    @Autowired
    private StringRedisTemplate redisTemplate;

    // 定义流的键名
    public static final String stream_key = "order_stream";
    // 定义消费者组名
    public static final String group_str = "g1";
    // 定义消费者名称
    public static final String consumer_str = "c1";

    /**
     * 消费流中的消息
     * 该方法会不断读取指定流中的消息并处理,直到没有消息为止。
     * 用到命令 xreadgroup
     * 每个消费者组执行这个方法后,假如队列有5条消息,第1次执行,就返回5条消息,再次执行此方法,就返回为空
     * 如果还想读这5条消息,就换个消费者组名称就可以了
     */
    @EnableScheduling //添加到主启动类上
    @Scheduled(cron="0/5 * *  * * ? ")   //每5秒执行一次
    public void read_message() {
        try {
            if (!redisTemplate.hasKey(stream_key)) {
                return;
            }
            // 检查是否已创建消费者组,若未创建则创建
            Set<StreamInfo.XInfoGroup> collect = redisTemplate.opsForStream().groups(stream_key)
                    .stream().filter(infoGroup -> infoGroup.groupName().equals(group_str)).collect(Collectors.toSet());
            if (collect.isEmpty()) {
                // 创建消费者组,****从头开始读****
                // XGROUP CREATE order_stream g1 0-0
                String group = redisTemplate.opsForStream().createGroup(stream_key, ReadOffset.from("0-0"), group_str);
                log.info("消费者组添加成功,group = {}", group);
            }

            // 读取消息,可以不阻塞读,也可以阻塞方式来读 如 xreadgroup group g1 c1 count 5 block 60000 order_stream >
            List<MapRecord<String, Object, Object>> read = redisTemplate.opsForStream().read(
                    Consumer.from(group_str, consumer_str),
                    StreamReadOptions.empty().count(5),
                    StreamOffset.create(stream_key, ReadOffset.lastConsumed())
            );

            // 若无消息则返回
            if (read.isEmpty()) {
                return;
            }

            // 记录读取到的消息数量
            log.info("读取到消息,size = {}", read.size());
            // 处理每条消息
            read.forEach(mapRecord -> {
                // 记录消息ID和内容
                RecordId recordId = mapRecord.getId();
                Map<Object, Object> value = mapRecord.getValue();
                log.info("消息id = {}", mapRecord.getId());
                log.info("消息内容 = {}", mapRecord.getValue());
                // 将消息内容转换为实体对象,用hutool
                SeckillOrder seckillOrder = BeanUtil.fillBeanWithMap(value, new SeckillOrder(), false);
                log.info("消息内容转成实体查看 = {}", seckillOrder);
                // 进行业务逻辑处理等
            });
        } catch (Exception e) {
            // 记录读取消息失败的日志
            log.error("读取消息失败,e = {}", e);
        }
    }

    /**
     * 确认消息处理完成
     * 该方法会读取并确认指定消费者组下指定消费者处理完成的消息。
     * 用到命令 xpending , xack
     */
    public void ack_message() {
        try {
            // 获取待处理的消息
            PendingMessages pending = redisTemplate.opsForStream().pending(
                    stream_key,
                    Consumer.from(group_str, consumer_str)
            );
            log.info("待处理消息 = {}", pending);

            // 遍历并确认每条待处理消息
            pending.forEach(pendingMessage -> {
                // 记录待处理消息的元数据
                log.info("待处理消息元数据 = {}", pendingMessage);
                RecordId recordId = pendingMessage.getId();
                log.info("待处理消息id = {}", recordId);
                // 提交消息确认
                redisTemplate.opsForStream().acknowledge(stream_key, group_str, recordId);
                log.info("消息提交ack成功,id = {}", recordId);
            });
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    /**
     * 消费者组 ,消费者,队列的信息
     */
    public void xInfo() {

        StreamInfo.XInfoGroups groups = redisTemplate.opsForStream().groups(stream_key);
        log.info("获取组的信息:{}", groups);
        System.out.println();

        StreamInfo.XInfoConsumers consumers = redisTemplate.opsForStream().consumers(stream_key, group_str);
        log.info("获取消费者的信息:{}", consumers);
        System.out.println();

        StreamInfo.XInfoStream infoStream = redisTemplate.opsForStream().info(stream_key);
        log.info("获取流的信息:{}", infoStream);
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/576527.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

架构师系列- 定时任务(四)- XXl-Job

XXL-JOB是一个轻量级分布式任务调度平台&#xff0c;其核心设计目标是开发迅速、学习简单、轻量级、易扩展&#xff0c;其中“XXL”是主要作者&#xff0c;大众点评许雪里名字的缩写 和ElasticJob的区别 相同点 E-Job和X-job都有普遍的用户基础和完整的技术文档&#xff0c;都…

吴恩达深度学习笔记:深度学习的 实践层面 (Practical aspects of Deep Learning)1.6-1.8

目录 第一门课&#xff1a;第二门课 改善深层神经网络&#xff1a;超参数调试、正 则 化 以 及 优 化 (Improving Deep Neural Networks:Hyperparameter tuning, Regularization and Optimization)第一周&#xff1a;深度学习的 实践层面 (Practical aspects of Deep Learning)…

某赛通电子文档安全管理系统 多处 SQL注入漏洞复现

0x01 产品简介 某赛通电子文档安全管理系统(简称:CDG)是一款电子文档安全加密软件,该系统利用驱动层透明加密技术,通过对电子文档的加密保护,防止内部员工泄密和外部人员非法窃取企业核心重要数据资产,对电子文档进行全生命周期防护,系统具有透明加密、主动加密、智能…

【智能算法】向日葵优化算法(SFO)原理及实现

目录 1.背景2.算法原理2.1算法思想2.2算法过程 3.结果展示4.参考文献 1.背景 2019年&#xff0c;GF Gomes等人受到自然界向日葵运动行为启发&#xff0c;提出了向日葵优化算法&#xff08;Sunflower Optimization, SFO&#xff09;。 2.算法原理 2.1算法思想 SFO模拟向日葵行…

Android某钉数据库的解密分析

声明 1 本文章中所有内容仅供学习交流&#xff0c;抓包内容、敏感网址、数据接口均已做脱敏处理&#xff0c;严禁用于商业用途和非法用途&#xff0c;否则由此产生的一切后果均与作者无关&#xff01; 目的 1 解密app数据库&#xff0c;用数据库软件打开查看信息内容 入手…

C语言数据类型的介绍,类型的基本归类,整型在内存中的存储,原码、反码、补码,大小端等介绍

文章目录 前言一、数据类型的介绍类型的意义 1. 类型的基本归类&#xff08;1&#xff09;. 整型家族&#xff08;2&#xff09;. 浮点数家族&#xff08;3&#xff09;. 构造类型&#xff08;4&#xff09;. 指针类型&#xff08;5&#xff09;. 空类型 二、整型在内存中的存储…

【网络安全】安全事件管理处置 — 安全事件处置思路指导

专栏文章索引&#xff1a;网络安全 有问题可私聊&#xff1a;QQ&#xff1a;3375119339 目录 一、处理DDOS事件 1.准备工作 2.预防工作 3.检测与分析 4.限制、消除 5.证据收集 二、处理恶意代码事件 1.准备 2.预防 3.检测与分析 4.限制 5.证据收集 6.消除与恢复 …

NodeJS基础知识

文章目录 **1. Node.js平台与架构****1.1 Node.js简介****1.2 事件循环&#xff08;Event Loop&#xff09;** **2. JavaScript基础知识****2.1 ECMAScript版本****2.2 变量、数据类型、运算符****2.3 函数****2.4 类与面向对象编程** **3. Node.js核心API****3.1 全局对象与内…

Linux下载及安装OpenSSL

文章目录 前言一、OpenSSL下载二、OpenSSL安装1.上传下载好的安装包到服务器2.解压3.切换目录4.配置config5.编译6.安装7.备份旧版本OpenSSL7.创建软链接8.添加OpenSSL动态链接库9.更新库缓存10.查看OpenSSL版本验证安装是否成功 前言 一般系统会自带有OpenSSL&#xff0c;我们…

OpenHarmony实战开发-媒体查询 (@ohos.mediaquery)

概述 媒体查询作为响应式设计的核心&#xff0c;在移动设备上应用十分广泛。媒体查询可根据不同设备类型或同设备不同状态修改应用的样式。媒体查询常用于下面两种场景&#xff1a; 针对设备和应用的属性信息&#xff08;比如显示区域、深浅色、分辨率&#xff09;&#xff0…

大数据第五天(操作hive的方式)

文章目录 操作hive的方式hive 存储位置hive 操作语法创建数据表的方式 操作hive的方式 hive 存储位置 hive 操作语法 创建数据表的方式 – 创建数据库 create database if not exists test我们创建数据库表的时候&#xff0c;hive是将我们的数据自动添加到数据表中&#xf…

Matlab|交直流系统潮流计算(含5种控制模式)

目录 1 主要内容 程序参考流程图 2 部分代码 3 程序结果 4 下载链接 1 主要内容 该程序参考文献《交直流系统潮流计算及相互关联特性分析》&#xff0c;采用5种交直流潮流控制方式&#xff1a;1.定电流定电压 2.定电流定熄弧角 3.定功率定电压 4.定功率定熄弧角 5.定触发角…

C++进阶:多态

目录 一、多态的概念 二、多态的实现 1.多态的实现条件 2.虚函数 3.虚函数的重写(覆盖) 三、概念比较 四、抽象类 1.概念 2.接口继承与实现继承 一、多态的概念 在生活中我们通常会遇到以下的一个场景&#xff1a;领支付宝的红包。 明明都是同一个红包&#xff0c;不同…

Qt配置CMake出错

一个项目需要在mingw环境下编译Opencv源码&#xff0c;当我用Qt配置opencv的CMakeLists.txt时&#xff0c;出现了以下配置错误&#xff1a; 首先我根据下述博文介绍&#xff0c;手动配置了CMake&#xff0c;但仍不能解决问题。 Qt(MinGW版本)安装 - 夕西行 - 博客园 (cnblogs.…

鸿蒙(HarmonyOS)性能优化实战-Trace使用教程

概述 OpenHarmony的DFX子系统提供了为应用框架以及系统底座核心模块的性能打点能力&#xff0c;每一处打点即是一个Trace&#xff0c;其上附带了记录执行时间、运行时格式化数据、进程或线程信息等。开发者可以使用SmartPerf-Host调试工具对Trace进行解析&#xff0c;在其绘制…

人工智能如何提高公司效率的 5 种方法

人工智能是当今最热门的话题之一&#xff0c;但并不是每个人都了解其对商业的价值规模。由此可见&#xff0c;现有的AI技术可以将企业的生产力提升40%。 在机器学习的帮助下&#xff0c;Netflix 利用自动化个性化推荐每年赚取 10 亿美元。当公司使用人工智能时&#xff0c;34%…

线性代数:抽象向量空间

一、说明 有些函数系列极具线性代数的向量特征。这里谈及多项式构成函数的线性代数意义。问题是这个主题能展开多少内涵&#xff1f;请看本文的论述。 二、线性空间和向量 让我先问你一个简单的问题。什么是向量&#xff1f;为了方便起见&#xff0c;二维箭头从根本上说是平…

Web前端一套全部清晰 ③ day2 HTML 标签综合案例

别让平淡生活&#xff0c;耗尽所有向往 —— 24.4.26 综合案例 —— 一切都会好的 网页制作思路&#xff1a;从上到下&#xff0c;先整体到局部&#xff0c;逐步分析制作 分析内容 ——> 写代码 ——>保存——>刷新浏览器&#xff0c;看效果 <!DOCTYPE html> &l…

IDEA生成测试类

方法一 具体流程: 选中要生成的测试类------------>选择code选项------------>选择Generate选项---------->选择test选项---------->选择要生成的方法 第一步: 光标选中需要生成测试类的类 找到code选项 选中Generate选项 选中test选项 选中你要生成的测试…

【智能算法】囊状虫群算法(TSA)原理及实现

目录 1.背景2.算法原理2.1算法思想2.2算法过程 3.结果展示4.参考文献 1.背景 2020年&#xff0c;S Kaur等人受到囊状虫群自然行为启发&#xff0c;提出了囊状虫群算法&#xff08;Tunicate Swarm Algorithm, TSA&#xff09;。 2.算法原理 2.1算法思想 TSA模拟了囊状虫群在导…
最新文章