ES的使用(Elasticsearch)
es是什么?
es是非关系型数据库,是分布式文档数据库,本质上是一个JSON 文本
为什么要用es?
搜索速度快,近乎是实时的存储、检索数据
怎么使用es?
1.下载es的包(环境要是jdk1.8及以上)(我的资源中有)
2.下载es的可视化界面包(我的资源中有)
3.java编写es的工具类
es与关系型数据库对比
1.下载es的包,解压,运行bat文件(windows)
下载地址:es官网下载地址
elasticsearch.yml配置允许跨域
http.cors.enabled: true
http.cors.allow-origin: "*"
2.下载es的可视化界面包,解压,使用命令npm run start
下载地址:elasticsearch-head-master es可视化工具
打开http:localhost:9100
3.java编写es的工具类
引入es的依赖包
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-high-level-client</artifactId>
<version>6.2.4</version>
</dependency>
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-client</artifactId>
<version>6.2.4</version>
</dependency>
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch</artifactId>
<version>6.2.4</version>
</dependency>
package com.next.service;
import lombok.extern.slf4j.Slf4j;
import org.apache.http.HttpHost;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.message.BasicHeader;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.get.MultiGetRequest;
import org.elasticsearch.action.get.MultiGetResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.action.update.UpdateResponse;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;
import org.elasticsearch.client.RestHighLevelClient;
import org.springframework.context.ApplicationListener;
import org.springframework.context.event.ContextRefreshedEvent;
import org.springframework.stereotype.Service;
@Service
@Slf4j
public class ESClient implements ApplicationListener<ContextRefreshedEvent> {
private final static int CONNECT_TIMEOUT = 100;
private final static int SOCKET_TIMEOUT = 60 * 1000;
private final static int REQUEST_TIMEOUT = SOCKET_TIMEOUT;
private RestHighLevelClient restHighLevelClient; //JDK8及以上
private BasicHeader[] basicHeaders;
@Override
public void onApplicationEvent(ContextRefreshedEvent contextRefreshedEvent) {
try {
initClient();
} catch (Exception e) {
log.error("es client init exception", e);
try {
Thread.sleep(1000);
} catch (Exception e1) {
}
initClient();
}
}
private void initClient() {
log.info("es client init start");
//请求头时允许的格式
basicHeaders = new BasicHeader[]{
new BasicHeader("Accept", "application/json;charset=UTF-8")};
//es客户端连接设置初始化
RestClientBuilder builder = RestClient.builder(new HttpHost("127.0.0.1", 9200, "http"));
builder.setDefaultHeaders(basicHeaders)
//设置相关超时间配置
.setRequestConfigCallback((RequestConfig.Builder configBuilder) -> {
configBuilder.setConnectTimeout(CONNECT_TIMEOUT);
configBuilder.setSocketTimeout(SOCKET_TIMEOUT);
configBuilder.setConnectionRequestTimeout(REQUEST_TIMEOUT);
return configBuilder;
});
restHighLevelClient = new RestHighLevelClient(builder);
log.info("es client init end");
}
//es新增操作
public IndexResponse index(IndexRequest indexRequest) throws Exception {
try {
return restHighLevelClient.index(indexRequest);
} catch (Exception e) {
log.error("es.index exception,indexRequest:{}", indexRequest, e);
throw e;
}
}
//更新操作
public UpdateResponse update(UpdateRequest updateRequest) throws Exception {
try {
return restHighLevelClient.update(updateRequest, basicHeaders);
} catch (Exception e) {
log.error("es.update exception,updateRequest:{}", updateRequest, e);
throw e;
}
}
//查询
public GetResponse get(GetRequest getRequest) throws Exception {
try {
return restHighLevelClient.get(getRequest, basicHeaders);
} catch (Exception e) {
log.error("es.get exception,updateRequest:{}", getRequest, e);
throw e;
}
}
//多个查询请求放在一起查
public MultiGetResponse multiGet(MultiGetRequest multiGetRequest) throws Exception {
try {
return restHighLevelClient.multiGet(multiGetRequest);
} catch (Exception e) {
log.error("es.multiGet exception,getRequest:{}", multiGetRequest, e);
throw e;
}
}
/**
* @desc 批量更新
*/
public BulkResponse bulk(BulkRequest bulkRequest) throws Exception {
try {
return restHighLevelClient.bulk(bulkRequest,basicHeaders);
} catch (Exception e) {
log.error("es.multiGet exception,bulkRequest:{}", bulkRequest, e);
throw e;
}
}
}
es启动
4.使用例子:
将车次信息存到es中,方便用户查询(从此地到目的地有哪些车可以乘坐)
package com.next.service;
import com.alibaba.google.common.base.Splitter;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.next.common.TrainEsConstant;
import com.next.dao.TrainNumberDetailMapper;
import com.next.dao.TrainNumberMapper;
import com.next.model.TrainNumber;
import com.next.model.TrainNumberDetail;
import com.next.util.JsonMapper;
import lombok.extern.slf4j.Slf4j;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.get.*;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.common.util.set.Sets;
import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils;
import javax.annotation.Resource;
import java.util.List;
import java.util.Map;
import java.util.Set;
@Service
@Slf4j
public class TrainNumberService {
@Resource
private TrainNumberMapper trainNumberMapper;
@Resource
private TrainCacheService trainCacheService;
@Resource
private TrainNumberDetailMapper trainNumberDetailMapper;
@Resource
private ESClient esClient;
public void handle(List<CanalEntry.Column> columns, CanalEntry.EventType eventType) throws Exception{
if (eventType != CanalEntry.EventType.UPDATE) {
log.info("not update,no need care");
return;
}
int trainNumberId = 0;
//获取数据库的trainNumberId
for (CanalEntry.Column column : columns) {
if (column.getName().equals("id")) {
trainNumberId = Integer.parseInt(column.getValue());
break;
}
}
TrainNumber trainNumber = trainNumberMapper.selectByPrimaryKey(trainNumberId);
//校验是否有车次
if (null == trainNumber) {
log.error("not found trainNumber,trainNumberId:{}", trainNumberId);
return;
}
List<TrainNumberDetail> detailList = trainNumberDetailMapper.getByTrainNumberId(trainNumberId);
//校验是否有车次详情
if (CollectionUtils.isEmpty(detailList)) {
log.warn("no detail,no need care,trainNumberId:{}", trainNumber.getName());
return;
}
//将数据写入缓存中
trainCacheService.set("TN_" + trainNumber.getName(), JsonMapper.obj2String(detailList));
log.info("trainNumber:{} detailList update redis", trainNumber.getName());
//将数据存入es中
saveES(detailList,trainNumber);
log.info("trainNumber:{} detailList update es", trainNumber.getName());
}
//数据保存到es(客户需要查询的数据放到es--->从此地到目的地有哪些车可以乘坐)
private void saveES(List<TrainNumberDetail> detailList, TrainNumber trainNumber) throws Exception{
/**
* A-B fromStationId- toStationId
* 例:北京到大连有多少趟车?
* 根据车站的开始结束站,去找车次,即根据fromStationId- toStationId获取到 trainNumberId1,trainNumberId2。。。。
* trainNumber: A->B->C
* D386:北京->锦州->大连
* D387:北京->鞍山->大连
*
* 拆分如下
* D386: 北京-锦州 锦州-大连 北京-大连
* D387: 北京-鞍山 鞍山-大连 北京-大连
*/
List<String> list = Lists.newArrayList();
int fromStationId = trainNumber.getFromStationId();
if (detailList.size() == 1) {
//单段
int toStationId = trainNumber.getToStationId();
list.add(fromStationId + "_" + toStationId);
} else {
//多段,枚举所有的车次,要保证多段有序
for (int i = 0; i < detailList.size(); i++) {
//获取开始车站id
int tempFromStationId = detailList.get(i).getFromStationId();
for (int j = i; j < detailList.size(); j++) {
//获取到达车站id
int tempToStationId = detailList.get(j).getToStationId();
list.add(tempFromStationId+"_"+tempToStationId);
}
}
}
//检查数据是否已经存在,存在则不新增,不存在则新增
//★如果是for循环里面的话,要封装成批量操作IO
MultiGetRequest multiGetRequest = new MultiGetRequest();
BulkRequest bulkRequest = new BulkRequest();
for(String item:list){
multiGetRequest.add(new MultiGetRequest.Item(TrainEsConstant.INDEX,TrainEsConstant.TYPE,item));
}
//获取处理后的结果
MultiGetResponse multiGetItemResponses = esClient.multiGet(multiGetRequest);
for(MultiGetItemResponse itemResponse:multiGetItemResponses.getResponses()){
if(itemResponse.isFailed()){
log.error("multiGet item failed,itemResponse:{}",itemResponse);
continue;
}
GetResponse getResponse = itemResponse.getResponse();
if(getResponse == null){
log.error("multiGet item is null,itemResponse:{}",itemResponse);
continue;
}
//存储更新es的数据,新增用source传入数据 更新用doc传入数据
Map<String,Object> dataMap = Maps.newHashMap();
Map<String,Object> map = getResponse.getSourceAsMap();
if(!getResponse.isExists() || map == null){
//add index
dataMap.put(TrainEsConstant.COLUMN_TRAIN_NUMBER,trainNumber.getName());
IndexRequest indexRequest = new IndexRequest(TrainEsConstant.INDEX,TrainEsConstant.TYPE,getResponse.getId()).source(dataMap);
bulkRequest.add(indexRequest);
continue;
}
//里面是车次信息 trainNumberId1,trainNumberId2。。。。,需要拆分
String origin = (String) map.get(TrainEsConstant.COLUMN_TRAIN_NUMBER);
Set<String> set = Sets.newHashSet(Splitter.on(",").trimResults().omitEmptyStrings().split(origin));
if(!set.contains(trainNumber.getName())){
//update index
dataMap.put(TrainEsConstant.COLUMN_TRAIN_NUMBER,origin+","+trainNumber.getName());
UpdateRequest updateRequest = new UpdateRequest(TrainEsConstant.INDEX,TrainEsConstant.TYPE,getResponse.getId()).doc(dataMap);
bulkRequest.add(updateRequest);
}
}
//批量更新es的数据(bulkResponse是批量对象转成string打印日志)
BulkResponse bulkResponse = esClient.bulk(bulkRequest);
log.info("es bulk response:{}",JsonMapper.obj2String(bulkResponse));
if(bulkResponse.hasFailures()){
throw new RuntimeException("es bulk failure");
}
}
}
车次表
车次明细表
修改数据库中车次表的信息会将数据处理后(出发站-到达站 车次号)存入es