微服务开发系列 第七篇:RocketMQ

总概

A、技术栈

  • 开发语言:Java 1.8
  • 数据库:MySQL、Redis、MongoDB、Elasticsearch
  • 微服务框架:Spring Cloud Alibaba
  • 微服务网关:Spring Cloud Gateway
  • 服务注册和配置中心:Nacos
  • 分布式事务:Seata
  • 链路追踪框架:Sleuth
  • 服务降级与熔断:Sentinel
  • ORM框架:MyBatis-Plus
  • 分布式任务调度平台:XXL-JOB
  • 消息中间件:RocketMQ
  • 分布式锁:Redisson
  • 权限:OAuth2
  • DevOps:Jenkins、Docker、K8S

B、本节实现目标

  • [mall-order]下单,用RocketMQ消息中间件发送消息,[mall-member]监听消费给用户加积分

一、RocketMQ安装

供参考:

  • 保姆级教程 Windows11下安装RocketMQ

  • RocketMQ基础入门

二、功能描述

用户下单(mall-order服务)后,发送下单事件MQ, mall-member服务监听消费MQ,为用户增加积分,MQ此处的作用是解耦。

三、代码实现

3.1 maven加RocketMQ依赖包

在项目[mall-pom]的pom.xml里加入RocketMQ依赖包

<rocketmq.version>2.2.3</rocketmq.version>

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-spring-boot-starter</artifactId>
    <version>${rocketmq.version}</version>
</dependency>

3.2 common.yml配置RocketMQ参数

rocketmq:
  name-server: 127.0.0.1:9876
  producer:
    group: group-${spring.profiles.active}
    send-message-timeout: 3000 # 消息发送超时时长,默认3s
    retry-times-when-send-failed: 3 # 同步发送消息失败重试次数,默认2
    retry-times-when-send-async-failed: 3 # 异步发送消息失败重试次数,默认2

common.yml完整配置

spring:
  redis:
    database: 0
    host: 127.0.0.1
    port: 6379a
    password: 123abc
    jedis:
      pool:
        max-active: 500  #连接池的最大数据库连接数。设为0表示无限制
        max-idle: 20   #最大空闲数
        max-wait: -1
        min-idle: 5
    timeout: 1000
    redisson: 
      password: 123abc
      cluster:
        nodeAddresses: ["redis://127.0.0.1:6379"]
      single:
        address: "redis://127.0.0.1:6379"
        database: 0
    
  datasource:
    driver-class-name: com.mysql.cj.jdbc.Driver
    url: jdbc:mysql://192.168.100.51:3306/ac_db?serverTimezone=Asia/Shanghai&useUnicode=true&tinyInt1isBit=false&characterEncoding=utf-8&zeroDateTimeBehavior=convertToNull&useSSL=false&allowPublicKeyRetrieval=true
    username: ac_u
    password: ac_PWD_123

    #hikari数据库连接池
    hikari:
      pool-name: YH_HikariCP
      minimum-idle: 10 #最小空闲连接数量
      idle-timeout: 600000 #空闲连接存活最大时间,默认600000(10分钟)
      maximum-pool-size: 100 #连接池最大连接数,默认是10
      auto-commit: true  #此属性控制从池返回的连接的默认自动提交行为,默认值:true
      max-lifetime: 1800000 #此属性控制池中连接的最长生命周期,值0表示无限生命周期,默认1800000即30分钟
      connection-timeout: 30000 #数据库连接超时时间,默认30秒,即30000
      connection-test-query: SELECT 1

rocketmq:
  name-server: 127.0.0.1:9876
  producer:
    group: group-${spring.profiles.active}
    send-message-timeout: 3000 # 消息发送超时时长,默认3s
    retry-times-when-send-failed: 3 # 同步发送消息失败重试次数,默认2
    retry-times-when-send-async-failed: 3 # 异步发送消息失败重试次数,默认2

mybatis-plus:
  configuration:
    log-impl: org.apache.ibatis.logging.stdout.StdOutImpl

3.3 [mall-order]生产者

生产者OrderSender

package com.ac.order.mq.send;

import com.ac.common.qm.MqTopicConstant;
import com.ac.common.qm.msg.MqOrderMsg;
import com.alibaba.fastjson.JSONObject;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;

@Slf4j
@Lazy
@Component
public class OrderSender {

    @Resource
    private RocketMQTemplate rocketMQTemplate;

    public void asyncSend(MqOrderMsg mqMsg) {
        String payload = JSONObject.toJSONString(mqMsg);

        //Topic+Tag更精准接收消息
        String destination = MqTopicConstant.TOPIC_ORDER + ":" + mqMsg.getAction().getCode();

        rocketMQTemplate.asyncSend(destination, payload, new SendCallback() {
            @Override
            public void onSuccess(SendResult sendResult) {
                log.info(OrderSender.class.getSimpleName() + ",消息发送成功, result: {}", sendResult);
            }

            @Override
            public void onException(Throwable e) {
                log.error(OrderSender.class.getSimpleName() + ",消息发送失败");
                e.printStackTrace();
            }
        });
    }
}

下单发送MQ

@Slf4j
@Service
public class OrderServiceImpl implements OrderService {

    @Resource
    private OrderDao orderDaoImpl;

    @Resource
    private MemberFeignApi memberFeignApi;

    @Resource
    private OrderItemService orderItemServiceImpl;

    @Resource
    private OrderSender orderSender;

    @Override
    public OrderDetailDTO findOrderDetail(Long id) {
        return null;
    }

    @Override
    public IPage<OrderDTO> pageOrder(OrderPageQry qry) {
        return orderDaoImpl.pageOrder(qry);
    }

    @Transactional(rollbackFor = Exception.class)
    @Override
    public Long createOrder(OrderAddVO addVO) {
        Order order = new Order();
        order.setOrderNo(RandomUtil.randomNumbers(8));

        //省略支付流程
        order.setOrderState(OrderStateEnum.PAYED);
        order.setOrderTime(LocalDateTime.now());

        //通过feign取用户信息
        MemberDTO member = memberFeignApi.findMember(addVO.getMemberId());
        order.setMemberId(addVO.getMemberId());
        order.setMemberName(member.getMemberName());
        order.setMobile(member.getMobile());
        orderDaoImpl.save(order);

        BigDecimal discountAmount = new BigDecimal(0.00);
        BigDecimal productAmount = new BigDecimal(0.00);
        //存订单项信息
        for (OrderItemAddVO orderItemAdd : addVO.getOrderItemList()) {
            OrderItem orderItem = orderItemServiceImpl.addOrderItem(order.getId(), orderItemAdd);
            productAmount = productAmount.add(orderItem.getBuyPrice().multiply(new BigDecimal(orderItem.getBuyNum())));
        }

        //更新订单金额信息
        order.setDiscountAmount(discountAmount);
        order.setProductAmount(productAmount);
        BigDecimal payAmount = productAmount.subtract(discountAmount);
        order.setPayAmount(payAmount);
        orderDaoImpl.updateById(order);

        //发送下单MQ
        MqOrderMsg mqMsg = MqOrderMsg.builder()
                .action(MqOrderAction.PAID)
                .orderId(order.getId())
                .memberId(order.getMemberId())
                .payAmount(order.getPayAmount())
                .build();
        orderSender.asyncSend(mqMsg);

        return order.getId();
    }
}

3.4 [mall-member]消费者

MemberOrderListener消费者

package com.ac.member.mq.listener;

import com.ac.common.qm.MqTopicConstant;
import com.ac.common.qm.MqConsumerConstant;
import com.ac.common.qm.msg.MqOrderAction;
import com.ac.common.qm.msg.MqOrderMsg;
import com.ac.member.component.MemberIntegralComponent;
import com.ac.member.enums.IntegralSourceTypeEnum;
import com.ac.member.vo.IntegralLogEditVO;
import com.alibaba.fastjson.JSONObject;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.spring.annotation.MessageModel;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Service;

import javax.annotation.Resource;

@Slf4j
@Service
@RocketMQMessageListener(
        consumerGroup = MqConsumerConstant.CONSUMER_MEMBER_ORDER,
        topic = MqTopicConstant.TOPIC_ORDER,
        selectorExpression = "PAID||REFUND",
        messageModel = MessageModel.CLUSTERING)
public class MemberOrderListener implements RocketMQListener<MessageExt> {

    @Resource
    private MemberIntegralComponent memberIntegralComponent;

    @Override
    public void onMessage(MessageExt message) {
        MqOrderMsg mqMsg = JSONObject.parseObject(message.getBody(), MqOrderMsg.class);
        log.info(MemberOrderListener.class.getSimpleName() + ",msgId={},msg={}", message.getMsgId(), mqMsg);
        try {
            //Topic+Tag更精准接收消息
            MqOrderAction action = mqMsg.getAction();
            if (MqOrderAction.PAID == action) {
                dealPaid(mqMsg);
            } else if (MqOrderAction.REFUND == action) {
                dealRefund(mqMsg);
            }
        } catch (Exception e) {
            log.error(MemberOrderListener.class.getSimpleName() + ",消费失败,mqMsg={},e={}", mqMsg, e.getMessage());
        }
    }

    /**
     * 处理订单付款事件
     *
     * @param mqMsg
     */
    private void dealPaid(MqOrderMsg mqMsg) {
        IntegralLogEditVO integralVO = new IntegralLogEditVO();
        integralVO.setMemberId(mqMsg.getMemberId());
        integralVO.setSourceType(IntegralSourceTypeEnum.AWARD_ORDER);
        integralVO.setSourceRemark("下单获得积分");
        integralVO.setIntegral(mqMsg.getPayAmount().longValue());

        memberIntegralComponent.recordIntegral(integralVO);
    }

    private void dealRefund(MqOrderMsg mqMsg) {
        log.info("处理退单事件");
    }
}

四、测试

4.1 下单

下单

4.2 控制台日志

[mall-order]控制台MQ发送日志:

2023-04-04 15:58:37.052  INFO 25204 --- [ublicExecutor_1] com.ac.order.mq.send.OrderSender         : OrderSender,消息发送成功, result: SendResult [sendStatus=SEND_OK, msgId=7F000001627418B4AAC212E0B7F30000, offsetMsgId=AC100B8D00002A9F000000000003B369, messageQueue=MessageQueue [topic=TOPIC_ORDER, brokerName=LAPTOP-R0R80SCR, queueId=3], queueOffset=0]

[mall-member]控制台MQ接收日志:

2023-04-04 15:58:37.243  INFO 26788 --- [_MEMBER_ORDER_1] c.a.m.mq.listener.MemberOrderListener    : MemberOrderListener,msgId=7F000001627418B4AAC212E0B7F30000,msg=MqOrderMsg(action=PAID, orderId=281635594240001, memberId=264260572479489, payAmount=40.50)

4.3 数据库记录

t_order t_member_integral

t_member_integral_log

4.4 RocketMQ Dashboard

Dashboard列表

Dashboard消息内容

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/24929.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

网络通信:http协议

虽然我们说, 应用层协议是我们程序猿自己定的. 但实际上, 已经有大佬们定义了一些现成的, 又非常好用的应用层协议, 供我们直接参考使用. HTTP(超文本传输协议) 就是其中之一. 认识URL 统一资源定位符(Uniform Resource Locator&#xff0c;缩写&#xff1a;URL)&#xff0c;…

DAY07_HTMLCSS

目录 1 HTML1.1 介绍1.1.1 WebStrom中基本配置 1.2 快速入门1.3 基础标签1.3.1 标题标签1.3.2 hr标签1.3.3 字体标签1.3.4 换行标签1.3.5 段落标签1.3.6 加粗、斜体、下划线标签1.3.7 居中标签1.3.8 案例 1.4 图片、音频、视频标签1.5 超链接标签1.6 列表标签1.6.1 列表中图表类…

【Selenium】提高测试爬虫效率:Selenium与多线程的完美结合

前言 使用Selenium 创建多个浏览器&#xff0c;这在自动化操作中非常常见。 而在Python中&#xff0c;使用 Selenium threading 或 Selenium ThreadPoolExecutor 都是很好的实现方法。 应用场景&#xff1a; 创建多个浏览器用于测试或者数据采集&#xff1b;使用Selenium…

C语言深度解析--操作符

目录 操作符 1.算数操作符 2.移位操作符 左移操作符<<&#xff1a; 右移操作符>>&#xff1a; 3.位操作符 按位与&&#xff1a; 按位或 | &#xff1a; 按位异或 ^ &#xff1a; 4.赋值操作符 5.单目操作符 6.关系操作符 7.逻辑操作符 8.条件操作…

如何快速搭建SpringBoot+Vue前后端分离的开发环境

唠嗑部分 今天我们来说一说&#xff0c;如何快速搭建SpringBootVue前后端分离的开发环境 需要前置环境nodejs&#xff0c;请自行安装(傻瓜式安装) SpringBoot采用2.4.2版本&#xff0c;Vue采用Vue2版本 言归正传 创建Vue项目 1、安装vue npm install -g vue/cli2、检查v…

TDengine 报错 failed to connect to server, reason: Unable to establish connection

一、前文 TDengine 入门教程——导读 二、遇到问题 taos 命令行&#xff08;CLI&#xff09;连接不上&#xff0c;进不去。 [rootiZ2ze30dygwd6yh7gu6lskZ ~]# taos Welcome to the TDengine Command Line Interface, Client Version:3.0.0.1 Copyright (c) 2022 by TDengine…

Linux 安装nodejs、npm、yarn、nrm(超实用)

前言&#xff1a;初衷想要本地通过dockerfile文件直接把项目打包到linux服务器&#xff0c;不用再本地加载再上传等&#xff0c;后续再贴上配置文件 一、什么是nodejs 来自官网的介绍&#xff0c;Node.js 是一个开源的跨平台 JavaScript 运行时环境。它几乎是任何类型项目的流…

JVM内存结构介绍

我们都知道&#xff0c;Java代码是要运行在虚拟机上的&#xff0c;而虚拟机在执行Java程序的过程中会把所管理的内存划分为若干个不同的数据区域&#xff0c;这些区域都有各自的用途。其中有些区域随着虚拟机进程的启动而存在&#xff0c;而有些区域则依赖用户线程的启动和结束…

远程访问群晖Drive并挂载为电脑磁盘同步备份文件「无需公网IP」

文章目录 前言视频教程1.群晖Synology Drive套件的安装1.1 安装Synology Drive套件1.2 设置Synology Drive套件1.3 局域网内电脑测试和使用 2.使用cpolar远程访问内网Synology Drive2.1 Cpolar云端设置2.2 Cpolar本地设置2.3 测试和使用 3. 结语 转发自CSDN远程穿透的文章&…

【Netty】Reactor 模型(十)

文章目录 前言一、传统服务的设计模型二、NIO 分发模型三、Reactor 模型3.1、Reactor 处理请求的流程3.2、Reactor 三种角色 四、单Reactor 单线程模型4.1、消息处理流程4.2、缺点 五、单Reactor 多线程模型5.1、消息处理流程5.2、缺点 六、主从Reactor 多线程模型6.1、Reactor…

【How to Design Translation Prompts for ChatGPT: An Empirical Study 论文略读】

How to Design Translation Prompts for ChatGPT: An Empirical Study 论文略读 INFORMATIONAbstract1 Introduction2 Background3 Experiments3.1 Prompt Design3.2 Experimental Setup3.2.1 Datasets3.2.2 Baselines and Evaluation Metrics 3.3 Multilingual Translation3.4…

Vue实现订单确认界面禁止浏览器返回操作导致重复提交订单的问题

哈喽 大家好啊 最近遇到一个问题&#xff0c;就是在提交订单成功后的页面&#xff0c;然后用户去浏览器返回&#xff0c;就导致又提交了一次 然后就想到了如果提交成功页面&#xff0c;就阻止浏览器返回操作 主要实现如下&#xff1a; 1.在mounted的钩子函数&#xff1a; 2.…

论文阅读:GLOBAL PROTOTYPE ENCODING FOR INCREMENTALVIDEO HIGHLIGHTS DETECTION

摘要&#xff1a; 视频亮点检测 (VHD) 是计算机视觉中的一个活跃研究领域&#xff0c;旨在在给定原始视频输入的情况下定位最吸引用户的片段。然而&#xff0c;大多数 VHD 方法都是基于封闭世界假设&#xff0c;即预先定义固定数量的高亮类别&#xff0c;并且所有训练数据都是…

4.Ansible Inventory介绍及实战 - A list or group of lists nodes

什么是inventory&#xff1f; 官方解释&#xff1a;Ansible automates tasks on managed nodes or “hosts” in your infrastructure, using a list or group of lists known as inventory. Ansible可以同时与您基础设施中的一个或多个系统协同工作&#xff61;为了与多台服务…

RPC核心原理(整体架构/调用过程)

Server: Provider ,暴露服务,服务提供方 Client: Consumer ,服务消费,调用远程服务 Registry:服务注册与发现 RPC的调用过程如下&#xff1a; 第一步&#xff1a;server会将他需要暴露的服务以及他的地址信息注册到Registry这一注册中心。 第二步&#xff1a;client通过注册…

【分布鲁棒和多目标非负矩阵分解】基于DR-NMF的对NMF问题噪声模型的识别鲁棒性研究(Matlab代码实现)

&#x1f4a5;&#x1f4a5;&#x1f49e;&#x1f49e;欢迎来到本博客❤️❤️&#x1f4a5;&#x1f4a5; &#x1f3c6;博主优势&#xff1a;&#x1f31e;&#x1f31e;&#x1f31e;博客内容尽量做到思维缜密&#xff0c;逻辑清晰&#xff0c;为了方便读者。 ⛳️座右铭&a…

Doris的一些进阶用法

6.doris进阶 6.1修改表 6.1.1修改表名 示例&#xff1a; 将名为 table1 的表修改为 table2 SQLALTER TABLE table1 RENAME table2; -- 示例 ALTER TABLE aggregate_test RENAME aggregate_test1; 将表 example_table 中名为 rollup1 的 rollup index 修改为 rollup2 SQLA…

Ex-ChatGPT本地部署+Azure OpenAI接口配置+docker部署服务

Ex-ChatGPT项目分为 Ex-ChatGPT 和 WebChatGPTEnhance 两部分&#xff0c;Ex-ChatGPT启动后是个web服务&#xff0c;通过访问ip端口体验&#xff1b; WebChatGPTEnhance可编译生成一个浏览器插件&#xff0c;Chrome或者Microsoft edge浏览器可以安装该插件&#xff0c;点击该插…

【MySQL】如何实现单表查询?

在我们对数据进行操作时&#xff0c;查询无疑是至关重要的&#xff0c;查询操作灵活多变&#xff0c;我们可以根据开发的需求&#xff0c;设计高效的查询操作&#xff0c;把数据库中存储的数据展示给用户。 文章目录 前言1. 基础查询1.1 基础查询语法1.2 基础查询练习 2. 条件查…

数据库基础——5.运算符

这篇文章我们来讲一下SQL语句中的运算符操作。 说点题外话&#xff1a;SQL本质上也是一种计算机语言&#xff0c;和C&#xff0c;java一样的&#xff0c;只不过SQL是用来操作数据库的。在C&#xff0c;java中也有运算符&#xff0c;这两种语言中的运算符和数学中的运算符差距不…