Spring Cloud学习(十一)【深入Elasticsearch 分布式搜索引擎03】

文章目录

  • 数据聚合
    • 聚合的种类
    • DSL实现聚合
    • RestAPI实现聚合
  • 自动补全
    • 拼音分词器
    • 自定义分词器
    • 自动补全查询
      • completion suggester查询
      • RestAPI实现自动补全
  • 数据同步
    • 数据同步思路分析
    • 实现elasticsearch与数据库数据同步
  • 集群
    • 搭建ES集群
      • 创建es集群
      • 集群状态监控
      • 创建索引库
        • 1)利用kibana的DevTools创建索引库
        • 2)利用cerebro创建索引库
      • 查看分片效果
    • ES集群的节点角色
    • 集群脑裂问题
    • 集群分布式存储
    • 集群分布式查询
    • 集群故障转移


数据聚合

聚合的种类

聚合(aggregations)可以实现对文档数据的统计、分析、运算。聚合常见的有三类:

  • 桶(Bucket)聚合:用来对文档做分组

    • TermAggregation:按照文档字段值分组
    • Date Histogram:按照日期阶梯分组,例如一周为一组,或者一月为一组
  • 度量(Metric)聚合:用以计算一些值,比如:最大值、最小值、平均值等

    • Avg:求平均值
    • Max:求最大值
    • Min:求最小值
    • Stats:同时求max、min、avg、sum等
  • 管道(pipeline)聚合:其它聚合的结果为基础做聚合

可以类比mysql数据库,(桶=》group by 分组,度量=》聚合函数,管道=》)

参与聚合的字段类型必须是:

  • keyword
  • 数值
  • 日期
  • 布尔

DSL实现聚合

DSL实现Bucket聚合

现在,我们要统计所有数据中的酒店品牌有几种,此时可以根据酒店品牌的名称做聚合。
类型为 term 类型,DSL示例:

GET /hotel/_search
{
  "size": 0,  // 设置size为0,结果中不包含文档,只包含聚合结果
  "aggs": { // 定义聚合
    "brandAgg": { //给聚合起个名字
      "terms": { // 聚合的类型,按照品牌值聚合,所以选择term
        "field": "brand", // 参与聚合的字段
        "size": 20 // 希望获取的聚合结果数量
      }
    }
  }
}

Bucket聚合-聚合结果排序

默认情况下,Bucket 聚合会统计 Bucket 内的文档数量,记为 _count,并且按照 _count 降序排序。
我们可以修改结果排序方式:

GET /hotel/_search
{
  "size": 0, 
  "aggs": {
    "brandAgg": {
      "terms": {
        "field": "brand",
        "order": {
          "_count": "asc" // 按照_count升序排列
        },
        "size": 20
      }
    }
  }
}

Bucket聚合-限定聚合范围

默认情况下,Bucket聚合是对索引库的所有文档做聚合,我们可以限定要聚合的文档范围,只要添加 query 条件即可:

GET /hotel/_search
{
  "query": {
    "range": {
      "price": {
        "lte": 200 // 只对200元以下的文档聚合
      }
    }
  }, 
  "size": 0, 
  "aggs": {
    "brandAgg": {
      "terms": {
        "field": "brand",
        "size": 20
      }
    }
  }
}

aggs代表聚合,与query同级,此时query的作用是?

  • 限定聚合的的文档范围

聚合必须的三要素:

  • 聚合名称
  • 聚合类型
  • 聚合字段

聚合可配置属性有:

  • size:指定聚合结果数量
  • order:指定聚合结果排序方式
  • field:指定聚合字段

DSL实现Metrics 聚合

例如,我们要求获取每个品牌的用户评分的 min、max、avg 等值.
我们可以利用 stats 聚合:

GET /hotel/_search
{
  "size": 0, 
  "aggs": {
    "brandAgg": { 
      "terms": { 
        "field": "brand", 
        "size": 20
      },
      "aggs": { // 是brands聚合的子聚合,也就是分组后对每组分别计算
        "score_stats": { // 聚合名称
          "stats": { // 聚合类型,这里stats可以计算min、max、avg等
            "field": "score" // 聚合字段,这里是score
          }
        }
      }
    }
  }
}

RestAPI实现聚合

我们以品牌聚合为例,演示下 JavaRestClient 使用,先看请求组装:

在这里插入图片描述

再看下聚合结果解析

在这里插入图片描述

在IUserService中定义方法,实现对品牌、城市、星级的聚合

需求:搜索页面的品牌、城市等信息不应该是在页面写死,而是通过聚合索引库中的酒店数据得来的:

在这里插入图片描述

在IUserService中定义一个方法,实现对品牌、城市、星级的聚合,方法声明如下:

在这里插入图片描述

对接前端接口

前端页面会向服务端发起请求,查询品牌、城市、星级等字段的聚合结果:
在这里插入图片描述

可以看到请求参数与之前search时的RequestParam完全一致,这是在限定聚合时的文档范围。
例如:用户搜索“外滩”,价格在300~600,那聚合必须是在这个搜索条件基础上完成。
因此我们需要:

  1. 编写controller接口,接收该请求
  2. 修改IUserService#getFilters()方法,添加RequestParam参数
  3. 修改getFilters方法的业务,聚合时添加query条件
@Test
void testAggregation() throws IOException {
    // 1. 准备Request
    SearchRequest request = new SearchRequest("hotel");
    // 2. 准备DSL
    // 2.1 设置size
    request.source().size(0);
    // 2.2 聚合
    request.source().aggregation(AggregationBuilders
            .terms("brandAgg")
            .field("brand")
            .size(10)
    );
    // 3. 发出请求
    SearchResponse response = client.search(request, RequestOptions.DEFAULT);
    // 4. 解析结果
    Aggregations aggregations = response.getAggregations();
    // 4.1 根据聚合名称获取聚合结果
    Terms brandTerms = aggregations.get("brandAgg");
    // 4.2 获取 buckets
    List<? extends Terms.Bucket> buckets = brandTerms.getBuckets();
    for (Terms.Bucket bucket : buckets) {
        String key = bucket.getKeyAsString();
        System.out.println(key);
    }
}

Controller

@PostMapping("filters")
public Map<String, List<String>> getFilters(@RequestBody RequestParams params){
    return hotelService.filters(params);
}

Service接口

@Override
public Map<String, List<String>> filters(RequestParams params) {
    try {
        // 1. 准备Request
        SearchRequest request = new SearchRequest("hotel");
        // 2. 准备DSL
        // 2.1 query
        buildBasicQuery(params, request);
        // 2.2 设置size
        request.source().size(0);
        // 2.3 聚合
        buildAggregation(request);
        // 3. 发出请求
        SearchResponse response = client.search(request, RequestOptions.DEFAULT);
        // 4. 解析结果
        Map<String, List<String>> result = new HashMap<>();
        Aggregations aggregations = response.getAggregations();
        // 4.1 根据品牌名称,获取品牌结果
        List<String> brandList = getAggByName(aggregations, "brandAgg");
        // 4.2 根据城市名称,获取城市结果
        List<String> cityList = getAggByName(aggregations, "cityAgg");
        // 4.3 根据星级名称,获取星级结果
        List<String> starList = getAggByName(aggregations, "starAgg");

        // 4.4 放入map
        result.put("品牌", brandList);
        result.put("城市", cityList);
        result.put("星级", starList);
        return result;
    } catch (IOException e) {
        throw new RuntimeException(e);
    } 
}

private List<String> getAggByName(Aggregations aggregations, String aggName) {
    // 4.1 根据聚合名称获取聚合结果
    Terms brandTerms = aggregations.get(aggName);
    // 4.2 获取 buckets
    List<? extends Terms.Bucket> buckets = brandTerms.getBuckets();
    // 4.3 遍历
    List<String> brandList = new ArrayList<>();
    for (Terms.Bucket bucket : buckets) {
        String key = bucket.getKeyAsString();
        brandList.add(key);
    }
    return brandList;
}

private void buildAggregation(SearchRequest request) {
    request.source().aggregation(AggregationBuilders
            .terms("brandAgg")
            .field("brand")
            .size(100)
    );
    request.source().aggregation(AggregationBuilders
            .terms("cityAgg")
            .field("city")
            .size(100)
    );
    request.source().aggregation(AggregationBuilders
            .terms("starAgg")
            .field("star")
            .size(100)
    );
}

private void buildBasicQuery(RequestParams params, SearchRequest request) {
    // 1. 构建BooleanQuery
    BoolQueryBuilder boolQuery = QueryBuilders.boolQuery();
    // 关键字搜索
    String key = params.getKey();
    if(key == null || "".equals(key)){
        boolQuery.must(QueryBuilders.matchAllQuery());
    }else{
        boolQuery.must(QueryBuilders.matchQuery("all", key));
    }
    // 条件过滤
    // 城市条件
    if (params.getCity() != null && !params.getCity().equals("")){
        boolQuery.filter(QueryBuilders.termQuery("city", params.getCity()));
    }
    // 品牌条件
    if (params.getBrand() != null && !params.getBrand().equals("")){
        boolQuery.filter(QueryBuilders.termQuery("brand", params.getBrand()));
    }
    // 星级条件
    if (params.getStarName() != null && !params.getStarName().equals("")){
        boolQuery.filter(QueryBuilders.termQuery("starName", params.getBrand()));
    }
    // 价格
    if (params.getMinPrice() != null && params.getMaxPrice() != null){
        boolQuery.filter(QueryBuilders
                 .rangeQuery("price").gte(params.getMinPrice()).lte(params.getMaxPrice()));
    }
    // 2. 算分控制
    FunctionScoreQueryBuilder functionScoreQuery = QueryBuilders.functionScoreQuery(
            // 原始查询,相关性算分查询
            boolQuery,
            // function score 的数组
            new FunctionScoreQueryBuilder.FilterFunctionBuilder[]{
                    // 其中的一个 function score 元素
                    new FunctionScoreQueryBuilder.FilterFunctionBuilder(
                            // 过滤条件
                            QueryBuilders.termQuery("isAD", true),
                            // 算分函数
                            ScoreFunctionBuilders.weightFactorFunction(10)
                    )
            });

    request.source().query(functionScoreQuery);
}

自动补全

自动补全需求说明

当用户在搜索框输入字符时,我们应该提示出与该字符有关的搜索项,如图:

在这里插入图片描述

拼音分词器

要实现根据字母做补全,就必须对文档按照拼音分词。在GitHub上恰好有elasticsearch的拼音分词插件。地址:https://github.com/medcl/elasticsearch-analysis-pinyin

安装方式与IK分词器一样,分三步:

  1. 解压
  2. 上传到虚拟机中,elasticsearch的plugin目录
  3. 重启elasticsearch
  4. 测试
POST /_analyze
{
 "text": "如家酒店整挺好",
 "analyzer": "pinyin"
}

在这里插入图片描述

自定义分词器

elasticsearch中分词器(analyzer)的组成包含三部分:

  • character filters:在tokenizer之前对文本进行处理。例如删除字符、替换字符
  • tokenizer:将文本按照一定的规则切割成词条(term)。例如keyword,就是不分词;还有ik_smart
  • tokenizer filter:将tokenizer输出的词条做进一步处理。例如大小写转换、同义词处理、拼音处理等

在这里插入图片描述

我们可以在创建索引库(自定义分词器只对指定的索引库适用)时,通过settings来配置自定义的analyzer(分词器):

在这里插入图片描述

拼音分词器适合在创建倒排索引的时候使用,但不能在搜索的时候使用。
创建倒排索引时:

在这里插入图片描述

因此字段在创建倒排索引时应该用 my_analyzer 分词器;字段在搜索时应该使用 ik_smart 分词器;

在这里插入图片描述

DELETE /test

# 自定义拼音分词器
PUT /test
{
  "settings": {
    "analysis": {
      "analyzer": { 
        "my_analyzer": {
          "tokenizer": "ik_max_word",
          "filter": "py"
        }
      },
      "filter": {
        "py": { 
          "type": "pinyin",
          "keep_full_pinyin": false,
          "keep_joined_full_pinyin": true,
          "keep_original": true,
          "limit_first_letter_length": 16,
          "remove_duplicated_term": true,
          "none_chinese_pinyin_tokenize": false
        }
      }
    }
  },
  "mappings": {
    "properties": {
      "name":{
        "type": "text",
        "analyzer": "my_analyzer",  
        "search_analyzer": "ik_smart"
      }
    }
  }
}

POST /test/_doc/1
{
  "id": 1,
  "name": "狮子"
}
POST /test/_doc/2
{
  "id": 2,
  "name": "虱子"
}

GET /test/_search
{
  "query": {
    "match": {
      "name": "掉入狮子笼咋办"
    }
  }
}

如何使用拼音分词器?

  1. 下载pinyin分词器
  2. 解压并放到elasticsearch的plugin目录
  3. 重启即可

如何自定义分词器?

  1. 创建索引库时,在settings中配置,可以包含三部分
  2. character filter
  3. tokenizer
  4. filter

拼音分词器注意事项?

  • 为了避免搜索到同音字,搜索时不要使用拼音分词器

自动补全查询

completion suggester查询

elasticsearch提供了Completion Suggester 查询来实现自动补全功能。这个查询会匹配以用户输入内容开头的词条并返回。为了提高补全查询的效率,对于文档中字段的类型有一些约束:

  • 参与补全查询的字段必须是completion类型。
  • 字段的内容一般是用来补全的多个词条形成的数组。

在这里插入图片描述
查询语法如下:

在这里插入图片描述

# 自动补全的索引库
PUT test2
{
  "mappings": {
    "properties": {
      "title":{
        "type": "completion"
      }
    }
  }
}
# 示例数据
POST test2/_doc
{
  "title": ["Sony", "WH-1000XM3"]
}
POST test2/_doc
{
  "title": ["SK-II", "PITERA"]
}
POST test2/_doc
{
  "title": ["Nintendo", "switch"]
}


# 自动补全查询
GET /test2/_search
{
  "suggest": {
    "titelSuggest": {
      "text": "s",
      "completion": {
        "field": "title",
        "skip_duplicates": true,
        "size": 10
      }
    }
  }
}

在这里插入图片描述
自动补全对字段的要求:

  • 类型是completion类型
  • 字段值是多词条的数组

酒店数据自动补全

实现hotel索引库的自动补全、拼音搜索功能

实现思路如下:

  1. 修改hotel索引库结构,设置自定义拼音分词器
  2. 修改索引库的name、all字段,使用自定义分词器
  3. 索引库添加一个新字段suggestion,类型为completion类型,使用自定义的分词器
  4. 给HotelDoc类添加suggestion字段,内容包含brand、business
  5. 重新导入数据到hotel库

注意:name、all是可分词的,自动补全的brand、business是不可分词的,要使用不同的分词器组合

# 酒店数据索引库
PUT /hotel
{
  "settings": {
    "analysis": {
      "analyzer": {
        "text_anlyzer": {
          "tokenizer": "ik_max_word",
          "filter": "py"
        },
        "completion_analyzer": {
          "tokenizer": "keyword",
          "filter": "py"
        }
      },
      "filter": {
        "py": {
          "type": "pinyin",
          "keep_full_pinyin": false,
          "keep_joined_full_pinyin": true,
          "keep_original": true,
          "limit_first_letter_length": 16,
          "remove_duplicated_term": true,
          "none_chinese_pinyin_tokenize": false
        }
      }
    }
  },
  "mappings": {
    "properties": {
      "id":{
        "type": "keyword"
      },
      "name":{
        "type": "text",
        "analyzer": "text_anlyzer",
        "search_analyzer": "ik_smart",
        "copy_to": "all"
      },
      "address":{
        "type": "keyword",
        "index": false
      },
      "price":{
        "type": "integer"
      },
      "score":{
        "type": "integer"
      },
      "brand":{
        "type": "keyword",
        "copy_to": "all"
      },
      "city":{
        "type": "keyword"
      },
      "starName":{
        "type": "keyword"
      },
      "business":{
        "type": "keyword",
        "copy_to": "all"
      },
      "location":{
        "type": "geo_point"
      },
      "pic":{
        "type": "keyword",
        "index": false
      },
      "all":{
        "type": "text",
        "analyzer": "text_anlyzer",
        "search_analyzer": "ik_smart"
      },
      "suggestion":{
          "type": "completion",
          "analyzer": "completion_analyzer"
      }
    }
  }
}

GET /hotel/_search
{
  "query": {
    "match_all": {}
  }
}


GET /hotel/_search
{
  "suggest": {
    "titelSuggest": {
      "text": "h",
      "completion": {
        "field": "suggestion",
        "skip_duplicates": true,
        "size": 10
      }
    }
  }
}

RestAPI实现自动补全

先看请求参数构造的API:

在这里插入图片描述

再来看结果解析:

在这里插入图片描述

实现酒店搜索页面输入框的自动补全

查看前端页面,可以发现当我们在输入框键入时,前端会发起ajax请求:

在这里插入图片描述

在服务端编写接口,接收该请求,返回补全结果的集合,类型为List<String>

controller

@GetMapping("suggestion")
public List<String> getSuggestions(@RequestParam("key") String prefix){
    return hotelService.getSuggestions(prefix);
}

service

@Override
public List<String> getSuggestions(String prefix) {
    try {
        // 1. 准备Request
        SearchRequest request = new SearchRequest("hotel");
        // 2. 准备DSL
        request.source().suggest(new SuggestBuilder().addSuggestion(
                "suggestions",
                SuggestBuilders.completionSuggestion("suggestion")
                .prefix(prefix)
                .skipDuplicates(true)
                .size(10)
        ));
        // 3. 发送请求
        SearchResponse response = client.search(request, RequestOptions.DEFAULT);
        // 4. 解析结果
        Suggest suggest = response.getSuggest();
        // 4.1 根据补全查询名称,获取补全结果
        CompletionSuggestion suggestions =  suggest.getSuggestion("suggestions");
        // 4.2 获取options
        List<CompletionSuggestion.Entry.Option> options = suggestions.getOptions();
        // 4.3 遍历
        List<String> list = new ArrayList<>(options.size());
        for (CompletionSuggestion.Entry.Option option : options) {
            String text = option.getText().toString();
            list.add(text);
        }
        return list;
    } catch (IOException e) {
        throw new RuntimeException();
    }
}

数据同步

数据同步思路分析

elasticsearch中的酒店数据来自于mysql数据库,因此mysql数据发生改变时,elasticsearch也必须跟着改变,这个就是elasticsearch与mysql之间的数据同步

在这里插入图片描述

方案一:同步调用

在这里插入图片描述

方案二:异步通知

在这里插入图片描述

方案三:监听binlog

在这里插入图片描述

方式一:同步调用

  • 优点:实现简单,粗暴
  • 缺点:业务耦合度高

方式二:异步通知

  • 优点:低耦合,实现难度一般
  • 缺点:依赖mq的可靠性

方式三:监听binlog

  • 优点:完全解除服务间耦合
  • 缺点:开启binlog增加数据库负担、实现复杂度高

实现elasticsearch与数据库数据同步

利用MQ实现mysql与elasticsearch数据同步

利用课前资料提供的hotel-admin项目作为酒店管理的微服务。当酒店数据发生增、删、改时,要求对elasticsearch中数据也要完成相同操作。
步骤:

  • 导入课前资料提供的hotel-admin项目,启动并测试酒店数据的CRUD
  • 声明exchange、queue、RoutingKey
  • 在hotel-admin中的增、删、改业务中完成消息发送
  • 在hotel-demo中完成消息监听,并更新elasticsearch中数据
  • 启动并测试数据同步功能

在这里插入图片描述

  1. 导入项目
    在这里插入图片描述
  2. 声明exchange、queue、RoutingKey(两类消息,两种队列)

导入amqp依赖

<!--amqp-->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

yaml文件中配置rabbitmq

spring:
  rabbitmq:
    host: 10.211.55.6
    port: 5672
    username: itcast
    password: 123321
    virtual-host: /

MqConstants.java

public class MqConstants {

    /**
     * 交换机
     */
    public final static String HOTEL_EXCHANGE = "hotel.topic";

    /**
     * 监听新增和修改的队列
     */
    public final static String HOTEL_INSERT_QUEUE = "hotel.insert.queue";

    /**
     * 监听删除的队列
     */
    public final static String HOTEL_DELETE_QUEUE = "hotel.delete.queue";

    /**
     * 新增或修改的RoutingKey
     */
    public final static String HOTEL_INSERT_KEY = "hotel.insert";

    /**
     * 删除的RoutingKey
     */
    public final static String HOTEL_DELETE_KEY = "hotel.delete";
}

MqConfig.java

@Configuration
public class MqConfig {

    @Bean
    public TopicExchange topicExchange(){
        return new TopicExchange(MqConstants.HOTEL_EXCHANGE, true, false);
    }

    @Bean
    public Queue insertQueue(){
        return new Queue(MqConstants.HOTEL_INSERT_QUEUE, true);
    }

    @Bean
    public Queue deleteQueue(){
        return new Queue(MqConstants.HOTEL_DELETE_QUEUE, true);
    }

    @Bean
    public Binding insertQueueBinding(){
        return BindingBuilder.bind(insertQueue()).to(topicExchange()).with(MqConstants.HOTEL_INSERT_KEY);
    }

    @Bean
    public Binding deleteQueueBinding(){
        return BindingBuilder.bind(deleteQueue()).to(topicExchange()).with(MqConstants.HOTEL_DELETE_KEY);
    }
}
  1. 在hotel-admin中的增、删、改业务中完成消息发送

导入依赖,配置 yaml 文件

controller中

@RestController
@RequestMapping("hotel")
public class HotelController {

    @Autowired
    private IHotelService hotelService;
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @GetMapping("/{id}")
    public Hotel queryById(@PathVariable("id") Long id){
        return hotelService.getById(id);
    }

    @GetMapping("/list")
    public PageResult hotelList(
            @RequestParam(value = "page", defaultValue = "1") Integer page,
            @RequestParam(value = "size", defaultValue = "1") Integer size
    ){
        Page<Hotel> result = hotelService.page(new Page<>(page, size));

        return new PageResult(result.getTotal(), result.getRecords());
    }

    @PostMapping
    public void saveHotel(@RequestBody Hotel hotel){
        hotelService.save(hotel);
        rabbitTemplate.convertAndSend(MqConstants.HOTEL_EXCHANGE, MqConstants.HOTEL_INSERT_KEY, hotel.getId());
    }

    @PutMapping()
    public void updateById(@RequestBody Hotel hotel){
        if (hotel.getId() == null) {
            throw new InvalidParameterException("id不能为空");
        }
        hotelService.updateById(hotel);
        rabbitTemplate.convertAndSend(MqConstants.HOTEL_EXCHANGE, MqConstants.HOTEL_INSERT_KEY, hotel.getId());
    }

    @DeleteMapping("/{id}")
    public void deleteById(@PathVariable("id") Long id) {
        hotelService.removeById(id);
        rabbitTemplate.convertAndSend(MqConstants.HOTEL_EXCHANGE, MqConstants.HOTEL_DELETE_KEY, id);

    }
}

在这里插入图片描述

  1. 在hotel-demo中完成消息监听,并更新elasticsearch中数据

HotelListener.java

@Component
public class HotelListener {

    @Autowired
    private IHotelService hotelService;

    /**
     * 监听酒店新增或修改的业务
     * @param id
     */
    @RabbitListener(queues = MqConstants.HOTEL_INSERT_QUEUE)
    public void listenHotelInsertOrUpdate(Long id){
        hotelService.insertById(id);
    }

    /**
     * 监听酒店新删除的业务
     * @param id
     */
    @RabbitListener(queues = MqConstants.HOTEL_DELETE_QUEUE)
    public void listenHotelDelete(Long id){
        hotelService.deleteById(id);
    }
}

service

@Override
public void deleteById(Long id) {
    try {
        // 1. 准备Request
        DeleteRequest request = new DeleteRequest("hotel", id.toString());
        // 2. 准备发送请求
        client.delete(request, RequestOptions.DEFAULT);
    } catch (IOException e) {
        throw new RuntimeException(e);
    }
}

@Override
public void insertById(Long id) {
    try {
        // 0. 根据id查询酒店数据
        Hotel hotel = getById(id);
        // 转换为文档类型
        HotelDoc hotelDoc = new HotelDoc(hotel);

        // 1. 准备Request对象
        IndexRequest request = new IndexRequest("hotel").id(hotel.getId().toString());
        // 2. 准备Json文档
        request.source(JSON.toJSONString(hotelDoc), XContentType.JSON);
        // 3. 发送请求
        client.index(request, RequestOptions.DEFAULT);
    } catch (IOException e) {
        throw new RuntimeException(e);
    }
}

集群

ES集群结构

单机的elasticsearch做数据存储,必然面临两个问题:海量数据存储问题、单点故障问题。

  • 海量数据存储问题:将索引库从逻辑上拆分为N个分片(shard),存储到多个节点
  • 单点故障问题:将分片数据在不同节点备份(replica )

在这里插入图片描述

搭建ES集群

我们会在单机上利用docker容器运行多个es实例来模拟es集群。不过生产环境推荐大家每一台服务节点仅部署一个es的实例。

部署es集群可以直接使用docker-compose来完成,但这要求你的Linux虚拟机至少有4G的内存空间

创建es集群

首先编写一个docker-compose文件,内容如下:

version: '2.2'
services:
  es01:
    image: elasticsearch:7.12.1
    container_name: es01
    environment:
      - node.name=es01
      - cluster.name=es-docker-cluster
      - discovery.seed_hosts=es02,es03
      - cluster.initial_master_nodes=es01,es02,es03
      - "ES_JAVA_OPTS=-Xms512m -Xmx512m"
    volumes:
      - data01:/usr/share/elasticsearch/data
    ports:
      - 9200:9200
    networks:
      - elastic
  es02:
    image: elasticsearch:7.12.1
    container_name: es02
    environment:
      - node.name=es02
      - cluster.name=es-docker-cluster
      - discovery.seed_hosts=es01,es03
      - cluster.initial_master_nodes=es01,es02,es03
      - "ES_JAVA_OPTS=-Xms512m -Xmx512m"
    volumes:
      - data02:/usr/share/elasticsearch/data
    ports:
      - 9201:9200
    networks:
      - elastic
  es03:
    image: elasticsearch:7.12.1
    container_name: es03
    environment:
      - node.name=es03
      - cluster.name=es-docker-cluster
      - discovery.seed_hosts=es01,es02
      - cluster.initial_master_nodes=es01,es02,es03
      - "ES_JAVA_OPTS=-Xms512m -Xmx512m"
    volumes:
      - data03:/usr/share/elasticsearch/data
    networks:
      - elastic
    ports:
      - 9202:9200
volumes:
  data01:
    driver: local
  data02:
    driver: local
  data03:
    driver: local

networks:
  elastic:
    driver: bridge

es运行需要修改一些linux系统权限,修改/etc/sysctl.conf文件

vi /etc/sysctl.conf

添加下面的内容:

vm.max_map_count=262144

然后执行命令,让配置生效:

sysctl -p

通过docker-compose启动集群:

docker-compose up -d

集群状态监控

kibana可以监控es集群,不过新版本需要依赖es的x-pack 功能,配置比较复杂。

这里推荐使用cerebro来监控es集群状态,官方网址:https://github.com/lmenezes/cerebro

解压即可使用,非常方便。

解压好的目录如下:

在这里插入图片描述

进入对应的bin目录:

在这里插入图片描述

双击其中的cerebro.bat文件即可启动服务。

在这里插入图片描述

访问http://localhost:9000 即可进入管理界面:

在这里插入图片描述

输入你的elasticsearch的任意节点的地址和端口,点击connect即可:

在这里插入图片描述

绿色的条,代表集群处于绿色(健康状态)。

创建索引库

1)利用kibana的DevTools创建索引库

在DevTools中输入指令:

PUT /itcast
{
  "settings": {
    "number_of_shards": 3, // 分片数量
    "number_of_replicas": 1 // 副本数量
  },
  "mappings": {
    "properties": {
      // mapping映射定义 ...
    }
  }
}
2)利用cerebro创建索引库

利用cerebro还可以创建索引库:

在这里插入图片描述

填写索引库信息:

在这里插入图片描述

点击右下角的create按钮:

在这里插入图片描述

查看分片效果

回到首页,即可查看索引库分片效果:

在这里插入图片描述

每个索引库的分片数量、副本数量都是在创建索引库时指定的,并且分片数量一旦设置以后无法修改。语法如下:

在这里插入图片描述

ES集群的节点角色

elasticsearch中集群节点有不同的职责划分:

在这里插入图片描述

elasticsearch中的每个节点角色都有自己不同的职责,因此建议集群部署时,每个节点都有独立的角色。

在这里插入图片描述

集群脑裂问题

默认情况下,每个节点都是master eligible节点,因此一旦master节点宕机,其它候选节点会选举一个成为主节点。当主节点与其他节点网络故障时,可能发生脑裂问题。

为了避免脑裂,需要要求选票超过 ( eligible节点数量 + 1 )/ 2 才能当选为主,因此eligible节点数量最好是奇数。对应配置项是discovery.zen.minimum_master_nodes,在es7.0以后,已经成为默认配置,因此一般不会发生脑裂问题

在这里插入图片描述

master eligible节点的作用是什么?

  • 参与集群选主
  • 主节点可以管理集群状态、管理分片信息、处理创建和删除索引库的请求

data节点的作用是什么?

  • 数据的CRUD

coordinator节点的作用是什么?

  • 路由请求到其它节点
  • 合并查询到的结果,返回给用户

集群分布式存储

当新增文档时,应该保存到不同分片,保证数据均衡,那么coordinating node如何确定数据该存储到哪个分片呢?
elasticsearch会通过hash算法来计算文档应该存储到哪个分片:

在这里插入图片描述

说明:

  • _routing默认是文档的id
  • 算法与分片数量有关,因此索引库一旦创建,分片数量不能修改!

新增文档流程:

在这里插入图片描述

集群分布式查询

elasticsearch的查询分成两个阶段:

  • scatter phase:分散阶段,coordinating node会把请求分发到每一个分片
  • gather phase:聚集阶段,coordinating node汇总data node的搜索结果,并处理为最终结果集返回给用户

在这里插入图片描述

分布式新增如何确定分片?

  • coordinating node根据id做hash运算,得到结果对shard数量取余,余数就是对应的分片

分布式查询的两个阶段

  • 分散阶段: coordinating node将查询请求分发给不同分片
  • 收集阶段:将查询结果汇总到coordinating node ,整理并返回给用户

集群故障转移

集群的master节点会监控集群中的节点状态,如果发现有节点宕机,会立即将宕机节点的分片数据迁移到其它节点,确保数据安全,这个叫做故障转移。

在这里插入图片描述

在这里插入图片描述

在这里插入图片描述

故障转移:

  • master宕机后,EligibleMaster选举为新的主节点。
  • master节点监控分片、节点状态,将故障节点上的分片转移到正常节点,确保数据安全。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/178377.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

优先经验回放(prioritized experience replay)

prioritized experience replay 思路 优先经验回放出自ICLR 2016的论文《prioritized experience replay》。 prioritized experience replay的作者们认为&#xff0c;按照一定的优先级来对经验回放池中的样本采样&#xff0c;相比于随机均匀的从经验回放池中采样的效率更高&…

matlab绘图函数plot和fplot的区别

一、背景 有的函数用plot画就会报错&#xff0c;显示数据必须为可转换为双精度值的数值、日期时间、持续时间、分类或数组。 如下图所示&#xff1a; 但用fplot函数就没有问题&#xff0c;因此这里记录一下两者的区别&#xff0c;如果使用不当&#xff0c;画出的图可能就是下…

坚鹏:中国工商银行数字化转型发展现状与成功案例培训圆满结束

中国工商银行围绕“数字生态、数字资产、数字技术、数字基建、数字基因”五维布局&#xff0c;深入推进数字化转型&#xff0c;加快形成体系化、生态化实施路径&#xff0c;促进科技与业务加速融合&#xff0c;以“数字工行”建设推动“GBC”&#xff08;政务、企业、个人&…

用 jmeter 对 mongodb 进行测试方法合集

MongoDB 作为非关系型数据库&#xff0c;在现在企业中&#xff0c;还是有广泛的使用。但是&#xff0c;用 jmeter 如何测试 MongoDB&#xff0c;却是一个令很多人头疼的问题。去搜索&#xff0c;国内基本找不到一篇比较有价值的文章。 今天&#xff0c;我就用三种不同方法&…

如何通过RA过程识别Redcap UE?

以下是38.300中的描述 RedCap UE可以通过发送MSG3/MSGA的特定LCID识别&#xff0c;可选条件是通过MSGA/MSG1的PRACH occasion/PRACH preamble识别&#xff0c;根据这段描述&#xff0c;通过MSG3/MSGA的识别是必须项&#xff0c;而MSGA/MSG1的识别过程是可选项。如果通过MSGA/MS…

02 请求默认值

一、HTTP请求默认值&#xff1a;是用来管理所有请求共有的协议、网址、端口等信息的&#xff1b;通常情况下&#xff0c;一批量的接口测试&#xff0c;访问的是同一个站点&#xff0c;那么以上信息基本都是相同的&#xff0c;就不需要在每个请求中重复编写&#xff1b; 每个请…

Spring cloud - Hystrix源码

其实只是Hystrix初始化部分&#xff0c;我们从源码的角度分析一下EnableCircuitBreaker以及HystrixCommand注解的初始化过程。 从EnableCircuitBreaker入手 我们是通过在启动类添加EnableCircuitBreaker注解启用Hystrix的&#xff0c;所以&#xff0c;源码解析也要从这个注解…

2014年5月28日 Go生态洞察:GopherCon 2014大会回顾

&#x1f337;&#x1f341; 博主猫头虎&#xff08;&#x1f405;&#x1f43e;&#xff09;带您 Go to New World✨&#x1f341; &#x1f984; 博客首页——&#x1f405;&#x1f43e;猫头虎的博客&#x1f390; &#x1f433; 《面试题大全专栏》 &#x1f995; 文章图文…

ansible的基本安装

目录 一、简介 1.ansible自动化运维人工运维时代 2.自动化运维时代 3.ansible介绍 4.ansible特点 二、ansible实践 1.环境 2.ansible管理安装 3.ansible被管理安装 4.管理方式 5.添加被管理机器的ip 6.ssh密码认证方式管理 三、配置免密登录 1.ansible自带的密码…

hp惠普Victus Gaming Laptop 15-fa1025TX/fa1005tx原装出厂Win11系统ISO镜像

光影精灵9笔记本电脑原厂W11系统22H2恢复出厂时开箱状态一模一样 适用型号&#xff1a;15-fa1003TX&#xff0c;15-fa1005TX&#xff0c;15-fa1007TX&#xff0c;15-fa1025TX 链接&#xff1a;https://pan.baidu.com/s/1fBPjed1bhOS_crGIo2tP1w?pwduzvz 提取码&#xff1a…

【华为OD题库-033】经典屏保-java

题目 DVD机在视频输出时&#xff0c;为了保护电视显像管&#xff0c;在待机状态会显示"屏保动画”&#xff0c;如下图所示,DVD Logo在屏幕内来回运动&#xff0c;碰到边缘会反弹:请根据如下要求&#xff0c;实现屏保Logo坐标的计算算法 1、屏幕是一个800 * 600像素的矩形&…

软件测试:功能测试常用的测试用例大全

登录、添加、删除、查询模块是我们经常遇到的&#xff0c;这些模块的测试点该如何考虑 1)登录 ① 用户名和密码都符合要求(格式上的要求) ② 用户名和密码都不符合要求(格式上的要求) ③ 用户名符合要求&#xff0c;密码不符合要求(格式上的要求) ④ 密码符合要求&#xf…

JavaScript实现右键菜单

1、代码实现 window.onload function () {(function () {// 自定义右键菜单内容并插入到body最后一个节点前let dom <div id"rightMenuBars"><div class"rightMenu-group rightMenu-small"><div class"rightMenu-item"><…

【应用程序启动过程-三种加载控制器的方式-上午内容复习 Objective-C语言】

一、我们先来回忆一下,上午所有内容 1.首先呢,我们先说的是这个“应用程序启动过程”, 应用程序启动过程里面,有三方面内容 1)UIApplication对象介绍 2)AppDelegate对象介绍 3)应用程序启动过程 现在不知道大家对这个应用程序启动过程有印象吗, 2.首先,这个UIAp…

渗透测试高级技巧(一):分析验签与前端加密

“开局一个登录框” 在黑盒的安全测试的工作开始的时候&#xff0c;打开网站一般来说可能仅仅是一个登录框&#xff1b;很多时候这种系统往往都是自研或者一些业务公司专门研发。最基础的情况下&#xff0c;我们会尝试使用 SQL 注入绕过或者爆破之类的常规手段&#xff0c;如果…

开源计算机视觉库OpenCV详解

目录 1、概述 2、OpenCV详细介绍 2.1、OpenCV的起源 2.2、OpenCV开发语言 2.3、OpenCV的应用领域 3、OpenCV模块划分 4、OpenCV源码文件结构 4.1、根目录介绍 4.2、常用模块介绍 4.3、CUDA加速模块 5、OpenCV配置以及Visual Studio使用OpenCV 6、关于Lena图片 7、…

简于外 强于内,联想全新ThinkCentre M90a Pro Gen4以强劲性能开启商用新体验

近日&#xff0c;联想发布了最新一代商用台式一体机联想ThinkCentre M90a Pro Gen4。作为联想ThinkCentre M大师系列的旗舰产品&#xff0c;其配备了优质的显示屏&#xff0c;拥有强大的性能和稳定安全的特性&#xff0c;能够满足多样的工作场合&#xff0c;为商用一体机的行业…

面试题:你怎么理解System.out.println() ?

文章目录 首先分析System源码out源码分析println分析拓展知识点 你如何理解System.out.println() ? 学了这么久的面向对象编程&#xff0c;那如何用一行代码体现呢&#xff1f; 如果你能自己读懂System.out.println()&#xff0c;就真正了解了Java面向对象编程的含义 面向对…

Ajax入门-Express框架介绍和基本使用

电脑实在忒垃圾了&#xff0c;出现问题耗费了至少一刻钟time&#xff0c;然后才搞出来正常的效果&#xff1b; 效果镇楼 另外重新安装了VScode软件&#xff0c;原来的老是报错&#xff0c;bug。。&#xff1b; 2个必要的安装命令&#xff1b; 然后建立必要的文件夹和文件&…

MAX/MSP SDK学习07:list传递

实现自定义Obejct&#xff0c;要求将传入的一组数据100后传出。 #include "ext.h" #include "ext_obex.h" typedef struct _listTrans {t_object ob;void* outLet;t_atom* fArr;long listNum;} t_listTrans;void* listTrans_new(t_symbol* s, long arg…