Sentinel 使用及概念
什么是 Sentinel
Sentinel (分布式系统的流量防卫兵) 是阿里开源的一套用于服务容错
的综合性解决方案。它以流量 为切入点, 从流量控制、熔断降级、系统负载保护
等多个维度来保护服务的稳定性。
Sentinel 具有以下特征:
丰富的应用场景:Sentinel 承接了阿里巴巴近 10 年的双十一大促流量的核心场景, 例如秒杀(即突发流量控制在系统容量可以承受的范围)、消息削峰填谷、集群流量控制、实时熔断下游不可用应用等。
完备的实时监控:Sentinel 提供了实时的监控功能。通过控制台可以看到接入应用的单台机器秒级数据, 甚至 500 台以下规模的集群的汇总运行情况。
广泛的开源生态:Sentinel 提供开箱即用的与其它开源框架/库的整合模块,例如与 Spring Cloud、Dubbo、gRPC 的整合。只需要引入相应的依赖并进行简单的配置即可快速地接入 Sentinel。
Sentinel 分为两个部分:
核心库(
Java 客户端)
不依赖任何框架/库,能够运行于所有 Java 运行时环境,同时对 Dubbo / Spring Cloud 等框架也有较好的支持。
控制台(Dashboard)基于 Spring Boot 开发,打包后可以直接运行,不需要额外的 Tomcat 等应用容器。
Sentinel 的概念和功能
基本概念
资源就是 Sentinel 要保护的东西
资源是 Sentinel 的关键概念。它可以是 Java 应用程序中的任何内容,可以是一个服务,也可以是一个方法,甚至可以是一段代码。
我们入门案例中的 message/test 接口就可以认为是一个资源。
规则就是用来定义如何进行保护资源的
作用在资源之上, 定义以什么样的方式保护资源,主要包括流量控制规则、熔断降级规则以及系统保护规则。
我们入门案例中就是为 message1 资源设置了一种流控规则, 限制了进入message/test 的流量。
重要功能
Sentinel 的主要功能就是容错,主要体现为下面这三个:
流量控制
流量控制在网络传输中是一个常用的概念,它用于调整网络包的数据。任意时间到来的请求往往是随机不可控的,而系统的处理能力是有限的。我们需要根据系统的处理能力对流量进行控制。
Sentinel 作为一个调配器,可以根据需要把随机的请求调整成合适的形状。
熔断降级
当检测到调用链路中某个资源出现不稳定的表现,例如请求响应时间长或异常比例升高的时候,则对这个资源的调用进行限制,让请求快速失败,避免影响到其它的资源而导致级联故障
Sentinel 对这个问题采取了两种手段:
通过并发线程数进行限制
Sentinel 通过限制资源并发线程的数量,来减少不稳定资源对其它资源的影响。当某个资源出现不稳定的情况下,例如响应时间变长,对资源的直接影响就是会造成线程数的逐步堆积。当线程数在特定资源上堆积到一定的数量之后,对该资源的新请求就会被拒绝。堆积的 线程完成任务后才开始继续接收请求。
通响应时间对资源进行降级
除了对并发线程数进行控制以外,Sentinel 还可以通过响应时间来快速降级不稳定的资源。当依赖的资源出现响应时间过长后,所有对该资源的访问都会被直接拒绝,直到过了指定的时间窗口之后才重新恢复。
系统负载保护
Sentinel 同时提供系统维度的自适应保护能力。当系统负载较高的时候,如果还持续让请求进入可能会导致系统崩溃,无法响应。在集群环境下,会把本应这台机器承载的流量转发到其它的机器上去。如果这个时候其它的机器也处在一个边缘状态的时候,Sentinel 提供了对应的保护机制,让系统的入口流量和系统的负载达到一个平衡,保证系统在能力范围之内处理最多的请求。
总之一句话: 我们需要做的事情,就是在 Sentinel 的资源上配置各种各样的规
则,来实现各种容错的功能。
微服务集成 Sentinel
为微服务集成 Sentinel 非常简单, 只需要加入 Sentinel 的依赖即
可在 pom.xml 中加入下面依赖
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-sentinel</artifactId>
</dependency>
编写一个 Controller 测试使用
@RestController
@RequestMapping(path = "/message")
public class MessageController {
@GetMapping(path = "/test1")
public String test1(){
return "测试高并发 1";
}
@GetMapping(path = "/test2")
public String test2(){
return "测试高并发 2";
}
}
下载客户端
https://github.com/alibaba/Sentinel/releases
启动控制台
# 直接使用 jar 命令启动项目(控制台本身是一个 SpringBoot 项目)
java -Dserver.port=8080 -Dcsp.sentinel.dashboard.server=localhost:8080 -Dproject.name=sentinel-dashboard -jar sentinel-dashboard-1.8.5.jar
访问控制台: http://ip+端口 默认用户名密码是 sentinel/sentinel
Sentinel 的控制台其实就是一个 SpringBoot 编写的程序。我们需要将我们的微服务程序注册到控制台上, 即在微服务中指定控制台的地址, 并且还要开启一个跟控制台传递数据的端口, 控制台也可以通过此端口调用微服务中的监控程序获取微服务的各种信息。
实现一个接口的限流
为某个接口添加访问控制
访问数量超过时限流
Gateway--服务网关
网关简介
大家都都知道在微服务架构中,一个系统会被拆分为很多个微服务。那么作为客户端要如何去调用 这么多的微服务呢?如果没有网关的存在,我们只能在客户端记录每个微服务的地址,然后分别去调用。
这样的架构,会存在着诸多的问题:
客户端多次请求不同的微服务,增加客户端代码或配置编写的复杂性。
认证复杂,每个服务都需要独立认证。
存在跨域请求,在一定场景下处理相对复杂。
上面的这些问题可以借助 API 网关
来解决。
所谓的 API 网关,就是指系统的统一入口,它封装了应用程序的内部结构,为客户端提供统一服务,一些与业务本身功能无关的公共逻辑可以在这里实现,诸如认证、鉴权、监控、路由转发等等。
添加上 API 网关之后,系统的架构图变成了如下所示:
我们也可以观察下,我们现在的整体架构图:
在业界比较流行的网关,有下面这些:
Ngnix+lua
使用nginx的反向代理和负载均衡可实现对api服务器的负载均衡及高可用 lua 是一种脚本语言,可以来编写一些简单的逻辑, nginx 支持 lua 脚本.
Kong
基于 Nginx+Lua 开发,性能高,稳定,有多个可用的插件(限流、鉴权等等)
可以开箱即用。 问题: 只支持 Http 协议;二次开发,自由扩展困难;提供管理 API,缺乏更易用的管控、配置方式。
Zuul
Netflix 开源的网关,功能丰富,使用 JAVA 开发,易于二次开发 问题:缺乏管控,无法动态配 置;依赖组件较多;处理 Http 请求依赖的是 Web 容器,性能不如 Nginx
Spring Cloud Gateway
SpringCloud alibaba 技术栈中并没有提供自己的网关,我们可以采用Spring Cloud Gateway 来做网关,将在下面具体介绍。
Gateway 简介
Spring Cloud Gateway 是 Spring 公司基于 Spring 5.0,Spring Boot 2.0和 Project Reactor 等技术 开发的网关,它旨在为微服务架构提供一种简单有效的统一的 API 路由管理方式。它的目标是替代 Netflix Zuul,其不仅提供统一的路由方式,并且基于 Filter 链的方式提供了网关基本的功能,例如:安全,监控和限流。
优点:
性能强劲,是第一代网关 Zuul 的 1.6 倍
功能强大,内置了很多实用的功能,例如转发、监控、限流等
设计优雅,容易扩展
缺点:
其实现依赖 Netty 与 WebFlux,不是传统的 Servlet 编程模型,学习成本高
不能将其部署在 Tomcat、Jetty 等 Servlet 容器里,只能打成 jar 包执行
需要 Spring Boot 2.0 及以上的版本,才支持
Gateway 快速入门
要求: 通过浏览器访问 api 网关,然后通过网关将请求转发到商品微服务.
1.创建 api 网关模块(略)
2.导入依赖,不能导入 web 相关的依赖
<!--gateway 网关-->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-gateway</artifactId>
</dependency>
3.创建主类
@SpringBootApplication
public class GatewayApplication {
public static void main(String[] args) {
SpringApplication.run(GatewayApplication.class, args);
}
}
4.添加配置文件
server:
port: 7000
spring:
application:
name: api-gateway
cloud:
gateway:
routes: # 路由数组[路由 就是指定当请求满足什么条件的时候转到哪个微服务]
- id: order_route # 当前路由的标识, 要求唯一
uri: http://localhost:8081 # 请求要转发到的地址
order: 1 # 路由的优先级,数字越小级别越高
predicates: # 断言(就是路由转发要满足的条件)
- Path=/order-serv/** # 当请求路径满足Path指定的规则时, 才进行路由转发
filters: # 过滤器,请求在传递过程中可以通过过滤器对其进行一定的修改
- StripPrefix=1 # 转发之前去掉 1 层路径
5. 启动项目, 并通过网关去访问微服务
http://127.0.0.1:9001/order-serv/order/createOrder/1/1/1
Gateway 结合 nacos
现在在配置文件中写死了转发路径的地址, 前面我们已经分析过地址写死带来的问题, 接下来我们从注册中心获取此地址。
1.加入 nacos 依赖
<!--nacos 服务发现依赖-->
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
</dependency>
2.在主类上添加注解
@SpringBootApplication
@EnableDiscoveryClient
3.修改配置文件
server:
port: 9001
spring:
application:
name: api-gateway
cloud:
nacos:
discovery:
server-addr: 127.0.0.1:8848 #nacos 服务地址
gateway:
discovery:
locator:
enabled: true
routes: # 路由数组[路由 就是指定当请求满足什么条件的时候转到哪个微服务]
- id: product_route # 当前路由的标识, 要求唯一
uri: lb://service-order # lb 指的是从 nacos 中按照名称获取微服务, 并遵循负载均衡策略
order: 1 # 路由的优先级,数字越小级别越高
predicates: # 断言(就是路由转发要满足的条件)
- Path=/order-serv/** # 当请求路径满足 Path 指定的规则时,才进行路由转发
filters: #过滤器,请求在传递过程中可以通过过滤器对其进行一定的修改
- StripPrefix=1 # 转发之前去掉 1 层路径
4.测试访问
http://127.0.0.1:9001/order-serv/order/createOrder/1/1/1
全局过滤器
全局过滤器作用于所有路由, 无需配置。通过全局过滤器可以实现对权限的统一校验,安全性验证等功能.
内置全局过滤器
自定义全局过滤器
内置的过滤器已经可以完成大部分的功能,但是对于企业开发的一些业务功能处理,还是需要我们自己编写过滤器来实现的,那么我们一起通过代码的形式自定义一个过滤器,去完成统一的权限校验。 开发中的鉴权逻辑:
- 当客户端第一次请求服务时,服务端对用户进行信息认证(登录)
- 认证通过,将用户信息进行加密形成 token,返回给客户端,作为登录凭证
- 以后每次请求,客户端都携带认证的 token
- 服务端对 token 进行解密,判断是否有效。
自定义一个全局过滤器,去校验所有请求的请求参数中是否包含“token”, 如何不包含请求
参数“token”则不转发路由,否则执行正常的逻辑。
@Component
public class TokenFilter implements GlobalFilter, Ordered {
@Override
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
//获取请求中的参数部分
String token = exchange.getRequest().getQueryParams().getFirst("token");
if (!"123456".equals(token)) {//模拟验证 token
System.out.println("鉴权失败");
exchange.getResponse().setStatusCode(201);
return exchange.getResponse().setComplete();//响应状态码
}
//调用 chain.filter 继续向下游执行
return chain.filter(exchange);
}
@Override
public int getOrder() {
return 0;
}
}
网关限流
网关是所有请求的公共入口,所以可以在网关进行限流,而且限流的方式也很多,我们本次采用前 面学过的 Sentinel 组件来实现网关的限流。Sentinel 支持对SpringCloud Gateway、Zuul 等主流网关进行限流。
从 1.6.0 版本开始,Sentinel 提供了 SpringCloud Gateway 的适配模块,可以提供两种资源维度的限流: route 维度:即在 Spring 配置文件中配置的路由条目,资源名为对应的 routeId 自定义 API 维度:用户可以利用 Sentinel 提供的API 来自定义一些 API 分组.
1 导入依赖
<dependency>
<groupId>com.alibaba.csp</groupId>
<artifactId>sentinel-spring-cloud-gateway-adapter</artifactId>
</dependency>
2 编写配置类
基于 Sentinel 的 Gateway 限流是通过其提供的 Filter 来完成的,使用时只需注入对应的 SentinelGatewayFilter 实例以及SentinelGatewayBlockExceptionHandler 实例即可。
@Configuration
public class GatewayConfiguration {
private final List<ViewResolver> viewResolvers;
private final ServerCodecConfigurer serverCodecConfigurer;
public GatewayConfiguration(ObjectProvider<List<ViewResolver>>
viewResolversProvider,ServerCodecConfigurer serverCodecConfigurer) {
this.viewResolvers =
viewResolversProvider.getIfAvailable(Collections::emptyList);
this.serverCodecConfigurer = serverCodecConfigurer;
}
// 初始化一个限流的过滤器
@Bean
@Order(Ordered.HIGHEST_PRECEDENCE)
public GlobalFilter sentinelGatewayFilter() {
return new SentinelGatewayFilter();
}
// 配置初始化的限流参数
@PostConstruct
public void initGatewayRules() {
Set<GatewayFlowRule> rules = new HashSet<>();
rules.add(new GatewayFlowRule("order_route") //资源名称,对应路由 id
.setCount(1) // 限流阈值
.setIntervalSec(1) // 统计时间窗口,单位是秒,默认是 1 秒
);
GatewayRuleManager.loadRules(rules);
}
// 配置限流的异常处理器
@Bean
@Order(Ordered.HIGHEST_PRECEDENCE)
public SentinelGatewayBlockExceptionHandler
sentinelGatewayBlockExceptionHandler() {
return new SentinelGatewayBlockExceptionHandler(viewResolvers,
serverCodecConfigurer);
}
// 自定义限流异常页面
@PostConstruct
public void initBlockHandlers() {
BlockRequestHandler blockRequestHandler = new BlockRequestHandler() {
public Mono<ServerResponse> handleRequest(ServerWebExchange
serverWebExchange, Throwable throwable) {
Map map = new HashMap<>();
map.put("code", 0);
map.put("message", "接口被限流了");
return ServerResponse.status(HttpStatus.OK).
contentType(MediaType.APPLICATION_JSON_UTF8).
body(BodyInserters.fromObject(map));
}
};
GatewayCallbackManager.setBlockHandler(blockRequestHandler);
}
}
消息队列-MQ
什么是 MQ
MQ 全称(Message Queue)又名消息队列,是一种提供消息队列服务的中间件,也称为消息中间件,是一套提供了消息生产、存储、消费全过程 API的软件系统(消息即数据)。通俗点说,就是一个先进先出的数据结构。
MQ 的应用场景
异步解耦
最常见的一个场景是用户注册后,需要发送注册邮件和短信通知,以告知用户注册成功。传统的做法如下:
此架构下注册、邮件、短信三个任务全部完成后,才返回注册结果到客户端,用户才能使用账号登录。 但是对于用户来说,注册功能实际只需要注册系统存储用户的账户信息后,该用户便可以登录,而后续 的注册短信和邮件不是即时需要关注的步骤。 所以实际当数据写入注册系统后,注册系统就可以把其他的操作放入对应的消息队列 MQ 中然后马上返 回用户结果,由消息队列 MQ 异步地进行这些操作。
架构图如下:
异步解耦是消息队列 MQ 的主要特点,主要目的是减少请求响应时间和解耦。主要的使用场景就是将比 较耗时而且不需要即时(同步)返回结果的操作作为消息放入消息队列。同时,由于使用了消息队列 MQ,只要保证消息格式不变,消息的发送方和接收方并不需要彼此联系,也不需要受对方的影响,即解耦合。
常见的 MQ 产品
目前业界有很多 MQ 产品,比较出名的有下面这些:
ZeroMQ: 号称最快的消息队列系统,尤其针对大吞吐量的需求场景。扩展性好,开发比较灵活,采用 C 语言 实现,实际上只是一个 socket 库的重新封装,如果做为消息队列使用,需要开发大量的代码。 ZeroMQ 仅提供非持久性的队列,也就是说如果 down 机,数据将会丢失。
RabbitMQ: 使用 erlang 语言开发,性能较好,适合于企业级的开发。但是不利于做二次开发和维护。
ActiveMQ: 历史悠久的 Apache 开源项目。已经在很多产品中得到应用,实现了 JMS1.1 规范,可以和 springjms 轻松融合,实现了多种协议,支持持久化到数据库,对队列数较多的情况支持不好。
RocketMQ: 阿里巴巴的 MQ 中间件,由 java 语言开发,性能非常好,能够撑住双十一的大流量,而且使用起来 很简单。
Kafka: Kafka 是 Apache 下的一个子项目,是一个高性能跨语言分布Publish/Subscribe 消息队列系统, 相对于 ActiveMQ 是一个非常轻量级的消息系统,除了性能非常好之外,还是一个工作良好的分布式系统。
RocketMQ 入门
RocketMQ 是阿里巴巴开源的分布式消息中间件,现在是 Apache 的一个顶级项目。在阿里内部使用非常广泛,已经经过了"双 11"这种万亿级的消息流转。
RocketMQ 环境搭建
软硬件需求
系统要求是 64 位的,JDK 要求是 1.8 及其以上版本的。
下载
https://rocketmq.apache.org/download/
解压
配置环境变量
ROCKETMQ_HOME=D:\ProgramFiles\rocketmq-4.9.3
NAMESRV_ADDR =127.0.0.1:9876
启动 Name Server
进入到 bin 目录输入命令: mqnamesrv.cmd
启动 Broker
进入到 bin 目录输入命令:
mqbroker.cmd -n 127.0.0.1:9876 atuoCreateTopicEnable=true
发送和接收消息测试
模拟发送消息
进入到 bin 目录输入命令: tools.cmd org.apache.rocketmq.example.quickstart.Producer
模拟接收消息
进入到 bin 目录输入命令: tools.cmd org.apache.rocketmq.example.quickstart.Consumer
控制台安装与启动
解压
修改其 src/main/resources 中的 application.properties 配置文件
在解压目录 rocketmq-console 的 pom.xml 中添加如下 JAXB 依赖。
<dependency>
<groupId>javax.xml.bind</groupId>
<artifactId>jaxb-api</artifactId>
<version>2.3.0</version>
</dependency>
<dependency>
<groupId>com.sun.xml.bind</groupId>
<artifactId>jaxb-impl</artifactId>
<version>2.3.0</version>
</dependency>
<dependency>
<groupId>com.sun.xml.bind</groupId>
<artifactId>jaxb-core</artifactId>
<version>2.3.0</version>
</dependency>
<dependency>
<groupId>javax.activation</groupId>
<artifactId>activation</artifactId>
<version>1.1.1</version>
</dependency>
打包
命令行进入到 rocketmq-console
mvn clean package -Dmaven.test.skip=true
打包后,进入 target 目录启动控制台 java -jar rocketmq-console-ng-1.0.0.jar
访问: http://127.0.0.1:6060
RocketMQ 的架构及概念
如上图所示,整体可以分成 4 个角色,分别是: NameServer,Broker,Producer,Consumer。
Broker(邮递员) Broker 是 RocketMQ 的核心,负责消息的接收,存储,投递等功能.
NameServer(邮局) 消息队列的协调者,Broker 向它注册路由信息,同时Producer 和 Consumer 向其获取路由信息
Producer(寄件人) 消息的生产者,需要从 NameServer 获取 Broker 信息,然后与 Broker 建立连接,向 Broker 发送消 息
Consumer(收件人) 消息的消费者,需要从 NameServer 获取 Broker 信息,然后与 Broker 建立连接,从 Broker 获取消息
Topic(地区) 用来区分不同类型的消息,发送和接收消息前都需要先创建Topic,针对 Topic 来发送和接收消息
Message Queue(邮件) 为了提高性能和吞吐量,引入了 Message Queue,一个 Topic 可以设置一个或多个 Message Queue,这样消息就可以并行往各个Message Queue 发送消息,消费者也可以并行的从多个 Message Queue 读取消息 Message Message 是消息的载体。
Producer Group 生产者组,简单来说就是多个发送同一类消息的生产者称之为一个生产者组。
Consumer Group 消费者组,消费同一类消息的多个 consumer 实例组成一个消费者组。
java 消息发送和接收演示
们使用 Java 代码来演示消息的发送和接收
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.0.2</version>
</dependency
发送消息
消息发送步骤:
1. 创建消息生产者, 指定生产者所属的组名
2. 指定 Nameserver 地址
3. 启动生产者
4. 创建消息对象,指定主题、标签和消息体
5. 发送消息
6. 关闭生产者
public class MQProducerTest {
public static void main(String[] args) throws Exception {
//1. 创建消息生产者, 指定生产者所属的组名
DefaultMQProducer producer = new DefaultMQProducer("myproducer-group");
//2. 指定 Nameserver 地址
producer.setNamesrvAddr("192.168.109.131:9876");
//3. 启动生产者
producer.start();
//4. 创建消息对象,指定主题、标签和消息体
Message msg = new Message("myTopic", "myTag",("RocketMQ Message").getBytes());
//5. 发送消息
SendResult sendResult = producer.send(msg, 10000);
System.out.println(sendResult);
//6. 关闭生产者
producer.shutdown();
}
}
接收消息
消息接收步骤:
1. 创建消息消费者, 指定消费者所属的组名
2. 指定 Nameserver 地址
3. 指定消费者订阅的主题和标签
4. 设置回调函数,编写处理消息的方法
5. 启动消息消费者
public class MQConsumerTest {
public static void main(String[] args) throws Exception {
//1. 创建消息消费者, 指定消费者所属的组名
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("myconsumergroup");
//2. 指定 Nameserver 地址
consumer.setNamesrvAddr("192.168.109.131:9876");
//3. 指定消费者订阅的主题和标签
consumer.subscribe("myTopic", "*");
//4. 设置回调函数,编写处理消息的方法
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt>msgs,ConsumeConcurrentlyContext context) {
System.out.println("Receive New Messages: " + msgs);//返回消费状态
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
//5. 启动消息消费者
consumer.start();
System.out.println("Consumer Started.");
}
}
案例
接下来我们模拟一种场景: 下单成功之后,向下单用户发送短信。设计图如下:
订单微服务发送消息
1. 添加 rocketmq 的依赖
<!--rocketmq-->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.0.2</version>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.4.0</version>
</dependency>
2. 添加配置
rocketmq:
name-server: 127.0.0.1:9876 #rocketMQ 服务的地址
producer:
group: shop-order # 生产者组
3. 编写测试代码
在订单微服务控制器中添加代码
@Autowired
private RocketMQTemplate rocketMQTemplate;
rocketMQTemplate.convertAndSend("order-topic", order);
用户微服务接收消息
1. 添加依赖
<!--rocketMQ-->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.0.2</version>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.4.0</version>
</dependency>
2. 修改配置文件
rocketmq:
name-server: 127.0.0.1:9876
3. 编写消息接收服务
@Service
@RocketMQMessageListener(consumerGroup = "shop-user", topic = "order-topic")
public class SmsService implements RocketMQListener<Order> {
@Override
public void onMessage(Order order) {
System.out.println("收到一个订单信息:"+ JSON.toJSONString(order)+",接下来发送短信");
}
}
4. 启动服务,执行下单操作,观看后台输出
发送不同类型的消息
RocketMQ 提供三种方式来发送普通消息:可靠同步发送、可靠异步发送、单向发送
可靠同步发送: 同步发送是指消息发送方发出数据后,会在收到接收方发回响应之后才发下一个数据包的通讯方 式。 此种方式应用场景非常广泛,例如重要通知邮件、报名短信通知、营销短信系统等。
可靠异步发送: 异步发送是指发送方发出数据后,不等接收方发回响应,接着发送下个数据包的通讯方式。发送 方通过回调接口接收服务器响应,并对响应结果进行处理。 异步发送一般用于链路耗时较长,对 RT 响应时间较为敏感的业务场景,例如用户视频上传后通知 启动转码服务,转码完成后通知推送转码结果等。
单向发送: 单向发送是指发送方只负责发送消息,不等待服务器回应且没有回调函数触发,即只发送请求不 等待应答。 适用于某些耗时非常短,但对可靠性要求并不高的场景,例如日志收集。
发送同步消息
//同步消息
//参数一: topic
//参数二: 消息内容
SendResult sendResult = rocketMQTemplate.syncSend("test-topic-1", "这是一条同步消息");
System.out.println(sendResult);
发送异步消息
//参数一: topic
//参数二: 消息内容
//参数三: 回调函数, 处理返回结果
rocketMQTemplate.asyncSend("test-topic-1", "这是一条异步消息", new
SendCallback(){
@Override
public void onSuccess(SendResult sendResult) {
System.out.println(sendResult);
}
@Override
public void onException(Throwable throwable) {
System.out.println(throwable);
}
});
//让线程不要终止
Thread.sleep(30000000);
单向消息
rocketMQTemplate.sendOneWay("test-topic-1", "这是一条单向消息");
Redis 实现分布式锁
什么是分布式锁?
分布式锁,即分布式系统中的锁。在单体应用中我们通过 java 中的锁解决多线程访问共享资源
的问题,而分布式锁,就是解决了
分布式系统中控制共享资
源访问
的问题。与单体应用不同的是,分布式系统中竞争共享资源的最小粒度从线程升级成了进程.
下面是一个扣除库存的方法,思考在集群,分布式项目中会出现什么问题.
思考: 添加 synchronized 锁,能够解决问题
基于 Redis 分布式锁的实现方式
分布式锁的特征
方式 1:SETNX 命令
作为分布式锁实现过程中的共享存储系统,Redis 可以使用键值对来保存锁变量,在接收和处理不同客户端发送的加锁和释放锁的操作请求。那么,键值对的键和值具体是怎么定的呢?我们要赋予锁变量一个变量名,把这个变量名作为键值对的键,而锁变量的值,则是键值对的值,这样一来,Redis 就能保存锁变量了,客户端也就可以通过 Redis 的命令操作来实现锁操作。
想要实现分布式锁,必须要求 Redis 有互斥的能力。可以使用 SETNX 命令,其含义是 SET IF NOT EXIST,即如果 key 不存在,才会设置它的值,否则什么也不做。两个客户端进程可以执行这个命令,达到互斥,就可以实现一个分布式锁。
以下展示了 Redis 使用 key/value 对保存锁变量,以及两个客户端同时请求加锁的操作过程。
加锁操作完成后,加锁成功的客户端,就可以去操作共享资源,例如,修改MySQL 的某一行数据。操作完成后,还要及时释放锁,给后来者让出操作共享资源的机会。直接使用 DEL 命令删除这个 key 即可。
但是,以上实现存在一个很大的问题,当客户端 1 拿到锁后,如果发生下面的场景,就会造成死锁
。
1. 程序处理业务逻辑异常,没及时释放锁
2. 进程挂了,没机会释放锁
以上情况会导致已经获得锁的客户端一直占用锁,其他客户端永远无法获取到锁。
在 finally 中释放锁,以及设置键失效时间.
但是设置失效时间 10s,有可能业务执行时间大于 10s,那么锁会失效,导致其他线程进入到减库存业务中,这时,第一个线程执行完成,会误删除第二个线程的锁标志,导致其他线程进入.导致锁永久失效.
为每个线程的添加一个版本号,删除时,判断版本号.
方式 2: 使用 redission
导入依赖
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>3.6.5</version>
</dependency>
创建 Redisson 对象
//创建 Redisson 对象
@Bean
public Redisson getRedisson(){
Config config = new Config();
config.useSingleServer().setAddress("redis://120.48.37.232:6379").setDatabase(0);
return (Redisson)Redisson.create(config);
}
使用 Redisson 实现加锁,释放锁.