在web网站的架设中特别是数据量大的网站或者APP小程序需要搜索或者全文检索的场景,几乎都需要借助ElasticSearch来作为全文检索引擎,以提高网站的搜索效率和性能。
这一节,我们通过一篇文章介绍,使大家通过一文就学会使用ElasticSearch。
一、ElasticSearch介绍:
ElasticSearch是一个基于Lucene的搜索服务器。它提供了一个分布式多用户能力的全文搜索引擎,基于RESTful web接口。Elasticsearch是用Java开发的,并作为Apache许可条款下的开放源码发布,是当前流行的企业级搜索引擎。设计用于云计算中,能够达到实时搜索,稳定,可靠,快速,安装使用方便。
ElasticSearch相关概念:
a)、索引index,相当于数据库中的database。
b)、类型type相当于数据库中的table。
c)、主键id相当于数据库中记录的主键,是唯一的。
d)、文档 document (相当于一条数据)
文档是ElasticSearch的基本单位。在Es中文档以JSON格式来表示
向es中的index下面的type中存储json类型的数据。
e) 、字段是文档中的field 属性,需要对每一个属性定义索引和被搜索的方式
二、ElasticSearch的安装:
1、先安装jdk
2、安装ElasticSearch
直接进入elasticsearch的官网,下载最新的安装包:https://www.elastic.co/downloads/elasticsearch,此教程使用的是5.1.1版本。
将下载的安装包上传到centos,或者直接在centos使用wget命令下载。
解压:
unzip elasticsearch-5.1.1.zip
运行:
cd bin
./elasticsearch
三、java语言操作ElasticSearch:
1、maven依赖
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch</artifactId>
<version>7.6.2</version>
</dependency>
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-high-level-client</artifactId>
<version>7.6.2</version>
</dependency>
2、连接ElasticSearch
import org.apache.http.HttpHost;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestHighLevelClient;
import java.io.IOException;
public class EsClientTest {
public static void main(String[] args) throws IOException {
RestHighLevelClient esClient = new RestHighLevelClient(
RestClient.builder(new HttpHost("IP",9200,"http"))
);
System.out.println("success");
esClient.close();
}
}
3、连接的相关api
public static RestHighLevelClient esClient;
static {
esClient = new RestHighLevelClient(
RestClient.builder(new HttpHost("IP", 9200, "http"))
);
}
4、创建索引操作:
/**
* 创建索引
* @throws IOException
*/
public static void createIndex() throws IOException {
CreateIndexRequest createIndexRequest = new CreateIndexRequest("user");
CreateIndexResponse indexResponse = esClient.indices().create(createIndexRequest, RequestOptions.DEFAULT);
boolean acknowledged = indexResponse.isAcknowledged();
System.out.println("索引创建状态:" + acknowledged);
}
5、获取索引:
/**
* 索引信息查询
* @throws IOException
*/
public static void getIndex() throws IOException {
GetIndexRequest getIndexRequest = new GetIndexRequest("user");
GetIndexResponse getIndexResponse = esClient.indices().get(getIndexRequest, RequestOptions.DEFAULT);
System.out.println(getIndexResponse.getAliases());
System.out.println(getIndexResponse.getMappings());
System.out.println(getIndexResponse.getSettings());
}
6、删除索引:
/**
* 删除索引
* @throws IOException
*/
public static void deleteIndex() throws IOException {
DeleteIndexRequest getIndexRequest = new DeleteIndexRequest("user");
AcknowledgedResponse delete = esClient.indices().delete(getIndexRequest, RequestOptions.DEFAULT);
System.out.println("索引删除状态:" + delete.isAcknowledged());
}
7、添加数据:
/**
* 添加数据
* @throws Exception
*/
public static void add() throws Exception{
IndexRequest indexRequest = new IndexRequest();
indexRequest.index("user").id("1008");
User user = new User();
user.setName("茅河野人");
user.setAge(28);
user.setSex("男");
user.setSalary(50000);
String userData = objectMapper.writeValueAsString(user);
indexRequest.source(userData,XContentType.JSON);
//插入数据
IndexResponse response = esClient.index(indexRequest, RequestOptions.DEFAULT);
System.out.println(response.status());
System.out.println(response.getResult());
}
8、修改数据:
/**
* 修改数据
* @throws Exception
*/
public static void update() throws Exception{
UpdateRequest request = new UpdateRequest();
request.index("user").id("1008");
request.doc(XContentType.JSON,"name","茅河野人");
//插入数据
UpdateResponse response = esClient.update(request, RequestOptions.DEFAULT);
System.out.println(response.getResult());
}
9、删除数据:
/**
* 删除
* @throws Exception
*/
public static void delete() throws Exception{
DeleteRequest request = new DeleteRequest();
request.index("user").id("1008");
//插入数据
DeleteResponse delete = esClient.delete(request, RequestOptions.DEFAULT);
System.out.println(delete.getResult());
}
10、批量添加数据:
/**
* 批量添加
* @throws Exception
*/
public static void batchInsert() throws Exception{
BulkRequest bulkRequest = new BulkRequest();
User user1 = new User("关羽","男",33,5500);
String userData1 = objectMapper.writeValueAsString(user1);
IndexRequest indexRequest1 = new IndexRequest().index("user").id("1002").source(userData1, XContentType.JSON);
bulkRequest.add(indexRequest1);
User user2 = new User("黄忠","男",50,8000);
String userData2 = objectMapper.writeValueAsString(user2);
IndexRequest indexRequest2 = new IndexRequest().index("user").id("1003").source(userData2, XContentType.JSON);
bulkRequest.add(indexRequest2);
User user3 = new User("黄忠2","男",49,10000);
String userData3 = objectMapper.writeValueAsString(user3);
IndexRequest indexRequest3 = new IndexRequest().index("user").id("1004").source(userData3, XContentType.JSON);
bulkRequest.add(indexRequest3);
User user4 = new User("赵云","男",33,12000);
String userData4 = objectMapper.writeValueAsString(user4);
IndexRequest indexRequest4 = new IndexRequest().index("user").id("1005").source(userData4, XContentType.JSON);
bulkRequest.add(indexRequest4);
User user5 = new User("马超","男",38,20000);
String userData5 = objectMapper.writeValueAsString(user5);
IndexRequest indexRequest5 = new IndexRequest().index("user").id("1006").source(userData5, XContentType.JSON);
bulkRequest.add(indexRequest5);
User user6 = new User("关羽","男",41,27000);
String userData6 = objectMapper.writeValueAsString(user6);
IndexRequest indexRequest6 = new IndexRequest().index("user").id("1007").source(userData6, XContentType.JSON);
bulkRequest.add(indexRequest6);
BulkResponse bulkResponse = esClient.bulk(bulkRequest, RequestOptions.DEFAULT);
System.out.println(bulkResponse.status());
System.out.println(bulkResponse.getItems());
}
11、批量删除数据:
/**
* 批量删除
* @throws Exception
*/
public static void batchDelete() throws Exception{
BulkRequest bulkRequest = new BulkRequest();
DeleteRequest indexRequest1 = new DeleteRequest().index("user").id("1002");
DeleteRequest indexRequest2 = new DeleteRequest().index("user").id("1003");
DeleteRequest indexRequest3 = new DeleteRequest().index("user").id("1004");
DeleteRequest indexRequest4 = new DeleteRequest().index("user").id("1005");
DeleteRequest indexRequest5 = new DeleteRequest().index("user").id("1006");
DeleteRequest indexRequest6 = new DeleteRequest().index("user").id("1007");
bulkRequest.add(indexRequest1);
bulkRequest.add(indexRequest2);
bulkRequest.add(indexRequest3);
bulkRequest.add(indexRequest4);
bulkRequest.add(indexRequest5);
bulkRequest.add(indexRequest6);
BulkResponse bulkResponse = esClient.bulk(bulkRequest, RequestOptions.DEFAULT);
System.out.println(bulkResponse.status());
System.out.println(bulkResponse.getItems());
}
13、删除某个索引下所有数据:
/**
* 查询某个索引下的所有数据
* @throws Exception
*/
public static void searchIndexAll() throws Exception{
SearchRequest request = new SearchRequest();
request.indices("user");
// 索引中的全部数据查询
SearchSourceBuilder query = new SearchSourceBuilder().query(QueryBuilders.matchAllQuery());
request.source(query);
SearchResponse response = esClient.search(request, RequestOptions.DEFAULT);
SearchHits hits = response.getHits();
for (SearchHit searchHit : hits){
System.out.println(searchHit.getSourceAsString());
}
}
14、根据条件查询:
TermQueryBuilder ageQueryBuilder = QueryBuilders.termQuery("sex", "女");
SearchSourceBuilder query = new SearchSourceBuilder().query(ageQueryBuilder);
request.source(query);
SearchResponse response = esClient.search(request, RequestOptions.DEFAULT);
System.out.println(response.getHits().getHits());
System.out.println(response.getHits().getTotalHits());
SearchHits hits = response.getHits();
for (SearchHit searchHit : hits){
System.out.println(searchHit.getSourceAsString());
}
15、分页查询:
SearchSourceBuilder sourceBuilder = new
SearchSourceBuilder().query(QueryBuilders.matchAllQuery());
sourceBuilder.from(0).size(3);
request.source(sourceBuilder);
SearchResponse response = esClient.search(request, RequestOptions.DEFAULT);
System.out.println(response.getHits().getHits());
System.out.println(response.getHits().getTotalHits());
SearchHits hits = response.getHits();
for (SearchHit searchHit : hits){
System.out.println(searchHit.getSourceAsString());
}
四、在springboot中的运用
1、maven依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-elasticsearch</artifactId>
</dependency>
2、yml配置文件:
# es 服务地址
elasticsearch.host=IP
# es 服务端口
elasticsearch.port=9200
# 配置日志级别,开启 debug 日志
logging.level.com.congge=debug
3、实际例子:
创建一个实体类
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.ToString;
import org.springframework.data.annotation.Id;
import org.springframework.data.elasticsearch.annotations.Document;
import org.springframework.data.elasticsearch.annotations.Field;
import org.springframework.data.elasticsearch.annotations.FieldType;
@Data
@NoArgsConstructor
@AllArgsConstructor
@ToString
@Document(indexName = "shopping", shards = 3, replicas = 1)
public class Product {
//必须有 id,这里的 id 是全局唯一的标识,等同于 es 中的"_id"
@Id
private Long id;//商品唯一标识
/**
* type : 字段数据类型
* analyzer : 分词器类型
* index : 是否索引(默认:true)
* Keyword : 短语,不进行分词
*/
@Field(type = FieldType.Text, analyzer = "ik_max_word")
private String title;//商品名称
@Field(type = FieldType.Keyword)
private String category;//分类名称
@Field(type = FieldType.Double)
private Double price;//商品价格
@Field(type = FieldType.Keyword, index = false)
private String images;//图片地址
}
提供接口:
import com.congge.entity.Product;
import org.springframework.data.elasticsearch.repository.ElasticsearchRepository;
import org.springframework.stereotype.Repository;
@Repository
public interface ProductDao extends ElasticsearchRepository<Product, Long>{
}
配置类:
import lombok.Data;
import org.apache.http.HttpHost;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;
import org.elasticsearch.client.RestHighLevelClient;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;
//import org.springframework.data.elasticsearch.config.AbstractElasticsearchConfiguration;
@ConfigurationProperties(prefix = "elasticsearch")
@Configuration
@Data
public class EsConfig extends com.congge.config.AbstractElasticsearchConfiguration {
private String host ;
private Integer port ;
//重写父类方法
@Override
public RestHighLevelClient elasticsearchClient() {
RestClientBuilder builder = RestClient.builder(new HttpHost(host, port));
RestHighLevelClient restHighLevelClient = new
RestHighLevelClient(builder);
return restHighLevelClient;
}
}
import org.elasticsearch.client.RestHighLevelClient;
import org.springframework.context.annotation.Bean;
import org.springframework.data.elasticsearch.config.ElasticsearchConfigurationSupport;
import org.springframework.data.elasticsearch.core.ElasticsearchOperations;
import org.springframework.data.elasticsearch.core.ElasticsearchRestTemplate;
import org.springframework.data.elasticsearch.core.convert.ElasticsearchConverter;
public abstract class AbstractElasticsearchConfiguration extends ElasticsearchConfigurationSupport {
//需重写本方法
public abstract RestHighLevelClient elasticsearchClient();
@Bean(name = { "elasticsearchOperations", "elasticsearchTemplate" })
public ElasticsearchOperations elasticsearchOperations(ElasticsearchConverter elasticsearchConverter) {
return new ElasticsearchRestTemplate(elasticsearchClient(), elasticsearchConverter);
}
}
测试1:
import com.congge.entity.Product;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.data.elasticsearch.core.ElasticsearchRestTemplate;
import org.springframework.test.context.junit4.SpringRunner;
@RunWith(SpringRunner.class)
@SpringBootTest
public class EsIndexTest {
//注入 ElasticsearchRestTemplate
@Autowired
private ElasticsearchRestTemplate elasticsearchRestTemplate;
//创建索引并增加映射配置
@Test
public void createIndex(){
//创建索引,系统初始化会自动创建索引
System.out.println("创建索引");
}
@Test
public void deleteIndex(){
//创建索引,系统初始化会自动创建索引
boolean flg = elasticsearchRestTemplate.deleteIndex(Product.class);
System.out.println("删除索引 = " + flg);
}
}
测试2:
import com.congge.dao.ProductDao;
import com.congge.entity.Product;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.query.TermQueryBuilder;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.data.domain.Page;
import org.springframework.data.domain.PageRequest;
import org.springframework.data.domain.Sort;
import org.springframework.test.context.junit4.SpringRunner;
import java.util.ArrayList;
import java.util.List;
@RunWith(SpringRunner.class)
@SpringBootTest
public class EsDocTest {
@Autowired
private ProductDao productDao;
/**
* 新增
*/
@Test
public void save() {
Product product = new Product();
product.setId(2L);
product.setTitle("ipad mini");
product.setCategory("ipad");
product.setPrice(1998.0);
product.setImages("http://ipad.jpg");
productDao.save(product);
}
//修改
@Test
public void update(){
Product product = new Product();
product.setId(2L);
product.setTitle("iphone");
product.setCategory("mobile");
product.setPrice(6999.0);
product.setImages("http://www.phone.jpg");
productDao.save(product);
}
//根据 id 查询
@Test
public void findById(){
Product product = productDao.findById(2L).get();
System.out.println(product);
}
//查询所有
@Test
public void findAll(){
Iterable<Product> products = productDao.findAll();
for (Product product : products) {
System.out.println(product);
}
}
//删除
@Test
public void delete(){
Product product = new Product();
product.setId(2L);
productDao.delete(product);
}
//批量新增
@Test
public void saveAll(){
List<Product> productList = new ArrayList<>();
for (int i = 0; i < 10; i++) {
Product product = new Product();
product.setId(Long.valueOf(i));
product.setTitle("iphone" + i);
product.setCategory("mobile");
product.setPrice(5999.0 + i);
product.setImages("http://www.phone.jpg");
productList.add(product);
}
productDao.saveAll(productList);
}
//分页查询
@Test
public void findByPageable(){
//设置排序(排序方式,正序还是倒序,排序的 id)
Sort sort = Sort.by(Sort.Direction.DESC,"id");
int currentPage=0;//当前页,第一页从 0 开始, 1 表示第二页
int pageSize = 5;//每页显示多少条
//设置查询分页
PageRequest pageRequest = PageRequest.of(currentPage, pageSize,sort);
//分页查询
Page<Product> productPage = productDao.findAll(pageRequest);
for (Product Product : productPage.getContent()) {
System.out.println(Product);
}
}
/**
* term 查询
* search(termQueryBuilder) 调用搜索方法,参数查询构建器对象
*/
@Test
public void termQuery(){
TermQueryBuilder termQueryBuilder = QueryBuilders.termQuery("title", "iphone");
Iterable<Product> products = productDao.search(termQueryBuilder);
for (Product product : products) {
System.out.println(product);
}
}
/**
* term 查询加分页
*/
@Test
public void termQueryByPage(){
int currentPage= 0 ;
int pageSize = 5;
//设置查询分页
PageRequest pageRequest = PageRequest.of(currentPage, pageSize);
TermQueryBuilder termQueryBuilder = QueryBuilders.termQuery("title", "phone");
Iterable<Product> products =
productDao.search(termQueryBuilder,pageRequest);
for (Product product : products) {
System.out.println(product);
}
}
}
五、将mysql数据写入Elasticsearch例子
package com.example.esdemo.service.impl;
import com.example.esdemo.config.DBHelper;
import com.example.esdemo.imports.ImportDb2Es;
import com.example.esdemo.service.ImportService;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.bulk.BackoffPolicy;
import org.elasticsearch.action.bulk.BulkProcessor;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.sql.*;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
/**
* 导入db2es 实现类
*/
@Component
public class ImportServiceImpl implements ImportService {
private static final Logger logger = LogManager.getLogger(ImportServiceImpl.class);
@Autowired
private RestHighLevelClient client;
@Override
public void importDb2Es(ImportDb2Es importDb2Es) {
writeMySQLDataToES(importDb2Es.getDbTableName(),importDb2Es.getDbTableName());
}
private void writeMySQLDataToES(String tableName,String esIndeName) {
BulkProcessor bulkProcessor = getBulkProcessor(client);
Connection connection = null;
PreparedStatement ps = null;
ResultSet rs = null;
try {
connection = DBHelper.getConn();
logger.info("start handle data :" + tableName);
String sql = "select * from " + tableName;
ps = connection.prepareStatement(sql, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
// 根据自己需要设置 fetchSize
ps.setFetchSize(20);
rs = ps.executeQuery();
ResultSetMetaData colData = rs.getMetaData();
ArrayList<HashMap<String, String>> dataList = new ArrayList<>();
HashMap<String, String> map = null;
int count = 0;
// c 就是列的名字 v 就是列对应的值
String c = null;
String v = null;
while (rs.next()) {
count++;
map = new HashMap<String, String>(128);
for (int i = 1; i < colData.getColumnCount(); i++) {
c = colData.getColumnName(i);
v = rs.getString(c);
map.put(c, v);
}
dataList.add(map);
// 每1万条 写一次 不足的批次的数据 最后一次提交处理
if (count % 10000 == 0) {
logger.info("mysql handle data number:" + count);
// 将数据添加到 bulkProcessor
for (HashMap<String, String> hashMap2 : dataList) {
bulkProcessor.add(new IndexRequest(esIndeName).source(hashMap2));
}
// 每提交一次 清空 map 和 dataList
map.clear();
dataList.clear();
}
}
// 处理 未提交的数据
for (HashMap<String, String> hashMap2 : dataList) {
bulkProcessor.add(new IndexRequest(esIndeName).source(hashMap2));
}
bulkProcessor.flush();
} catch (SQLException e) {
e.printStackTrace();
} finally {
try {
rs.close();
ps.close();
connection.close();
boolean terinaFlag = bulkProcessor.awaitClose(150L, TimeUnit.SECONDS);
logger.info(terinaFlag);
} catch (Exception e) {
e.printStackTrace();
}
}
}
private BulkProcessor getBulkProcessor(RestHighLevelClient client) {
BulkProcessor bulkProcessor = null;
try {
BulkProcessor.Listener listener = new BulkProcessor.Listener() {
@Override
public void beforeBulk(long executionId, BulkRequest request) {
logger.info("Try to insert data number : "
+ request.numberOfActions());
}
@Override
public void afterBulk(long executionId, BulkRequest request,
BulkResponse response) {
logger.info("************** Success insert data number : "
+ request.numberOfActions() + " , id: " + executionId);
}
@Override
public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
logger.error("Bulk is unsuccess : " + failure + ", executionId: " + executionId);
}
};
BiConsumer<BulkRequest, ActionListener<BulkResponse>> bulkConsumer = (request, bulkListener) -> client
.bulkAsync(request, RequestOptions.DEFAULT, bulkListener);
BulkProcessor.Builder builder = BulkProcessor.builder(bulkConsumer, listener);
builder.setBulkActions(5000);
builder.setBulkSize(new ByteSizeValue(100L, ByteSizeUnit.MB));
builder.setConcurrentRequests(10);
builder.setFlushInterval(TimeValue.timeValueSeconds(100L));
builder.setBackoffPolicy(BackoffPolicy.constantBackoff(TimeValue.timeValueSeconds(1L), 3));
// 注意点:让参数设置生效
bulkProcessor = builder.build();
} catch (Exception e) {
e.printStackTrace();
try {
bulkProcessor.awaitClose(100L, TimeUnit.SECONDS);
} catch (Exception e1) {
logger.error(e1.getMessage());
}
}
return bulkProcessor;
}
}