前言
elastic表示可伸缩,search表示查询。所以es的核心即为查询。通常情况下,我们的数据可以分为三类:结构化数据、非结构化数据、半结构化数据。
结构化数据:一般会用特定的结构来组织和管理数据,表现为二维表结构。这些数据信息一般是有关系的,所以可以保存到关系型数据库如MySQL、Oracle中,并可以通过SQL语句来查询。
– 优点:方便管理、方便查询。
– 缺点:扩展结构较为困难。
非结构化数据:无法用二维表结构来表现的数据。如日志、文档、报表、图片、视频等。这种数据维度广、数据量大,所以存储和查询的成本较大。一般会将这种数据保存到noSQL数据库当中,如redis、mongoDB、HBase。这种数据库一般以 K-V 结构保存,通过key来查询数据,相对较快。
半结构化数据:将数据的结构和内容混在一起,没有明确的区分。如XML,一般保存在redis、mongoDB、HBase中。
– 缺点:不容易查询内容。
但是生活中很多情况下数据并非关系型结构化的数据,无法模糊查询、也无法遍历匹配。为了解决这种问题,于是有了es。
一、ElasticSearch概述
The Elastic Stack包括ElasticSearch、Kibana、Beats和Logstash。能够安全可靠的获取任何来源、任何格式的数据,然后实时地对数据进行搜索、分析和可视化。ElasticSearch
简称ES,是一个开源的高扩展的分布式全文搜索引擎
,是整个Elastic Stack技术栈的核心。它可以近乎实时的存储、检索数据。
全文搜索引擎:
可以理解为全栈搜索。如在csdn中,用户可以写一些文章, 其他用户根据内容词汇、关键字等进行搜索,查询网站内所有匹配的文章,并以列表的形式展现结果。传统的数据库进行这样的检索时效率较低。即使进行SQL的优化,效果也不会有显著变化。所以在生产环境中,这种常规的搜索方式效果较差。
这就需要我们采用专门用于全文搜索的搜索引擎。至于具体使用那种搜索引擎,需要考虑如下几点进行分析:
- 除了搜索文本之外,还需要服务器来处理分析查询
- 搜索服务器也需要是集群,而且可扩展
- 对数据进行大量的分析,统计出不同的指标
二、ElasticSearch使用
Windows版本的ES安装很简单,解压即安装完毕。解压后,进入/bin目录,点击elasticsearch.bat文件启动es服务。
9300 端口为 ElasticSearch 集群间组件的通信端口
9200 端口为浏览器访问的http协议 RESTful 端口
如果直接通过浏览器向 ElasticSearch 发送请求,那么需要在发送的请求中包含 HTTP 标准的方法,而 HTTP 的大部分特性仅支持GET
和POST
方法。为了方便的进行客户端的访问,可以使用Postman
软件。
Postman能够发送任何类型的 HTTP 请求(GET, HEAD, POST, PUT…)不仅能够表单提交,也可以附带任何类型请求体。
2.1 RESTful
es支持分布式、RESTful 风格的搜索和分析,这就表示es允许采用RESTful风格的方式发请求进行软件访问。RESTful(Representational State Transfer 资源状态转换)是一种软件架构风格,比如 HTTP 就遵循了REST原则:
如在web中资源的唯一标识是 URI(统一资源路径)http://localhost:9200/test/test.txt 路径中不包含对资源的操作,如增加、修改不应该存在在路径中。RESTful风格架构就要求遵循统一的接口原则,包含一组受限制的预定义操作,不论什么样的资源都应该通过使用相同的接口对资源进行访问。这里的接口应该符合标准HTTP方法,比如 GET
、POST
、PUT
、DELETE
、HEAD
等请求。
路径是对资源的定位,方法是对资源的操作。按照HTTP的方法暴露资源,那资源将具有安全性
和幂等性
的特性。如 GET、HEAD都是安全的,无论访问多少次,都不会改变资源的状态。PUT、DELETE都是幂等性的,无论对资源操作多少次,结果都是一样的,后面的请求不会比第一回请求产生更多的影响。
所以当我们采用RESTful风格向es发出请求之后,es会返回相应,返回相应的数据格式是JSON格式。我们可以在请求体中发送JSON格式的字符串给服务器,服务器拿到后做相应的处理。所以es中数据的发送和数据的处理都是以JSON为标准格式的
。
为什么采用JSON格式:因为JSON格式更容易转换成字符串在网络中传递,而且识别会更加容易。
2.2 客户端 Postman
如果直接通过浏览器向ES服务器发送请求,那么需要在发送的请求中包含HTTP标准的方法,而HTTP的大部分特性仅支持 GET
和 POST
方法。所以为了能够方便的进行客户端的访问,可以使用Postman
软件。Postman是一款网页调试工具,能够发送任何类型的HTTP请求(GET、HEAD、PSOT、PUT…)不仅能够表单提交,且可以附带任何类型请求体。
2.3 数据格式
ElasticSearch 是面向文档型数据库,一条数据就是一个文档。ElasticSearch 里存储文档数据和关系型数据库MySQL存储数据的概念类比如下:
在关系型数据库中,索引是为了优化查询设计的数据库对象。没有索引也能进行检索,只是速度会有所降低。而ES软件专门用于全文检索数据,所以索引是整个搜索引擎中的关键。ES为了做到快速准确的查询,使用了一个特殊的概念来进行数据的存储和查询。我们称之为倒排索引
。
正排(正向)索引
通过文章编号快速查询到文章内容。我们将文章编号设置为主键,同时生成主键索引。通过主键索引快速关联到对应信息。
在正排索引中,如果想要查询文章中包含哪些热门词汇,会比较麻烦。我们需要做模糊查询,而且对每条数据都需要进行遍历,效率明显下降。且查询内容的大小写、时态等因素都会影响准确率。此时需要换种方式来将索引和数据关联。
倒排索引
与以往的查询方式相反(通过主键ID关联文件内容,再查询关键字)。倒排索引是通过关键字查询主键ID,再关联文章内容,查询效率较快。
倒排索引中,强调关键字
和文档编号的关联
,表的作用反而不那么明显。所以在 ES7.x 中,Type的概念被删除。
2.2 HTTP操作
2.2.1 索引操作
1)创建索引
对比关系型数据库,创建索引就等同于创建数据库。在MySQL中对数据进行操作,需要知道连接的数据库database
和对应的表,在es中叫Index索引
。
在 Postman 中,向ES服务器发 PUT
请求:http://127.0.0.1:9200/shopping
acknowledge:true 响应成功
index:shopping 当前索引为shopping
PUT具有幂等性,所以发出同样的请求,结果是一样的。而POST不具有幂等性,两次操作的结果可能不一样。
PUT 和 PUSH 的区别:
PUT:
PUT
请求用于在Elasticsearch中创建或更新一个文档。当使用PUT
请求时,需要指定文档的唯一标识符(ID),然后提供文档的内容。如果指定的文档ID已存在,Elasticsearch将更新该文档;如果文档ID不存在,它将创建一个新的文档。
PUSH:
Push
是一种与索引或更新具有一定关联的操作无关的API。它允许向特定字段添加新的元素。该API用于数组类型的字段,例如nested字段或array字段。使用Push
可以向现有文档字段添加新的元素,而不必使用GET和PUT来检索和创建整个文档。
2)查看所有索引
GET
获取索引的相关信息
在Postman中,向ES服务器发GET
请求: http://127.0.0.1:9200/_cat/indices?v
3)查看单个索引
在Postman中,向服务器发GET
请求:http://127.0.0.1:9200/shopping
{
"shopping"【索引名】: {
"aliases"【别名】: {},
"mappings"【映射】: {},
"settings"【设置】: {
"index"【设置 - 索引】: {
"creation_date"【设置 - 索引 - 创建时间】: "1614265373911",
"number_of_shards"【设置 - 索引 - 主分片数量】: "1",
"number_of_replicas"【设置 - 索引 - 副分片数量】: "1",
"uuid"【设置 - 索引 - 唯一标识】: "eI5wemRERTumxGCc1bAk2A",
"version"【设置 - 索引 - 版本】: {
"created": "7080099"
},
"provided_name"【设置 - 索引 - 名称】: "shopping"
}
}
}
}
4)删除索引
在Postman中,向ES服务器发DELETE
请求:http://127.0.0.1:9200/shopping
重新访问索引时,服务器返回响应:索引不存在。
2.2.2 文档操作
1)创建文档
在Postman中,向ES服务器发POST
请求:http://127.0.0.1:9200/shopping/_doc
注意,此处发送请求的方式必须是POST
,不能是PUT
,不然会发生错误。这是由于数据创建成功之后,会返回一个 id ,可以认为是刚刚创建数据的标识,由es软件随机生成。所以同样的请求多次执行之后,返回的结果不一样。
所以 POST 不是幂等性,但 PUT 操作必须是幂等性的,所以不能使用 PUT 操作。
{
"title":"小米手机",
"category":"小米",
"images":"http://www.gulixueyuan.com/xm.jpg",
"price":3999.00
}
服务器相应结果如下:
{
"_index"【索引】: "shopping",
"_type"【类型-文档】: "_doc",
"_id"【唯一标识】: "Xhsa2ncBlvF_7lxyCE9G", #可以类比为 MySQL 中的主键,随机生成
"_version"【版本】: 1,
"result"【结果】: "created", #这里的 create 表示创建成功
"_shards"【分片】: {
"total"【分片 - 总数】: 2,
"successful"【分片 - 成功】: 1,
"failed"【分片 - 失败】: 0
},
"_seq_no": 0,
"_primary_term": 1
}
自定义id:http://127.0.0.1:9200/shopping/_doc/1001
如果采用上面的方式提交两次 PUSH 请求操作,那么该操作也是幂等性的,相当于 PUT 操作。也就是说,如果增加数据时明确数据主键,那么请求方式也可以是PUT
或者,为了明确当前的操作是新增,可以把_doc
改为 _create
:http://127.0.0.1:9200/shopping/_create/1002
2)查看文档
<1> 主键查询
可以用 GET
的方式发送一条新的请求即可:http://127.0.0.1:9200/shopping/_doc/1001
<2> 全量查询
上面的查询语句中,1001类似主键,结果就类似于主键查询的结果。
一个主键id
只能获取一条数据。如果想要查询索引下面的所有数据,可以采用:http://127.0.0.1:9200/shopping/_search
3)修改文档
完全覆盖性的修改,具有幂等性,可以使用PUT
方法进行操作。http://127.0.0.1:9200/shopping/_doc
{
"title":"小米手机",
"category":"小米",
"images":"http://www.gulixueyuan.com/xm.jpg",
"price":4999.00
}
4)修改字段
局部数据需要更新时,由于每次更新的结果不一样,所以不是幂等性,不能用PUT方式,只能用POST
方式。http://127.0.0.1:9200/shopping/_update/1001
。
{
"doc":{
"title":"华为手机"
}
}
在命令行中需要明确对谁进行修改,所以需要使用命令_update
,如果使用_doc
,可能会被认为是新增。
5)删除文档(字段)
使用DELETE
方法,http://127.0.0.1:9200/shopping/_doc/1001
2.2.3 映射操作
有的查询可以分词查询,有的不能,必须全部匹配。在MySQL中,一个表的字段、类型、长度信息都属于表的结构信息,在es中也有类似概念,称为映射。
创建映射PUT
:http://127.0.0.1:9200/user/_mapping
{
"properties" : {
"name" : {
"type" : "text", // 可以分词
"index" : "true" // 可以被索引
},
"sex" : {
"type" : "keyword", // 不能分词,必须完整匹配
"index" : "true"
},
"tel" : {
"type" : "keyword",
"index" : "false" // 不能被索引
},
}
}
2.2.4 高级查询
1)条件查询
请求路径:GET
:http://127.0.0.1:9200/shopping/_search?q=category:小米
请求体:GET
:http://127.0.0.1:9200/shopping/_search
【Body -> row -> JSON】
// 条件查询
{
"query" : {
"match" : {
"category" : "小米"
}
}
}
// 全量查询
{
"query" : {
"match_all" : {
}
}
}
// 分页查询
{
"query" : {
"match" : {
"category" : "小米"
},
"from" : 0,
"size" : 2 //每页查询的数据2条
}
}
// 指定查询字段
{
"query" : {
"match" : {
"category" : "小米"
},
"from" : 0,
"size" : 2,
"_source" : ["title"] // 指定title字段
}
}
// 查询排序
{
"query" : {
"match" : {
"category" : "小米"
},
"from" : 0,
"size" : 2,
"_source" : ["title"],
"sort" : {
"price" : {
"order" : "desc"
}
}
}
}
// 多条件查询 逻辑与&& = must,逻辑或|| = should
// 小米手机,且价格=1999
{
"query" : {
"bool" : {
"must" : [
{
"match" : {
"catatory" : "小米"
}
},
{
"match" : {
"price" : "1999.00"
}
}
]
}
}
// 小米或华为手机
{
"query" : {
"bool" : {
"should" : [
{
"match" : {
"catatory" : "小米"
}
},
{
"match" : {
"catatory" : "华为"
}
}
]
}
}
// 范围查询
{
"query" : {
"bool" : {
"should" : [
{
"match" : {
"catatory" : "小米"
}
},
{
"match" : {
"catatory" : "华为"
}
},
"filter" : {
"range" : {
"price" : {
"gt" : 5000 // 价格大于5000
}
}
}
]
}
}
// 模糊查询(全文检索)
// 文字即使不正确,也能查询出数据。保存文档数据时,es会对文档进行分词拆解,并将拆解后的数据保存到倒排索引中。所以即使使用文字的一部分,也能检索到数据。这种检索方式就称为全文检索。
{
"query" : {
"match" : {
"category" : "米"
}
}
}
// 完全匹配
{
"query" : {
"match_phrase" : {
"category" : "米"
}
}
}
// 高亮显示
{
"query" : {
"match_phrase" : {
"category" : "米"
}
},
"highliaht" : {
"fields" : {
"cayegory" : {}
}
}
}
2)聚合查询
// 聚合查询
{
"aggs" : {
"price_group" : { // 名称,随意起名
"terms" : { // 分组
"field" : "price" // 分组字段
}
}
},
"size" : 0 // 只返回聚合结果而不返回具体的文档数据
}
这里的terms
是一个聚合类型(aggregation type),用于对指定字段进行分组。比如价格为“1999”的商品数量为1,价格为“2999”的商品数量为3。
terms
可以换成其他聚合类型,如: “sum” (求和聚合):用于对指定字段的值进行求和操作。它会计算指定字段值的总和。
"avg"(平均值聚合)
:用于对指定字段的值进行平均值计算。它会计算指定字段值的平均值。
"max"(最大值聚合)
:用于查找指定字段的最大值。它会返回指定字段中的最大值。
"min"(最小值聚合)
:用于查找指定字段的最小值。它会返回指定字段中的最小值。
"count"(计数聚合)
:用于计算文档数量。它会返回匹配指定条件的文档数量。
"date_histogram"(日期直方图聚合)
:用于按时间范围对文档进行分组,并统计每个时间范围内的文档数量。
2.2.5 Java API操作
准备操作
pom依赖
<dependencies>
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch</artifactId>
<version>7.8.0</version>
</dependency>
<!-- elasticsearch 的客户端 -->
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-high-level-client</artifactId>
<version>7.8.0</version>
</dependency>
<!-- elasticsearch 依赖 2.x 的 log4j -->
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
<version>2.8.2</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>2.8.2</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.9.9</version>
</dependency>
<!-- junit 单元测试 -->
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
</dependency>
</dependencies>
创建ES客户端
// 创建客户端对象
RestHighLevelClient client = new RestHighLevelClient(
RestClient.builder(new HttpHost("localhost", 9200, "http"))
);
...
// 关闭客户端连接
client.close();
注意:9200 端口为 Elasticsearch 的 Web 通信端口,localhost 为启动 ES 服务的主机名
1)索引
1. 创建索引
// 创建索引 - 请求对象
CreateIndexRequest request = new CreateIndexRequest("user");
// 发送请求,获取响应
CreateIndexResponse response = client.indices().create(request,
RequestOptions.DEFAULT);
boolean acknowledged = response.isAcknowledged();
// 响应状态
System.out.println("操作状态 = " + acknowledged);
2. 查询索引
// 查询索引 - 请求对象
GetIndexRequest request = new GetIndexRequest("user");
// 发送请求,获取响应
GetIndexResponse response = client.indices().get(request,
RequestOptions.DEFAULT);
System.out.println("aliases:"+response.getAliases());
System.out.println("mappings:"+response.getMappings()); // 查看索引结构
System.out.println("settings:"+response.getSettings()); // 查看配置
3. 删除索引
// 删除索引 - 请求对象
DeleteIndexRequest request = new DeleteIndexRequest("user");
// 发送请求,获取响应
AcknowledgedResponse response = client.indices().delete(request,
RequestOptions.DEFAULT);
// 操作结果
System.out.println("操作结果 : " + response.isAcknowledged());
2)文档
1. 新增 / 批量新增
RestHighLevelClient client = new RestHighLevelClient(
RestClient.builder(new HttpHost("localhost", 9200, "http"))
);
// 新增文档 - 请求对象
IndexRequest request = new IndexRequest();
// 设置索引及唯一性标识
request.index("user").id("1001");
// 创建数据对象
User user = new User();
user.setName("zhangsan");
user.setAge(30);
user.setSex("男");
// 向ES插入数据,必须将数据转换成JSON格式
ObjectMapper mapper = new ObjectMapper();
String userJson = mapper.writeValueAsString(user);
// 添加文档数据,数据格式为 JSON 格式
request.source(userJson, XContentType.JSON);
// 客户端发送请求,获取响应对象
IndexResponse reponse = client.index(request, RequestOptions.DEFAULT);
// 打印结果信息
System.out.println("_index:" + response.getIndex());
System.out.println("_id:" + response.getId());
System.out.println("_result:" + response.getResult());
client.close();
批量新增
RestHighLevelClient client = new RestHighLevelClient(
RestClient.builder(new HttpHost("localhost", 9200, "http"))
);
// 批量新增数据
BulkRequest request = new BulkRequest();
request.add(new IndexRequest().index("user").id("1001").source(XContentType.JSON, "name", "zhangsan"));
request.add(new IndexRequest().index("user").id("1002").source(XContentType.JSON, "name", "lisi"));
// 客户端发送请求,获取响应对象
BulkResponse responses = client.bult(request, RequestOptions.DEFAULT);
//打印结果信息
System.out.println("took:" + responses.getTook());
System.out.println("items:" + responses.getItems());
client.close();
2. 修改
RestHighLevelClient client = new RestHighLevelClient(
RestClient.builder(new HttpHost("localhost", 9200, "http"))
);
// 新增文档 - 请求对象
UpdateRequest request = new UpdatRequest();
// 设置索引及唯一性标识
request.index("user").id("1001");
request.dox(XContentType.JSON, "sex", "女");
// 客户端发骚那个请求,获取响应对象
UpdatResponse reponse = client.update(request, RequestOptions.DEFAULT);
// 打印结果信息
System.out.println("_result:" + response.getResult());
client.close();
3. 查询
RestHighLevelClient client = new RestHighLevelClient(
RestClient.builder(new HttpHost("localhost", 9200, "http"))
);
//1.创建请求对象
GetRequest request = new GetRequest().index("user").id("1001");
//2.客户端发送请求,获取响应对象
GetResponse response = client.get(request, RequestOptions.DEFAULT);
//3.打印结果信息
System.out.println("_index:" + response.getIndex());
System.out.println("_type:" + response.getType());
System.out.println("_id:" + response.getId());
System.out.println("source:" + response.getSourceAsString());
client.close();
4. 删除 / 批量删除
RestHighLevelClient client = new RestHighLevelClient(
RestClient.builder(new HttpHost("localhost", 9200, "http"))
);
//1.创建请求对象
DeleteRequest request = new DeleteRequest().index("user").id("1001");
//2.客户端发送请求,获取响应对象
DeleteResponse response = client.delete(request, RequestOptions.DEFAULT);
//3.打印结果信息
System.out.println("source:" + response.toString());
client.close();
批量删除
RestHighLevelClient client = new RestHighLevelClient(
RestClient.builder(new HttpHost("localhost", 9200, "http"))
);
// 批量删除数据
BulkRequest request = new BulkRequest();
request.add(new DeleteRequest().index("user").id("1001"));
request.add(new DeleteRequest().index("user").id("1002"));
// 客户端发送请求,获取响应对象
BulkResponse responses = client.bult(request, RequestOptions.DEFAULT);
//打印结果信息
System.out.println("took:" + responses.getTook());
System.out.println("items:" + responses.getItems());
client.close();
5. 高级查询
全量查询
RestHighLevelClient client = new RestHighLevelClient(
RestClient.builder(new HttpHost("localhost", 9200, "http"))
);
// 1. 全量查询:matchAllQuery
SearchRequest request = new SearchRequest();
request.indices("user");
request.source(new SearchSourceBuilder.query(QueryBuilders.matchAllQuery()));
SearchResponse response= client.search(request, RequestOptions.DEFAULT);
SearchHits hits = response.getHits();
System.out.println("source:" + response.getHits());
for( SearchHit hit : hits ) {
System.out.println(hit.getSourceAsString());
}
// 2. 条件查询:termQuery
SearchRequest request = new SearchRequest();
request.indices("user");
request.source(new SearchSourceBuilder.query(QueryBuilders.termQuery("age", 30)));
SearchResponse response= client.search(request, RequestOptions.DEFAULT);
SearchHits hits = response.getHits();
System.out.println("source:" + response.getHits());
for( SearchHit hit : hits ) {
System.out.println(hit.getSourceAsString());
}
// 3. 分页查询
SearchRequest request = new SearchRequest();
request.indices("user");
SearchSourceBuilder builder = new SearchSourceBuilder.query(QueryBuilders.matchAllQuery());
builder.from(0);
builder.size(2);
request.source(builder);
SearchResponse response= client.search(request, RequestOptions.DEFAULT);
SearchHits hits = response.getHits();
System.out.println("source:" + response.getHits());
for( SearchHit hit : hits ) {
System.out.println(hit.getSourceAsString());
}
// 4. 查询排序
SearchRequest request = new SearchRequest();
request.indices("user");
SearchSourceBuilder builder = new SearchSourceBuilder.query(QueryBuilders.matchAllQuery());
builder.sort("age", SortOrder.DESC);
request.source(builder);
SearchResponse response= client.search(request, RequestOptions.DEFAULT);
SearchHits hits = response.getHits();
System.out.println("source:" + response.getHits());
for( SearchHit hit : hits ) {
System.out.println(hit.getSourceAsString());
}
// 5. 过滤字段
SearchRequest request = new SearchRequest();
request.indices("user");
SearchSourceBuilder builder = new SearchSourceBuilder.query(QueryBuilders.matchAllQuery());
String excludes = {"age"};
String indludes = {"name"}
builder.fetchSource(indludes, excludes );
request.source(builder);
SearchResponse response= client.search(request, RequestOptions.DEFAULT);
SearchHits hits = response.getHits();
System.out.println("source:" + response.getHits());
for( SearchHit hit : hits ) {
System.out.println(hit.getSourceAsString());
}
// 6. 组合查询
SearchRequest request = new SearchRequest();
request.indices("user");
SearchSourceBuilder builder = new SearchSourceBuilder();
BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
boolQueryBuilder.must(QueryBuilders.matchQuery("age", 30));
boolQueryBuilder.must(QueryBuilders.matchQuery("sex", "男"));
builder.query(boolQueryBuilder);
request.source(builder);
SearchResponse response= client.search(request, RequestOptions.DEFAULT);
SearchHits hits = response.getHits();
System.out.println("source:" + response.getHits());
for( SearchHit hit : hits ) {
System.out.println(hit.getSourceAsString());
}
// 7. 范围查询
SearchRequest request = new SearchRequest();
request.indices("user");
SearchSourceBuilder builder = new SearchSourceBuilder();
RangeQueryBuilder rangeQueryBuilder = QueryBuilders.rangeQuery("age");
rangeQueryBuilder.gte(30)
rangeQueryBuilder.lte(40);
builder.query(rangeQueryBuilder);
request.source(builder);
SearchResponse response= client.search(request, RequestOptions.DEFAULT);
SearchHits hits = response.getHits();
System.out.println("source:" + response.getHits());
for( SearchHit hit : hits ) {
System.out.println(hit.getSourceAsString());
}
// 8. 模糊查询
SearchRequest request = new SearchRequest();
request.indices("user");
SearchSourceBuilder builder = new SearchSourceBuilder();
builder.query(QueryBuilders.fuzzyQuery("name","wangwu").fuzziness(Fuzziness.ONE));
request.source(builder);
SearchResponse response= client.search(request, RequestOptions.DEFAULT);
SearchHits hits = response.getHits();
System.out.println("source:" + response.getHits());
for( SearchHit hit : hits ) {
System.out.println(hit.getSourceAsString());
}
// 9. 高亮查询
SearchRequest request = new SearchRequest();
request.indices("user");
SearchSourceBuilder builder = new SearchSourceBuilder();
TermsQueryBuilders termsQueryBuilder = QueryBuilders.termQuery("name", "zhangsan");
HighlightBuilder highlightBuilder = new HighlightBuilder();
highlightBuilder.preTags("<font color='red'>");
highlightBuilder.postTags("<font color='red'>");
highlightBuilder.field("name");
// 设置高亮构建对象
builder.highlighter(highlightBuilder);
builder.query(termsQueryBuilder);
request.source(builder);
SearchResponse response= client.search(request, RequestOptions.DEFAULT);
SearchHits hits = response.getHits();
System.out.println("source:" + response.getHits());
for( SearchHit hit : hits ) {
System.out.println(hit.getSourceAsString());
}
// 10. 聚合查询
SearchRequest request = new SearchRequest();
request.indices("user");
SearchSourceBuilder builder = new SearchSourceBuilder();
AggregationBuilder aggregationBuilder = AggregationBuilders.max("max_age").filed("age");
builder.aggregation(aggregationBuilder );
builder.query(termsQueryBuilder);
request.source(builder);
SearchResponse response= client.search(request, RequestOptions.DEFAULT);
SearchHits hits = response.getHits();
System.out.println("source:" + response.getHits());
for( SearchHit hit : hits ) {
System.out.println(hit.getSourceAsString());
}
// 11. 分组查询
SearchRequest request = new SearchRequest();
request.indices("user");
SearchSourceBuilder builder = new SearchSourceBuilder();
AggregationBuilder aggregationBuilder = AggregationBuilders.terms("ageGroup").filed("age");
builder.aggregation(aggregationBuilder );
builder.query(termsQueryBuilder);
request.source(builder);
SearchResponse response= client.search(request, RequestOptions.DEFAULT);
SearchHits hits = response.getHits();
System.out.println("source:" + response.getHits());
for( SearchHit hit : hits ) {
System.out.println(hit.getSourceAsString());
}
client.close();
三、进阶
3.1 概念
1. 索引(Index)
一个索引就是一个拥有几分相似特征的文档的集合。比如说,你可以有一个客户数据的索引,另一个产品目录的索引,还有一个订单数据的索引。一个索引由一个名字来标识(必须全部是小写字母),并且当我们要对这个索引中的文档进行索引、搜索、更新和删除的时候,都要使用到这个名字。在一个集群中,可以定义任意多的索引。
能搜索的数据必须索引,这样的好处是可以提高查询速度,比如:新华字典前面的目录就是索引的意思,目录可以提高查询速度。
Elasticsearch 索引的精髓:一切设计都是为了提高搜索的性能。
2. 类型(Type)
7.x 之后默认不再支持自定义索引类型(默认类型为:_doc)
3. 文档(Document)
一个文档是一个可被索引的基础信息单元,也就是一条数据。
比如:你可以拥有某一个客户的文档,某一个产品的一个文档,当然,也可以拥有某个订单的一个文档。文档以 JSON(Javascript Object Notation)格式来表示,而 JSON 是一个到处存在的互联网数据交互格式。
在一个 index/type 里面,可以存储任意多的文档。
4. 字段(Filed)
相当于是数据表的字段
,对文档数据根据不同属性进行的分类标识。
5. 映射(Mapping)
mapping 是处理数据的方式和规则方面做一些限制,如:某个字段的数据类型、默认值、分析器、是否被索引等等。这些都是映射里面可以设置的,其它就是处理 ES 里面数据的一些使用规则设置也叫做映射,按着最优规则处理数据对性能提高很大,因此才需要建立映射,并且需要思考如何建立映射才能对性能更好。
6. 分片(Shards)
可以理解成MySQL数据库中的分表。
一个索引可以存储超出单个节点硬件限制的大量数据。比如,一个具有 10 亿文档数据的索引占据 1TB 的磁盘空间,而任一节点都可能没有这样大的磁盘空间。或者单个节点处理搜索请求,响应太慢。为了解决这个问题,Elasticsearch 提供了将索引划分成多份的能力,每一份就称之为分片。当你创建一个索引的时候,你可以指定你想要的分片的数量。每个分片本身也是一个功能完善并且独立的“索引”,这个“索引”可以被放置到集群中的任何节点上。
分片很重要,主要有两方面的原因:
1)允许你水平分割 / 扩展你的内容容量。
2)允许你在分片之上进行分布式的、并行的操作,进而提高性能/吞吐量。
至于一个分片怎样分布,它的文档怎样聚合和搜索请求,是完全由 Elasticsearch 管理的,对于作为用户来说,这些都是透明的,无需过分关心。
7. 副本(Replicas)
在一个网络 / 云的环境里,失败随时都可能发生,在某个分片/节点不知怎么的就处于离线状态,或者由于任何原因消失了,这种情况下,有一个故障转移机制是非常有用并且是强烈推荐的。为此目的,Elasticsearch 允许你创建分片的一份或多份拷贝,这些拷贝叫做复制分片(副本)。
复制分片之所以重要,有两个主要原因:
在分片/节点失败的情况下,提供了高可用性。因为这个原因,注意到复制分片从不与原/主要(original/primary)分片置于同一节点上是非常重要的。
扩展你的搜索量/吞吐量,因为搜索可以在所有的副本上并行运行。
分配分片和副本
{
"settings" : {
"number_of_shards" : 3,
"number_of_replicas" : 1
}
}
8. 分配(Allocation)
将分片分配给某个节点的过程,包括分配主分片或者副本。如果是副本,还包含从主分片复制数据的过程。这个过程是由 master
节点完成的。
3.2 系统框架
一个运行中的 Elasticsearch 实例称为一个节点,而集群是由一个或者多个拥有相同cluster.name 配置的节点组成, 它们共同承担数据和负载的压力。当有节点加入集群中或者从集群中移除节点时,集群将会重新平均分布所有的数据。
作为用户,我们可以将请求发送到集群中的任何节点 ,包括主节点。 每个节点都知道任意文档所处的位置,并且能够将我们的请求直接转发到存储我们所需文档的节点。 无论我们将请求发送到哪个节点,它都能负责从各个包含我们所需文档的节点收集回数据,并将最终结果返回給客户端。 Elasticsearch 当一个节点被选举成为主节点时, 它将负责管理集群范围内的所有变更,例如增加、删除索引,或者增加、删除节点等。 而主节点并不需要涉及到文档级别的变更和搜索等操作,所以当集群只拥有一个主节点的情况下,即使流量的增加它也不会成为瓶颈。 任何节点都可以成为主节点。我们的示例集群就只有一个节点,所以它同时也成为了主节点。对这一切的管理都是透明的。
写流程
新建、索引和删除 请求都是 写
操作, 必须在主分片上面完成之后才能被复制到相关的副本分片。
参数:consistency
含义:consistency,即一致性。在默认设置下,即使仅仅是在试图执行一个_写_操作之前,主分片都会要求 必须要有 规定数量(quorum)(或者换种说法,也即必须要有大多数)的分片副本处于活跃可用状态,才会去执行_写_操作(其中分片副本可以是主分片或者副本分片)。这是为了避免在发生网络分区故障(network partition)的时候进行_写_操作,进而导致数据不一致。_规定数量_即:
int( (primary + number_of_replicas) / 2 ) + 1
consistency 参数的值可以设为 one (只要主分片状态 ok 就允许执行_写_操作),all(必须要主分片和所有副本分片的状态没问题才允许执行_写_操作), 或quorum 。默认值为 quorum , 即大多数的分片副本状态没问题就允许执行_写_操作。
注意,规定数量 的计算公式中 number_of_replicas 指的是在索引设置中的设定副本分片数,而不是指当前处理活动状态的副本分片数。如果你的索引设置中指
定了当前索引拥有三个副本分片,那规定数量的计算结果即:
int( (primary + 3 replicas) / 2 ) + 1 = 3
如果此时你只启动两个节点,那么处于活跃状态的分片副本数量就达不到规定数量,也因此您将无法索引和删除任何文档。
参数:timeout
含义:如果没有足够的副本分片会发生什么? Elasticsearch 会等待,希望更多的分片出现。默认情况下,它最多等待 1 分钟。 如果你需要,你可以使用 timeout 参数使它更早终止: 100 100 毫秒,30s 是 30 秒。
读流程
更新流程
部分更新一个文档结合了先前说明的读取和写入流程,步骤如下:
- 客户端向 Node 1 发送更新请求。
- 它将请求转发到主分片所在的 Node 3 。
- Node 3 从主分片检索文档,修改 _source 字段中的 JSON ,并且尝试重新索引主分片的文档。如果文档已经被另一个进程修改,它会重试步骤 3 ,超过 retry_on_conflict 次后放弃。
- 如果 Node 3 成功地更新文档,它将新版本的文档并行转发到 Node 1 和 Node 2 上的副本分片,重新建立索引。一旦所有副本分片都返回成功, Node 3 向协调节点也返回成功,协调节点向客户端返回成功。
分片原理
传统的数据库每个字段存储单个值,但这对全文检索并不够。文本字段中的每个单词需
要被搜索,对数据库意味着需要单个字段有索引多值的能力。最好的支持是一个字段多个值
需求的数据结构是倒排索引
。
倒排索引
Elasticsearch 使用一种称为倒排索引
的结构,它适用于快速的全文搜索。
所谓的正向索引,就是搜索引擎会将待搜索的文件都对应一个文件 ID,搜索时将这个ID 和搜索关键字进行对应,形成 K-V 对,然后对关键字进行统计计数。
但是互联网上收录在搜索引擎中的文档的数目是个天文数字,这样的索引结构根本无法满足实时返回排名结果的要求。所以,搜索引擎会将正向索引重新构建为倒排索引,即把文件ID对应到关键词的映射转换为关键词到文件ID的映射,每个关键词都对应着一系列的文件,这些文件中都出现这个关键词。一个倒排索引由文档中所有不重复词的列表构成,对于其中每个词,有一个包含它的文档列表。
文档搜索
早期的全文检索会为整个文档集合建立一个很大的倒排索引并将其写入到磁盘。 一旦新的索引就绪,旧的就会被其替换,这样最近的变化便可以被检索到。
倒排索引被写入磁盘后是 不可改变
的:它永远不会修改。
动态更新索引
如何在保留不变性的前提下实现倒排索引的更新?答案是: 用更多的索引。通过增加新的补充索引来反映新近的修改,而不是直接重写整个倒排索引。每一个倒排索引都会被轮流查询到,从最早的开始查询完后再对结果进行合并。
文档分析
首先我们通过 Postman 发送 GET
请求查询分词效果
# GET http://localhost:9200/_analyze
{
"text":"测试单词"
}
ES 的默认分词器无法识别中文中测试、单词这样的词汇,而是简单的将每个字拆完分为一个词。这样的结果显然不符合我们的使用要求,所以我们需要下载 ES 对应版本的中文分词器。我们这里采用 IK 中文分词器,下载地址为:https://github.com/medcl/elasticsearch-analysis-ik/releases/tag/v7.8.0
将解压后的后的文件夹放入 ES 根目录下的 plugins 目录下,重启 ES 即可使用。我们这次加入新的查询参数"analyzer":"ik_max_word
# GET http://localhost:9200/_analyze
{
"text":"测试单词",
"analyzer":"ik_max_word"
}
// ik_max_word:会将文本做最细粒度的拆分
// ik_smart:会将文本做最粗粒度的拆分
ES 中也可以进行扩展词汇,首先进入 ES 根目录中的 plugins 文件夹下的 ik 文件夹,进入 config
目录,创建 custom.dic
文件,写入 单词 。同时打开 IKAnalyzer.cfg.xml
文件,将新建的 custom.dic
配置其中,重启 ES 服务器。
自定义分析器
# PUT http://localhost:9200/my_index
{
"settings": {
"analysis": {
"char_filter": {
"&_to_and": {
"type": "mapping",
"mappings": [ "&=> and "]
}},
"filter": {
"my_stopwords": {
"type": "stop",
"stopwords": [ "the", "a" ]
}},
"analyzer": {
"my_analyzer": {
"type": "custom",
"char_filter": [ "html_strip", "&_to_and" ],
"tokenizer": "standard",
"filter": [ "lowercase", "my_stopwords" ]
}}
}}}
结果展示:
{
"tokens": [
{
"token": "quick",
"start_offset": 4,
"end_offset": 9,
"type": "<ALPHANUM>",
"position": 1
},
{
"token": "and",
"start_offset": 10,
"end_offset": 11,
"type": "<ALPHANUM>",
"position": 2
},
{
"token": "brown",
"start_offset": 12,
"end_offset": 17,
"type": "<ALPHANUM>",
"position": 3
},
{
"token": "fox",
"start_offset": 18,
"end_offset": 21,
"type": "<ALPHANUM>",
"position": 4
}
]
}
文档处理
当我们使用 index API 更新文档 ,可以一次性读取原始文档,做我们的修改,然后重新索引 整个文档 。 最近的索引请求将获胜:无论最后哪一个文档被索引,都将被唯一存储在 Elasticsearch 中。如果其他人同时更改这个文档,他们的更改将丢失。在数据库领域中,有两种方法通常被用来确保并发更新时变更不会丢失:悲观并发控制
、乐观并发控制
。
乐观并发控制:
当我们之前讨论 index ,GET 和 delete 请求时,我们指出每个文档都有一个_version(版本)号,当文档被修改时版本号递增。 Elasticsearch 使用这个 version 号来确保变更以正确顺序得到执行。如果旧版本的文档在新版本之后到达,它可以被简单的忽略。我们可以利用 version 号来确保应用中相互冲突的变更不会导致数据丢失。我们通过指定想要修改文档的 version 号来达到这个目的。 如果该版本不是当前版本号,我们的请求将会失败。
老的版本 es 使用 version,但是新版本不支持了,会报下面的错误,提示我们用if_seq_no
和if_primary_term
。
POST: http://127.0.0.1:9200/shopping/_doc/1001?if_seq_no=0&if_primary_term=0
外部系统版本控制:
如果你的主数据库已经有了版本号 — 或一个能作为版本号的字段值比如 timestamp —那么你就可以在 Elasticsearch 中通过增加 version_type=external 到查询字符串的方式重用这些相同的版本号, 版本号必须是大于零的整数, 且小于 9.2E+18 — 一个 Java 中 long类型的正值。
外部版本号的处理方式和我们之前讨论的内部版本号的处理方式有些不同,Elasticsearch 不是检查当前 _version 和请求中指定的版本号是否相同, 而是检查当前_version 是否 小于 指定的版本号。 如果请求成功,外部的版本号作为文档的新 _version 进行存储。
POST: http://127.0.0.1:9200/shopping/_doc/1001?version=3&version_type=external
四、框架集成
Spring Data 框架基础
pom文件
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.3.6.RELEASE</version>
<relativePath/>
</parent>
<groupId>com.atguigu.es</groupId>
<artifactId>springdata-elasticsearch</artifactId>
<version>1.0</version>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-elasticsearch</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-devtools</artifactId>
<scope>runtime</scope>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-test</artifactId>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-test</artifactId>
</dependency>
</dependencies>
</project>
增加配置文件
在 resources 目录中增加 application.properties 文件
# es 服务地址
elasticsearch.host=127.0.0.1
# es 服务端口
elasticsearch.port=9200
# 配置日志级别,开启 debug 日志
logging.level.com.atguigu.es=debug
SpringBoot 主程序
package com.atguigu.es;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class SpringDataElasticSearchMainApplication {
public static void main(String[] args) {
SpringApplication.run(SpringDataElasticSearchMainApplication.class,args);
}
}
数据实体类
package com.atguigu.es;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.ToString;
@Data
@NoArgsConstructor
@AllArgsConstructor
@ToString
@Document(indexName = "product", shards = 3, replicas = 1)
public class Product {
@Id
private Long id;//商品唯一标识
@Filed(type = FieldType.Text)
private String title;//商品名称
@Filed(type = FieldType.Keyword)
private String category;//分类名称
@Filed(type = FieldType.Keyword)
private Double price;//商品价格
@Filed(type = FieldType.Keyword, index = false) // 不做索引关联
private String images;//图片地址
}
配置类
package com.atguigu.es;
import lombok.Data;
import org.apache.http.HttpHost;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;
import org.elasticsearch.client.RestHighLevelClient;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.elasticsearch.config.AbstractElasticsearchConfiguration;
// 关联配置文件
@ConfigurationProperties(prefix = "elasticsearch")
@Configuration
@Data
public class ElasticsearchConfig extends AbstractElasticsearchConfiguration {
private String host ;
private Integer port ;
//重写父类方法
@Override
public RestHighLevelClient elasticsearchClient() {
RestClientBuilder builder = RestClient.builder(new HttpHost(host, port));
RestHighLevelClient restHighLevelClient = new RestHighLevelClient(builder);
return restHighLevelClient;
}
}
DAO 数据访问对象
package com.atguigu.es;
import org.springframework.data.elasticsearch.repository.ElasticsearchRepository;
import org.springframework.stereotype.Repository;
@Repository
public interface ProductDao extends ElasticsearchRepository<Product,Long> {
}
索引操作
package com.atguigu.es;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.data.elasticsearch.core.ElasticsearchRestTemplate;
import org.springframework.test.context.junit4.SpringRunner;
@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringDataESIndexTest {
//注入 ElasticsearchRestTemplate
@Autowired
private ElasticsearchRestTemplate elasticsearchRestTemplate;
//创建索引并增加映射配置
@Test
public void createIndex(){
//创建索引,系统初始化会自动创建索引
System.out.println("创建索引");
}
@Test
public void deleteIndex(){
//创建索引,系统初始化会自动创建索引
boolean flg = elasticsearchRestTemplate.deleteIndex(Product.class);
System.out.println("删除索引 = " + flg);
}
}
文档操作
package com.atguigu.es;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.data.domain.Page;
import org.springframework.data.domain.PageRequest;
import org.springframework.data.domain.Sort;
import org.springframework.test.context.junit4.SpringRunner;
import java.util.ArrayList;
import java.util.List;
@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringDataESProductDaoTest {
@Autowired
private ProductDao productDao;
/**
* 新增
*/
@Test
public void save(){
Product product = new Product();
product.setId(2L);
product.setTitle("华为手机");
product.setCategory("手机");
product.setPrice(2999.0);
product.setImages("http://www.atguigu/hw.jpg");
productDao.save(product);
}
//修改
@Test
public void update(){
Product product = new Product();
product.setId(1L);
product.setTitle("小米 2 手机");
product.setCategory("手机");
product.setPrice(9999.0);
product.setImages("http://www.atguigu/xm.jpg");
productDao.save(product);
}
//根据 id 查询
@Test
public void findById(){
Product product = productDao.findById(1L).get();
System.out.println(product);
}
//查询所有
@Test
public void findAll(){
Iterable<Product> products = productDao.findAll();
for (Product product : products) {
System.out.println(product);
}
}
//删除
@Test
public void delete(){
Product product = new Product();
product.setId(1L);
productDao.delete(product);
}
//批量新增
@Test
public void saveAll(){
List<Product> productList = new ArrayList<>();
for (int i = 0; i < 10; i++) {
Product product = new Product();
product.setId(Long.valueOf(i));
product.setTitle("["+i+"]小米手机");
product.setCategory("手机");
product.setPrice(1999.0+i);
product.setImages("http://www.atguigu/xm.jpg");
productList.add(product);
}
productDao.saveAll(productList);
}
//分页查询
@Test
public void findByPageable(){
//设置排序(排序方式,正序还是倒序,排序的 id)
Sort sort = Sort.by(Sort.Direction.DESC,"id");
int currentPage=0;//当前页,第一页从 0 开始,1 表示第二页
int pageSize = 5;//每页显示多少条
//设置查询分页
PageRequest pageRequest = PageRequest.of(currentPage, pageSize,sort);
//分页查询
Page<Product> productPage = productDao.findAll(pageRequest);
for (Product Product : productPage.getContent()) {
System.out.println(Product);
}
}
}
文档搜索
package com.atguigu.es;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.query.TermQueryBuilder;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.data.domain.PageRequest;
import org.springframework.test.context.junit4.SpringRunner;
@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringDataESSearchTest {
@Autowired
private ProductDao productDao;
/**
* term 查询
* search(termQueryBuilder) 调用搜索方法,参数查询构建器对象
*/
@Test
public void termQuery(){
TermQueryBuilder termQueryBuilder = QueryBuilders.termQuery("title", "小米");
Iterable<Product> products = productDao.search(termQueryBuilder);
for (Product product : products) {
System.out.println(product);
}
}
/**
* term 查询加分页
*/
@Test
public void termQueryByPage(){
int currentPage= 0 ;
int pageSize = 5;
//设置查询分页
PageRequest pageRequest = PageRequest.of(currentPage, pageSize);
TermQueryBuilder termQueryBuilder = QueryBuilders.termQuery("title", "小米");
Iterable<Product> products = productDao.search(termQueryBuilder,pageRequest);
for (Product product : products) {
System.out.println(product);
}
}
}
Spark Streaming - 集成
pom文件
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.atguigu.es</groupId>
<artifactId>sparkstreaming-elasticsearch</artifactId>
<version>1.0</version>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.12</artifactId>
<version>3.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.12</artifactId>
<version>3.0.0</version>
</dependency>
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch</artifactId>
<version>7.8.0</version>
</dependency>
<!-- elasticsearch 的客户端 -->
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-high-level-client</artifactId>
<version>7.8.0</version>
</dependency>
<!-- elasticsearch 依赖 2.x 的 log4j -->
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
<version>2.8.2</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>2.8.2</version>
</dependency>
<!-- <dependency>-->
<!-- <groupId>com.fasterxml.jackson.core</groupId>-->
<!-- <artifactId>jackson-databind</artifactId>-->
<!-- <version>2.11.1</version>-->
<!-- </dependency>-->
<!-- <!– junit 单元测试 –>-->
<!-- <dependency>-->
<!-- <groupId>junit</groupId>-->
<!-- <artifactId>junit</artifactId>-->
<!-- <version>4.12</version>-->
<!-- </dependency>-->
</dependencies>
</project>
功能实现
package com.atguigu.es
import org.apache.http.HttpHost
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.ReceiverInputDStream
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.elasticsearch.action.index.IndexRequest
import org.elasticsearch.client.indices.CreateIndexRequest
import org.elasticsearch.client.{RequestOptions, RestClient, RestHighLevelClient}
import org.elasticsearch.common.xcontent.XContentType
import java.util.Date
object SparkStreamingESTest {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("ESTest")
val ssc = new StreamingContext(sparkConf, Seconds(3))
val ds: ReceiverInputDStream[String] = ssc.socketTextStream("localhost", 9999)
ds.foreachRDD(
rdd => {
data => {
new RestHighLevelClient(
RestClient.builder(new HttpHost("localhost", 9200))
)
// 设置索引及唯一性标识
val ss = data.split(" ")
// 新增文档 - 请求对象
val request = new IndexRequest()
request.index("product").id(ss(0))
val json =
s"""
| { "data" : "${ss(1)}"}
|""".stripMargin
// 添加文档数据,数据格式为 JSON 格式
request.source(json, XContentType.JSON)
// 客户端发送请求,获取响应对象
val response = client.index(request, RequestOptions.DEFAULT)
System.out.println("_index:" + response.getIndex());
System.out.println("_id:" + response.getId());
System.out.println("_result:" + response.getResult());
client.close()
}
}
)
ssc.start()
ssc.awaitTermination()
}
}
Flink - 集成
pom文件
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.atguigu.es</groupId>
<artifactId>flink-elasticsearch</artifactId>
<version>1.0</version>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala_2.12</artifactId>
<version>1.12.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.12</artifactId>
<version>1.12.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.12</artifactId>
<version>1.12.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-elasticsearch7_2.11</artifactId>
<version>1.12.0</version>
</dependency>
<!-- jackson -->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
<version>2.11.1</version>
</dependency>
</dependencies>
</project>
功能实现
package com.atguigu.es;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
import org.apache.flink.streaming.connectors.elasticsearch7.ElasticsearchSink;
import org.apache.http.HttpHost;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.Requests;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class FlinkElasticsearchSinkTest {
public static void main(String[] args) throws Exception {
// 构建Flink环境对象
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Source:数据的输入
DataStreamSource<String> source = env.socketTextStream("localhost", 9999);
// 使用ESBuilder构建输出
List<HttpHost> hosts = new ArrayList<>();
hosts.add(new HttpHost("localhost", 9200, "http"));
ElasticsearchSink.Builder<String> esBuilder = new ElasticsearchSink.Builder<>(hosts,
new ElasticsearchSinkFunction<String>() {
@Override
public void process(String s, RuntimeContext runtimeContext, RequestIndexer requestIndexer){
Map<String, String> jsonMap = new HashMao<>();
jsonMap.put("data", s);
IndexRequest indexRequest = Requests.indexRequest();
indexRequest.index("flink-index");
indexRequest.id("9001");
indexRequest.source(jsonMap);
requestIndexer.add(indexRequest);
}
})
// Sink:数据的输出
esBuilder.setBuilFlushMaxActions(1);
source.addSink(esSinkBuilder.build());
env.execute("flink-es");
}
}
五、优化
5.1 硬件选择
ES重度使用磁盘,磁盘能处理的吞吐量越大,节点久越稳定。优化磁盘I/O的技巧:
使用SSD
。SSD(Solid State Drive 固态硬盘)相对于传统的机械硬盘(HDD),SSD采用闪存存储技术,没有机械部件,因此速度更快、耐用性更高。
使用RAID 0
。RAID(Redundant Array of Independent Disks)是一种数据存储技术,用于将多个物理硬盘组合成一个逻辑卷,以提高数据的性能、容错能力或同时兼具两者。RAID 0,也被称为条带化(striping),是RAID级别中的一种。它通过将数据分散存储在多个硬盘上,以提高读写性能。在RAID 0中,数据被分成多个固定大小的块,并按顺序分配到各个硬盘上。因此,当读取或写入数据时,可以同时从多个硬盘中读取或写入,大大提高了数据传输速度。
使用多块硬盘,并允许ES通过多个path.data目录配置把数据条带化分配到他们上面。
不要使用远程挂载的存储。远程挂载的存储是指通过网络连接将远程存储设备(通常是网络存储设备)连接到本地计算机,并将其作为本地计算机的一个目录或驱动器来使用。比如 NFS 或者 SMB/CIFS。
5.2 分片策略
分片和副本的设计为 ES 提供了支持分布式和故障转移的特性,但分片和副本不是可以无限分配的。分片在设置好之后是不能修改的,由于路由计算规则,数据的存储位置是可以通过计算得到的,一旦分片动态变化,可能会导致数据查询不到。所以分片一定是在创建索引时久设定好的。
合理设置分片数
设计原则:
每个分片占用的硬盘容量不超过 ES 的最大 JVM 的堆空间设置(一般设置不超过 32G
,参考下文
的 JVM 设置原则),因此,如果索引的总容量在 500G 左右,那分片大小在 16 个左右即可;当然,
最好同时考虑原则 2。
考虑一下 node 数量,一般一个节点有时候就是一台物理机,如果分片数过多,大大超过了节点数,
很可能会导致一个节点上存在多个分片,一旦该节点故障,即使保持了 1 个以上的副本,同样有可能
会导致数据丢失,集群无法恢复。所以, 一般都设置分片数不超过节点数的 3 倍。
主分片,副本和节点最大数之间数量,我们分配的时候可以参考以下关系:
节点数<=主分片数(副本数+1)*
推迟分片策略
对于节点瞬时中断的问题,默认情况,集群会等待一分钟来查看节点是否会重新加入,如果这个节点在此期间重新加入,重新加入的节点会保持其现有的分片数据,不会触发新的分片分配。这样就可以减少 ES 在自动再平衡可用分片时所带来的极大开销。
通过修改参数 delayed_timeout ,可以延长再均衡的时间,可以全局设置也可以在索引级别进行修改:
PUT /_all/_settings
{
"settings": {
"index.unassigned.node_left.delayed_timeout": "5m"
}
}
5.3 路由选择
当我们查询文档的时候,Elasticsearch 如何知道一个文档应该存放到哪个分片中呢?它其实是通过下面这个公式来计算出来:
shard = hash(routing) % number_of_primary_shards
routing 默认值是文档的 id,也可以采用自定义值,比如用户 id。
5.4 写入速度优化
针对于搜索性能要求不高,但是对写入要求较高的场景,我们需要尽可能的选择恰当写优化策略。综合来说,可以考虑以下几个方面来提升写索引的性能:
加大 Translog Flush ,目的是降低 Iops、Writeblock。
增加 Index Refresh 间隔,目的是减少 Segment Merge 的次数。
调整 Bulk 线程池和队列。
优化节点间的任务分布。
优化 Lucene 层的索引建立,目的是降低 CPU 及 IO。
批量数据提交
ES 提供了 Bulk API 支持批量操作,当我们有大量的写任务时,可以使用 Bulk 来进行批量写入。
通用的策略如下:Bulk 默认设置批量提交的数据量不能超过 100M。数据条数一般是根据文档的大小和服务器性能而定的,但是单次批处理的数据大小应从 5MB~15MB 逐渐增加,当性能没有提升时,把这个数据量作为最大值。
优化存储设备
ES 是一种密集使用磁盘的应用,在段合并的时候会频繁操作磁盘,所以对磁盘要求较高,当磁盘速度提升之后,集群的整体性能会大幅度提高。
合理使用合并
Lucene 以段的形式存储数据。当有新的数据写入索引时,Lucene 就会自动创建一个新的段。
随着数据量的变化,段的数量会越来越多,消耗的多文件句柄数及 CPU 就越多,查询效率就会下降。
由于 Lucene 段合并的计算量庞大,会消耗大量的 I/O,所以 ES 默认采用较保守的策略,让后台定期进行段合并。
减少refresh的次数
Lucene 在新增数据时,采用了延迟写入的策略,默认情况下索引的 refresh_interval 为1 秒。
Lucene 将待写入的数据先写到内存中,超过 1 秒(默认)时就会触发一次 Refresh,然后 Refresh 会把内存中的的数据刷新到操作系统的文件缓存系统中。
如果我们对搜索的实效性要求不高,可以将 Refresh 周期延长,例如 30 秒。
这样还可以有效地减少段刷新次数,但这同时意味着需要消耗更多的 Heap 内存。
加大 Flush 设置
Flush 的主要目的是把文件缓存系统中的段持久化到硬盘,当 Translog 的数据量达到512MB 或者 30 分钟时,会触发一次 Flush。
index.translog.flush_threshold_size
参数的默认值是 512MB,我们进行修改。
增加参数值意味着文件缓存系统中可能需要存储更多的数据,所以我们需要为操作系统的文件缓存系统留下足够的空间。
减少副本的数量
ES 为了保证集群的可用性,提供了 Replicas(副本)支持,然而每个副本也会执行分析、索引及可能的合并过程,所以 Replicas 的数量会严重影响写索引的效率。
当写索引时,需要把写入的数据都同步到副本节点,副本节点越多,写索引的效率就越慢。
如果我们需要大批量进行写入操作,可以先禁止Replica复制,设置index.number_of_replicas: 0
关闭副本。在写入完成后,Replica 修改回正常的状态。
5.5 内存设置
如果有一个 64G 内存的机器, ES 堆内存的分配需要满足以下两个原则:
1. 不要超过物理内存的 50%:Lucene 的设计目的是把底层 OS 里的数据缓存到内存中。
Lucene 的段是分别存储到单个文件中的,这些文件都是不会变化的,所以很利于缓存,同时操作系统也会把这些段文件缓存起来,以便更快的访问。
如果我们设置的堆内存过大,Lucene 可用的内存将会减少,就会严重影响降低 Lucene 的全文本查询性能。
2.堆内存的大小最好不要超过 32GB:在 Java 中,所有对象都分配在堆上,然后有一个 Klass Pointer 指针指向它的类元数据。
这个指针在 64 位的操作系统上为 64 位,64 位的操作系统可以使用更多的内存(2^ 64)。在 32 位的系统上为 32 位,32 位的操作系统的最大寻址空间为 4GB(2^32)。
但是 64 位的指针意味着更大的浪费,因为你的指针本身大了。浪费内存不算,更糟糕的是,更大的指针在主内存和缓存器(例如 LLC, L1 等)之间移动数据的时候,会占用更多的带宽。
最终我们都会采用 31 G 设置:-Xms 31g,-Xmx 31g
5.6 重要配置
参数名 | 参数值 | 说明 |
---|---|---|
cluster.name | elasticsearch | 配置 ES 的集群名称,默认值是 ES,建议改成与所存数据相关的名称,ES 会自动发现在同一网段下的集群名称相同的节点 |
node.name | node-1 | 集群中的节点名,在同一个集群中不能重复。节点的名称一旦设置,就不能再改变了。当然,也可以设 置 成 服 务 器 的 主 机 名 称 , 例 如node.name:${HOSTNAME}。 |
node.master | true | 指定该节点是否有资格被选举成为 Master 节点,默认是 True,如果被设置为 True,则只是有资格成为Master 节点,具体能否成为 Master 节点,需要通过选举产生。 |
node.data | true | 指定该节点是否存储索引数据,默认为 True。数据的增、删、改、查都是在 Data 节点完成的。 |
index.number_of_shards | 1 | 设置都索引分片个数,默认是 1 片。也可以在创建索引时设置该值,具体设置为多大都值要根据数据量的大小来定。如果数据量不大,则设置成 1 时效率最高 |
index.number_of_replicas | 1 | 设置默认的索引副本个数,默认为 1 个。副本数越多,集群的可用性越好,但是写索引时需要同步的数据越多。 |
transport.tcp.compress | 1 | 设置在节点间传输数据时是否压缩,默认为 False,不压缩 |
discovery.zen.minimum_master_nodes | true | 设置在选举 Master 节点时需要参与的最少的候选主节点数,默认为 1。如果使用默认值,则当网络不稳定时有可能会出现脑裂。合理的数值为 (master_eligible_nodes/2)+1 ,其中 |
master_eligible_nodes 表示集群中的候选主节点数 | ||
discovery.zen.ping.timeout | 1 | 设置在集群中自动发现其他节点时 Ping 连接的超时时间,默认为 3 秒。在较差的网络环境下需要设置得大一点,防止因误判该节点的存活状态而导致分片的转移 |