在Elasticsearch中,Ingest Pipeline 的 `processors` 属性是一个数组,包含一个或多个处理器(processors)。每个处理器定义了一个数据处理步骤,可以在数据索引之前对数据进行预处理或富化。以下是对 `processors` 属性中常见处理器的详细说明:
### 常见处理器
1. **`set`**:
- **描述**: 设置或更新文档中的字段值。
- **参数**:
- `field`: 要设置的字段名称。
- `value`: 要设置的值。
- `if` (可选): 条件表达式,只有当条件为真时才执行处理器。
- **示例**:
```json
{
"set": {
"field": "status",
"value": "processed"
}
}
```
2. **`remove`**:
- **描述**: 删除文档中的字段。
- **参数**:
- `field`: 要删除的字段名称。
- `if` (可选): 条件表达式,只有当条件为真时才执行处理器。
- **示例**:
```json
{
"remove": {
"field": "temp_field"
}
}
```
3. **`rename`**:
- **描述**: 重命名文档中的字段。
- **参数**:
- `field`: 当前字段名称。
- `target_field`: 新字段名称。
- `ignore_missing` (可选): 如果为 `true`,则在字段不存在时不抛出错误。
- **示例**:
```json
{
"rename": {
"field": "old_field",
"target_field": "new_field"
}
}
```
4. **`script`**:
- **描述**: 使用Painless脚本对文档进行处理。
- **参数**:
- `source`: Painless脚本代码。
- `lang` (可选): 脚本语言,默认为 `painless`。
- `params` (可选): 脚本参数。
- **示例**:
```json
{
"script": {
"source": "ctx._source.count++",
"params": {
"increment": 1
}
}
}
```
5. **`inference`**:
- **描述**: 使用预训练的机器学习模型对文档进行推理。
- **参数**:
- `model_id`: 预训练模型的ID。
- `target_field`: 存储推理结果的字段名称。
- `inference_config` (可选): 推理配置。
- **示例**:
```json
{
"inference": {
"model_id": "my_word_embedding_model",
"target_field": "embedding",
"inference_config": {
"natural_language_inference": {
"results_field": "embedding"
}
}
}
}
```
6. **`query_vector_builder`**:
- **描述**: 生成查询向量。
- **参数**:
- `field`: 输入字段名称。
- `target_field`: 存储生成的查询向量的字段名称。
- **示例**:
```json
{
"query_vector_builder": {
"field": "text",
"target_field": "query_vector"
}
}
```
7. **`date`**:
- **描述**: 将字符串转换为日期。
- **参数**:
- `field`: 要转换的字段名称。
- `target_field` (可选): 存储转换后的日期的字段名称。
- `formats`: 日期格式列表。
- **示例**:
```json
{
"date": {
"field": "timestamp",
"target_field": "date",
"formats": ["yyyy-MM-dd'T'HH:mm:ssZ", "epoch_millis"]
}
}
```
8. **`grok`**:
- **描述**: 使用Grok模式解析文本字段。
- **参数**:
- `field`: 要解析的字段名称。
- `patterns`: Grok模式列表。
- `target_field` (可选): 存储解析结果的字段名称。
- **示例**:
```json
{
"grok": {
"field": "log_message",
"patterns": ["%{COMBINEDAPACHELOG}"]
}
}
```
9. **`append`**:
- **描述**: 将值追加到数组字段中。
- **参数**:
- `field`: 要追加值的字段名称。
- `value`: 要追加的值。
- **示例**:
```json
{
"append": {
"field": "tags",
"value": "new_tag"
}
}
```
10. **`convert`**:
- **描述**: 将字段值转换为指定的数据类型。
- **参数**:
- `field`: 要转换的字段名称。
- `target_field` (可选): 存储转换后值的字段名称。
- `type`: 目标数据类型(如 `string`, `integer`, `float`, `boolean`)。
- **示例**:
```json
{
"convert": {
"field": "age",
"type": "integer"
}
}
```
### 示例 Ingest Pipeline
以下是一个示例 Ingest Pipeline,展示了如何使用多个处理器:
```json
PUT _ingest/pipeline/example_pipeline
{
"description": "Example pipeline with multiple processors",
"processors": [
{
"set": {
"field": "status",
"value": "processed"
}
},
{
"remove": {
"field": "temp_field"
}
},
{
"rename": {
"field": "old_field",
"target_field": "new_field"
}
},
{
"script": {
"source": "ctx._source.count++"
}
},
{
"inference": {
"model_id": "my_word_embedding_model",
"target_field": "embedding"
}
},
{
"query_vector_builder": {
"field": "text",
"target_field": "query_vector"
}
},
{
"date": {
"field": "timestamp",
"target_field": "date",
"formats": ["yyyy-MM-dd'T'HH:mm:ssZ", "epoch_millis"]
}
},
{
"grok": {
"field": "log_message",
"patterns": ["%{COMBINEDAPACHELOG}"]
}
},
{
"append": {
"field": "tags",
"value": "new_tag"
}
},
{
"convert": {
"field": "age",
"type": "integer"
}
}
]
}
```
### 使用 Ingest Pipeline
在索引数据时,指定使用创建的 Ingest Pipeline:
```json
POST my_index/_doc?pipeline=example_pipeline
{
"text": "example text",
"old_field": "some value",
"temp_field": "temporary value",
"timestamp": "2023-10-01T12:34:56Z",
"log_message": "127.0.0.1 - frank [10/Oct/2000:13:55:36 -0700] \"GET /apache_pb.gif HTTP/1.0\" 200 2326",
"age": "25"
}
```
### 验证结果
你可以通过查询索引来验证数据是否正确处理:
```json
GET my_index/_search
{
"query": {