文章目录
- 前言
- 一 数据聚合
- 1.1 DSL实现聚合
- 1.1.1 Bucket聚合语法
- 1.1.2 聚合结果排序
- 1.1.3 限定聚合范围
- 1.2 Metric聚合语法
- 1.3 小结
- 1.4 RestAPI实现聚合
- 1.5 API语法
- 1.7 案例
- 二 自动补全
- 2.1 拼音分词器的安装
- 2.2 自定义分词器
- 2.3 自动补全查询
- 2.4 自动补全查询的JavaAPI
- 三 数据同步
- 3.1 思路分析
- 3.2 方案一:同步调用
- 3.3 方案二:异步通知
- 3.4 监听binlog
- 3.5 三种方案的对比总结
- 3.6 实现数据同步案例总结
- 3.7 数据同步案例的测试
- 3.8 补充:vue Devtools插件的安装
- 3.8.1 edge浏览器安装方法
- 3.8.2 chrome浏览器的安装方法
- 3.9 对于vue devtools工具的解释
- 四 elasticsearch集群
- 4.1 **ES集群相关概念**
- 4.2 搭建ES集群
- 4.3.1 创建es集群
- 4.4 集群状态监控
- 4.4.1 win安装cerebro【不推荐】
- 4.4.2 闪退问题[未解决]
- 4.4.3 linux安装cerebo
- 4.4.5 创建索引库
- 4.4.6 利用kibana的DevTools创建索引库【非实际操作】
- 4.4.7 利用cerebro创建索引库【实际操作】
- 4.4.8 查看分片效果
- 4.9 集群脑裂问题
- 4.9.1 集群职责划分
- 4.9.2 脑裂问题
- 4.9.3 小结
- 4.10 集群分布式存储
- 4.10.1 分片存储测试
- 4.10.2 分片存储原理
- 4.10.3 集群分布式查询
- 4.10.4 集群故障转移
前言
- 本文学自黑马,并且经过认真学习后,整理总结,不是照搬!!
- 关于实际操作部分,建议学习者,多动手,认真分析!!!
- 该部分对电脑要求稍高,请学习者注意升级电脑配置!!!
一 数据聚合
- 聚合(aggregations)可以实现对文档数据的统计、分析、运算。
- 聚合常见的有三类:
- 桶(Bucket)聚合:用来对文档做分组
- TermAggregation:按照文档字段值分组
- Date Histogram:按照日期阶梯分组,例如一周为一组,或者一月为一组
- 桶(Bucket)聚合:用来对文档做分组
- 度量(Metric)聚合:用以计算一些值,比如:最大值、最小值、平均值等
- Avg:求平均值
- Max:求最大值
- Min:求最小值
- Stats:同时求max、min、avg、sum等
- 管道(pipeline)聚合:其它聚合的结果为基础做聚合
- **注意:**参加聚合的字段必须是keyword、日期、数值、布尔类型
1.1 DSL实现聚合
1.1.1 Bucket聚合语法
GET /hotel/_search
{
"size": 0, // 设置size为0,结果中不包含文档,只包含聚合结果
"aggs": { // 定义聚合
"brandAgg": { //给聚合起个名字
"terms": { // 聚合的类型,按照品牌值聚合,所以选择term
"field": "brand", // 参与聚合的字段
"size": 20 // 希望获取的聚合结果数量
}
}
}
}
- 示例:
#桶排序 GET /hotel/_search { "size": 0, "aggs": { "brandAgg": { "terms": { "field": "brand", "size": 20 } } } }
- 结果:
1.1.2 聚合结果排序
- 默认情况下,Bucket聚合会统计Bucket内的文档数量,记为_count,并且按照_count降序排序。
- 指定order属性,自定义聚合的排序方式
- 演示:
#自定义聚合的排序方式 # 按照_count升序排列 GET /hotel/_search { "size": 0, "aggs": { "brandAgg": { "terms": { "field": "brand", "order": { "_count": "asc" }, "size": 20 } } } }
- 结果:
- 结果:
1.1.3 限定聚合范围
- 默认情况下,Bucket聚合是对索引库的所有文档做聚合,但真实场景下,用户会输入搜索条件,因此聚合必须是对搜索结果聚合。那么聚合必须添加限定条件。
- 限定要聚合的文档范围,只要添加query条件即可:
- 演示:
# 限定聚合范围 # 只对200元以下的文档聚合 GET /hotel/_search { "query": { "range": { "price": { "lte": 200 } } }, "size": 0, "aggs": { "brandAgg": { "terms": { "field": "brand", "size": 20 } } } }
- 结果:
1.2 Metric聚合语法
- 度量(Metric)聚合:用以计算一些值,比如:最大值、最小值、平均值等
- Avg:求平均值
- Max:求最大值
- Min:求最小值
- Stats:同时求max、min、avg、sum等
- 演示:
GET /hotel/_search
{
"size": 0,
"aggs": {
"brandAgg": {
"terms": {
"field": "brand",
"size": 20
},
"aggs": { // 是brands聚合的子聚合,也就是分组后对每组分别计算
"score_stats": { // 聚合名称
"stats": { // 聚合类型,这里stats可以计算min、max、avg等
"field": "score" // 聚合字段,这里是score
}
}
}
}
}
}
- 结果:
1.3 小结
-
aggs代表聚合,与query同级,此时query的作用是?
- 限定聚合的的文档范围
-
聚合必须的三要素:
- 聚合名称
- 聚合类型
- 聚合字段
-
聚合可配置属性有:
- size:指定聚合结果数量
- order:指定聚合结果排序方式
- field:指定聚合字段
1.4 RestAPI实现聚合
1.5 API语法
- 聚合条件与query条件同级别,因此需要使用request.source()来指定聚合条件。
- 聚合条件的语法:
- 聚合的结果也与查询结果不同,API也比较特殊。不过同样是JSON逐层解析:
1.7 案例
- 需求:搜索页面的品牌、城市等信息不应该是在页面写死,而是通过聚合索引库中的酒店数据得来的:
- 分析:
- 使用聚合功能,利用Bucket聚合,对搜索结果中的文档基于品牌分组、基于城市分组,得知包含的品牌、城市。
- 因为是对搜索结果聚合,因此聚合是限定范围的聚合,也就是说聚合的限定条件跟搜索文档的条件一致。
- 返回值类型就是页面要展示的最终结果:
- 结果是一个Map结构:
- key是字符串,城市、星级、品牌、价格
- value是集合,例如多个城市的名称
-
实现的重要逻辑代码
-
在
Controller
中添加一个方法,遵循下面的要求:- 请求方式:
POST
- 请求路径:
/hotel/filters
- 请求参数:
RequestParams
,与搜索文档的参数一致 - 返回值类型:
Map<String, List<String>>
@PostMapping("filters") public Map<String, List<String>> getFilters(@RequestBody RequestParams params){ return hotelService.filters(params); }
- 请求方式:
-
在
Service
中定义新方法:Map<String, List<String>> filters(RequestParams params);
-
在
HotelService
的实现类中实现该方法@Override public Map<String, List<String>> filters(RequestParams params) { try { // 1.准备Request SearchRequest request = new SearchRequest("hotel"); // 2.准备DSL // 2.1.query buildBasicQuery(params, request); // 2.2.设置size request.source().size(0); // 2.3.聚合 buildAggregation(request); // 3.发出请求 SearchResponse response = client.search(request, RequestOptions.DEFAULT); // 4.解析结果 Map<String, List<String>> result = new HashMap<>(); Aggregations aggregations = response.getAggregations(); // 4.1.根据品牌名称,获取品牌结果 List<String> brandList = getAggByName(aggregations, "brandAgg"); result.put("品牌", brandList); // 4.2.根据品牌名称,获取品牌结果 List<String> cityList = getAggByName(aggregations, "cityAgg"); result.put("城市", cityList); // 4.3.根据品牌名称,获取品牌结果 List<String> starList = getAggByName(aggregations, "starAgg"); result.put("星级", starList); return result; } catch (IOException e) { throw new RuntimeException(e); } } private void buildAggregation(SearchRequest request) { request.source().aggregation(AggregationBuilders .terms("brandAgg") .field("brand") .size(100) ); request.source().aggregation(AggregationBuilders .terms("cityAgg") .field("city") .size(100) ); request.source().aggregation(AggregationBuilders .terms("starAgg") .field("starName") .size(100) ); } private List<String> getAggByName(Aggregations aggregations, String aggName) { // 4.1.根据聚合名称获取聚合结果 Terms brandTerms = aggregations.get(aggName); // 4.2.获取buckets List<? extends Terms.Bucket> buckets = brandTerms.getBuckets(); // 4.3.遍历 List<String> brandList = new ArrayList<>(); for (Terms.Bucket bucket : buckets) { // 4.4.获取key String key = bucket.getKeyAsString(); brandList.add(key); } return brandList; }
-
注意:
- 案例部分,代码并不一定需要自定手动一个一个敲,但是需要自己操作一边,去验证最终的结果!!!
- 同时,努力处理自己遇到的问题!!!
二 自动补全
- 效果如图:
- 根据用户输入的字母,提示完整词条的功能,就是自动补全
- 因为需要根据拼音字母来推断,因此要用到拼音分词功能
2.1 拼音分词器的安装
- 要实现根据字母做补全,就必须对文档按照拼音分词。在GitHub上恰好有elasticsearch的拼音分词插件。地址
-
下载并解压
-
上传到虚拟机中,elasticsearch的plugin目录
- 安装插件需要知道elasticsearch的plugins目录位置,而我们用了数据卷挂载,因此需要查看elasticsearch的数据卷目录,通过下面命令查看:
docker volume inspect es-plugins
- 然后将,解压后的文件上传到这个目录下,如
py
是解压后重命名后的拼音分词器
-
重启elasticsearch
docker restart es
-
测试
POST /_analyze { "text": "如家酒店还不错", "analyzer": "pinyin" }
2.2 自定义分词器
-
默认的拼音分词器会将每个汉字单独分为拼音,而我们希望的是每个词条形成一组拼音,需要对拼音分词器做个性化定制,形成自定义分词器。
-
elasticsearch中分词器(analyzer)的组成包含三部分:
- character filters:在tokenizer之前对文本进行处理。例如删除字符、替换字符
- tokenizer:将文本按照一定的规则切割成词条(term)。例如keyword,就是不分词;还有ik_smart
- tokenizer filter:将tokenizer输出的词条做进一步处理。例如大小写转换、同义词处理、拼音处理等
-
文档分词时会依次由这三部分来处理文档:
-
演示
#拼音分词器
DELETE /test
PUT /test
{
"settings": {
"analysis": {
"analyzer": {
"my_analyzer": {
"tokenizer": "ik_max_word",
"filter": "py"
}
},
"filter": {
"py": {
"type": "pinyin",
"keep_full_pinyin": false,
"keep_joined_full_pinyin": true,
"keep_original": true,
"limit_first_letter_length": 16,
"remove_duplicated_term": true,
"none_chinese_pinyin_tokenize": false
}
}
}
},
"mappings": {
"properties": {
"name": {
"type": "text",
"analyzer": "my_analyzer",
"search_analyzer": "ik_smart"
}
}
}
}
POST /test/_analyze
{
"text": ["如家酒店还不错"],
"analyzer": "my_analyzer"
}
- 结果:
- 拼音分词器注意事项
- 为了避免搜索到同音字,搜索时不要使用拼音分词器
2.3 自动补全查询
-
elasticsearch提供了Completion Suggester查询来实现自动补全功能。这个查询会匹配以用户输入内容开头的词条并返回。为了提高补全查询的效率,对于文档中字段的类型有一些约束:
- 参与补全查询的字段必须是completion类型。
- 字段的内容一般是用来补全的多个词条形成的数组。
-
演示:
# 自动补全查询
DELETE /test02
## 创建索引库
PUT /test02
{
"mappings": {
"properties": {
"title": {
"type": "completion"
}
}
}
}
## 示例数据
POST test02/_doc
{
"title": ["Sony", "WH-1000XM3"]
}
POST test02/_doc
{
"title": ["SK-II", "PITERA"]
}
POST test02/_doc
{
"title": ["Nintendo", "switch"]
}
## 自动补全查询
GET /test02/_search
{
"suggest": {
"title_suggest": {
"text": "s", # 关键字
"completion": {
"field": "title", #补全查询的字段
"skip_duplicates": true, #跳过重复的
"size": 10 #获取前10条结果
}
}
}
}
2.4 自动补全查询的JavaAPI
- 先看请求参数构造的API:
- 再来看结果解析:
- 重要代码:
@Override public List<String> getSuggestions(String prefix) { try { // 1.准备Request SearchRequest request = new SearchRequest("hotel"); // 2.准备DSL request.source().suggest(new SuggestBuilder().addSuggestion( "suggestions", SuggestBuilders.completionSuggestion("suggestion") .prefix(prefix) .skipDuplicates(true) .size(10) )); // 3.发起请求 SearchResponse response = client.search(request, RequestOptions.DEFAULT); // 4.解析结果 Suggest suggest = response.getSuggest(); // 4.1.根据补全查询名称,获取补全结果 CompletionSuggestion suggestions = suggest.getSuggestion("suggestions"); // 4.2.获取options List<CompletionSuggestion.Entry.Option> options = suggestions.getOptions(); // 4.3.遍历 List<String> list = new ArrayList<>(options.size()); for (CompletionSuggestion.Entry.Option option : options) { String text = option.getText().toString(); list.add(text); } return list; } catch (IOException e) { throw new RuntimeException(e); } }
三 数据同步
- 引入:
3.1 思路分析
- 常见的数据同步方案有三种:
- 方案一:同步调用
- 方案二:异步通知
- 方案三:监听binlog
3.2 方案一:同步调用
- 基本步骤如下:
- hotel-demo对外提供接口,用来修改elasticsearch中的数据
- 酒店管理服务在完成数据库操作后,直接调用hotel-demo提供的接口
3.3 方案二:异步通知
- 流程如下:
- hotel-admin对mysql数据库数据完成增、删、改后,发送MQ消息
- hotel-demo监听MQ,接收到消息后完成elasticsearch数据修改
3.4 监听binlog
- 流程如下:
- 给mysql开启binlog功能
- mysql完成增、删、改操作都会记录在binlog中
- hotel-demo基于canal监听binlog变化,实时更新elasticsearch中的内容
3.5 三种方案的对比总结
方案 | 同步调用 | 异步调用 | 监听binlog |
---|---|---|---|
优点 | 实现简单,粗暴 | 低耦合,实现难度一般 | 完全解除服务间耦合 |
缺点 | 业务耦合度高 | 依赖mq的可靠性 | 开启binlog增加数据库负担、实现复杂度高 |
3.6 实现数据同步案例总结
-
当酒店数据发生增、删、改时,要求对elasticsearch中数据也要完成相同操作。
-
步骤:
- 导入资料提供的hotel-admin项目,启动并测试酒店数据的CRUD
- 在hotel-admin中的增、删、改业务中完成消息发送
- 在hotel-demo中使用注解声明exchange、queue、RoutingKey,并完成消息监听,并更新elasticsearch中数据
- 启动并测试数据同步功能
-
MQ结构如图:
-
依赖:
<!--amqp--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
-
声明队列交换机名称
public class MqConstants { /** * 交换机 */ public final static String HOTEL_EXCHANGE = "hotel.topic"; /** * 监听新增和修改的队列 */ public final static String HOTEL_INSERT_QUEUE = "hotel.insert.queue"; /** * 监听删除的队列 */ public final static String HOTEL_DELETE_QUEUE = "hotel.delete.queue"; /** * 新增或修改的RoutingKey */ public final static String HOTEL_INSERT_KEY = "hotel.insert"; /** * 删除的RoutingKey */ public final static String HOTEL_DELETE_KEY = "hotel.delete"; }
-
发送MQ消息
@PostMapping public void saveHotel(@RequestBody Hotel hotel){ hotelService.save(hotel); rabbitTemplate.convertAndSend(MqConstants.HOTEL_EXCHANGE,MqConstants.HOTEL_INSERT_KEY,hotel.getId()); } @PutMapping() public void updateById(@RequestBody Hotel hotel){ if (hotel.getId() == null) { throw new InvalidParameterException("id不能为空"); } hotelService.updateById(hotel); rabbitTemplate.convertAndSend(MqConstants.HOTEL_EXCHANGE,MqConstants.HOTEL_INSERT_KEY,hotel.getId()); } @DeleteMapping("/{id}") public void deleteById(@PathVariable("id") Long id) { hotelService.removeById(id); rabbitTemplate.convertAndSend(MqConstants.HOTEL_EXCHANGE,MqConstants.HOTEL_DELETE_KEY,id); }
-
接收MQ消息
- hotel-demo接收到MQ消息要做的事情包括:
- 新增消息:根据传递的hotel的id查询hotel信息,然后新增一条数据到索引库
- 删除消息:根据传递的hotel的id删除索引库中的一条数据
- 在
service
中定义新增、删除业务
void deleteById(Long id); void insertById(Long id);
- 在其实现类中,实现业务
@Override public void deleteById(Long id) { try { // 1.准备Request DeleteRequest request = new DeleteRequest("hotel", id.toString()); // 2.发送请求 client.delete(request, RequestOptions.DEFAULT); } catch (IOException e) { throw new RuntimeException(e); } } @Override public void insertById(Long id) { try { // 0.根据id查询酒店数据 Hotel hotel = getById(id); // 转换为文档类型 HotelDoc hotelDoc = new HotelDoc(hotel); // 1.准备Request对象 IndexRequest request = new IndexRequest("hotel").id(hotel.getId().toString()); // 2.准备Json文档 request.source(JSON.toJSONString(hotelDoc), XContentType.JSON); // 3.发送请求 client.index(request, RequestOptions.DEFAULT); } catch (IOException e) { throw new RuntimeException(e); } }
- hotel-demo接收到MQ消息要做的事情包括:
-
编写监听器
import cn.itcast.hotel.constants.MqConstants; import cn.itcast.hotel.service.IHotelService; import org.springframework.amqp.core.ExchangeTypes; import org.springframework.amqp.rabbit.annotation.Exchange; import org.springframework.amqp.rabbit.annotation.Queue; import org.springframework.amqp.rabbit.annotation.QueueBinding; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; @Component public class HotelListener { @Autowired private IHotelService hotelService; /** * 监听酒店新增或修改的业务 * @param id 酒店id * queues = */ @RabbitListener(bindings = @QueueBinding(value = @Queue(name =MqConstants.HOTEL_INSERT_QUEUE), exchange = @Exchange(name = MqConstants.HOTEL_EXCHANGE,type = ExchangeTypes.TOPIC, autoDelete="false",durable = "true"), key = {MqConstants.HOTEL_INSERT_KEY} ) ) public void listenHotelInsertOrUpdate(Long id){ hotelService.insertById(id); } /** * 监听酒店删除的业务 * @param id 酒店id */ @RabbitListener(bindings = @QueueBinding(value = @Queue(name =MqConstants.HOTEL_DELETE_QUEUE), exchange = @Exchange(name = MqConstants.HOTEL_EXCHANGE,type = ExchangeTypes.TOPIC, autoDelete="false",durable = "true"), key = {MqConstants.HOTEL_DELETE_KEY} ) ) public void listenHotelDelete(Long id){ hotelService.deleteById(id); } }
3.7 数据同步案例的测试
- 在rabbitMq中,可以看到队列注册完成
- 通过交换机绑定情况,确定项目正常运行
- 修改
上海希尔顿酒店
的价格
- 通过
vue devtools
插件工具,查看上海希尔顿酒店
的文档ID
- 然后,修改其价格
- 查看,队列的消息记录
- 查看价格
3.8 补充:vue Devtools插件的安装
- 不推荐大家,自己去下载源码,手动编译安装!!!会遇到很多错误,出力不讨好!!!
3.8.1 edge浏览器安装方法
- 在edge拓展商店中,搜索安装即可
- 目前的版本为稳定版本6.5.0
- 目前的版本为稳定版本6.5.0
3.8.2 chrome浏览器的安装方法
- 由于国内,chrome无法正常访问chrome商店,所以需要借助第三方的插件网站极简插件网站的Vue Devtools下载地址
- 下载Vue Devtools,并解压缩
- 打开chrome的开发者模式:设置->拓展程序->开发者模式
- 将解压文件夹中的
.crx
文件,拖入chrome浏览器的拓展程序页面,点击添加拓展即可
3.9 对于vue devtools工具的解释
- 安装后的检测
- 该工具,只有在本地运行vue前端页面时,控制台才显示该插件,在其他网页并不会出现
- 所以:正确的检测是否安装成功的方法是:启动本地vue项目,打开控制台查看vue devtools
- 该插件默认配置文件中
persistent
的值为:true
,所以,无需修改
四 elasticsearch集群
- 单机的elasticsearch做数据存储,必然面临两个问题:海量数据存储问题、单点故障问题。
- 海量数据存储问题:将索引库从逻辑上拆分为N个分片(shard),存储到多个节点
- 单点故障问题:将分片数据在不同节点备份(replica )
4.1 ES集群相关概念
- 集群(cluster):一组拥有共同的 cluster name 的 节点。
- 节点(node) :集群中的一个 Elasticearch 实例
- 分片(shard):索引可以被拆分为不同的部分进行存储,称为分片。在集群环境下,一个索引的不同分片可以拆分到不同的节点中
- 解决问题:数据量太大,单点存储量有限的问题。
- 主分片(Primary shard):相对于副本分片的定义。
- 副本分片(Replica shard)每个主分片可以有一个或者多个副本,数据和主分片一样。
- 为了在高可用和成本间寻求平衡,我们可以这样做:
- 首先对数据分片,存储到不同节点
- 然后对每个分片进行备份,放到对方节点,完成互相备份
- 现在,每个分片都有1个备份,存储在3个节点:
- node0:保存了分片0和1
- node1:保存了分片0和2
- node2:保存了分片1和2
4.2 搭建ES集群
- 我们在单机上利用docker容器运行多个es实例来模拟es集群。不过生产环境推荐大家每一台服务节点仅部署一个es的实例。
4.3.1 创建es集群
- 首先编写一个docker-compose.yml文件,内容如下:
version: '2.2' services: es01: image: elasticsearch:7.12.1 container_name: es01 environment: - node.name=es01 - cluster.name=es-docker-cluster - discovery.seed_hosts=es02,es03 - cluster.initial_master_nodes=es01,es02,es03 - "ES_JAVA_OPTS=-Xms512m -Xmx512m" volumes: - data01:/usr/share/elasticsearch/data ports: - 9200:9200 networks: - elastic es02: image: elasticsearch:7.12.1 container_name: es02 environment: - node.name=es02 - cluster.name=es-docker-cluster - discovery.seed_hosts=es01,es03 - cluster.initial_master_nodes=es01,es02,es03 - "ES_JAVA_OPTS=-Xms512m -Xmx512m" volumes: - data02:/usr/share/elasticsearch/data ports: - 9201:9200 networks: - elastic es03: image: elasticsearch:7.12.1 container_name: es03 environment: - node.name=es03 - cluster.name=es-docker-cluster - discovery.seed_hosts=es01,es02 - cluster.initial_master_nodes=es01,es02,es03 - "ES_JAVA_OPTS=-Xms512m -Xmx512m" volumes: - data03:/usr/share/elasticsearch/data networks: - elastic ports: - 9202:9200 volumes: data01: driver: local data02: driver: local data03: driver: local networks: elastic: driver: bridge
- es运行需要修改一些linux系统权限,修改
/etc/sysctl.conf
文件vi /etc/sysctl.conf
- 然后执行命令,让配置生效:
sysctl -p
- 通过docker-compose启动集群
docker-compose up -d
[root@kongyue tmp]# docker-compose up -d Starting es01 ... done Creating es03 ... done Creating es02 ... done
4.4 集群状态监控
4.4.1 win安装cerebro【不推荐】
-
kibana可以监控es集群,不过新版本需要依赖es的x-pack 功能,配置比较复杂。
-
这里推荐使用cerebro来监控es集群状态,官方网址解压好的目录如下:
-
进入对应的bin目录:
-
双击其中的cerebro.bat文件即可启动服务。
4.4.2 闪退问题[未解决]
Oops, cannot start the server.
com.google.common.util.concurrent.UncheckedExecutionException: java.lang.IllegalStateException: Unable to load cache item
at com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2051)
at com.google.common.cache.LocalCache.get(LocalCache.java:3951)
at com.google.common.cache.LocalCache.getOrLoad(LocalCache.java:3974)
at com.google.common.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4958)
at com.google.common.cache.LocalCache$LocalLoadingCache.getUnchecked(LocalCache.java:4964)
at com.google.inject.internal.FailableCache.get(FailableCache.java:54)
at com.google.inject.internal.ConstructorInjectorStore.get(ConstructorInjectorStore.java:49)
at com.google.inject.internal.ConstructorBindingImpl.initialize(ConstructorBindingImpl.java:155)
at com.google.inject.internal.InjectorImpl.initializeBinding(InjectorImpl.java:592)
at com.google.inject.internal.AbstractBindingProcessor$Processor.initializeBinding(AbstractBindingProcessor.java:173)
at com.google.inject.internal.AbstractBindingProcessor$Processor.lambda$scheduleInitialization$0(AbstractBindingProcessor.java:160)
at com.google.inject.internal.ProcessedBindingData.initializeBindings(ProcessedBindingData.java:49)
at com.google.inject.internal.InternalInjectorCreator.initializeStatically(InternalInjectorCreator.java:124)
at com.google.inject.internal.InternalInjectorCreator.build(InternalInjectorCreator.java:108)
at com.google.inject.Guice.createInjector(Guice.java:87)
at com.google.inject.Guice.createInjector(Guice.java:78)
at play.api.inject.guice.GuiceBuilder.injector(GuiceInjectorBuilder.scala:200)
at play.api.inject.guice.GuiceApplicationBuilder.build(GuiceApplicationBuilder.scala:155)
at play.api.inject.guice.GuiceApplicationLoader.load(GuiceApplicationLoader.scala:21)
at play.core.server.ProdServerStart$.start(ProdServerStart.scala:54)
at play.core.server.ProdServerStart$.main(ProdServerStart.scala:30)
at play.core.server.ProdServerStart.main(ProdServerStart.scala)
Caused by: java.lang.IllegalStateException: Unable to load cache item
at com.google.inject.internal.cglib.core.internal.$LoadingCache.createEntry(LoadingCache.java:79)
at com.google.inject.internal.cglib.core.internal.$LoadingCache.get(LoadingCache.java:34)
at com.google.inject.internal.cglib.core.$AbstractClassGenerator$ClassLoaderData.get(AbstractClassGenerator.java:119)
at com.google.inject.internal.cglib.core.$AbstractClassGenerator.create(AbstractClassGenerator.java:294)
at com.google.inject.internal.cglib.reflect.$FastClass$Generator.create(FastClass.java:65)
at com.google.inject.internal.BytecodeGen.newFastClassForMember(BytecodeGen.java:258)
at com.google.inject.internal.BytecodeGen.newFastClassForMember(BytecodeGen.java:207)
at com.google.inject.internal.DefaultConstructionProxyFactory.create(DefaultConstructionProxyFactory.java:49)
at com.google.inject.internal.ProxyFactory.create(ProxyFactory.java:156)
at com.google.inject.internal.ConstructorInjectorStore.createConstructor(ConstructorInjectorStore.java:94)
at com.google.inject.internal.ConstructorInjectorStore.access$000(ConstructorInjectorStore.java:30)
at com.google.inject.internal.ConstructorInjectorStore$1.create(ConstructorInjectorStore.java:38)
at com.google.inject.internal.ConstructorInjectorStore$1.create(ConstructorInjectorStore.java:34)
at com.google.inject.internal.FailableCache$1.load(FailableCache.java:43)
at com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3529)
at com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2278)
at com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2155)
at com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2045)
... 21 more
Caused by: java.lang.ExceptionInInitializerError
at com.google.inject.internal.cglib.core.$DuplicatesPredicate.evaluate(DuplicatesPredicate.java:104)
at com.google.inject.internal.cglib.core.$CollectionUtils.filter(CollectionUtils.java:52)
at com.google.inject.internal.cglib.reflect.$FastClassEmitter.<init>(FastClassEmitter.java:69)
at com.google.inject.internal.cglib.reflect.$FastClass$Generator.generateClass(FastClass.java:77)
at com.google.inject.internal.cglib.core.$DefaultGeneratorStrategy.generate(DefaultGeneratorStrategy.java:25)
at com.google.inject.internal.cglib.core.$AbstractClassGenerator.generate(AbstractClassGenerator.java:332)
at com.google.inject.internal.cglib.core.$AbstractClassGenerator$ClassLoaderData$3.apply(AbstractClassGenerator.java:96)
at com.google.inject.internal.cglib.core.$AbstractClassGenerator$ClassLoaderData$3.apply(AbstractClassGenerator.java:94)
at com.google.inject.internal.cglib.core.internal.$LoadingCache$2.call(LoadingCache.java:54)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:317)
at com.google.inject.internal.cglib.core.internal.$LoadingCache.createEntry(LoadingCache.java:61)
... 38 more
Caused by: com.google.inject.internal.cglib.core.$CodeGenerationException: java.lang.reflect.InaccessibleObjectException-->Unable to make protected final java.lang.Class java.lang.ClassLoader.defineClass(java.lang.String,byte[],int,int,java.security.ProtectionDomain) throws java.lang.ClassFormatError accessible: module java.base does not "opens java.lang" to unnamed module @6a988392
at com.google.inject.internal.cglib.core.$ReflectUtils.defineClass(ReflectUtils.java:464)
at com.google.inject.internal.cglib.core.$AbstractClassGenerator.generate(AbstractClassGenerator.java:339)
at com.google.inject.internal.cglib.core.$AbstractClassGenerator$ClassLoaderData$3.apply(AbstractClassGenerator.java:96)
at com.google.inject.internal.cglib.core.$AbstractClassGenerator$ClassLoaderData$3.apply(AbstractClassGenerator.java:94)
at com.google.inject.internal.cglib.core.internal.$LoadingCache$2.call(LoadingCache.java:54)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:317)
at com.google.inject.internal.cglib.core.internal.$LoadingCache.createEntry(LoadingCache.java:61)
at com.google.inject.internal.cglib.core.internal.$LoadingCache.get(LoadingCache.java:34)
at com.google.inject.internal.cglib.core.$AbstractClassGenerator$ClassLoaderData.get(AbstractClassGenerator.java:119)
at com.google.inject.internal.cglib.core.$AbstractClassGenerator.create(AbstractClassGenerator.java:294)
at com.google.inject.internal.cglib.core.$KeyFactory$Generator.create(KeyFactory.java:221)
at com.google.inject.internal.cglib.core.$KeyFactory.create(KeyFactory.java:174)
at com.google.inject.internal.cglib.core.$KeyFactory.create(KeyFactory.java:157)
at com.google.inject.internal.cglib.core.$KeyFactory.create(KeyFactory.java:149)
at com.google.inject.internal.cglib.core.$KeyFactory.create(KeyFactory.java:145)
at com.google.inject.internal.cglib.core.$MethodWrapper.<clinit>(MethodWrapper.java:23)
... 49 more
Caused by: java.lang.reflect.InaccessibleObjectException: Unable to make protected final java.lang.Class java.lang.ClassLoader.defineClass(java.lang.String,byte[],int,int,java.security.ProtectionDomain) throws java.lang.ClassFormatError accessible: module java.base does not "opens java.lang" to unnamed module @6a988392
at java.base/java.lang.reflect.AccessibleObject.throwInaccessibleObjectException(AccessibleObject.java:387)
at java.base/java.lang.reflect.AccessibleObject.checkCanSetAccessible(AccessibleObject.java:363)
at java.base/java.lang.reflect.AccessibleObject.checkCanSetAccessible(AccessibleObject.java:311)
at java.base/java.lang.reflect.Method.checkCanSetAccessible(Method.java:201)
at java.base/java.lang.reflect.Method.setAccessible(Method.java:195)
at com.google.inject.internal.cglib.core.$ReflectUtils$1.run(ReflectUtils.java:61)
at java.base/java.security.AccessController.doPrivileged(AccessController.java:569)
at com.google.inject.internal.cglib.core.$ReflectUtils.<clinit>(ReflectUtils.java:52)
at com.google.inject.internal.cglib.reflect.$FastClassEmitter.<init>(FastClassEmitter.java:67)
... 46 more
- 由于jdk版本过高,导致与cerebo不兼容,貌似目前没有解决的方法!!!
- 当然,作者水平有限!如果有解决的方法,请告诉我,感激不尽
4.4.3 linux安装cerebo
- 下载cerebo
- 建议:在浏览器中安装github加速器插件【有梯子的请略过】
- 然后将压缩包上传到linux中,进行安装
rpm -ivh cerebro-0.9.4-1.noarch.rpm
- 修改配置文件
vim /usr/share/cerebro/conf/application.conf
- cerebro的启动 状态查看和关闭:【不推荐】
- 该种启动方式无法在外部设备访问
# 停止 systemctl stop cerebro # 开启 systemctl start cerebro # 查看状态 systemctl status cerebro
- 启动命令:
- 为了便于问题排除可以直接使用命令启动cerebro
/usr/share/cerebro/bin/cerebro
- 默认启动的:
[info] play.api.Play - Application started (Prod) (no global state) [info] p.c.s.AkkaHttpServer - Listening for HTTP on /0:0:0:0:0:0:0:0:9000
- 访问
ip:9000
:
- 输入elasticsearch的任意节点的地址和端口,点击connect即可
- 绿色的条,代表集群处于绿色(健康状态)
4.4.5 创建索引库
4.4.6 利用kibana的DevTools创建索引库【非实际操作】
- 在DevTools中输入指令:
PUT /itcast { "settings": { "number_of_shards": 3, // 分片数量 "number_of_replicas": 1 // 副本数量 }, "mappings": { "properties": { // mapping映射定义 ... } } }
4.4.7 利用cerebro创建索引库【实际操作】
- 利用cerebro还可以创建索引库:
-
填写索引库信息:
-
点击右下角的create按钮:
-
点击右下角的create按钮:
4.4.8 查看分片效果
- 回到首页,即可查看索引库分片效果:
4.9 集群脑裂问题
4.9.1 集群职责划分
- elasticsearch中集群节点有不同的职责划分:
- 集群一定要将集群职责分离:
- master节点:对CPU要求高,但是内存要求第
- data节点:对CPU和内存要求都高
- coordinating节点:对网络带宽、CPU要求高
- 职责分离可以根据不同节点的需求分配不同的硬件去部署。而且避免业务之间的互相干扰。
- elasticsearch中的每个节点角色都有自己不同的职责,因此建议集群部署时,每个节点都有独立的角色。
4.9.2 脑裂问题
-
默认情况下,每个节点都是master eligible节点,因此一旦master节点宕机,其它候选节点会选举一个成为主节点。当主节点与其他节点网络故障时,可能发生脑裂问题。
-
当node3当选后,集群继续对外提供服务,node2和node3自成集群,node1自成集群,两个集群数据不同步,出现数据差异,出现脑裂的情况。
-
为了避免脑裂,需要要求选票超过 ( eligible节点数量 + 1 )/ 2 才能当选为主,因此eligible节点数量最好是奇数。
-
对应配置项是discovery.zen.minimum_master_nodes,在es7.0以后,已经成为默认配置,因此一般不会发生脑裂问题
4.9.3 小结
-
master eligible节点的作用:
- 参与集群选主
- 主节点可以管理集群状态、管理分片信息、处理创建和删除索引库的请求
-
data节点的作用:
- 数据的CRUD
-
coordinator节点的作用:
- 路由请求到其它节点
- 合并查询到的结果,返回给用户
4.10 集群分布式存储
- 当新增文档时,应该保存到不同分片,保证数据均衡,那么coordinating node如何确定数据该存储到哪个分片?
4.10.1 分片存储测试
- 测试使用的工具
insomnia
,Insomnia 与postman一样,是一个免费的跨平台接口测试桌面应用程序 - insomnia官网,如果要下载的话建议去其他网站下载,官网太慢
- 这里作者提供一个目前最新版的「Insomnia.Core-2023.1.0.exe」
- 测试可以看到,三条数据分别在不同分片:
4.10.2 分片存储原理
- elasticsearch会通过hash算法来计算文档应该存储到哪个分片:
- 说明:
- _routing默认是文档的id
- 算法与分片数量有关,因此索引库一旦创建,分片数量不能修改
- 新增文档的流程如下
- 解读:
- 1)新增一个id=1的文档
- 2)对id做hash运算,假如得到的是2,则应该存储到shard-2
- 3)shard-2的主分片在node3节点,将数据路由到node3
- 4)保存文档
- 5)同步给shard-2的副本replica-2,在node2节点
- 6)返回结果给coordinating-node节点
4.10.3 集群分布式查询
- elasticsearch的查询分成两个阶段:
- scatter phase:分散阶段,coordinating node会把请求分发到每一个分片
- gather phase:聚集阶段,coordinating node汇总data node的搜索结果,并处理为最终结果集返回给用户
4.10.4 集群故障转移
- 故障转移:集群的master节点会监控集群中的节点状态,如果发现有节点宕机,会立即将宕机节点的分片数据迁移到其它节点,确保数据安全。
- 例如一个集群结构如图:node1是主节点,其它两个节点是从节点
- node1发生了故障
- 宕机后的第一件事,需要重新选主,例如选中了node2
- node2成为主节点后,会检测集群监控状态,发现:shard-1、shard-0没有副本节点。因此需要将node1上的数据迁移到node2、node3
- 动图演示: