目录
数据聚合
聚合的分类
DSL实现bucket聚合
DSL实现Metrics聚合
RestAPI实现聚合
多条件聚合
带过滤条件的聚合
自动补全
安装拼音分词器
自定义分词器
completion suggester查询
修改索引库数据结构
RestAPI实现自动补全查询
实现搜索框自动补全
数据同步
数据同步问题分析
导入Hotel-admin
声明队列和交换机
发送mq消息
监听MQ消息
测试同步功能
es集群
集群结果介绍
搭建集群
集群职责及脑裂
ES集群的分布式存储
ES集群的分布式查询
ES集群的故障转移
什么是聚合?
1、聚合是对文档数据的统计、分析、计算聚合的常见种类有哪些?
1、Bucket:对文档数据分组,并统计每组数量2、Metric:对文档数据做计算,例如avg
3、Pipeline:基于其它聚合结果再做聚合
参与聚合的字段类型必须是:
1、keyword
2、数值3、日期
4、布尔
aggs代表聚合,与query同级,此时query的作用是?
1、限定聚合的的文档范围
聚合必须的三要素:1、聚合名称
2、聚合类型3、聚合字段
聚合可配置属性有:1、size:指定聚合结果数量
2、order:指定聚合结果排序方式
3、field:指定聚合字段
如何使用拼音分词器?
1、下载pinyin分词器
2、解压并放到elasticsearch的plugin目录3、重启即可
如何自定义分词器?
1、创建索引库时,在settings中配置,可以包含三部分2、character filter
3、tokenizer4、filter
拼音分词器注意事项?
1、为了避免搜索到同音字,搜索时不要使用拼音分词器
自动补全对字段的要求:
1、类型是completion类型
2、字段值是多词条的数组
方式一:同步调用
1、优点:实现简单,粗暴
2、缺点:业务耦合度高
方式二:异步通知
1、优点:低耦合,实现难度一般2、缺点:依赖mq的可靠性
方式三:监听binlog
1、优点:完全解除服务间耦合
2、缺点:开启biniog增加数据库负担、实现复杂度高
master eligible节点的作用是什么?
1、参与集群选主
2、主节点可以管理集群状态、管理分片信息、处理创建和删除索引库的请求
data节点的作用是什么?1、数据的CRUD
coordinator节点的作用是什么?1、路由请求到其它节点
2、合并查询到的结果,返回给用户
分布式新增如何确定分片?
1、coordinating node根据id做hash运算,得到结果对shard数量取余,余数就是对应的分片
分布式查询:
1、分散阶段: coordinating node将查询请求分发给不同分片2、收集阶段:将查询结果汇总到coordinating node ,整理并返回给用户
故障转移:
1、master宕机后,EligibleMaster选举为新的主节点。
2、master节点监控分片、节点状态,将故障节点上的分片转移到正常节点,确保数据安全。
数据聚合
聚合的分类
聚合(aggregations)可以实现对文档数据的统计、分析、运算
聚合常见的有三类:
桶(Bucket)聚合:用来对文档做分组
1、TermAggregation:按照文档字段值分组
2、Date Histogram:按照日期阶梯分组,例如一周为一组,或者一月为一组度量(Metric)聚合:用以计算一些值,比如:最大值、最小值、平均值等
1、Avg:求平均值
2、Max:求最大值3、Min:求最小值
4、Stats:同时求max、min、avg、sum等
管道(pipeline)聚合:其它聚合的结果为基础做聚合
DSL实现bucket聚合
统计所有数据中的酒店品牌有几种,此时可以根据酒店品牌的名称做聚合
GET /hotel/_search
{
"size": 0, #size:设置size为0,结果中不包含文档,只包含聚合结果
"aggs": { #aggs:定义聚合
"brandAgg": { #brandAgg:给聚合起个名字
"terms": { #terms:聚合的类型,按照品牌值聚合,所以选term
"field": "brand", #field:参与聚合的字段
"size": 20 #size:希望获取的聚合结果数量
}
}
}
}
buckets里的doc_count是指该key有多少个
默认情况下,Bucket聚合会统计Bucket内的文档数量,记为_count,并且按照_count降序排序
GET /hotel/_search
{
"size": 0,
"aggs": {
"brandAgg": {
"terms": {
"field": "brand",
"size": 20,
"order": {
"_count": "asc"
}
}
}
}
}
此时设置为升序,则右边的数据是升序排序
默认情况下,Bucket聚合是对索引库的所有文档做聚合,我们可以限定要聚合的文档范围,只要添加query条件即可
GET /hotel/_search
{
"query": {
"range": {
"price": {
"lte": 200
}
}
},
"size": 0,
"aggs": {
"brandAgg": {
"terms": {
"field": "brand",
"size": 20
}
}
}
}
此时只会查price小于等于200的数据
DSL实现Metrics聚合
要求获取每个品牌的用户评分的min、max、avg等值
可以利用stats聚合:
GET /hotel/_search
{
"size": 0,
"aggs": {
"brandAgg": {
"terms": {
"field": "brand",
"size": 20,
"order": {
"scoreAgg.avg": "desc"
}
},
"aggs": { #是brands聚合的子集合,也就是分组后对每组分别计算
"scoreAgg": { #聚合名称
"stats": { #聚合类型,这里的stats可以计算min,max,avg等
"field": "score" #聚合字段,这里是score
}
}
}
}
}
}
统计出了每个buckets的min,max,avg,sum了
RestAPI实现聚合
请求组装
HotelSearchTest.java
@Test void testAggregation() throws IOException { //1、准备Request SearchRequest request = new SearchRequest("hotel"); //2、准备DSL //2.1、设置size request.source().size(0); //2.2、聚合 request.source().aggregation(AggregationBuilders .terms("brandAgg") .field("brand") .size(10) ); //3、发送请求 SearchResponse response = client.search(request, RequestOptions.DEFAULT); //4、解析响应 Aggregations aggregations = response.getAggregations(); //4.1、根据聚合名称获聚合结果 Terms brandTerms = aggregations.get("brandAgg"); //4.2、获取buckets List<? extends Terms.Bucket> buckets = brandTerms.getBuckets(); //4.3、遍历 for(Terms.Bucket bucket : buckets){ //4.4、获取key String key = bucket.getKeyAsString(); System.out.println(key); } }
package cn.itcast.hotel;
import cn.itcast.hotel.pojo.HotelDoc;
import com.alibaba.fastjson.JSON;
import org.apache.http.HttpHost;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.client.indices.CreateIndexRequest;
import org.elasticsearch.client.indices.GetIndexRequest;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.query.Rewriteable;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.aggregations.Aggregation;
import org.elasticsearch.search.aggregations.AggregationBuilder;
import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.aggregations.Aggregations;
import org.elasticsearch.search.aggregations.bucket.terms.Terms;
import org.elasticsearch.search.fetch.subphase.highlight.HighlightBuilder;
import org.elasticsearch.search.fetch.subphase.highlight.HighlightField;
import org.elasticsearch.search.sort.SortOrder;
import org.elasticsearch.search.suggest.Suggest;
import org.elasticsearch.search.suggest.SuggestBuilder;
import org.elasticsearch.search.suggest.SuggestBuilders;
import org.elasticsearch.search.suggest.completion.CompletionSuggester;
import org.elasticsearch.search.suggest.completion.CompletionSuggestion;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.springframework.util.CollectionUtils;
import javax.naming.directory.SearchResult;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import static cn.itcast.hotel.constants.HotelConstants.MAPPING_TEMPLATE;
public class HotelSearchTest {
private RestHighLevelClient client;
@Test
void testMatchAll() throws IOException {
//1、准备Request
SearchRequest request = new SearchRequest("hotel");
//2、准备DSL
request.source().query(QueryBuilders.matchAllQuery());
//3、发送请求
SearchResponse response = client.search(request, RequestOptions.DEFAULT);
//4、解析响应
handleResponse(response);
}
@Test
void testMatch() throws IOException {
//1、准备Request
SearchRequest request = new SearchRequest("hotel");
//2、准备DSL
request.source().query(QueryBuilders.matchQuery("all","如家"));
//3、发送请求
SearchResponse response = client.search(request, RequestOptions.DEFAULT);
//4、解析响应
handleResponse(response);
}
@Test
void testBool() throws IOException {
//1、准备Request
SearchRequest request = new SearchRequest("hotel");
//2、准备DSL
//2.1、准备booleanQuery
BoolQueryBuilder boolQuery = QueryBuilders.boolQuery();
//2.2、添加term
boolQuery.must(QueryBuilders.termQuery("city","上海"));
//2.3、添加range
boolQuery.filter(QueryBuilders.rangeQuery("price").lte(250));
request.source().query(boolQuery);
//3、发送请求
SearchResponse response = client.search(request, RequestOptions.DEFAULT);
//4、解析响应
handleResponse(response);
}
@Test
void testRageAndSort() throws IOException {
//页码,每页大小
int page = 2, size = 5;
//1、准备Request
SearchRequest request = new SearchRequest("hotel");
//2、准备DSL
//2.1、准备query
request.source().query(QueryBuilders.matchAllQuery());
//2.2、排序sort
request.source().sort("price", SortOrder.ASC);
//2.3、分页from,size
request.source().from((page - 1) * size).size(5);
//3、发送请求
SearchResponse response = client.search(request, RequestOptions.DEFAULT);
//4、解析响应
handleResponse(response);
}
@Test
void testHighlight() throws IOException {
//1、准备Request
SearchRequest request = new SearchRequest("hotel");
//2、准备DSL
//2.1、query
request.source().query(QueryBuilders.matchQuery("all","如家"));
//2.2、高亮
request.source().highlighter(new HighlightBuilder().field("name").requireFieldMatch(false));
//3、发送请求
SearchResponse response = client.search(request, RequestOptions.DEFAULT);
//4、解析响应
handleResponse(response);
}
@Test
void testAggregation() throws IOException {
//1、准备Request
SearchRequest request = new SearchRequest("hotel");
//2、准备DSL
//2.1、设置size
request.source().size(0);
//2.2、聚合
request.source().aggregation(AggregationBuilders
.terms("brandAgg")
.field("brand")
.size(10)
);
//3、发送请求
SearchResponse response = client.search(request, RequestOptions.DEFAULT);
//4、解析响应
Aggregations aggregations = response.getAggregations();
//4.1、根据聚合名称获聚合结果
Terms brandTerms = aggregations.get("brandAgg");
//4.2、获取buckets
List<? extends Terms.Bucket> buckets = brandTerms.getBuckets();
//4.3、遍历
for(Terms.Bucket bucket : buckets){
//4.4、获取key
String key = bucket.getKeyAsString();
System.out.println(key);
}
}
private void handleResponse(SearchResponse response) {
//4、解析响应
SearchHits searchHits = response.getHits();
//4.1获取总条数
long total = searchHits.getTotalHits().value;
System.out.println("共搜索到" + total + "条数据");
//4.2文档数组
SearchHit[] hits = searchHits.getHits();
//4.3遍历
for (SearchHit hit : hits) {
//获取文档source
String json = hit.getSourceAsString();
//反序列化
HotelDoc hotelDoc = JSON.parseObject(json, HotelDoc.class);
//获取高亮结果
Map<String, HighlightField> highlightFields = hit.getHighlightFields();
//判断是否有高亮结果
if(!CollectionUtils.isEmpty(highlightFields)){
//根据字段名获取高亮结果
HighlightField highlightField = highlightFields.get("name");
//判断是否有字段名高亮
if(highlightField != null){
//获取高亮值
String name = highlightField.getFragments()[0].string();
//覆盖非高亮结果
hotelDoc.setName(name);
}
}
System.out.println("hotelDoc = " + hotelDoc);
}
System.out.println(response);
}
@BeforeEach
void setUp() {
this.client = new RestHighLevelClient(RestClient.builder(
HttpHost.create("http://xxx.xxx.xxx.xxx:9200")
));
}
@AfterEach
void tearDown() throws IOException {
this.client.close();
}
}
成功获取到聚合结果
多条件聚合
IHotelService.java
Map<String, List<String>> filters(RequestParams params);
package cn.itcast.hotel.service;
import cn.itcast.hotel.pojo.Hotel;
import cn.itcast.hotel.pojo.PageResult;
import cn.itcast.hotel.pojo.RequestParams;
import com.baomidou.mybatisplus.extension.service.IService;
import java.util.List;
import java.util.Map;
public interface IHotelService extends IService<Hotel> {
PageResult search(RequestParams params);
Map<String, List<String>> filters(RequestParams params);
}
Hotelservice.java
@Override public Map<String, List<String>> filters() { try { //1、准备Request SearchRequest request = new SearchRequest("hotel"); //2、准备DSL //2.2、设置size request.source().size(0); //2.3、聚合 buildAggregation(request); //3、发送请求 SearchResponse response = client.search(request, RequestOptions.DEFAULT); //4、解析响应 Map<String, List<String>> result = new HashMap<>(); Aggregations aggregations = response.getAggregations(); //4.1、根据品牌名称,获取品牌结果 List<String> brandList = getAggByName(aggregations,"brandAgg"); result.put("brand",brandList); //4.1、根据城市名称,获取品牌结果 List<String> cityList = getAggByName(aggregations,"cityAgg"); result.put("city",cityList); //4.1、根据星级名称,获取品牌结果 List<String> starList = getAggByName(aggregations,"starAgg"); result.put("starName",starList); return result; } catch (IOException e) { throw new RuntimeException(e); } } private void buildAggregation(SearchRequest request) { request.source().aggregation(AggregationBuilders .terms("brandAgg") .field("brand") .size(100) ); request.source().aggregation(AggregationBuilders .terms("cityAgg") .field("city") .size(100) ); request.source().aggregation(AggregationBuilders .terms("starAgg") .field("starName") .size(100) ); } private void buildBasicQuery(RequestParams params, SearchRequest request) { //构建BooleanQuery BoolQueryBuilder boolQuery = QueryBuilders.boolQuery(); //关键字搜索 String key = params.getKey(); if(key == null || "".equals(key)){ boolQuery.must(QueryBuilders.matchAllQuery()); }else{ boolQuery.must(QueryBuilders.matchQuery("all",key)); } //城市条件 if(params.getCity() != null && !"".equals(params.getCity())){ boolQuery.filter(QueryBuilders.termQuery("city", params.getCity())); } //品牌条件 if(params.getBrand() != null && !"".equals(params.getBrand())){ boolQuery.filter(QueryBuilders.termQuery("brand", params.getBrand())); } //星级条件 if(params.getStarName() != null && !"".equals(params.getStarName())){ boolQuery.filter(QueryBuilders.termQuery("starName", params.getStarName())); } //价格 if(params.getMinPrice() != null && params.getMaxPrice() != null){ boolQuery.filter(QueryBuilders.rangeQuery("price").gte(params.getMinPrice()).lte(params.getMaxPrice())); } request.source().query(functionScoreQueryBuilder); } private List<String> getAggByName(Aggregations aggregations, String aggName) { //4.1、根据聚合名称获聚合结果 Terms brandTerms = aggregations.get(aggName); //4.2、获取buckets List<? extends Terms.Bucket> buckets = brandTerms.getBuckets(); //4.3、遍历 List<String> brandList = new ArrayList<>(); for(Terms.Bucket bucket : buckets){ //4.4、获取key String key = bucket.getKeyAsString(); brandList.add(key); } return brandList; } private PageResult handleResponse(SearchResponse response) { //4、解析响应 SearchHits searchHits = response.getHits(); //4.1获取总条数 long total = searchHits.getTotalHits().value; //4.2文档数组 SearchHit[] hits = searchHits.getHits(); //4.3遍历 List<HotelDoc> hotels = new ArrayList<>(); for (SearchHit hit : hits) { //获取文档source String json = hit.getSourceAsString(); //反序列化 HotelDoc hotelDoc = JSON.parseObject(json, HotelDoc.class); //获取距离 Object[] sortValues = hit.getSortValues(); if(sortValues.length > 0){ Object sortValue = sortValues[0]; hotelDoc.setDistance(sortValue); } hotels.add(hotelDoc); } //4、封装返回 return new PageResult(total,hotels); }
package cn.itcast.hotel.service.impl;
import cn.itcast.hotel.mapper.HotelMapper;
import cn.itcast.hotel.pojo.Hotel;
import cn.itcast.hotel.pojo.HotelDoc;
import cn.itcast.hotel.pojo.PageResult;
import cn.itcast.hotel.pojo.RequestParams;
import cn.itcast.hotel.service.IHotelService;
import com.alibaba.fastjson.JSON;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import org.apache.lucene.queries.function.FunctionScoreQuery;
import org.apache.lucene.search.vectorhighlight.ScoreOrderFragmentsBuilder;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.geo.GeoPoint;
import org.elasticsearch.common.unit.DistanceUnit;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.query.functionscore.FunctionScoreQueryBuilder;
import org.elasticsearch.index.query.functionscore.ScoreFunctionBuilder;
import org.elasticsearch.index.query.functionscore.ScoreFunctionBuilders;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.aggregations.Aggregations;
import org.elasticsearch.search.aggregations.bucket.terms.Terms;
import org.elasticsearch.search.fetch.subphase.highlight.HighlightBuilder;
import org.elasticsearch.search.fetch.subphase.highlight.HighlightField;
import org.elasticsearch.search.sort.SortBuilders;
import org.elasticsearch.search.sort.SortOrder;
import org.elasticsearch.search.suggest.Suggest;
import org.elasticsearch.search.suggest.SuggestBuilder;
import org.elasticsearch.search.suggest.SuggestBuilders;
import org.elasticsearch.search.suggest.completion.CompletionSuggestion;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@Service
public class HotelService extends ServiceImpl<HotelMapper, Hotel> implements IHotelService {
@Autowired
private RestHighLevelClient client;
@Override
public PageResult search(RequestParams params) {
try {
//1、准备Request
SearchRequest request = new SearchRequest("hotel");
//2、准备DSL
//2.1、query
buildBasicQuery(params,request);
//2.2、分页
int page = params.getPage();
int size = params.getSize();
request.source().from((page - 1) * size).size(size);
//2.3、排序
String location = params.getLocation();
if(location != null && !"".equals(location)){
request.source().sort(SortBuilders
.geoDistanceSort("location",new GeoPoint(location))
.order(SortOrder.ASC)
.unit(DistanceUnit.KILOMETERS)
);
}
//3、发送请求
SearchResponse response = client.search(request, RequestOptions.DEFAULT);
//4、解析响应
return handleResponse(response);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
@Override
public Map<String, List<String>> filters() {
try {
//1、准备Request
SearchRequest request = new SearchRequest("hotel");
//2、准备DSL
//2.2、设置size
request.source().size(0);
//2.3、聚合
buildAggregation(request);
//3、发送请求
SearchResponse response = client.search(request, RequestOptions.DEFAULT);
//4、解析响应
Map<String, List<String>> result = new HashMap<>();
Aggregations aggregations = response.getAggregations();
//4.1、根据品牌名称,获取品牌结果
List<String> brandList = getAggByName(aggregations,"brandAgg");
result.put("brand",brandList);
//4.1、根据城市名称,获取品牌结果
List<String> cityList = getAggByName(aggregations,"cityAgg");
result.put("city",cityList);
//4.1、根据星级名称,获取品牌结果
List<String> starList = getAggByName(aggregations,"starAgg");
result.put("starName",starList);
return result;
} catch (IOException e) {
throw new RuntimeException(e);
}
}
private void buildAggregation(SearchRequest request) {
request.source().aggregation(AggregationBuilders
.terms("brandAgg")
.field("brand")
.size(100)
);
request.source().aggregation(AggregationBuilders
.terms("cityAgg")
.field("city")
.size(100)
);
request.source().aggregation(AggregationBuilders
.terms("starAgg")
.field("starName")
.size(100)
);
}
private void buildBasicQuery(RequestParams params, SearchRequest request) {
//构建BooleanQuery
BoolQueryBuilder boolQuery = QueryBuilders.boolQuery();
//关键字搜索
String key = params.getKey();
if(key == null || "".equals(key)){
boolQuery.must(QueryBuilders.matchAllQuery());
}else{
boolQuery.must(QueryBuilders.matchQuery("all",key));
}
//城市条件
if(params.getCity() != null && !"".equals(params.getCity())){
boolQuery.filter(QueryBuilders.termQuery("city", params.getCity()));
}
//品牌条件
if(params.getBrand() != null && !"".equals(params.getBrand())){
boolQuery.filter(QueryBuilders.termQuery("brand", params.getBrand()));
}
//星级条件
if(params.getStarName() != null && !"".equals(params.getStarName())){
boolQuery.filter(QueryBuilders.termQuery("starName", params.getStarName()));
}
//价格
if(params.getMinPrice() != null && params.getMaxPrice() != null){
boolQuery.filter(QueryBuilders.rangeQuery("price").gte(params.getMinPrice()).lte(params.getMaxPrice()));
}
request.source().query(functionScoreQueryBuilder);
}
private List<String> getAggByName(Aggregations aggregations, String aggName) {
//4.1、根据聚合名称获聚合结果
Terms brandTerms = aggregations.get(aggName);
//4.2、获取buckets
List<? extends Terms.Bucket> buckets = brandTerms.getBuckets();
//4.3、遍历
List<String> brandList = new ArrayList<>();
for(Terms.Bucket bucket : buckets){
//4.4、获取key
String key = bucket.getKeyAsString();
brandList.add(key);
}
return brandList;
}
private PageResult handleResponse(SearchResponse response) {
//4、解析响应
SearchHits searchHits = response.getHits();
//4.1获取总条数
long total = searchHits.getTotalHits().value;
//4.2文档数组
SearchHit[] hits = searchHits.getHits();
//4.3遍历
List<HotelDoc> hotels = new ArrayList<>();
for (SearchHit hit : hits) {
//获取文档source
String json = hit.getSourceAsString();
//反序列化
HotelDoc hotelDoc = JSON.parseObject(json, HotelDoc.class);
//获取距离
Object[] sortValues = hit.getSortValues();
if(sortValues.length > 0){
Object sortValue = sortValues[0];
hotelDoc.setDistance(sortValue);
}
hotels.add(hotelDoc);
}
//4、封装返回
return new PageResult(total,hotels);
}
}
HotelDemoApplicationTest.java
@Test void contextLoads() { Map<String, List<String>> filters = hotelService.filters(); System.out.println(filters); }
package cn.itcast.hotel;
import cn.itcast.hotel.service.IHotelService;
import com.fasterxml.jackson.databind.deser.impl.CreatorCandidate;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import java.util.List;
import java.util.Map;
@SpringBootTest
class HotelDemoApplicationTests {
@Autowired
private IHotelService hotelService;
@Test
void contextLoads() {
Map<String, List<String>> filters = hotelService.filters();
System.out.println(filters);
}
}
多条件聚合成功
带过滤条件的聚合
HotelController.java
@PostMapping("filters") public Map<String, List<String>> getFilters(@RequestBody RequestParams params){ return hotelService.filters(params); }
package cn.itcast.hotel.web;
import cn.itcast.hotel.pojo.PageResult;
import cn.itcast.hotel.pojo.RequestParams;
import cn.itcast.hotel.service.IHotelService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;
import java.util.List;
import java.util.Map;
@RestController
@RequestMapping("/hotel")
public class HotelController {
@Autowired
private IHotelService hotelService;
@PostMapping("/list")
public PageResult search(@RequestBody RequestParams params){
return hotelService.search(params);
}
@PostMapping("filters")
public Map<String, List<String>> getFilters(@RequestBody RequestParams params){
return hotelService.filters(params);
}
}
IHotelService.java
Map<String, List<String>> filters(RequestParams params);
package cn.itcast.hotel.service;
import cn.itcast.hotel.pojo.Hotel;
import cn.itcast.hotel.pojo.PageResult;
import cn.itcast.hotel.pojo.RequestParams;
import com.baomidou.mybatisplus.extension.service.IService;
import java.util.List;
import java.util.Map;
public interface IHotelService extends IService<Hotel> {
PageResult search(RequestParams params);
Map<String, List<String>> filters(RequestParams params);
}
HotelService.java
@Override
public Map<String, List<String>> filters(RequestParams params) {
try {
//1、准备Request
SearchRequest request = new SearchRequest("hotel");
//2、准备DSL
//2.1、query
buildBasicQuery(params, request);
//2.2、设置size
request.source().size(0);
//2.3、聚合
buildAggregation(request);
//3、发送请求
SearchResponse response = client.search(request, RequestOptions.DEFAULT);
//4、解析响应
Map<String, List<String>> result = new HashMap<>();
Aggregations aggregations = response.getAggregations();
//4.1、根据品牌名称,获取品牌结果
List<String> brandList = getAggByName(aggregations,"brandAgg");
result.put("brand",brandList);
//4.1、根据城市名称,获取品牌结果
List<String> cityList = getAggByName(aggregations,"cityAgg");
result.put("city",cityList);
//4.1、根据星级名称,获取品牌结果
List<String> starList = getAggByName(aggregations,"starAgg");
result.put("starName",starList);
return result;
} catch (IOException e) {
throw new RuntimeException(e);
}
}
private void buildAggregation(SearchRequest request) {
request.source().aggregation(AggregationBuilders
.terms("brandAgg")
.field("brand")
.size(100)
);
request.source().aggregation(AggregationBuilders
.terms("cityAgg")
.field("city")
.size(100)
);
request.source().aggregation(AggregationBuilders
.terms("starAgg")
.field("starName")
.size(100)
);
}
private void buildBasicQuery(RequestParams params, SearchRequest request) {
//构建BooleanQuery
BoolQueryBuilder boolQuery = QueryBuilders.boolQuery();
//关键字搜索
String key = params.getKey();
if(key == null || "".equals(key)){
boolQuery.must(QueryBuilders.matchAllQuery());
}else{
boolQuery.must(QueryBuilders.matchQuery("all",key));
}
//城市条件
if(params.getCity() != null && !"".equals(params.getCity())){
boolQuery.filter(QueryBuilders.termQuery("city", params.getCity()));
}
//品牌条件
if(params.getBrand() != null && !"".equals(params.getBrand())){
boolQuery.filter(QueryBuilders.termQuery("brand", params.getBrand()));
}
//星级条件
if(params.getStarName() != null && !"".equals(params.getStarName())){
boolQuery.filter(QueryBuilders.termQuery("starName", params.getStarName()));
}
//价格
if(params.getMinPrice() != null && params.getMaxPrice() != null){
boolQuery.filter(QueryBuilders.rangeQuery("price").gte(params.getMinPrice()).lte(params.getMaxPrice()));
}
request.source().query(functionScoreQueryBuilder);
}
private List<String> getAggByName(Aggregations aggregations, String aggName) {
//4.1、根据聚合名称获聚合结果
Terms brandTerms = aggregations.get(aggName);
//4.2、获取buckets
List<? extends Terms.Bucket> buckets = brandTerms.getBuckets();
//4.3、遍历
List<String> brandList = new ArrayList<>();
for(Terms.Bucket bucket : buckets){
//4.4、获取key
String key = bucket.getKeyAsString();
brandList.add(key);
}
return brandList;
}
private PageResult handleResponse(SearchResponse response) {
//4、解析响应
SearchHits searchHits = response.getHits();
//4.1获取总条数
long total = searchHits.getTotalHits().value;
//4.2文档数组
SearchHit[] hits = searchHits.getHits();
//4.3遍历
List<HotelDoc> hotels = new ArrayList<>();
for (SearchHit hit : hits) {
//获取文档source
String json = hit.getSourceAsString();
//反序列化
HotelDoc hotelDoc = JSON.parseObject(json, HotelDoc.class);
//获取距离
Object[] sortValues = hit.getSortValues();
if(sortValues.length > 0){
Object sortValue = sortValues[0];
hotelDoc.setDistance(sortValue);
}
hotels.add(hotelDoc);
}
//4、封装返回
return new PageResult(total,hotels);
}
package cn.itcast.hotel.service.impl;
import cn.itcast.hotel.mapper.HotelMapper;
import cn.itcast.hotel.pojo.Hotel;
import cn.itcast.hotel.pojo.HotelDoc;
import cn.itcast.hotel.pojo.PageResult;
import cn.itcast.hotel.pojo.RequestParams;
import cn.itcast.hotel.service.IHotelService;
import com.alibaba.fastjson.JSON;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import org.apache.lucene.queries.function.FunctionScoreQuery;
import org.apache.lucene.search.vectorhighlight.ScoreOrderFragmentsBuilder;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.geo.GeoPoint;
import org.elasticsearch.common.unit.DistanceUnit;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.query.functionscore.FunctionScoreQueryBuilder;
import org.elasticsearch.index.query.functionscore.ScoreFunctionBuilder;
import org.elasticsearch.index.query.functionscore.ScoreFunctionBuilders;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.aggregations.Aggregations;
import org.elasticsearch.search.aggregations.bucket.terms.Terms;
import org.elasticsearch.search.fetch.subphase.highlight.HighlightBuilder;
import org.elasticsearch.search.fetch.subphase.highlight.HighlightField;
import org.elasticsearch.search.sort.SortBuilders;
import org.elasticsearch.search.sort.SortOrder;
import org.elasticsearch.search.suggest.Suggest;
import org.elasticsearch.search.suggest.SuggestBuilder;
import org.elasticsearch.search.suggest.SuggestBuilders;
import org.elasticsearch.search.suggest.completion.CompletionSuggestion;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@Service
public class HotelService extends ServiceImpl<HotelMapper, Hotel> implements IHotelService {
@Autowired
private RestHighLevelClient client;
@Override
public PageResult search(RequestParams params) {
try {
//1、准备Request
SearchRequest request = new SearchRequest("hotel");
//2、准备DSL
//2.1、query
buildBasicQuery(params,request);
//2.2、分页
int page = params.getPage();
int size = params.getSize();
request.source().from((page - 1) * size).size(size);
//2.3、排序
String location = params.getLocation();
if(location != null && !"".equals(location)){
request.source().sort(SortBuilders
.geoDistanceSort("location",new GeoPoint(location))
.order(SortOrder.ASC)
.unit(DistanceUnit.KILOMETERS)
);
}
//3、发送请求
SearchResponse response = client.search(request, RequestOptions.DEFAULT);
//4、解析响应
return handleResponse(response);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
@Override
public Map<String, List<String>> filters(RequestParams params) {
try {
//1、准备Request
SearchRequest request = new SearchRequest("hotel");
//2、准备DSL
//2.1、query
buildBasicQuery(params, request);
//2.2、设置size
request.source().size(0);
//2.3、聚合
buildAggregation(request);
//3、发送请求
SearchResponse response = client.search(request, RequestOptions.DEFAULT);
//4、解析响应
Map<String, List<String>> result = new HashMap<>();
Aggregations aggregations = response.getAggregations();
//4.1、根据品牌名称,获取品牌结果
List<String> brandList = getAggByName(aggregations,"brandAgg");
result.put("brand",brandList);
//4.1、根据城市名称,获取品牌结果
List<String> cityList = getAggByName(aggregations,"cityAgg");
result.put("city",cityList);
//4.1、根据星级名称,获取品牌结果
List<String> starList = getAggByName(aggregations,"starAgg");
result.put("starName",starList);
return result;
} catch (IOException e) {
throw new RuntimeException(e);
}
}
private void buildAggregation(SearchRequest request) {
request.source().aggregation(AggregationBuilders
.terms("brandAgg")
.field("brand")
.size(100)
);
request.source().aggregation(AggregationBuilders
.terms("cityAgg")
.field("city")
.size(100)
);
request.source().aggregation(AggregationBuilders
.terms("starAgg")
.field("starName")
.size(100)
);
}
private void buildBasicQuery(RequestParams params, SearchRequest request) {
//构建BooleanQuery
BoolQueryBuilder boolQuery = QueryBuilders.boolQuery();
//关键字搜索
String key = params.getKey();
if(key == null || "".equals(key)){
boolQuery.must(QueryBuilders.matchAllQuery());
}else{
boolQuery.must(QueryBuilders.matchQuery("all",key));
}
//城市条件
if(params.getCity() != null && !"".equals(params.getCity())){
boolQuery.filter(QueryBuilders.termQuery("city", params.getCity()));
}
//品牌条件
if(params.getBrand() != null && !"".equals(params.getBrand())){
boolQuery.filter(QueryBuilders.termQuery("brand", params.getBrand()));
}
//星级条件
if(params.getStarName() != null && !"".equals(params.getStarName())){
boolQuery.filter(QueryBuilders.termQuery("starName", params.getStarName()));
}
//价格
if(params.getMinPrice() != null && params.getMaxPrice() != null){
boolQuery.filter(QueryBuilders.rangeQuery("price").gte(params.getMinPrice()).lte(params.getMaxPrice()));
}
request.source().query(functionScoreQueryBuilder);
}
private List<String> getAggByName(Aggregations aggregations, String aggName) {
//4.1、根据聚合名称获聚合结果
Terms brandTerms = aggregations.get(aggName);
//4.2、获取buckets
List<? extends Terms.Bucket> buckets = brandTerms.getBuckets();
//4.3、遍历
List<String> brandList = new ArrayList<>();
for(Terms.Bucket bucket : buckets){
//4.4、获取key
String key = bucket.getKeyAsString();
brandList.add(key);
}
return brandList;
}
private PageResult handleResponse(SearchResponse response) {
//4、解析响应
SearchHits searchHits = response.getHits();
//4.1获取总条数
long total = searchHits.getTotalHits().value;
//4.2文档数组
SearchHit[] hits = searchHits.getHits();
//4.3遍历
List<HotelDoc> hotels = new ArrayList<>();
for (SearchHit hit : hits) {
//获取文档source
String json = hit.getSourceAsString();
//反序列化
HotelDoc hotelDoc = JSON.parseObject(json, HotelDoc.class);
//获取距离
Object[] sortValues = hit.getSortValues();
if(sortValues.length > 0){
Object sortValue = sortValues[0];
hotelDoc.setDistance(sortValue);
}
hotels.add(hotelDoc);
}
//4、封装返回
return new PageResult(total,hotels);
}
}
此时,只要使用条件,就会影响到其他的条件。比如选择某个地点,或输入某个品牌,就会减少一些条件。例如:选择100-300元,就不会出现五星级
自动补全
安装拼音分词器
pyhttps://pan.baidu.com/s/1gPkxVU6dJL7mRVaPYwNmYQ?pwd=hano
把该文件夹放到/var/lib/docker/volumes/es-plugins/_data目录下
自定义分词器
elasticsearch中分词器(analyzer)的组成包含三部分
1、character filters:在tokenizer之前对文本进行处理。例如删除字符、替换字符
2、tokenizer:将文本按照一定的规则切割成词条(term)。例如keyword,就是不分词;还有ik_smart
3、tokenizer filter:将tokenizer输出的词条做进一步处理。例如大小写转换、同义词处理、拼音处理等
我们可以在创建索引库时,通过settings来配置自定义的analyzer(分词器)
PUT /test
{
"settings": {
"analysis": {
"analyzer": {
"my_analyzer": {
"tokenizer": "ik_max_word",
"filter": "py"
}
},
"filter": {
"py": {
"type": "pinyin",
"keep_full_pinyin": false,
"keep_joined_full_pinyin": true,
"keep_original": true,
"limit_first_letter_length": 16,
"remove_duplicated_term": true,
"none_chinese_pinyin_tokenize": false
}
}
}
},
"mappings": {
"properties": {
"name": {
"type": "text",
"analyzer": "my_analyzer",
}
}
}
}
创建成功
可以看到,汉字、拼音、拼音首字母的类型都有
拼音分词器适合在创建倒排索引的时候使用,但不能在搜索的时候使用。
这里创建两条倒排索引,并搜索第三条命令
POST /test/_doc/1
{
"id": 1,
"name": "狮子"
}
POST /test/_doc/2
{
"id": 2,
"name": "虱子"
}
GET /test/_search
{
"query": {
"match": {
"name": "掉入狮子笼咋办"
}
}
}
此时会发现两个shizi都搜索到了
因此字段在创建倒排索引时应该用my_analyzer分词器;字段在搜索时应该使用ik_smart分词器
PUT /test
{
"settings": {
"analysis": {
"analyzer": {
"my_analyzer": {
"tokenizer": "ik_max_word",
"filter": "py"
}
},
"filter": {
"py": {
"type": "pinyin",
"keep_full_pinyin": false,
"keep_joined_full_pinyin": true,
"keep_original": true,
"limit_first_letter_length": 16,
"remove_duplicated_term": true,
"none_chinese_pinyin_tokenize": false
}
}
}
},
"mappings": {
"properties": {
"name": {
"type": "text",
"analyzer": "my_analyzer",
"search_analyzer": "ik_smart"
}
}
}
}
再次创建这两个倒排索引,查询第三条命令
POST /test/_doc/1
{
"id": 1,
"name": "狮子"
}
POST /test/_doc/2
{
"id": 2,
"name": "虱子"
}
GET /test/_search
{
"query": {
"match": {
"name": "掉入狮子笼咋办"
}
}
}
此时发现,同音字问题解决了
completion suggester查询
elasticsearch提供了Completion Suggester查询来实现自动补全功能。这个查询会匹配以用户输入内容开头的词条并返回。为了提高补全查询的效率,对于文档中字段的类型有一些约束:
1、参与补全查询的字段必须是completion类型。
2、字段的内容一般是用来补全的多个词条形成的数组。
创建索引库
PUT test2
{
"mappings": {
"properties": {
"title": {
"type": "completion"
}
}
}
}
插入三条数据
POST test2/_doc
{
"title": ["Sony","WH-1000XM3"]
}
POST test2/_doc
{
"title": ["SK-II","PITERA"]
}
POST test2/_doc
{
"title": ["Nintendo","switch"]
}
自动补全查询
GET /test2/_search
{
"suggest": {
"titleSuggest": {
"text": "s",
"completion": {
"field": "title",
"skip_duplicates": true,
"size": 10
}
}
}
}
可以看到,所以我只搜了s,但是S开头的数据都查询到了
修改索引库数据结构
创建索引库
PUT /hotel
{
"settings": {
"analysis": {
"analyzer": {
"text_anlyzer": {
"tokenizer": "ik_max_word",
"filter": "py"
},
"completion_analyzer": {
"tokenizer": "keyword",
"filter": "py"
}
},
"filter": {
"py": {
"type": "pinyin",
"keep_full_pinyin": false,
"keep_joined_full_pinyin": true,
"keep_original": true,
"limit_first_letter_length": 16,
"remove_duplicated_term": true,
"none_chinese_pinyin_tokenize": false
}
}
}
},
"mappings": {
"properties": {
"id":{
"type": "keyword"
},
"name":{
"type": "text",
"analyzer": "text_anlyzer",
"search_analyzer": "ik_smart",
"copy_to": "all"
},
"address":{
"type": "keyword",
"index": false
},
"price":{
"type": "integer"
},
"score":{
"type": "integer"
},
"brand":{
"type": "keyword",
"copy_to": "all"
},
"city":{
"type": "keyword"
},
"starName":{
"type": "keyword"
},
"business":{
"type": "keyword",
"copy_to": "all"
},
"location":{
"type": "geo_point"
},
"pic":{
"type": "keyword",
"index": false
},
"all":{
"type": "text",
"analyzer": "text_anlyzer",
"search_analyzer": "ik_smart"
},
"suggestion":{
"type": "completion",
"analyzer": "completion_analyzer"
}
}
}
}
HotelDoc.java
package cn.itcast.hotel.pojo;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.util.*;
@Data
@NoArgsConstructor
public class HotelDoc {
private Long id;
private String name;
private String address;
private Integer price;
private Integer score;
private String brand;
private String city;
private String starName;
private String business;
private String location;
private String pic;
private Object distance;
private Boolean isAD;
private List<String> suggestion;
public HotelDoc(Hotel hotel) {
this.id = hotel.getId();
this.name = hotel.getName();
this.address = hotel.getAddress();
this.price = hotel.getPrice();
this.score = hotel.getScore();
this.brand = hotel.getBrand();
this.city = hotel.getCity();
this.starName = hotel.getStarName();
this.business = hotel.getBusiness();
this.location = hotel.getLatitude() + ", " + hotel.getLongitude();
this.pic = hotel.getPic();
if(this.business.contains("、")){
// business有多个值,需要切割
String[] arr = this.business.split("、");
//添加元素
this.suggestion = new ArrayList<>();
this.suggestion.add(this.brand);
Collections.addAll(this.suggestion,arr);
}else {
this.suggestion = Arrays.asList(this.brand, this.business);
}
}
}
运行HotelDocmentTest.java的testBulkRequest()
@Test void testBulkRequest() throws IOException { //批量查询酒店数据 List<Hotel> hotels = hotelService.list(); //1、创建Request BulkRequest request = new BulkRequest(); //2、准备参数,添加多个新增的Request for(Hotel hotel:hotels){ //转换为文档类型HotelDoc HotelDoc hotelDoc = new HotelDoc(hotel); //创建新增文档的Request对象 request.add(new IndexRequest("hotel") .id(hotelDoc.getId().toString()) .source(JSON.toJSONString(hotelDoc),XContentType.JSON)); } //3、发送请求 client.bulk(request,RequestOptions.DEFAULT); }
测试自动补全
GET /hotel/_search
{
"suggest": {
"suggestions": {
"text": "sd",
"completion": {
"field": "suggestion",
"skip_duplicates": true,
"size": 10
}
}
}
}
可以看到自动补全成功。只输入sd能把所有sd开头的text和s开头加d开头的text的数据查询出来
RestAPI实现自动补全查询
@Test void testSuggest() throws IOException { //1、准备Request SearchRequest request = new SearchRequest("hotel"); //2、准备DSL request.source().suggest(new SuggestBuilder().addSuggestion( "suggestions", SuggestBuilders.completionSuggestion("suggestion") .prefix("h") .skipDuplicates(true) .size(10) )); //3、发送请求 SearchResponse response = client.search(request, RequestOptions.DEFAULT); //4、解析结果 Suggest suggest = response.getSuggest(); //4.1、根据补全查询名称,获取补全结果 CompletionSuggestion suggestions = suggest.getSuggestion("suggestions"); //4.2、获取options List<CompletionSuggestion.Entry.Option> options = suggestions.getOptions(); //4.3、遍历 for(CompletionSuggestion.Entry.Option option : options) { String text = option.getText().toString(); System.out.println(text); } }
package cn.itcast.hotel;
import cn.itcast.hotel.pojo.HotelDoc;
import com.alibaba.fastjson.JSON;
import org.apache.http.HttpHost;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.client.indices.CreateIndexRequest;
import org.elasticsearch.client.indices.GetIndexRequest;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.query.Rewriteable;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.aggregations.Aggregation;
import org.elasticsearch.search.aggregations.AggregationBuilder;
import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.aggregations.Aggregations;
import org.elasticsearch.search.aggregations.bucket.terms.Terms;
import org.elasticsearch.search.fetch.subphase.highlight.HighlightBuilder;
import org.elasticsearch.search.fetch.subphase.highlight.HighlightField;
import org.elasticsearch.search.sort.SortOrder;
import org.elasticsearch.search.suggest.Suggest;
import org.elasticsearch.search.suggest.SuggestBuilder;
import org.elasticsearch.search.suggest.SuggestBuilders;
import org.elasticsearch.search.suggest.completion.CompletionSuggester;
import org.elasticsearch.search.suggest.completion.CompletionSuggestion;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.springframework.util.CollectionUtils;
import javax.naming.directory.SearchResult;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import static cn.itcast.hotel.constants.HotelConstants.MAPPING_TEMPLATE;
public class HotelSearchTest {
private RestHighLevelClient client;
@Test
void testMatchAll() throws IOException {
//1、准备Request
SearchRequest request = new SearchRequest("hotel");
//2、准备DSL
request.source().query(QueryBuilders.matchAllQuery());
//3、发送请求
SearchResponse response = client.search(request, RequestOptions.DEFAULT);
//4、解析响应
handleResponse(response);
}
@Test
void testMatch() throws IOException {
//1、准备Request
SearchRequest request = new SearchRequest("hotel");
//2、准备DSL
request.source().query(QueryBuilders.matchQuery("all","如家"));
//3、发送请求
SearchResponse response = client.search(request, RequestOptions.DEFAULT);
//4、解析响应
handleResponse(response);
}
@Test
void testBool() throws IOException {
//1、准备Request
SearchRequest request = new SearchRequest("hotel");
//2、准备DSL
//2.1、准备booleanQuery
BoolQueryBuilder boolQuery = QueryBuilders.boolQuery();
//2.2、添加term
boolQuery.must(QueryBuilders.termQuery("city","上海"));
//2.3、添加range
boolQuery.filter(QueryBuilders.rangeQuery("price").lte(250));
request.source().query(boolQuery);
//3、发送请求
SearchResponse response = client.search(request, RequestOptions.DEFAULT);
//4、解析响应
handleResponse(response);
}
@Test
void testRageAndSort() throws IOException {
//页码,每页大小
int page = 2, size = 5;
//1、准备Request
SearchRequest request = new SearchRequest("hotel");
//2、准备DSL
//2.1、准备query
request.source().query(QueryBuilders.matchAllQuery());
//2.2、排序sort
request.source().sort("price", SortOrder.ASC);
//2.3、分页from,size
request.source().from((page - 1) * size).size(5);
//3、发送请求
SearchResponse response = client.search(request, RequestOptions.DEFAULT);
//4、解析响应
handleResponse(response);
}
@Test
void testHighlight() throws IOException {
//1、准备Request
SearchRequest request = new SearchRequest("hotel");
//2、准备DSL
//2.1、query
request.source().query(QueryBuilders.matchQuery("all","如家"));
//2.2、高亮
request.source().highlighter(new HighlightBuilder().field("name").requireFieldMatch(false));
//3、发送请求
SearchResponse response = client.search(request, RequestOptions.DEFAULT);
//4、解析响应
handleResponse(response);
}
@Test
void testAggregation() throws IOException {
//1、准备Request
SearchRequest request = new SearchRequest("hotel");
//2、准备DSL
//2.1、设置size
request.source().size(0);
//2.2、聚合
request.source().aggregation(AggregationBuilders
.terms("brandAgg")
.field("brand")
.size(10)
);
//3、发送请求
SearchResponse response = client.search(request, RequestOptions.DEFAULT);
//4、解析响应
Aggregations aggregations = response.getAggregations();
//4.1、根据聚合名称获聚合结果
Terms brandTerms = aggregations.get("brandAgg");
//4.2、获取buckets
List<? extends Terms.Bucket> buckets = brandTerms.getBuckets();
//4.3、遍历
for(Terms.Bucket bucket : buckets){
//4.4、获取key
String key = bucket.getKeyAsString();
System.out.println(key);
}
}
@Test
void testSuggest() throws IOException {
//1、准备Request
SearchRequest request = new SearchRequest("hotel");
//2、准备DSL
request.source().suggest(new SuggestBuilder().addSuggestion(
"suggestions",
SuggestBuilders.completionSuggestion("suggestion")
.prefix("h")
.skipDuplicates(true)
.size(10)
));
//3、发送请求
SearchResponse response = client.search(request, RequestOptions.DEFAULT);
//4、解析结果
Suggest suggest = response.getSuggest();
//4.1、根据补全查询名称,获取补全结果
CompletionSuggestion suggestions = suggest.getSuggestion("suggestions");
//4.2、获取options
List<CompletionSuggestion.Entry.Option> options = suggestions.getOptions();
//4.3、遍历
for(CompletionSuggestion.Entry.Option option : options) {
String text = option.getText().toString();
System.out.println(text);
}
}
private void handleResponse(SearchResponse response) {
//4、解析响应
SearchHits searchHits = response.getHits();
//4.1获取总条数
long total = searchHits.getTotalHits().value;
System.out.println("共搜索到" + total + "条数据");
//4.2文档数组
SearchHit[] hits = searchHits.getHits();
//4.3遍历
for (SearchHit hit : hits) {
//获取文档source
String json = hit.getSourceAsString();
//反序列化
HotelDoc hotelDoc = JSON.parseObject(json, HotelDoc.class);
//获取高亮结果
Map<String, HighlightField> highlightFields = hit.getHighlightFields();
//判断是否有高亮结果
if(!CollectionUtils.isEmpty(highlightFields)){
//根据字段名获取高亮结果
HighlightField highlightField = highlightFields.get("name");
//判断是否有字段名高亮
if(highlightField != null){
//获取高亮值
String name = highlightField.getFragments()[0].string();
//覆盖非高亮结果
hotelDoc.setName(name);
}
}
System.out.println("hotelDoc = " + hotelDoc);
}
System.out.println(response);
}
@BeforeEach
void setUp() {
this.client = new RestHighLevelClient(RestClient.builder(
HttpHost.create("http://192.168.80.130:9200")
));
}
@AfterEach
void tearDown() throws IOException {
this.client.close();
}
}
可以看到,只查询h,就能查到h开头的数据
实现搜索框自动补全
HotelController.java
@GetMapping("suggestion")
public List<String> getSuggestions(@RequestParam("key")String prefix){
return hotelService.getSuggestions(prefix);
}
package cn.itcast.hotel.web;
import cn.itcast.hotel.pojo.PageResult;
import cn.itcast.hotel.pojo.RequestParams;
import cn.itcast.hotel.service.IHotelService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;
import java.util.List;
import java.util.Map;
@RestController
@RequestMapping("/hotel")
public class HotelController {
@Autowired
private IHotelService hotelService;
@PostMapping("/list")
public PageResult search(@RequestBody RequestParams params){
return hotelService.search(params);
}
@PostMapping("filters")
public Map<String, List<String>> getFilters(@RequestBody RequestParams params){
return hotelService.filters(params);
}
@GetMapping("suggestion")
public List<String> getSuggestions(@RequestParam("key")String prefix){
return hotelService.getSuggestions(prefix);
}
}
IHotelService.java
List<String> getSuggestions(String prefix);
package cn.itcast.hotel.service;
import cn.itcast.hotel.pojo.Hotel;
import cn.itcast.hotel.pojo.PageResult;
import cn.itcast.hotel.pojo.RequestParams;
import com.baomidou.mybatisplus.extension.service.IService;
import java.util.List;
import java.util.Map;
public interface IHotelService extends IService<Hotel> {
PageResult search(RequestParams params);
Map<String, List<String>> filters(RequestParams params);
List<String> getSuggestions(String prefix);
}
HotelService.java
@Override public List<String> getSuggestions(String prefix) { try { //1、准备Request SearchRequest request = new SearchRequest("hotel"); //2、准备DSL request.source().suggest(new SuggestBuilder().addSuggestion( "suggestions", SuggestBuilders.completionSuggestion("suggestion") .prefix(prefix) .skipDuplicates(true) .size(10) )); //3、发送请求 SearchResponse response = client.search(request, RequestOptions.DEFAULT); //4、解析结果 Suggest suggest = response.getSuggest(); //4.1、根据补全查询名称,获取补全结果 CompletionSuggestion suggestions = suggest.getSuggestion("suggestions"); //4.2、获取options List<CompletionSuggestion.Entry.Option> options = suggestions.getOptions(); //4.3、遍历 List<String> list = new ArrayList<>(options.size()); for(CompletionSuggestion.Entry.Option option : options) { String text = option.getText().toString(); list.add(text); } return list; } catch (IOException e) { throw new RuntimeException(e); } }
package cn.itcast.hotel.service.impl;
import cn.itcast.hotel.mapper.HotelMapper;
import cn.itcast.hotel.pojo.Hotel;
import cn.itcast.hotel.pojo.HotelDoc;
import cn.itcast.hotel.pojo.PageResult;
import cn.itcast.hotel.pojo.RequestParams;
import cn.itcast.hotel.service.IHotelService;
import com.alibaba.fastjson.JSON;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import org.apache.lucene.queries.function.FunctionScoreQuery;
import org.apache.lucene.search.vectorhighlight.ScoreOrderFragmentsBuilder;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.geo.GeoPoint;
import org.elasticsearch.common.unit.DistanceUnit;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.query.functionscore.FunctionScoreQueryBuilder;
import org.elasticsearch.index.query.functionscore.ScoreFunctionBuilder;
import org.elasticsearch.index.query.functionscore.ScoreFunctionBuilders;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.aggregations.Aggregations;
import org.elasticsearch.search.aggregations.bucket.terms.Terms;
import org.elasticsearch.search.fetch.subphase.highlight.HighlightBuilder;
import org.elasticsearch.search.fetch.subphase.highlight.HighlightField;
import org.elasticsearch.search.sort.SortBuilders;
import org.elasticsearch.search.sort.SortOrder;
import org.elasticsearch.search.suggest.Suggest;
import org.elasticsearch.search.suggest.SuggestBuilder;
import org.elasticsearch.search.suggest.SuggestBuilders;
import org.elasticsearch.search.suggest.completion.CompletionSuggestion;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@Service
public class HotelService extends ServiceImpl<HotelMapper, Hotel> implements IHotelService {
@Autowired
private RestHighLevelClient client;
@Override
public PageResult search(RequestParams params) {
try {
//1、准备Request
SearchRequest request = new SearchRequest("hotel");
//2、准备DSL
//2.1、query
buildBasicQuery(params,request);
//2.2、分页
int page = params.getPage();
int size = params.getSize();
request.source().from((page - 1) * size).size(size);
//2.3、排序
String location = params.getLocation();
if(location != null && !"".equals(location)){
request.source().sort(SortBuilders
.geoDistanceSort("location",new GeoPoint(location))
.order(SortOrder.ASC)
.unit(DistanceUnit.KILOMETERS)
);
}
//3、发送请求
SearchResponse response = client.search(request, RequestOptions.DEFAULT);
//4、解析响应
return handleResponse(response);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
@Override
public Map<String, List<String>> filters(RequestParams params) {
try {
//1、准备Request
SearchRequest request = new SearchRequest("hotel");
//2、准备DSL
//2.1、query
buildBasicQuery(params, request);
//2.2、设置size
request.source().size(0);
//2.3、聚合
buildAggregation(request);
//3、发送请求
SearchResponse response = client.search(request, RequestOptions.DEFAULT);
//4、解析响应
Map<String, List<String>> result = new HashMap<>();
Aggregations aggregations = response.getAggregations();
//4.1、根据品牌名称,获取品牌结果
List<String> brandList = getAggByName(aggregations,"brandAgg");
result.put("brand",brandList);
//4.1、根据城市名称,获取品牌结果
List<String> cityList = getAggByName(aggregations,"cityAgg");
result.put("city",cityList);
//4.1、根据星级名称,获取品牌结果
List<String> starList = getAggByName(aggregations,"starAgg");
result.put("starName",starList);
return result;
} catch (IOException e) {
throw new RuntimeException(e);
}
}
@Override
public List<String> getSuggestions(String prefix) {
try {
//1、准备Request
SearchRequest request = new SearchRequest("hotel");
//2、准备DSL
request.source().suggest(new SuggestBuilder().addSuggestion(
"suggestions",
SuggestBuilders.completionSuggestion("suggestion")
.prefix(prefix)
.skipDuplicates(true)
.size(10)
));
//3、发送请求
SearchResponse response = client.search(request, RequestOptions.DEFAULT);
//4、解析结果
Suggest suggest = response.getSuggest();
//4.1、根据补全查询名称,获取补全结果
CompletionSuggestion suggestions = suggest.getSuggestion("suggestions");
//4.2、获取options
List<CompletionSuggestion.Entry.Option> options = suggestions.getOptions();
//4.3、遍历
List<String> list = new ArrayList<>(options.size());
for(CompletionSuggestion.Entry.Option option : options) {
String text = option.getText().toString();
list.add(text);
}
return list;
} catch (IOException e) {
throw new RuntimeException(e);
}
}
private List<String> getAggByName(Aggregations aggregations, String aggName) {
//4.1、根据聚合名称获聚合结果
Terms brandTerms = aggregations.get(aggName);
//4.2、获取buckets
List<? extends Terms.Bucket> buckets = brandTerms.getBuckets();
//4.3、遍历
List<String> brandList = new ArrayList<>();
for(Terms.Bucket bucket : buckets){
//4.4、获取key
String key = bucket.getKeyAsString();
brandList.add(key);
}
return brandList;
}
private void buildAggregation(SearchRequest request) {
request.source().aggregation(AggregationBuilders
.terms("brandAgg")
.field("brand")
.size(100)
);
request.source().aggregation(AggregationBuilders
.terms("cityAgg")
.field("city")
.size(100)
);
request.source().aggregation(AggregationBuilders
.terms("starAgg")
.field("starName")
.size(100)
);
}
private void buildBasicQuery(RequestParams params, SearchRequest request) {
//构建BooleanQuery
BoolQueryBuilder boolQuery = QueryBuilders.boolQuery();
//关键字搜索
String key = params.getKey();
if(key == null || "".equals(key)){
boolQuery.must(QueryBuilders.matchAllQuery());
}else{
boolQuery.must(QueryBuilders.matchQuery("all",key));
}
//城市条件
if(params.getCity() != null && !"".equals(params.getCity())){
boolQuery.filter(QueryBuilders.termQuery("city", params.getCity()));
}
//品牌条件
if(params.getBrand() != null && !"".equals(params.getBrand())){
boolQuery.filter(QueryBuilders.termQuery("brand", params.getBrand()));
}
//星级条件
if(params.getStarName() != null && !"".equals(params.getStarName())){
boolQuery.filter(QueryBuilders.termQuery("starName", params.getStarName()));
}
//价格
if(params.getMinPrice() != null && params.getMaxPrice() != null){
boolQuery.filter(QueryBuilders.rangeQuery("price").gte(params.getMinPrice()).lte(params.getMaxPrice()));
}
request.source().query(functionScoreQueryBuilder);
}
private PageResult handleResponse(SearchResponse response) {
//4、解析响应
SearchHits searchHits = response.getHits();
//4.1获取总条数
long total = searchHits.getTotalHits().value;
//4.2文档数组
SearchHit[] hits = searchHits.getHits();
//4.3遍历
List<HotelDoc> hotels = new ArrayList<>();
for (SearchHit hit : hits) {
//获取文档source
String json = hit.getSourceAsString();
//反序列化
HotelDoc hotelDoc = JSON.parseObject(json, HotelDoc.class);
//获取距离
Object[] sortValues = hit.getSortValues();
if(sortValues.length > 0){
Object sortValue = sortValues[0];
hotelDoc.setDistance(sortValue);
}
hotels.add(hotelDoc);
}
//4、封装返回
return new PageResult(total,hotels);
}
}
在搜索框输入x,可以看到出现自动补全
数据同步
数据同步问题分析
elasticsearch中的酒店数据来自于mysql数据库,因此mysql数据发生改变时,elasticsearch也必须跟着改变,这个就是elasticsearch与mysql之间的数据同步。
方案一:同步调用
方案二:异步调用
方案三:监听binlog
导入Hotel-admin
Hotel-adminhttps://pan.baidu.com/s/1lS-PUGaqUWdHEeGHkNrR0w?pwd=5vxx
声明队列和交换机
在hotel-demo的pom文件
<!--amqp--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.3.10.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>cn.itcast.hotel</groupId>
<artifactId>hotel-admin</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>hotel-admin</name>
<description>Demo project for Spring Boot</description>
<properties>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>com.baomidou</groupId>
<artifactId>mybatis-plus-boot-starter</artifactId>
<version>3.4.2</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.junit.vintage</groupId>
<artifactId>junit-vintage-engine</artifactId>
</exclusion>
</exclusions>
</dependency>
<!--amqp-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<configuration>
<excludes>
<exclude>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</exclude>
</excludes>
</configuration>
</plugin>
</plugins>
</build>
</project>
在hotel-demo的application.yaml文件
rabbitmq: host: xxx.xxx.xxx.xxx port: 5672 username: itcast password: 123321 virtual-host: /
server:
port: 8099
spring:
datasource:
url: jdbc:mysql://localhost:3306/heima?useSSL=false
username: root
password: 1234
driver-class-name: com.mysql.jdbc.Driver
rabbitmq:
host: xxx.xxx.xxx.xxx
port: 5672
username: itcast
password: 123321
virtual-host: /
logging:
level:
cn.itcast: debug
pattern:
dateformat: MM-dd HH:mm:ss:SSS
mybatis-plus:
configuration:
map-underscore-to-camel-case: true
type-aliases-package: cn.itcast.hotel.pojo
在hotel-demo的constants包创建MqConstants.java
package cn.itcast.hotel.constants;
public class MqConstants {
//交换机
public final static String HOTEL_EXCHANGE = "hotel.topic";
//监听新增和修改的队列
public final static String HOTEL_INSERT_QUEUE = "hotel.insert.queue";
//监听删除的队列
public final static String HOTEL_DELETE_QUEUE = "hotel.delete.queue";
//新增或修改的RoutingKey
public final static String HOTEL_INSERT_KEY = "hotel.insert";
//删除的RoutingKey
public final static String HOTEL_DELETE_KEY = "hotel.delete";
}
在hotel-demo创建config包创建MqConfig.java
package cn.itcast.hotel.config;
import cn.itcast.hotel.constants.MqConstants;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class MqConfig {
@Bean
public TopicExchange topicExchange(){
return new TopicExchange(MqConstants.HOTEL_EXCHANGE,true,true);
}
@Bean
public Queue insertQueue(){
return new Queue(MqConstants.HOTEL_INSERT_QUEUE,true);
}
@Bean
public Queue deleteQueue(){
return new Queue(MqConstants.HOTEL_DELETE_QUEUE,true);
}
@Bean
public Binding insertQueueBinding(){
return BindingBuilder.bind(insertQueue()).to(topicExchange()).with(MqConstants.HOTEL_INSERT_KEY);
}
@Bean
public Binding deleteQueueBinding(){
return BindingBuilder.bind(deleteQueue()).to(topicExchange()).with(MqConstants.HOTEL_DELETE_KEY);
}
}
发送mq消息
在hotel-admin创建constants包,并把hotel-demo的MqConstants.java复制过来
package cn.itcast.hotel.constants;
public class MqConstants {
//交换机
public final static String HOTEL_EXCHANGE = "hotel.topic";
//监听新增和修改的队列
public final static String HOTEL_INSERT_QUEUE = "hotel.insert.queue";
//监听删除的队列
public final static String HOTEL_DELETE_QUEUE = "hotel.delete.queue";
//新增或修改的RoutingKey
public final static String HOTEL_INSERT_KEY = "hotel.insert";
//删除的RoutingKey
public final static String HOTEL_DELETE_KEY = "hotel.delete";
}
在hotel-admin的pom文件
<!--amqp--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.3.10.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>cn.itcast.hotel</groupId>
<artifactId>hotel-admin</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>hotel-admin</name>
<description>Demo project for Spring Boot</description>
<properties>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>com.baomidou</groupId>
<artifactId>mybatis-plus-boot-starter</artifactId>
<version>3.4.2</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.junit.vintage</groupId>
<artifactId>junit-vintage-engine</artifactId>
</exclusion>
</exclusions>
</dependency>
<!--amqp-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<configuration>
<excludes>
<exclude>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</exclude>
</excludes>
</configuration>
</plugin>
</plugins>
</build>
</project>
在hotel-admin的application.yaml文件
rabbitmq: host: xxx.xxx.xxx.xxx port: 5672 username: itcast password: 123321 virtual-host: /
server:
port: 8099
spring:
datasource:
url: jdbc:mysql://localhost:3306/heima?useSSL=false
username: root
password: 1234
driver-class-name: com.mysql.jdbc.Driver
rabbitmq:
host: xxx.xxx.xxx.xxx
port: 5672
username: itcast
password: 123321
virtual-host: /
logging:
level:
cn.itcast: debug
pattern:
dateformat: MM-dd HH:mm:ss:SSS
mybatis-plus:
configuration:
map-underscore-to-camel-case: true
type-aliases-package: cn.itcast.hotel.pojo
在hotel-admin的HotelController.java
@PostMapping public void saveHotel(@RequestBody Hotel hotel){ hotelService.save(hotel); rabbitTemplate.convertAndSend(MqConstants.HOTEL_EXCHANGE,MqConstants.HOTEL_INSERT_KEY,hotel.getId()); } @PutMapping() public void updateById(@RequestBody Hotel hotel){ if (hotel.getId() == null) { throw new InvalidParameterException("id不能为空"); } hotelService.updateById(hotel); rabbitTemplate.convertAndSend(MqConstants.HOTEL_EXCHANGE,MqConstants.HOTEL_INSERT_KEY,hotel.getId()); } @DeleteMapping("/{id}") public void deleteById(@PathVariable("id") Long id) { hotelService.removeById(id); rabbitTemplate.convertAndSend(MqConstants.HOTEL_EXCHANGE,MqConstants.HOTEL_DELETE_KEY,id); }
package cn.itcast.hotel.web;
import cn.itcast.hotel.constants.MqConstants;
import cn.itcast.hotel.pojo.Hotel;
import cn.itcast.hotel.pojo.PageResult;
import cn.itcast.hotel.service.IHotelService;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;
import java.security.InvalidParameterException;
@RestController
@RequestMapping("hotel")
public class HotelController {
@Autowired
private IHotelService hotelService;
@Autowired
private RabbitTemplate rabbitTemplate;
@GetMapping("/{id}")
public Hotel queryById(@PathVariable("id") Long id){
return hotelService.getById(id);
}
@GetMapping("/list")
public PageResult hotelList(
@RequestParam(value = "page", defaultValue = "1") Integer page,
@RequestParam(value = "size", defaultValue = "1") Integer size
){
Page<Hotel> result = hotelService.page(new Page<>(page, size));
return new PageResult(result.getTotal(), result.getRecords());
}
@PostMapping
public void saveHotel(@RequestBody Hotel hotel){
hotelService.save(hotel);
rabbitTemplate.convertAndSend(MqConstants.HOTEL_EXCHANGE,MqConstants.HOTEL_INSERT_KEY,hotel.getId());
}
@PutMapping()
public void updateById(@RequestBody Hotel hotel){
if (hotel.getId() == null) {
throw new InvalidParameterException("id不能为空");
}
hotelService.updateById(hotel);
rabbitTemplate.convertAndSend(MqConstants.HOTEL_EXCHANGE,MqConstants.HOTEL_INSERT_KEY,hotel.getId());
}
@DeleteMapping("/{id}")
public void deleteById(@PathVariable("id") Long id) {
hotelService.removeById(id);
rabbitTemplate.convertAndSend(MqConstants.HOTEL_EXCHANGE,MqConstants.HOTEL_DELETE_KEY,id);
}
}
监听MQ消息
在hotel-demo创建mq包创建HotelListener.java
//监听酒店新增和修改的业务 @RabbitListener(queues = MqConstants.HOTEL_INSERT_QUEUE) public void listenHotelInsertOrUpdate(Long id){ hotelService.insertById(id); } //监听酒店删除的业务 @RabbitListener(queues = MqConstants.HOTEL_DELETE_QUEUE) public void listenHotelDelete(Long id){ hotelService.deleteById(id); }
package cn.itcast.hotel.mq;
import cn.itcast.hotel.constants.MqConstants;
import cn.itcast.hotel.service.IHotelService;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
public class HotelListener {
@Autowired
private IHotelService hotelService;
//监听酒店新增和修改的业务
@RabbitListener(queues = MqConstants.HOTEL_INSERT_QUEUE)
public void listenHotelInsertOrUpdate(Long id){
hotelService.insertById(id);
}
//监听酒店删除的业务
@RabbitListener(queues = MqConstants.HOTEL_DELETE_QUEUE)
public void listenHotelDelete(Long id){
hotelService.deleteById(id);
}
}
hotel-demo的IHotelService.java
void deleteById(Long id); void insertById(Long id);
package cn.itcast.hotel.service;
import cn.itcast.hotel.pojo.Hotel;
import cn.itcast.hotel.pojo.PageResult;
import cn.itcast.hotel.pojo.RequestParams;
import com.baomidou.mybatisplus.extension.service.IService;
import java.util.List;
import java.util.Map;
public interface IHotelService extends IService<Hotel> {
PageResult search(RequestParams params);
Map<String, List<String>> filters(RequestParams params);
List<String> getSuggestions(String prefix);
void deleteById(Long id);
void insertById(Long id);
}
hotel-demo的HotelService.java
@Override public void deleteById(Long id) { try { //1、准备Request DeleteRequest request = new DeleteRequest("hotel", id.toString()); //2、发送请求 client.delete(request,RequestOptions.DEFAULT); } catch (IOException e) { throw new RuntimeException(e); } } @Override public void insertById(Long id) { try { //0、根据id查询酒店数据 Hotel hotel = getById(id); //转换成文档类型 HotelDoc hotelDoc = new HotelDoc(hotel); //1、准备Request对象 IndexRequest request = new IndexRequest("hotel").id(hotel.getId().toString()); //2、准备json文档 request.source(JSON.toJSONString(hotelDoc), XContentType.JSON); //3、发送请求 client.index(request,RequestOptions.DEFAULT); } catch (IOException e) { throw new RuntimeException(e); } }
package cn.itcast.hotel.service.impl;
import cn.itcast.hotel.mapper.HotelMapper;
import cn.itcast.hotel.pojo.Hotel;
import cn.itcast.hotel.pojo.HotelDoc;
import cn.itcast.hotel.pojo.PageResult;
import cn.itcast.hotel.pojo.RequestParams;
import cn.itcast.hotel.service.IHotelService;
import com.alibaba.fastjson.JSON;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import org.apache.lucene.queries.function.FunctionScoreQuery;
import org.apache.lucene.search.vectorhighlight.ScoreOrderFragmentsBuilder;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.geo.GeoPoint;
import org.elasticsearch.common.unit.DistanceUnit;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.query.functionscore.FunctionScoreQueryBuilder;
import org.elasticsearch.index.query.functionscore.ScoreFunctionBuilder;
import org.elasticsearch.index.query.functionscore.ScoreFunctionBuilders;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.aggregations.Aggregations;
import org.elasticsearch.search.aggregations.bucket.terms.Terms;
import org.elasticsearch.search.fetch.subphase.highlight.HighlightBuilder;
import org.elasticsearch.search.fetch.subphase.highlight.HighlightField;
import org.elasticsearch.search.sort.SortBuilders;
import org.elasticsearch.search.sort.SortOrder;
import org.elasticsearch.search.suggest.Suggest;
import org.elasticsearch.search.suggest.SuggestBuilder;
import org.elasticsearch.search.suggest.SuggestBuilders;
import org.elasticsearch.search.suggest.completion.CompletionSuggestion;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@Service
public class HotelService extends ServiceImpl<HotelMapper, Hotel> implements IHotelService {
@Autowired
private RestHighLevelClient client;
@Override
public PageResult search(RequestParams params) {
try {
//1、准备Request
SearchRequest request = new SearchRequest("hotel");
//2、准备DSL
//2.1、query
buildBasicQuery(params,request);
//2.2、分页
int page = params.getPage();
int size = params.getSize();
request.source().from((page - 1) * size).size(size);
//2.3、排序
String location = params.getLocation();
if(location != null && !"".equals(location)){
request.source().sort(SortBuilders
.geoDistanceSort("location",new GeoPoint(location))
.order(SortOrder.ASC)
.unit(DistanceUnit.KILOMETERS)
);
}
//3、发送请求
SearchResponse response = client.search(request, RequestOptions.DEFAULT);
//4、解析响应
return handleResponse(response);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
@Override
public Map<String, List<String>> filters(RequestParams params) {
try {
//1、准备Request
SearchRequest request = new SearchRequest("hotel");
//2、准备DSL
//2.1、query
buildBasicQuery(params, request);
//2.2、设置size
request.source().size(0);
//2.3、聚合
buildAggregation(request);
//3、发送请求
SearchResponse response = client.search(request, RequestOptions.DEFAULT);
//4、解析响应
Map<String, List<String>> result = new HashMap<>();
Aggregations aggregations = response.getAggregations();
//4.1、根据品牌名称,获取品牌结果
List<String> brandList = getAggByName(aggregations,"brandAgg");
result.put("brand",brandList);
//4.1、根据城市名称,获取品牌结果
List<String> cityList = getAggByName(aggregations,"cityAgg");
result.put("city",cityList);
//4.1、根据星级名称,获取品牌结果
List<String> starList = getAggByName(aggregations,"starAgg");
result.put("starName",starList);
return result;
} catch (IOException e) {
throw new RuntimeException(e);
}
}
@Override
public List<String> getSuggestions(String prefix) {
try {
//1、准备Request
SearchRequest request = new SearchRequest("hotel");
//2、准备DSL
request.source().suggest(new SuggestBuilder().addSuggestion(
"suggestions",
SuggestBuilders.completionSuggestion("suggestion")
.prefix(prefix)
.skipDuplicates(true)
.size(10)
));
//3、发送请求
SearchResponse response = client.search(request, RequestOptions.DEFAULT);
//4、解析结果
Suggest suggest = response.getSuggest();
//4.1、根据补全查询名称,获取补全结果
CompletionSuggestion suggestions = suggest.getSuggestion("suggestions");
//4.2、获取options
List<CompletionSuggestion.Entry.Option> options = suggestions.getOptions();
//4.3、遍历
List<String> list = new ArrayList<>(options.size());
for(CompletionSuggestion.Entry.Option option : options) {
String text = option.getText().toString();
list.add(text);
}
return list;
} catch (IOException e) {
throw new RuntimeException(e);
}
}
@Override
public void deleteById(Long id) {
try {
//1、准备Request
DeleteRequest request = new DeleteRequest("hotel", id.toString());
//2、发送请求
client.delete(request,RequestOptions.DEFAULT);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
@Override
public void insertById(Long id) {
try {
//0、根据id查询酒店数据
Hotel hotel = getById(id);
//转换成文档类型
HotelDoc hotelDoc = new HotelDoc(hotel);
//1、准备Request对象
IndexRequest request = new IndexRequest("hotel").id(hotel.getId().toString());
//2、准备json文档
request.source(JSON.toJSONString(hotelDoc), XContentType.JSON);
//3、发送请求
client.index(request,RequestOptions.DEFAULT);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
private List<String> getAggByName(Aggregations aggregations, String aggName) {
//4.1、根据聚合名称获聚合结果
Terms brandTerms = aggregations.get(aggName);
//4.2、获取buckets
List<? extends Terms.Bucket> buckets = brandTerms.getBuckets();
//4.3、遍历
List<String> brandList = new ArrayList<>();
for(Terms.Bucket bucket : buckets){
//4.4、获取key
String key = bucket.getKeyAsString();
brandList.add(key);
}
return brandList;
}
private void buildAggregation(SearchRequest request) {
request.source().aggregation(AggregationBuilders
.terms("brandAgg")
.field("brand")
.size(100)
);
request.source().aggregation(AggregationBuilders
.terms("cityAgg")
.field("city")
.size(100)
);
request.source().aggregation(AggregationBuilders
.terms("starAgg")
.field("starName")
.size(100)
);
}
private void buildBasicQuery(RequestParams params, SearchRequest request) {
//构建BooleanQuery
BoolQueryBuilder boolQuery = QueryBuilders.boolQuery();
//关键字搜索
String key = params.getKey();
if(key == null || "".equals(key)){
boolQuery.must(QueryBuilders.matchAllQuery());
}else{
boolQuery.must(QueryBuilders.matchQuery("all",key));
}
//城市条件
if(params.getCity() != null && !"".equals(params.getCity())){
boolQuery.filter(QueryBuilders.termQuery("city", params.getCity()));
}
//品牌条件
if(params.getBrand() != null && !"".equals(params.getBrand())){
boolQuery.filter(QueryBuilders.termQuery("brand", params.getBrand()));
}
//星级条件
if(params.getStarName() != null && !"".equals(params.getStarName())){
boolQuery.filter(QueryBuilders.termQuery("starName", params.getStarName()));
}
//价格
if(params.getMinPrice() != null && params.getMaxPrice() != null){
boolQuery.filter(QueryBuilders.rangeQuery("price").gte(params.getMinPrice()).lte(params.getMaxPrice()));
}
//算分控制
FunctionScoreQueryBuilder functionScoreQueryBuilder =
//构建Score
QueryBuilders.functionScoreQuery(
//原始查询,相关性算分
boolQuery,
//function score的数组
new FunctionScoreQueryBuilder.FilterFunctionBuilder[]{
//其中一个function score 元素
new FunctionScoreQueryBuilder.FilterFunctionBuilder(
//过滤条件
QueryBuilders.termQuery("isAD",true),
//算分函数
ScoreFunctionBuilders.weightFactorFunction(10)
)
});
request.source().query(functionScoreQueryBuilder);
}
private PageResult handleResponse(SearchResponse response) {
//4、解析响应
SearchHits searchHits = response.getHits();
//4.1获取总条数
long total = searchHits.getTotalHits().value;
//4.2文档数组
SearchHit[] hits = searchHits.getHits();
//4.3遍历
List<HotelDoc> hotels = new ArrayList<>();
for (SearchHit hit : hits) {
//获取文档source
String json = hit.getSourceAsString();
//反序列化
HotelDoc hotelDoc = JSON.parseObject(json, HotelDoc.class);
//获取距离
Object[] sortValues = hit.getSortValues();
if(sortValues.length > 0){
Object sortValue = sortValues[0];
hotelDoc.setDistance(sortValue);
}
hotels.add(hotelDoc);
}
//4、封装返回
return new PageResult(total,hotels);
}
}
测试同步功能
运行hotel-demo和hotel-admin
在RabbitMQ中可以看到两个新的队列
交换机也有一个新的
在交换机里可以看到两个队列
在8099端口中,删除一个数据
在RabbitMQ可以看到一条消息
并且在8089端口该数据确实找不到了
es集群
集群结果介绍
单机的elasticsearch做数据存储,必然面临两个问题:海量数据存储问题、单点故障问题。
1、海量数据存储问题:将索引库从逻辑上拆分为N个分片(shard),存储到多个节点
2、单点故障问题:将分片数据在不同节点备份(replica)
搭建集群
把该文件放到/root里
decker-composehttps://pan.baidu.com/s/1TJZz4GSoy6CA5DbC3IeL-w?pwd=a87u
运行指令
vi /etc/sysctl.conf
在最后一行后面加上一条
vm.max_map_count=262144
运行指令,若能看到刚刚输入的指令,则说明成功
sysctl -p
运行指令,可以运行三个新的容器。注意一定要先关闭以前运行的es,否则会报错
docker-compose up -d
解压该文件
cerebrohttps://pan.baidu.com/s/1GlX4vsf1RQj1ocYufPDx5A?pwd=p0g9
运行lesson的cerebro-0.9.4的bin的cerebro.bat文件。若闪退,则编辑该文件。加上这两行。
注意D:\jdk\jdk8是你的jdk文件位置。闪退的原因是你的jdk等级过高,请下载jdk8并把这个位置写上你的jdk8的文件位置
访问localhost:9000。这里输入虚拟机ip加端口9200
可以看到集群中的节点信息。实心五角星是主节点,其他是候选节点
点击此处,创建索引库
name是索引库名字,number of shards是几个片,number of replicas是几个备份。最后点create创建
回到主页,可以看到每个片都有两个,因为每个片都有一个,而且我设置了一个备份,所以每个片有两个。实线框是主分片,虚线框是副本分片。
可以注意到,每个号码的主分片和副本分片一定不在同一个机器上。确保有机器宕机,不会造成数据故障。
集群职责及脑裂
slasticsearch中集群节点有不同的职责划分,因此建议集群部署时,每个节点都有独立的角色
节点类型 | 配置参数 | 默认值 | 节点职责 |
master eligible | node.master | true | 备选主节点:主节点可以管理和记录集群状态、决定分片在哪个节点、处理创建和删除索引库的请求 |
data | node.data | true | 数据节点:存储数据、搜索、聚合、CRUD |
ingest | node.ingest | true | 数据存储之前的预处理 |
coordinating | 上面3个参数都为false 则为coordinating节点 | 无 | 路由请求到其它节点 合并其它节点处理的结果,返回给用户 |
默认情况下,每个节点都是master eligible节点,因此一旦master节点宕机,其它候选节点会选举一个成为主节点。当主节点与其他节点网络故障时,可能发生脑裂问题。
为了避免脑裂,需要要求选票超过( eligible节点数量+1 )/2才能当选为主,因此eligible节点数量最好是奇数。对应配置项是discovery.zen.minimum_master_nodes,在es7.0以后,已经成为默认配置,因此一般不会发生脑裂问题
ES集群的分布式存储
elasticsearch会通过hash算法来计算文档应该存储到哪个分片:
公式:
shard = hash(_routing)% number_of_shards
说明:
1、_routing默认是文档的id
2、算法与分片数量有关,因此索引库一旦创建,分片数量不能修改!
ES集群的分布式查询
elasticsearch的查询分成两个阶段:
1、scatter phase:分散阶段,coordinating node会把请求分发到每一个分片
2、gather phase:聚集阶段,coordinating node汇总data node的搜索结果,并处理为最终结果集返回给用户
ES集群的故障转移
集群的master节点会监控集群中的节点状态,如果发现有节点宕机,会立即将宕机节点的分片数据迁移到其它节点,确保数据安全,这个叫做故障转移。
当我停掉es01后,es01的分片分配给了其他的机器
当我重启es后,es01的分片又回来了
数据并未发生故障
上一篇:DSL查询语法和RestClient查询文档