微服务技术栈SpringCloud+RabbitMQ+Docker+Redis+搜索+分布式(五):分布式搜索 ES-下

文章目录

  • 一、数据聚合
    • 1.1 聚合种类
    • 1.2 DSL实现聚合
    • 1.3 RestAPI实现聚合
    • 1.4 演示:多条件聚合
  • 二、自动补全
    • 2.1 拼音分词器
    • 2.2 自定义分词器
    • 2.3 DSL自动补全查询
    • 2.5 实现酒店搜索框自动补全
      • 2.5.1 修改酒店索引库数据结构
      • 2.5.2 RestAPI实现自动补全查询
      • 2.5.3 实战
  • 三、数据同步
    • 3.1 实现数据同步的方法
    • 3.2 使用消息队列MQ实现数据同步
      • 3.2.1 导入hotel-admin
      • 3.2.2 声明交换机、队列、routingkey
  • 四、集群
    • 4.1 搭建ES集群
    • 4.2 集群职责和脑裂问题
    • 4.3 集群故障转移
    • 4.4 集群分布式存储与查询


一、数据聚合

1.1 聚合种类

聚合(aggregations)可以实现对文档数据的统计、分析、运算。聚合常见的有三类:

  1. 桶(Bucket)聚合:用来对文档做分组
    TermAggregation:按照文档字段值分组
    Date Histogram:按照日期阶梯分组,例如一周为一组,或者一月为一组
  2. 度量(Metric)聚合:用以计算一些值,比如:最大值、最小值、平均值等
    Avg:求平均值
    Max:求最大值
    Min:求最小值
    Stats:同时求max、min、avg、sum等
  3. 管道(pipeline)聚合:其它聚合的结果为基础做聚合

注意:参与聚合的字段类型必须是:keyword、数值、日期、布尔,一定不能是可分词的类型。

1.2 DSL实现聚合

# 使用DSL实现聚合
# 1.bucket桶聚合 + 限定聚合范围
# 例:根据酒店品牌名做聚合(并且限定价格不高于200的),并按照结果的升序排序,显示前5个品牌
GET /hotel/_search
{
  "query": {
    "range": {
      "price": {
        "lte": 200
      }
    }
  },
  "size": 0, //设置size为0,结果中不包含文档,只包含聚合结果
  "aggs": {  // 定义聚合
    "brandAgg": { // 定义聚合名
      "terms": {  // 聚合类型,按照品牌名聚合,所以选择term
        "field": "brand", // 参与聚合字段
        "order": {
          "_count": "asc"  //指定排序规则 升序
        }, 
        "size": 20 //希望获得聚合结果数
      }
    }
  }
}

# 2.Metrics聚合
# 例:获得每个品牌的用户评分的min、max、avg,并且按照avg排序(降序)

GET /hotel/_search
{
  "size": 0,
  "aggs": {
    "brandAgg": {
      "terms": {
        "field": "brand",
        "size": 20,
        "order": {
          "score_stats.avg": "desc"
        }
      },
      "aggs": { //子聚合
        "score_stats": { //子聚合名
          "stats": {  //聚合类型,stats可以计算min、max、avg等
            "field": "score"  //聚合字段
          }
        }
      }
    }
  }
}

1.3 RestAPI实现聚合

在这里插入图片描述
在这里插入图片描述

    /**
     * 桶bucket聚合
     */
    @Test
    void testAgg() throws IOException {
        // 1.准备请求
        SearchRequest request = new SearchRequest("hotel");
        // 2.请求参数
        // 2.1.size
        request.source().size(0);
        // 2.2.聚合
        request.source().aggregation(
                AggregationBuilders.terms("brandAgg").field("brand").size(20));
        // 3.发出请求
        SearchResponse response = client.search(request, RequestOptions.DEFAULT);
        
        // 4.解析结果
        Aggregations aggregations = response.getAggregations();
        // 4.1.根据聚合名称,获取聚合结果
        Terms brandAgg = aggregations.get("brandAgg");
        // 4.2.获取buckets
        List<? extends Terms.Bucket> buckets = brandAgg.getBuckets();
        // 4.3.遍历
        for (Terms.Bucket bucket : buckets) {
            String brandName = bucket.getKeyAsString();
            System.out.println("brandName = " + brandName);
            long docCount = bucket.getDocCount();
            System.out.println("docCount = " + docCount);
        }
    }

1.4 演示:多条件聚合

在这里插入图片描述


@Slf4j
@Service
public class HotelService extends ServiceImpl<HotelMapper, Hotel> implements IHotelService {

    @Autowired
    private RestHighLevelClient restHighLevelClient;
    @Override
    public Map<String, List<String>> filters() {
        try {
            // 1.准备请求
            SearchRequest request = new SearchRequest("hotel");
            // 2.请求参数
            // 2.1.size
            request.source().size(0);
            // 2.2.聚合
            buildAggregation(request);
            // 3.发出请求
            SearchResponse response = restHighLevelClient.search(request, RequestOptions.DEFAULT);

            // 4.解析结果
            Map<String, List<String>> result = new HashMap<>();
            Aggregations aggregations = response.getAggregations();

            // 4.1.根据品牌名称,获取聚合结果
            List<String> brandList = getAggByName(aggregations, "brandAgg");
            // 放入map
            result.put("品牌",brandList);
            // 4.2.根据城市名称,获取聚合结果
            List<String> cityList = getAggByName(aggregations, "cityAgg");
            // 放入map
            result.put("城市",cityList);
            // 4.3.根据星级名称,获取聚合结果
            List<String> starList = getAggByName(aggregations, "starAgg");
            // 放入map
            result.put("星级",starList);
            return result;
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    public List<String> getAggByName(Aggregations aggregations, String aggName) {
        // 4.1.根据聚合名称,获取聚合结果
        Terms brandAgg = aggregations.get(aggName);
        // 4.2.获取buckets
        List<? extends Terms.Bucket> buckets = brandAgg.getBuckets();
        // 4.3.遍历
        List<String> brandList = new ArrayList<>();
        for (Terms.Bucket bucket : buckets) {
            String key = bucket.getKeyAsString();
            brandList.add(key);
        }
        return brandList;
    }

    public void buildAggregation(SearchRequest request) {
        request.source().aggregation(AggregationBuilders
                .terms("brandAgg")
                        .field("brand")
                        .size(100));
        request.source().aggregation(AggregationBuilders
                .terms("cityAgg")
                .field("city")
                .size(100));
        request.source().aggregation(AggregationBuilders
                .terms("starAgg")
                .field("starName")
                .size(100));
    }
}

测试

@SpringBootTest
public class HotelDemoApplicationTest {
    @Autowired
    private IHotelService hotelService;
    @Test
    void contextLoads(){
        Map<String, List<String>> filters = hotelService.filters();
        System.out.println(filters);
    }
}

结果:
在这里插入图片描述

二、自动补全

自动补全如下图所示:
在这里插入图片描述

2.1 拼音分词器

要实现根据字母做补全,就必须对文档按照拼音分词。在GitHub上恰好有elasticsearch的拼音分词插件。地址:https://github.com/medcl/elasticsearch-analysis-pinyin
安装方式与IK分词器一样,分三步:

  1. 解压
  2. 上传到虚拟机中,elasticsearch的plugin目录
  3. 重启elasticsearch
  4. 测试
    在这里插入图片描述

2.2 自定义分词器

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

演示:

# 自定义拼音分词器
PUT /test
{
  "settings": {
    "analysis": {
      "analyzer": { 
        "my_analyzer": { 
          "tokenizer": "ik_max_word",
          "filter": "py"
        }
      },
      "filter": {
        "py": { 
          "type": "pinyin",
          "keep_full_pinyin": false,
          "keep_joined_full_pinyin": true,
          "keep_original": true,
          "limit_first_letter_length": 16,
          "remove_duplicated_term": true,
          "none_chinese_pinyin_tokenize": false
        }
      }
    }
  },
  "mappings": {
    "properties": {
      "name": {
        "type": "text",
        "analyzer": "my_analyzer",
        "search_analyzer": "ik_smart"
      }
    }
  }
}

POST /test/_doc/1
{
  "id": 1,
  "name": "狮子"
}
POST /test/_doc/2
{
  "id": 2,
  "name": "虱子"
}

GET /test/_search
{
  "query": {
    "match": {
      "name": "掉入狮子笼咋办"
    }
  }
}

在这里插入图片描述

注意:拼音分词器通常在创建索引库时使用,搜索时使用普通分词器即可

2.3 DSL自动补全查询

在这里插入图片描述
查询语法如下

// 自动补全查询
POST /test/_search
{
  "suggest": {
    "title_suggest": {  // 自定义补全查询名称
      "text": "s", // 关键字
      "completion": {
        "field": "title", // 补全字段
        "skip_duplicates": true, // 跳过重复的
        "size": 10 // 获取前10条结果
      }
    }
  }
}

演示:

# 2.自动补全
# 2.1 创建一个 自动补全的索引库 属性有title
DELETE /test
PUT test
{
  "mappings": {
    "properties": {
      "title":{
        "type": "completion"
      }
    }
  }
}
# 2.2 插入示例数据
POST test/_doc
{
  "title": ["Sony", "WH-1000XM3"]
}
POST test/_doc
{
  "title": ["SK-II", "PITERA"]
}
POST test/_doc
{
  "title": ["Nintendo", "switch"]
}


# 2.3 自动补全查询
# 例:输入一个关键字s,看自动补全的结果
# 结果:"SK-II""Sony""switch"
POST /test/_search
{
  "suggest": {
    "title_suggest": {
      "text": "s", 
      "completion": {
        "field": "title",
        "skip_duplicates": true, 
        "size": 10 
      }
    }
  }
}

结果:
在这里插入图片描述

2.5 实现酒店搜索框自动补全

2.5.1 修改酒店索引库数据结构

在这里插入图片描述

1.修改索引库结构

# 酒店数据索引库
GET /hotel/_mapping
DELETE /hotel
PUT /hotel
{
  "settings": {
    "analysis": {
      "analyzer": {
        "text_anlyzer": {
          "tokenizer": "ik_max_word",
          "filter": "py"
        },
        "completion_analyzer": {
          "tokenizer": "keyword",
          "filter": "py"
        }
      },
      "filter": {
        "py": {
          "type": "pinyin",
          "keep_full_pinyin": false,
          "keep_joined_full_pinyin": true,
          "keep_original": true,
          "limit_first_letter_length": 16,
          "remove_duplicated_term": true,
          "none_chinese_pinyin_tokenize": false
        }
      }
    }
  },
  "mappings": {
    "properties": {
      "id":{
        "type": "keyword"
      },
      "name":{
        "type": "text",
        "analyzer": "text_anlyzer",
        "search_analyzer": "ik_smart",
        "copy_to": "all"
      },
      "address":{
        "type": "keyword",
        "index": false
      },
      "price":{
        "type": "integer"
      },
      "score":{
        "type": "integer"
      },
      "brand":{
        "type": "keyword",
        "copy_to": "all"
      },
      "city":{
        "type": "keyword"
      },
      "starName":{
        "type": "keyword"
      },
      "business":{
        "type": "keyword",
        "copy_to": "all"
      },
      "location":{
        "type": "geo_point"
      },
      "pic":{
        "type": "keyword",
        "index": false
      },
      "all":{
        "type": "text",
        "analyzer": "text_anlyzer",
        "search_analyzer": "ik_smart"
      },
      "suggestion":{
          "type": "completion",
          "analyzer": "completion_analyzer"
      }
    }
  }
}

# 自动补全查询
GET /hotel/_search
{
  "suggest": {
    "mySuggestion": {
      "text": "shang",
      "completion": {
        "field": "suggestion",
        "skip_duplicates": true, 
        "size": 10 
      }
    }
  }
}

2.修改HotelDoc

@Data
@NoArgsConstructor
public class HotelDoc {
    private Long id;
    private String name;
    private String address;
    private Integer price;
    private Integer score;
    private String brand;
    private String city;
    private String starName;
    private String business;
    private String location;
    private String pic;
    private Object distance; //新加加字段"距离":酒店距你选择位置的距离
    private Boolean isAD; //新加加字段"标记":给你置顶的酒店添加一个标记
    private List<String> suggestion;//新加该字段用于自动补全

    public HotelDoc(Hotel hotel) {
        this.id = hotel.getId();
        this.name = hotel.getName();
        this.address = hotel.getAddress();
        this.price = hotel.getPrice();
        this.score = hotel.getScore();
        this.brand = hotel.getBrand();
        this.city = hotel.getCity();
        this.starName = hotel.getStarName();
        this.business = hotel.getBusiness();
        this.location = hotel.getLatitude() + ", " + hotel.getLongitude();
        this.pic = hotel.getPic();
        // 自动补全字段的处理
        this.suggestion = new ArrayList<>();
        // 添加品牌、城市
        this.suggestion.add(this.brand);
        this.suggestion.add(this.city);
        // 判断商圈是否包含/
        if (this.business.contains("/")) {
            // business有多个值,需要切割
            String[] arr = this.business.split("/");
            // business的每个值都要加入到suggestion中
            Collections.addAll(this.suggestion, arr);
        }else{
            this.suggestion.add(this.business);
        }
    }
}

3.【重新导入数据,不演示,参见之前的批量导入文档功能】查询结果
在这里插入图片描述

2.5.2 RestAPI实现自动补全查询

在这里插入图片描述

在这里插入图片描述

    /**
     * 自动补全查询
     */
    @Test
    void testSuggest() throws IOException {
        // 1.准备请求
        SearchRequest request = new SearchRequest("hotel");
        // 2.请求参数
        request.source().suggest(new SuggestBuilder().addSuggestion(
                        "hotelSuggest",
                        SuggestBuilders
                                .completionSuggestion("suggestion")
                                .size(10)
                                .skipDuplicates(true)
                                .prefix("s")
                ));
        // 3.发出请求
        SearchResponse response = client.search(request, RequestOptions.DEFAULT);
        // 4.解析结果
        Suggest suggest = response.getSuggest();
        // 4.1.根据补全查询名称,获取补全结果
        CompletionSuggestion suggestion = suggest.getSuggestion("hotelSuggest");
        // 4.2.获取options
        for (CompletionSuggestion.Entry.Option option : suggestion.getOptions()) {
            // 4.3.获取补全的结果
            String str = option.getText().toString();
            System.out.println(str);
        }
    }

2.5.3 实战

在这里插入图片描述
Mapper层

@RestController
@RequestMapping("hotel")
public class HotelController {

    @Autowired
    private IHotelService hotelService;

    @PostMapping("list")
    public PageResult search(@RequestBody RequestParams params) {
        return hotelService.search(params);
    }

    @PostMapping("filters")
    public Map<String, List<String>> getFilters(@RequestBody RequestParams params) {
        return hotelService.filters(params);
    }

    @GetMapping("suggestion")
    public List<String> getSuggestion(@RequestParam("key") String key) {
        return hotelService.getSuggestion(key);
    }
}

Service层

@Slf4j
@Service
public class HotelService extends ServiceImpl<HotelMapper, Hotel> implements IHotelService {

    @Autowired
    private RestHighLevelClient restHighLevelClient;
    /**
     * 自动补全查询
     */
    @Override
    public List<String> getSuggestion(String key)  {
        try {
            // 1.准备请求
            SearchRequest request = new SearchRequest("hotel");
            // 2.请求参数
            request.source().suggest(new SuggestBuilder().addSuggestion(
                    "hotelSuggest",
                    SuggestBuilders
                            .completionSuggestion("suggestion")
                            .size(10)
                            .skipDuplicates(true)
                            .prefix(key)
            ));
            // 3.发出请求
            SearchResponse response = restHighLevelClient.search(request, RequestOptions.DEFAULT);
            // 4.解析结果
            Suggest suggest = response.getSuggest();
            // 4.1.根据补全查询名称,获取补全结果
            CompletionSuggestion suggestion = suggest.getSuggestion("hotelSuggest");
            // 4.2.获取options
            List<String> result = new ArrayList<>();
            for (CompletionSuggestion.Entry.Option option : suggestion.getOptions()) {
                // 4.3.获取补全的结果
                String str = option.getText().toString();
                result.add(str);
            }
            return result;
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }
}

结果演示
在这里插入图片描述

三、数据同步

3.1 实现数据同步的方法

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

3.2 使用消息队列MQ实现数据同步

在这里插入图片描述

3.2.1 导入hotel-admin

3.2.2 声明交换机、队列、routingkey

在这里插入图片描述

由于增和改都相当于插入,所以共用一个队列;删除占用一个队列。

一、对消费者hotel-demo的操作

  1. 引入amqp依赖和配置rabbitmq的yml文件
		<!--amqp-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
server:
  port: 8089
spring:
  datasource:
    url: jdbc:mysql://mysql:3306/heima?useSSL=false
    username: root
    password: 123
    driver-class-name: com.mysql.jdbc.Driver
  rabbitmq:
    host: 192.168.150.101
    port: 5672
    username: itcast
    password: 123321
    virtual-host: /
logging:
  level:
    cn.itcast: debug
  pattern:
    dateformat: HH:mm:ss:SSS
mybatis-plus:
  configuration:
    map-underscore-to-camel-case: true
  type-aliases-package: cn.itcast.hotel.pojo
  1. 定义mq的一些常量
public class HotelMqConstants {
    // 交换机名称
    public static final String EXCHANGE_NAME = "hotel.topic";
    // 新增修改队列
    public static final String INSERT_QUEUE_NAME = "hotel.insert.queue";
    // 删除队列
    public static final String DELETE_QUEUE_NAME = "hotel.delete.queue";
    // 新增修改的RoutingKey
    public static final String INSERT_KEY = "hotel.insert";
    // 删除的RoutingKey
    public static final String DELETE_KEY = "hotel.delete";
}
  1. 声明交换机和队列,并监听MQ消息【注解方式】
@Component
public class HotelListener {

    @Autowired
    private IHotelService hotelService;

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = HotelMqConstants.INSERT_QUEUE_NAME),
            exchange = @Exchange(name = HotelMqConstants.EXCHANGE_NAME, type = ExchangeTypes.TOPIC),
            key = HotelMqConstants.INSERT_KEY
    ))
    public void listenHotelInsert(Long hotelId){
        // 新增
        hotelService.saveById(hotelId);
    }

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = HotelMqConstants.DELETE_QUEUE_NAME),
            exchange = @Exchange(name = HotelMqConstants.EXCHANGE_NAME, type = ExchangeTypes.TOPIC),
            key = HotelMqConstants.DELETE_KEY
    ))
    public void listenHotelDelete(Long hotelId){
        // 删除
        hotelService.deleteById(hotelId);
    }
}

【bean方式】

@Configuration
public class MqConfig {
    @Bean
    public TopicExchange topicExchange(){
        return new TopicExchange(HotelMqConstants.EXCHANGE_NAME,true,false);
    }

    @Bean
    public Queue insertQueue(){
        return new Queue(HotelMqConstants.INSERT_QUEUE_NAME,true);
    }
    @Bean
    public Queue deleteQueue(){
        return new Queue(HotelMqConstants.DELETE_QUEUE_NAME,true);
    }

    @Bean
    public Binding insertQueueBinding(){
        return BindingBuilder
                .bind(insertQueue())
                .to(topicExchange())
                .with(HotelMqConstants.INSERT_KEY);
    }

    @Bean
    public Binding deleteQueueBinding(){
        return BindingBuilder
                .bind(deleteQueue())
                .to(topicExchange())
                .with(HotelMqConstants.DELETE_KEY);
    }
}
  1. RestAPI实现删改
@Slf4j
@Service
public class HotelService extends ServiceImpl<HotelMapper, Hotel> implements IHotelService {

    @Autowired
    private RestHighLevelClient restHighLevelClient;

    /**
     * 搜索框查询
     */
    @Override
    public PageResult search(RequestParams params) {
        try {
            // 1.准备Request
            SearchRequest request = new SearchRequest("hotel");

            // 2.准备请求参数

            // 2.1.多条件查询和过滤
            buildBasicQuery(params, request);

            // 2.2.分页
            int page = params.getPage();
            int size = params.getSize();
            request.source().from((page - 1) * size).size(size);

            /**
             * 2.3.距离排序
             */
            String location = params.getLocation();
            if (StringUtils.isNotBlank(location)) {// 不为空则查询
                request.source().sort(SortBuilders
                        .geoDistanceSort("location", new GeoPoint(location))
                        .order(SortOrder.ASC)
                        .unit(DistanceUnit.KILOMETERS)
                );
            }

            // 3.发送请求
            SearchResponse response = restHighLevelClient.search(request, RequestOptions.DEFAULT);

            // 4.解析响应
            return handleResponse(response);
        } catch (IOException e) {
            throw new RuntimeException("搜索数据失败", e);
        }
    }

    /**
     * 复合查询
     */
    private void buildBasicQuery(RequestParams params, SearchRequest request) {
        // 1.准备Boolean复合查询
        BoolQueryBuilder boolQuery = QueryBuilders.boolQuery();

        /**
         * 1.查询关键字
         * must参与 算分
         */
        // 1.1.关键字搜索,match查询,放到must中
        String key = params.getKey();
        if (StringUtils.isNotBlank(key)) {
            // 不为空,根据关键字查询
            boolQuery.must(QueryBuilders.matchQuery("all", key));
        } else {
            // 为空,查询所有
            boolQuery.must(QueryBuilders.matchAllQuery());
        }
        /**
         * 2.条件过滤:多条件复合查询
         * 根据 “品牌 城市 星级 价格范围” 过滤数据
         * filter不参与 算分
         */

        // 1.2.品牌
        String brand = params.getBrand();
        if (StringUtils.isNotBlank(brand)) { // 不为空则查询
            boolQuery.filter(QueryBuilders.termQuery("brand", brand));
        }
        // 1.3.城市
        String city = params.getCity();
        if (StringUtils.isNotBlank(city)) {// 不为空则查询
            boolQuery.filter(QueryBuilders.termQuery("city", city));
        }
        // 1.4.星级
        String starName = params.getStarName();
        if (StringUtils.isNotBlank(starName)) {// 不为空则查询
            boolQuery.filter(QueryBuilders.termQuery("starName", starName));
        }
        // 1.5.价格范围
        Integer minPrice = params.getMinPrice();
        Integer maxPrice = params.getMaxPrice();
        if (minPrice != null && maxPrice != null) {// 不为空则查询
            maxPrice = maxPrice == 0 ? Integer.MAX_VALUE : maxPrice;
            boolQuery.filter(QueryBuilders.rangeQuery("price").gte(minPrice).lte(maxPrice));
        }

        /**
         * 3.算分函数查询
         * 置顶功能:给你置顶的酒店添加一个标记,并按其算分
         */
        FunctionScoreQueryBuilder functionScoreQuery = QueryBuilders.functionScoreQuery(
                boolQuery, // 原始查询,boolQuery
                new FunctionScoreQueryBuilder.FilterFunctionBuilder[]{ // function数组
                        new FunctionScoreQueryBuilder.FilterFunctionBuilder(
                                QueryBuilders.termQuery("isAD", true), // 过滤条件
                                ScoreFunctionBuilders.weightFactorFunction(10) // 算分函数
                        )
                }
        );

        /**
         * 4.设置查询条件
          */
        request.source().query(functionScoreQuery);
    }

    /**
     * 结果解析
     */
    private PageResult handleResponse(SearchResponse response) {
        SearchHits searchHits = response.getHits();
        // 4.1.总条数
        long total = searchHits.getTotalHits().value;
        // 4.2.获取文档数组
        SearchHit[] hits = searchHits.getHits();
        // 4.3.遍历
        List<HotelDoc> hotels = new ArrayList<>(hits.length);
        for (SearchHit hit : hits) {
            // 4.4.获取source
            String json = hit.getSourceAsString();
            // 4.5.反序列化,非高亮的
            HotelDoc hotelDoc = JSON.parseObject(json, HotelDoc.class);
            // 4.6.处理高亮结果
            // 1)获取高亮map
            Map<String, HighlightField> map = hit.getHighlightFields();
            if (map != null && !map.isEmpty()) {
                // 2)根据字段名,获取高亮结果
                HighlightField highlightField = map.get("name");
                if (highlightField != null) {
                    // 3)获取高亮结果字符串数组中的第1个元素
                    String hName = highlightField.getFragments()[0].toString();
                    // 4)把高亮结果放到HotelDoc中
                    hotelDoc.setName(hName);
                }
            }
            // 4.8.排序信息
            Object[] sortValues = hit.getSortValues(); // 获取排序结果
            if (sortValues.length > 0) {
                /**
                 * 由于该程序是根据距离[酒店距你选择位置的距离]进行排序,所以排序结果为距离
                 */
                hotelDoc.setDistance(sortValues[0]);
            }

            // 4.9.放入集合
            hotels.add(hotelDoc);
        }
        return new PageResult(total, hotels);
    }



    /**
     * 多条件聚合
     */
    @Override
    public Map<String, List<String>> filters(RequestParams params) {
        try {
            // 1.准备请求
            SearchRequest request = new SearchRequest("hotel");
            // 2.请求参数
            // 2.1.query查询信息
            buildBasicQuery(params, request);
            // 2.2.size
            request.source().size(0);
            // 2.3.聚合
            buildAggregation(request);
            // 3.发出请求
            SearchResponse response = restHighLevelClient.search(request, RequestOptions.DEFAULT);

            // 4.解析结果
            Map<String, List<String>> result = new HashMap<>();
            Aggregations aggregations = response.getAggregations();

            // 4.1.根据品牌名称,获取聚合结果
            List<String> brandList = getAggByName(aggregations, "brandAgg");
            // 放入map
            result.put("品牌",brandList);
            // 4.2.根据城市名称,获取聚合结果
            List<String> cityList = getAggByName(aggregations, "cityAgg");
            // 放入map
            result.put("城市",cityList);
            // 4.3.根据星级名称,获取聚合结果
            List<String> starList = getAggByName(aggregations, "starAgg");
            // 放入map
            result.put("星级",starList);
            return result;
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    public List<String> getAggByName(Aggregations aggregations, String aggName) {
        // 4.1.根据聚合名称,获取聚合结果
        Terms brandAgg = aggregations.get(aggName);
        // 4.2.获取buckets
        List<? extends Terms.Bucket> buckets = brandAgg.getBuckets();
        // 4.3.遍历
        List<String> brandList = new ArrayList<>();
        for (Terms.Bucket bucket : buckets) {
            String key = bucket.getKeyAsString();
            brandList.add(key);
        }
        return brandList;
    }

    public void buildAggregation(SearchRequest request) {
        request.source().aggregation(AggregationBuilders
                .terms("brandAgg")
                        .field("brand")
                        .size(100));
        request.source().aggregation(AggregationBuilders
                .terms("cityAgg")
                .field("city")
                .size(100));
        request.source().aggregation(AggregationBuilders
                .terms("starAgg")
                .field("starName")
                .size(100));
    }

    /**
     * 自动补全查询
     */
    @Override
    public List<String> getSuggestion(String key)  {
        try {
            // 1.准备请求
            SearchRequest request = new SearchRequest("hotel");
            // 2.请求参数
            request.source().suggest(new SuggestBuilder().addSuggestion(
                    "hotelSuggest",
                    SuggestBuilders
                            .completionSuggestion("suggestion")
                            .size(10)
                            .skipDuplicates(true)
                            .prefix(key)
            ));
            // 3.发出请求
            SearchResponse response = restHighLevelClient.search(request, RequestOptions.DEFAULT);
            // 4.解析结果
            Suggest suggest = response.getSuggest();
            // 4.1.根据补全查询名称,获取补全结果
            CompletionSuggestion suggestion = suggest.getSuggestion("hotelSuggest");
            // 4.2.获取options
            List<String> result = new ArrayList<>();
            for (CompletionSuggestion.Entry.Option option : suggestion.getOptions()) {
                // 4.3.获取补全的结果
                String str = option.getText().toString();
                result.add(str);
            }
            return result;
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    @Override
    public void deleteById(Long hotelId) {
        try {
            // 1.创建request
            DeleteRequest request = new DeleteRequest("hotel", hotelId.toString());
            // 2.发送请求
            restHighLevelClient.delete(request, RequestOptions.DEFAULT);
        } catch (IOException e) {
            throw new RuntimeException("删除酒店数据失败", e);
        }
    }

    @Override
    public void saveById(Long hotelId) {
        try {
            // 查询酒店数据,应该基于Feign远程调用hotel-admin,根据id查询酒店数据(现在直接去数据库查)
            Hotel hotel = getById(hotelId);
            // 转换
            HotelDoc hotelDoc = new HotelDoc(hotel);

            // 1.创建Request
            IndexRequest request = new IndexRequest("hotel").id(hotelId.toString());
            // 2.准备参数
            request.source(JSON.toJSONString(hotelDoc), XContentType.JSON);
            // 3.发送请求
            restHighLevelClient.index(request, RequestOptions.DEFAULT);
        } catch (IOException e) {
            throw new RuntimeException("新增酒店数据失败", e);
        }
    }
}

二、对发送者hotel-admin的操作

  1. 引入amqp依赖和配置rabbitmq的yml文件【同上】
  2. 定义mq的一些常量【同上】
  3. 当发送者对mysql数据库改动时,发送消息给MQ
@RestController
@RequestMapping("hotel")
public class HotelController {

    @Autowired
    private IHotelService hotelService;

    // 注入发送消息的api
    @Autowired
    private RabbitTemplate rabbitTemplate;

    /**
     * 根据id查询
     */
    @GetMapping("/{id}")
    public Hotel queryById(@PathVariable("id") Long id){
        return hotelService.getById(id);
    }

    /**
     * 查询当前页内容
     */
    @GetMapping("/list")
    public PageResult hotelList(
            @RequestParam(value = "page", defaultValue = "1") Integer page,
            @RequestParam(value = "size", defaultValue = "1") Integer size
    ){
        Page<Hotel> result = hotelService.page(new Page<>(page, size));

        return new PageResult(result.getTotal(), result.getRecords());
    }

    /**
     * 新增,并发送给mq消息
     */
    @PostMapping
    public void saveHotel(@RequestBody Hotel hotel){
        // 新增酒店
        hotelService.save(hotel);
        // 发送MQ消息
        rabbitTemplate.convertAndSend(HotelMqConstants.EXCHANGE_NAME, HotelMqConstants.INSERT_KEY, hotel.getId());
    }

    /**
     * 修改,并发送给mq消息
     */
    @PutMapping()
    public void updateById(@RequestBody Hotel hotel){
        if (hotel.getId() == null) {
            throw new InvalidParameterException("id不能为空");
        }
        hotelService.updateById(hotel);

        // 发送MQ消息
        rabbitTemplate.convertAndSend(HotelMqConstants.EXCHANGE_NAME, HotelMqConstants.INSERT_KEY, hotel.getId());
    }

    /**
     * 删除,并发送给mq消息
     */
    @DeleteMapping("/{id}")
    public void deleteById(@PathVariable("id") Long id) {
        hotelService.removeById(id);

        // 发送MQ消息
        rabbitTemplate.convertAndSend(HotelMqConstants.EXCHANGE_NAME, HotelMqConstants.DELETE_KEY, id);
    }
}

四、集群

单机的elasticsearch做数据存储,必然面临两个问题:海量数据存储问题、单点故障问题。
>> 海量数据存储问题:将索引库从逻辑上拆分为N个分片(shard),存储到多个节点
>> 单点故障问题:将分片数据在不同节点备份(replica )

4.1 搭建ES集群

我们会在单机上利用docker容器运行多个es实例来模拟es集群。不过生产环境推荐大家每一台服务节点仅部署一个es的实例。
部署es集群可以直接使用docker-compose来完成,但这要求你的Linux虚拟机至少有4G的内存空间

  1. 创建es集群
    首先编写一个docker-compose文件,内容如下:
version: '2.2'
services:
  es01:
    image: elasticsearch:7.12.1
    container_name: es01
    environment:
      - node.name=es01
      - cluster.name=es-docker-cluster
      - discovery.seed_hosts=es02,es03
      - cluster.initial_master_nodes=es01,es02,es03
      - "ES_JAVA_OPTS=-Xms512m -Xmx512m"
    volumes:
      - data01:/usr/share/elasticsearch/data
    ports:
      - 9200:9200
    networks:
      - elastic
  es02:
    image: elasticsearch:7.12.1
    container_name: es02
    environment:
      - node.name=es02
      - cluster.name=es-docker-cluster
      - discovery.seed_hosts=es01,es03
      - cluster.initial_master_nodes=es01,es02,es03
      - "ES_JAVA_OPTS=-Xms512m -Xmx512m"
    volumes:
      - data02:/usr/share/elasticsearch/data
    ports:
      - 9201:9200
    networks:
      - elastic
  es03:
    image: elasticsearch:7.12.1
    container_name: es03
    environment:
      - node.name=es03
      - cluster.name=es-docker-cluster
      - discovery.seed_hosts=es01,es02
      - cluster.initial_master_nodes=es01,es02,es03
      - "ES_JAVA_OPTS=-Xms512m -Xmx512m"
    volumes:
      - data03:/usr/share/elasticsearch/data
    networks:
      - elastic
    ports:
      - 9202:9200
volumes:
  data01:
    driver: local
  data02:
    driver: local
  data03:
    driver: local

networks:
  elastic:
    driver: bridge

es运行需要修改一些linux系统权限,修改/etc/sysctl.conf文件

vi /etc/sysctl.conf

添加下面的内容:

vm.max_map_count=262144

然后执行命令,让配置生效:

sysctl -p

通过docker-compose启动集群:

docker-compose up -d
  1. 集群状态监控

kibana可以监控es集群,不过新版本需要依赖es的x-pack 功能,配置比较复杂。

这里推荐使用cerebro来监控es集群状态,官方网址:https://github.com/lmenezes/cerebro

课前资料已经提供了安装包:

解压即可使用,非常方便。

解压好的目录如下:

进入对应的bin目录:

双击其中的cerebro.bat文件即可启动服务。

访问http://localhost:9000 即可进入管理界面:

在这里插入图片描述

输入你的elasticsearch的任意节点的地址和端口,点击connect即可:

绿色的条,代表集群处于绿色(健康状态)。
在这里插入图片描述

  1. 创建索引库

1)利用kibana的DevTools创建索引库

在DevTools中输入指令:

PUT /itcast
{
  "settings": {
    "number_of_shards": 3, // 分片数量
    "number_of_replicas": 1 // 副本数量
  },
  "mappings": {
    "properties": {
      // mapping映射定义 ...
    }
  }
}

2)利用cerebro创建索引库

利用cerebro还可以创建索引库:
在这里插入图片描述

填写索引库信息:

点击右下角的create按钮:
在这里插入图片描述

  1. 查看分片效果

回到首页,即可查看索引库分片效果:
在这里插入图片描述

4.2 集群职责和脑裂问题

在这里插入图片描述
在这里插入图片描述

在这里插入图片描述

4.3 集群故障转移

在这里插入图片描述

4.4 集群分布式存储与查询

在这里插入图片描述
在这里插入图片描述

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/467666.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

yocto编译测试

源码下载 git clone -b gatesgarth git://git.yoctoproject.org/poky lkmaolkmao-virtual-machine:~/yocto$ git clone -b gatesgarth git://git.yoctoproject.org/poky Cloning into poky... remote: Enumerating objects: 640690, done. remote: Counting objects: 100% (13…

C++开发基础——函数模板

一&#xff0c;函数模板 1.基础概念 模板编程是C中泛型编程的基础。 一个模板可以是创建类或者函数的蓝图。 模板编程分两种&#xff0c;分别是算法抽象的模板、数据抽象的模板。算法抽象的模板以函数模板为主&#xff0c;数据抽象的模板以类模板为主。 基于函数模板生成的…

matplotlib库简介及函数说明

目录 简介matplotlib.pyplot as plt 常用函数说明创建子图plt.subplots&#xff08;&#xff09;.plot&#xff08;&#xff09; 子图参数set_title&#xff08;&#xff09;axis2.legend()fig.autofmt_xdate() 简介 matplotlib 是一个用于创建二维图表和数据可视化的 Python …

【数据挖掘】实验3:常用的数据管理

实验3&#xff1a;常用的数据管理 一&#xff1a;实验目的与要求 1&#xff1a;熟悉和掌握常用的数据管理方法&#xff0c;包括变量重命名、缺失值分析、数据排序、随机抽样、字符串处理、文本分词。 二&#xff1a;实验内容 【创建新变量】 方法1&#xff1a; mydata <…

写一个五子棋小游戏

具体如下&#xff0c;直接来 目录 大致一看 导入模块和初始化 定义棋盘&#xff08;Checkerboard类&#xff09; 定义AI类 游戏主循环&#xff08;main函数&#xff09; 绘图和辅助函数 AI算法解析 完整代码 大致一看 导入模块和初始化 一开始导入了必要的模块&#x…

【边缘智能】Jetson板卡上安装QT5与OpenCV集成

学习《OpenCV应用开发&#xff1a;入门、进阶与工程化实践》一书 做真正的OpenCV开发者&#xff0c;从入门到入职&#xff0c;一步到位&#xff01; 安装QT5与QT Creator 如果只是简单的使用QT的GUI库&#xff0c;没有其它要求&#xff0c;其实特别容易&#xff0c;一行命令行…

【Unity每日一记】unity中的内置宏和条件编译(Unity内置脚本符号)

&#x1f468;‍&#x1f4bb;个人主页&#xff1a;元宇宙-秩沅 &#x1f468;‍&#x1f4bb; hallo 欢迎 点赞&#x1f44d; 收藏⭐ 留言&#x1f4dd; 加关注✅! &#x1f468;‍&#x1f4bb; 本文由 秩沅 原创 &#x1f468;‍&#x1f4bb; 收录于专栏&#xff1a;uni…

【数据结构和算法初阶(C语言)】二叉树的顺序结构--堆的实现/堆排序/topk问题详解---二叉树学习日记②

目录 ​编辑 1.二叉树的顺序结构及实现 1.1 二叉树的顺序结构 2 堆的概念及结构 3 堆的实现 3.1堆的代码定义 3.2堆插入数据 3.3打印堆数据 3.4堆的数据的删除 3.5获取根部数据 3.6判断堆是否为空 3.7 堆的销毁 4.建堆以及堆排序 4.1 升序建大堆&#xff0c;降序建小堆 4.2堆…

RPM与DNF的操作实践

这几课有三个目标&#xff1a; 第一步&#xff1a;先配置软件源 跳转到yum.repos.d目录&#xff0c;用vim创建一个openeuler_x84_64.repo文件。这个文件就是我们将会用到的软件源。 我们在里面添加这些东西&#xff0c;保存并退出即可。 然后&#xff0c;我们用yum list all就…

【CICD】Jenkins 常用操作手册

常见词汇 词汇 说明 Node 作为 Jenkins 环境的一部分并能够执行Pipeline或项目的机器&#xff0c;无论是 Master 还是Agent 都被认为是 Node。 Master 存储配置&#xff0c;加载插件以及为 Jenkins 呈现各种用户界面的主控节点 Agent 通常是一台主机或容器&#xff0c;连…

Hive:数据仓库利器

1. 简介 Hive是一个基于Hadoop的开源数据仓库工具&#xff0c;可以用来存储、查询和分析大规模数据。Hive使用SQL-like的HiveQL语言来查询数据&#xff0c;并将其结果存储在Hadoop的文件系统中。 2. 基本概念 介绍 Hive 的核心概念&#xff0c;例如表、分区、桶、HQL 等。 …

Chrome历史版本下载地址:Google Chrome Older Versions Download (Windows, Linux Mac)

最近升级到最新版本Chrome后发现页面居然显示错乱,是在无语, 打算退回原来的版本, 又发现官方只提供最新的版本下载, 为了解决这个问题所有收集了Chrome历史版本的下载地址分享给大家. Google Chrome Windows version 32-bit VersionSizeDate104.0.5112.10279.68 MB2022-05-30…

TT-100K数据集,YOLO格式

TT-100K数据集YOLO格式&#xff0c;分为train、val和test&#xff0c;其中train中共有6793张图片&#xff0c;val中共有1949张图片&#xff0c;test中共有996张图片。数据集只保留包含图片数超过100的类别。共计46类。

uniapp微信小程序随机生成canvas-id报错?

uniapp微信小程序随机生成canvas-id报错&#xff1f; 文章目录 uniapp微信小程序随机生成canvas-id报错&#xff1f;效果图遇到问题解决 场景&#xff1a; 子组件&#xff0c;在 mounted 绘制 canvas&#xff1b;App、H5端正常显示&#xff0c;微信小程序报错&#xff1b; 效…

信息系统项目管理师019:存储和数据库(2信息技术发展—2.1信息技术及其发展—2.1.3存储和数据库)

文章目录 2.1.3 存储和数据库1.存储技术2.数据结构模型3.常用数据库类型4.数据仓库 记忆要点总结 2.1.3 存储和数据库 1.存储技术 存储分类根据服务器类型分为&#xff1a;封闭系统的存储和开放系统的存储。封闭系统主要指大型机等服务器。开放系统指基于包括麒麟、欧拉、UNIX…

MacBook远程桌面Windows使用Microsoft Remote Desktop for Mac_亲测使用

MacBook远程桌面Windows使用Microsoft Remote Desktop for Mac_亲测使用 像Windows上有自带的远程桌面连接软件.MacBook没有自带的远程连接Windows桌面的工具,需要安装软件来实现. 像远程桌面控制软件一般有 TeamViewer、向日葵远程控制, ToDesk, Microsoft Remote Desktop f…

【ZooKeeper3、Watcher机制

本文基于 Apache ZooKeeper Release 3.7.0 版本书写 作于 2022年5月15日 17:22:11 转载请声明 演示前的ZooKeeper目录状态&#xff0c;只有zookeeper默认目录&#xff1a; 在客户端直接输入 --help 命令&#xff0c;可以看到以下文字&#xff1a; 可以看到 addWatch 命令&am…

视频桥接芯片#LT8912B适用于MIPIDSI转HDMI+LVDS应用方案,提供技术支持。

1. 概述 Lontium LT8912B MIPI DSI 转 LVDS 和 HDMI 桥接器采用单通道 MIPI D-PHY 接收器前端配置&#xff0c;每通道 4 个数据通道&#xff0c;每个数据通道以 1.5Gbps 的速度运行&#xff0c;最大输入带宽高达 6Gbps。 对于屏幕应用&#xff0c;该桥接器可解码 MIPI DSI 18bp…

【QED】斐波那契游戏

文章目录 题目思路代码复杂度分析时间复杂度空间复杂度 总结 题目 题目链接&#x1f517; 斐波那契数列指的是这样一个数列&#xff1a;1&#xff0c;1&#xff0c;2&#xff0c;3&#xff0c;5&#xff0c;8&#xff0c;13&#xff0c;21&#xff0c;34&#xff0c;55&#x…

Docker部署TeamCity来完成内部CI、CD流程

使用TeamCity来完成内部CI、CD流程 本篇教程主要讲解基于容器服务搭建TeamCity服务&#xff0c;并且完成内部项目的CI流程配置。至于完整的DevOps&#xff0c;我们后续独立探讨。 一个简单的CI、CD流程 以下分享一个简单的CI、CD流程&#xff08;仅供参考&#xff09;&#…