【天衍系列 04】深入理解Flink的ElasticsearchSink组件:实时数据流如何无缝地流向Elasticsearch

文章目录

  • 01 Elasticsearch Sink 基础概念
  • 02 Elasticsearch Sink 工作原理
  • 03 Elasticsearch Sink 核心组件
  • 04 Elasticsearch Sink 配置参数
  • 05 Elasticsearch Sink 依赖管理
  • 06 Elasticsearch Sink 初阶实战
  • 07 Elasticsearch Sink 进阶实战
    • 7.1 包结构 & 项目配置
      • 项目配置application.properties
      • 日志配置log4j2.properties
      • 项目pom.xml文件
    • 7.2 实体类ElasticsearchEntity
    • 7.3 客户端工厂类CustomRestClientFactory
    • 7.4 回调函数类CustomRequestConfigCallback
    • 7.5 客户端配置类CustomHttpClientConfigCallback
    • 7.6 Es操作类CustomElasticsearchSinkFunction
    • 7.7 异常处理类CustomActionRequestFailureHandler
    • 7.8 作业主类ElasticsearchSinkStreamJobAdvancedDemo

01 Elasticsearch Sink 基础概念

Flink的Elasticsearch Sink是用于将Flink数据流(DataStream)中的数据发送到Elasticsearch的组件。它是Flink的一个连接器(Connector),用于实现将实时处理的结果或数据持续地写入Elasticsearch集群中的索引中。

下面是一些关于Flink的Elasticsearch Sink的基础概念:

  1. 数据源(Source):Flink数据流的源头,可以是各种数据源,例如Kafka、文件系统、Socket等。Elasticsearch Sink通常是连接到Flink数据流的末端,用于将最终处理结果或数据写入Elasticsearch。
  2. Elasticsearch集群:一个或多个Elasticsearch节点的集合,用于存储和处理数据。Elasticsearch提供了分布式的数据存储和搜索功能。
  3. 索引(Index):在Elasticsearch中,索引是存储相关数据的地方,类似于关系数据库中的表。每个索引可以包含多个文档(Document),每个文档包含一个或多个字段(Field)。
  4. 文档(Document):在Elasticsearch中,文档是最小的数据单元。它们以JSON格式表示,并存储在索引中。
  5. Elasticsearch Sink:是Flink的一个数据接收器,用于将数据流中的数据发送到Elasticsearch集群中的特定索引。Sink负责将Flink数据流中的事件转换为Elasticsearch要求的格式,并将其发送到指定的索引。
  6. 序列化与映射:在将数据写入Elasticsearch之前,通常需要对数据进行序列化和映射。序列化是将数据从Flink的内部表示转换为Elasticsearch要求的JSON格式。映射则是定义如何将Flink数据流中的字段映射到Elasticsearch文档中的字段。
  7. 并行度控制:Elasticsearch Sink支持并行度控制,可以根据需要调整并发写入Elasticsearch的任务数量。这有助于优化性能并避免对Elasticsearch集群造成过大的负载。

总的来说,Flink的Elasticsearch Sink是一个关键的组件,用于将实时处理的结果或数据可靠地写入Elasticsearch中,从而支持各种实时数据分析和搜索应用。

02 Elasticsearch Sink 工作原理

Elasticsearch Sink 是 Apache Flink 提供的一个连接器,用于将 Flink 数据流中的数据发送到 Elasticsearch 集群中。以下是 Elasticsearch Sink 的工作原理:

  1. 数据流入 Flink 程序: 数据首先从外部数据源(如 Kafka、RabbitMQ、文件系统等)进入到 Flink 程序中。Flink 以流式处理的方式处理数据,这意味着数据会一条一条地进入 Flink 的数据流中。
  2. 数据转换与处理: 一旦数据进入 Flink,您可以对数据进行各种转换和处理。这可能包括数据清洗、转换、聚合、窗口操作等。在您的 Flink 程序中,您可以通过各种 Flink 的算子来实现这些转换和处理。
  3. Elasticsearch Sink 的配置: 当需要将数据写入 Elasticsearch 时,您需要配置 Elasticsearch Sink。这通常包括指定 Elasticsearch 集群的地址、端口、索引名称等信息。您还可以配置其他参数,例如批量写入的大小、超时时间等。
  4. 数据发送到 Elasticsearch: 一旦配置完成,Elasticsearch Sink 会将 Flink 数据流中的数据转换为 JSON 格式,并通过 Elasticsearch 的 REST API 将数据发送到指定的索引中。通常,Elasticsearch Sink 会将数据批量发送到 Elasticsearch,以提高写入的效率和性能。
  5. 序列化与映射: 在发送数据之前,通常需要将 Flink 数据流中的数据序列化为 JSON 格式,并根据 Elasticsearch 索引的映射规则进行字段映射。这确保了发送到 Elasticsearch 的数据与索引的结构一致。
  6. 容错与错误处理: Flink 提供了容错机制来确保数据的可靠性和一致性。如果在数据发送过程中发生错误,例如网络故障或 Elasticsearch 集群不可用,Flink 会自动进行故障恢复,并重新发送丢失的数据,以确保数据不会丢失。
  7. 性能优化: 为了提高性能,Elasticsearch Sink 可以通过调整批量写入的大小、并发度等参数来优化性能。这可以减少与 Elasticsearch 的通信开销,并提高写入的效率。

总的来说,Elasticsearch Sink 通过将 Flink 数据流中的数据转换为 JSON 格式,并利用 Elasticsearch 的 REST API 将数据发送到指定的索引中,实现了将实时流数据写入 Elasticsearch 的功能。

03 Elasticsearch Sink 核心组件

Elasticsearch Sink 在 Apache Flink 中是一个核心组件,它负责将 Flink 数据流中的数据发送到 Elasticsearch。下面是 Elasticsearch Sink 的核心组件:

  1. SinkFunction: SinkFunction 是 Flink 中的一个接口,用于定义将数据发送到外部系统的逻辑。在 Elasticsearch Sink 中,您需要实现 SinkFunction 接口,以将 Flink 数据流中的数据发送到 Elasticsearch。通常,您需要在 SinkFunction 中实现将数据转换为 JSON 格式,并通过 Elasticsearch 的 REST API 将数据发送到指定的索引中。
  2. BulkProcessor: BulkProcessor 是 Elasticsearch Java 客户端提供的一个功能,用于批量写入数据到 Elasticsearch。在 Elasticsearch Sink 中,BulkProcessor 负责将 Flink 数据流中的数据批量发送到 Elasticsearch。您可以通过 BulkProcessor 来配置批量写入的大小、并发度等参数,以优化写入性能。
  3. TransportClient 或 RestHighLevelClient: 在 Elasticsearch Sink 中,您可以使用 Elasticsearch Java 客户端的 TransportClient 或 RestHighLevelClient 来与 Elasticsearch 集群进行通信。这些客户端提供了与 Elasticsearch 集群交互的接口,使您可以发送数据到 Elasticsearch、执行查询、索引管理等操作。
  4. 序列化器(Serializer): 在将数据发送到 Elasticsearch 之前,通常需要将 Flink 数据流中的数据序列化为 JSON 格式。序列化器负责将 Flink 数据流中的数据转换为 Elasticsearch 所需的 JSON 格式。您可以根据具体的数据类型和业务需求来实现自定义的序列化器。
  5. Elasticsearch 连接配置: 在 Elasticsearch Sink 中,您需要配置与 Elasticsearch 集群的连接信息,包括 Elasticsearch 集群的地址、端口、索引名称等。这些配置信息通常在初始化 Elasticsearch Sink 时进行设置,并在发送数据时使用。
  6. 容错与错误处理机制: Elasticsearch Sink 需要具备容错和错误处理机制,以确保数据的可靠性和一致性。如果在数据发送过程中发生错误,例如网络故障或 Elasticsearch 集群不可用,Sink 需要能够进行故障恢复,并重新发送丢失的数据,以确保数据不会丢失。

这些组件共同作用,构成了 Elasticsearch Sink 在 Flink 中的核心功能,使得 Flink 用户可以轻松地将实时流数据发送到 Elasticsearch,并实现各种实时数据分析和搜索应用。

04 Elasticsearch Sink 配置参数

nodes :Elasticsearch 集群的节点地址列表

port :Elasticsearch 集群的端口

Elasticsearch 集群的节点地址列表

scheme : Elasticsearch 集群的通信协议,http或https

type :Elasticsearch 集群的文档类型,es7以后是_doc

index :Elasticsearch 集群的索引名称

bulkFlushMaxActions :内部批量处理器,刷新前最大缓存的操作数

bulkFlushMaxSizeMb :刷新前最大缓存的数据量(以兆字节为单位)

bulkFlushInterval :刷新的时间间隔(不论缓存操作的数量或大小如何)

bulkFlushBackoff :是否启用批量写入的退避策略,当Elasticsearch 写入失败时,可以启用退避策略,以避免频繁的重试。此时,setBulkFlushBackoffDelay 和 setBulkFlushBackoffRetries 参数生效。

bulkFlushBackoffDelay :设置批量写入的退避延迟时间,在发生写入失败后,等待指定的延迟时间后再进行重试

bulkFlushBackoffRetries :设置批量写入的最大重试次数,设置在写入失败后的最大重试次数。超过这个次数后,将不再重试

connectTimeout :设置与 Elasticsearch 集群建立连接的超时时间,单位为毫秒。在指定的时间内无法建立连接将会抛出连接超时异常

socketTimeout :设置与 Elasticsearch 连接的套接字超时时间,单位为毫秒。该参数定义了在建立连接后从服务器读取数据的超时时间。

connectionRequestTimeout :设置连接请求超时时间,单位为毫秒。该参数表示从连接池获取连接的超时时间。如果在指定的时间内无法获得连接,将会抛出连接请求超时异常。

redirectsEnabled :设置是否允许重定向。如果设置为true,则当遇到重定向响应时,客户端将跟随重定向并继续请求;如果设置为false,重定向响应将被视为错误。

maxRedirects :客户端允许的最大重定向次数

authenticationEnabled :启用身份验证功能。通过设置该参数为true,可以提供用户名和密码进行身份验证,以连接到 Elasticsearch 集群。

circularRedirectsAllowed :设置是否允许循环重定向。如果设置为true,则允许在重定向过程中发生循环重定向;如果设置为false,则在检测到循环重定向时,将会抛出异常。

contentCompressionEnabled :设置是否启用内容压缩。如果设置为true,则允许客户端和 Elasticsearch 之间进行内容压缩,以减少数据传输量。

expectContinueEnabled :设置是否启用 “Expect: continue” 机制。当设置为true时,在发送请求之前,客户端会发送一个请求头部,询问服务器是否接受请求的主体部分。如果服务器响应允许继续发送请求主体,则客户端会继续发送请求;如果服务器响应拒绝继续发送请求主体,则客户端会放弃该请求。

normalizeUri :设置是否标准化 URI。如果设置为true,则客户端会尝试标准化请求 URI,以便消除多余和重复的斜杠等

05 Elasticsearch Sink 依赖管理

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-java</artifactId>
    <version>1.14.4</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-streaming-scala_1.12</artifactId>
    <version>1.14.4</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-clients_1.12</artifactId>
    <version>1.14.4</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-elasticsearch7_1.12</artifactId>
    <version>1.14.4</version>
</dependency>

06 Elasticsearch Sink 初阶实战

package com.aurora.demo;

import com.alibaba.fastjson.JSONObject;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
import org.apache.flink.streaming.connectors.elasticsearch7.ElasticsearchSink;
import org.apache.http.HttpHost;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.Requests;
import org.elasticsearch.common.xcontent.XContentType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.UUID;

/**
 * 描述:Flink集成Elasticsearch Connector连接器快速入门运行demo
 * 实现实时数据流如何无缝地流向Elasticsearch
 *
 * @author 浅夏的猫
 * @version 1.0.0
 * @date 2024-02-13 22:25:58
 */
public class ElasticsearchSinkStreamJobQuickDemo {

    private static final Logger logger = LoggerFactory.getLogger(ElasticsearchSinkStreamJobQuickDemo.class);

    public static void main(String[] args) throws Exception {

        // 创建elasticsearch集群的httpHost连接
        HttpHost httpHost = new HttpHost("localhost", 9200, "http");

        List<HttpHost> httpHosts = new ArrayList<>();

        httpHosts.add(httpHost);

        // 创建elasticsearchSinkFunction函数对象,专门用于处理数据写入elasticsearchSink算子队列,会自动创建索引
        ElasticsearchSinkFunction<JSONObject> elasticsearchSinkFunction = new ElasticsearchSinkFunction<JSONObject>() {
            @Override
            public void process(JSONObject element, RuntimeContext runtimeContext, RequestIndexer indexer) {
                String transId = element.getString("transId");
                String tradeTime = element.getString("tradeTime");
                String index = "flink_" + tradeTime;
                logger.info("交易流水={},数据写入索引{}成功", transId, index);
                IndexRequest indexRequest = Requests.indexRequest().index(index).type("_doc").id(transId).source(element, XContentType.JSON);
                indexer.add(indexRequest);
            }
        };

        // 构建elasticsearchSink算子Builder
        ElasticsearchSink.Builder<JSONObject> esSinkBuilder = new ElasticsearchSink.Builder<>(httpHosts, elasticsearchSinkFunction);

        // 每个请求最多发送的文档数量
        esSinkBuilder.setBulkFlushMaxActions(1);

        // 每次发送请求的时间间隔
        esSinkBuilder.setBulkFlushInterval(1000);

        //构建elasticsearchSink算子
        ElasticsearchSink<JSONObject> sink = esSinkBuilder.build();

        // 自定义数据源,模拟生产环境交易接入,每秒下发一个json格式数据
        SourceFunction<JSONObject> dataSource = new SourceFunction<JSONObject>() {
            @Override
            public void run(SourceContext sourceContext) throws Exception {
                while (true) {
                    //交易流水号
                    String tradeId = UUID.randomUUID().toString();
                    //交易发生时间戳
                    long timeStamp = System.currentTimeMillis();
                    //交易发生金额
                    long tradeAmount = new Random().nextInt(1000);
                    //交易名称
                    String tradeName = "支付宝转账";

                    JSONObject dataObj = new JSONObject();
                    dataObj.put("transId", tradeId);
                    dataObj.put("timeStamp", timeStamp);
                    dataObj.put("tradeTime", dateUtil(timeStamp));
                    dataObj.put("tradeAmount", tradeAmount);
                    dataObj.put("tradeName", tradeName);

                    //模拟生产,每隔1秒生成一笔交易
                    Thread.sleep(1000);
                    logger.info("源交易流水={},原始报文={}", tradeId, dataObj.toJSONString());
                    sourceContext.collect(dataObj);
                }
            }

            @Override
            public void cancel() {

            }
        };

        // 创建运行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 构建数据源
        DataStreamSource<JSONObject> dataStreamSource = env.addSource(dataSource);

        // 数据源写入数据算子,进行输出到elasticsearch
        dataStreamSource.addSink(sink);

        // 执行任务
        env.execute();
    }

    /**
     * 描述:时间格式化工具类
     *
     * @param timestamp 时间戳
     * @return {@code String }
     */
    private static String dateUtil(long timestamp) {
        //时间戳加工
        timestamp = timestamp / 1000;
        // 将时间戳转换为 LocalDateTime 对象
        LocalDateTime dateTime = LocalDateTime.ofInstant(Instant.ofEpochSecond(timestamp), ZoneId.systemDefault());
        // 定义日期时间格式
        DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyyMMdd");
        // 格式化日期时间对象为指定格式的字符串
        String dateTimeFormat = formatter.format(dateTime);
        return dateTimeFormat;
    }
}

启动上述作业后,根据对应的交易流水号查询es,或者查询es的索引数据,但是索引数据一般是一段时间才更新

验证1:检查索引数据变化
http://127.0.0.1:9200/_cat/indices?v

在这里插入图片描述

验证2:根据id查询es的文档记录

在这里插入图片描述
在这里插入图片描述

07 Elasticsearch Sink 进阶实战

进阶实战主要是包括ElasticsearchSink的各种参数配置,以及性能调优

7.1 包结构 & 项目配置

在这里插入图片描述

项目配置application.properties

es.cluster.hosts=localhost
es.cluster.port=9200
es.cluster.scheme=http
es.cluster.type=_doc
es.cluster.indexPrefix=flink_

#内部批量处理器,刷新前最大缓存的操作数
es.cluster.bulkFlushMaxActions=1
#刷新前最大缓存的数据量(以兆字节为单位)
es.cluster.bulkFlushMaxSizeMb=10
#刷新的时间间隔(不论缓存操作的数量或大小如何)
es.cluster.bulkFlushInterval=10000

#是否启用批量写入的退避策略,当Elasticsearch 写入失败时,可以启用退避策略,以避免频繁的重试。此时,setBulkFlushBackoffDelay 和 setBulkFlushBackoffRetries 参数生效。
es.cluster.bulkFlushBackoff=false
#设置批量写入的退避延迟时间,在发生写入失败后,等待指定的延迟时间后再进行重试
es.cluster.bulkFlushBackoffDelay=10000
#设置批量写入的最大重试次数,设置在写入失败后的最大重试次数。超过这个次数后,将不再重试
es.cluster.bulkFlushBackoffRetries=3

#设置与 Elasticsearch 集群建立连接的超时时间,单位为毫秒。在指定的时间内无法建立连接将会抛出连接超时异常
es.cluster.connectTimeout=10000
#设置与 Elasticsearch 连接的套接字超时时间,单位为毫秒。该参数定义了在建立连接后从服务器读取数据的超时时间。
es.cluster.socketTimeout=10000
#设置连接请求超时时间,单位为毫秒。该参数表示从连接池获取连接的超时时间。如果在指定的时间内无法获得连接,将会抛出连接请求超时异常。
es.cluster.connectionRequestTimeout=10000
设置是否允许重定向。如果设置为true,则当遇到重定向响应时,客户端将跟随重定向并继续请求;如果设置为false,重定向响应将被视为错误。
es.cluster.redirectsEnabled=false
#客户端允许的最大重定向次数
es.cluster.maxRedirects=3

#启用身份验证功能。通过设置该参数为true,可以提供用户名和密码进行身份验证,以连接到 Elasticsearch 集群。
es.cluster.authenticationEnabled=false
#设置是否允许循环重定向。如果设置为true,则允许在重定向过程中发生循环重定向;如果设置为false,则在检测到循环重定向时,将会抛出异常。
es.cluster.circularRedirectsAllowed=false
#设置是否启用内容压缩。如果设置为true,则允许客户端和 Elasticsearch 之间进行内容压缩,以减少数据传输量。
es.cluster.contentCompressionEnabled=false
#设置是否启用 "Expect: continue" 机制。当设置为true时,在发送请求之前,客户端会发送一个请求头部,询问服务器是否接受请求的主体部分。如果服务器响应允许继续发送请求主体,则客户端会继续发送请求;如果服务器响应拒绝继续发送请求主体,则客户端会放弃该请求。
es.cluster.expectContinueEnabled=false
#设置是否标准化 URI。如果设置为true,则客户端会尝试标准化请求 URI,以便消除多余和重复的斜杠等。
es.cluster.normalizeUri=false

日志配置log4j2.properties

rootLogger.level=INFO
rootLogger.appenderRef.console.ref=ConsoleAppender
appender.console.name=ConsoleAppender
appender.console.type=CONSOLE
appender.console.layout.type=PatternLayout
appender.console.layout.pattern=%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n
log.file=D:\\tmprootLogger.level=INFO
rootLogger.appenderRef.console.ref=ConsoleAppender
appender.console.name=ConsoleAppender
appender.console.type=CONSOLE
appender.console.layout.type=PatternLayout
appender.console.layout.pattern=%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n
log.file=D:\\tmp

项目pom.xml文件

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.aurora</groupId>
    <artifactId>aurora_elasticsearch_connector</artifactId>
    <version>1.0-SNAPSHOT</version>

    <!--属性设置-->
    <properties>
        <!--java_JDK版本-->
        <java.version>1.8</java.version>
        <!--maven打包插件-->
        <maven.plugin.version>3.8.1</maven.plugin.version>
        <!--编译编码UTF-8-->
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <!--输出报告编码UTF-8-->
        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
        <!--json数据格式处理工具-->
        <fastjson.version>1.2.75</fastjson.version>
        <!--log4j版本-->
        <log4j.version>2.17.1</log4j.version>
        <!--flink版本-->
        <flink.version>1.14.4</flink.version>
        <!--scala版本-->
        <scala.binary.version>2.12</scala.binary.version>
    </properties>

    <!--依赖管理-->
    <dependencies>

        <!-- fastJson工具类依赖 start -->
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>${fastjson.version}</version>
        </dependency>
        <!-- fastJson工具类依赖 end -->

        <!-- log4j日志框架依赖 start -->
        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-slf4j-impl</artifactId>
            <version>${log4j.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-api</artifactId>
            <version>${log4j.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-core</artifactId>
            <version>${log4j.version}</version>
        </dependency>

        <!-- log4j日志框架依赖 end -->

        <!-- Flink基础依赖 start -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <!-- Flink基础依赖 end -->

        <!-- Flink Elasticsearch 连接器依赖 start -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-elasticsearch7_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <!-- Flink Elasticsearch 连接器依赖 end -->

    </dependencies>

    <!--编译打包-->
    <build>
        <finalName>${project.name}</finalName>
        <!--资源文件打包-->
        <resources>
            <resource>
                <directory>src/main/resources</directory>
            </resource>
            <resource>
                <directory>src/main/java</directory>
                <includes>
                    <include>**/*.xml</include>
                </includes>
            </resource>
        </resources>

        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>3.1.1</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <artifactSet>
                                <excludes>
                                    <exclude>org.apache.flink:force-shading</exclude>
                                    <exclude>org.google.code.flindbugs:jar305</exclude>
                                    <exclude>org.slf4j:*</exclude>
                                    <excluder>org.apache.logging.log4j:*</excluder>
                                </excludes>
                            </artifactSet>
                            <filters>
                                <filter>
                                    <artifact>*:*</artifact>
                                    <excludes>
                                        <exclude>META-INF/*.SF</exclude>
                                        <exclude>META-INF/*.DSA</exclude>
                                        <exclude>META-INF/*.RSA</exclude>
                                    </excludes>
                                </filter>
                            </filters>
                            <transformers>
                                <transformer
                                        implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                    <mainClass>com.aurora.demo,ElasticsearchSinkStreamingJobDemo</mainClass>
                                </transformer>
                            </transformers>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>

        <!--插件统一管理-->
        <pluginManagement>
            <plugins>
                <!--maven打包插件-->
                <plugin>
                    <groupId>org.springframework.boot</groupId>
                    <artifactId>spring-boot-maven-plugin</artifactId>
                    <version>${spring.boot.version}</version>
                    <configuration>
                        <fork>true</fork>
                        <finalName>${project.build.finalName}</finalName>
                    </configuration>
                    <executions>
                        <execution>
                            <goals>
                                <goal>repackage</goal>
                            </goals>
                        </execution>
                    </executions>
                </plugin>

                <!--编译打包插件-->
                <plugin>
                    <artifactId>maven-compiler-plugin</artifactId>
                    <version>${maven.plugin.version}</version>
                    <configuration>
                        <source>${java.version}</source>
                        <target>${java.version}</target>
                        <encoding>UTF-8</encoding>
                        <compilerArgs>
                            <arg>-parameters</arg>
                        </compilerArgs>
                    </configuration>
                </plugin>
            </plugins>
        </pluginManagement>
    </build>

</project>

7.2 实体类ElasticsearchEntity

package com.aurora.advanced;


import java.io.Serializable;

/**
 * 描述:elasticsearch实体类
 *
 * @author 浅夏的猫
 * @version 1.0.0
 * @date 2024-02-10 20:08:20
 */
public class ElasticsearchEntity implements Serializable {

    private static final long serialVersionUID = 1L;

    /**
     * 集群地址
     * */
    private String hosts;

    /**
     * 集群端口
     * */
    private Integer port;

    /**
     *执行计划
     * */
    private String scheme;

    /**
     * 文档类型,es7一般都是_doc
     * */
    private String type;

    /**
     * 索引前缀
     * */
    private String indexPrefix;

    /**
     * 内部批量处理器,刷新前最大缓存的操作数
     * */
    private Integer bulkFlushMaxActions=1;

    /**
     * 刷新前最大缓存的数据量(以兆字节为单位)
     * */
    private Integer bulkFlushMaxSizeMb=10;

    /**
     * 刷新的时间间隔(不论缓存操作的数量或大小如何)
     * */
    private Integer bulkFlushInterval=10000;

    /**
     * 是否启用批量写入的退避策略,当Elasticsearch 写入失败时,可以启用退避策略,以避免频繁的重试。
     * 此时,setBulkFlushBackoffDelay 和 setBulkFlushBackoffRetries 参数生效。
     * */
    private Boolean bulkFlushBackoff=false;

    /**
     * 设置批量写入的退避延迟时间,在发生写入失败后,等待指定的延迟时间后再进行重试
     * */
    private Integer bulkFlushBackoffDelay=10000;

    /**
     * 设置批量写入的最大重试次数,设置在写入失败后的最大重试次数。超过这个次数后,将不再重试
     * */
    private Integer bulkFlushBackoffRetries=3;

    /**
     * 设置与 Elasticsearch 集群建立连接的超时时间,单位为毫秒。在指定的时间内无法建立连接将会抛出连接超时异常
     * */
    private Integer connectTimeout=10000;

    /**
     * 设置与 Elasticsearch 连接的套接字超时时间,单位为毫秒。该参数定义了在建立连接后从服务器读取数据的超时时间。
     * */
    private Integer socketTimeout=10000;

    /**
     * 设置连接请求超时时间,单位为毫秒。该参数表示从连接池获取连接的超时时间。如果在指定的时间内无法获得连接,将会抛出连接请求超时异常。
     * */
    private Integer connectionRequestTimeout=10000;

    /**
     * 设置是否允许重定向。如果设置为true,则当遇到重定向响应时,客户端将跟随重定向并继续请求;如果设置为false,重定向响应将被视为错误。
     * */
    private Boolean redirectsEnabled=false;

    /**
     * 客户端允许的最大重定向次数
     * */
    private Integer maxRedirects=3;

    /**
     * 启用身份验证功能。通过设置该参数为true,可以提供用户名和密码进行身份验证,以连接到 Elasticsearch 集群。
     * */
    private Boolean authenticationEnabled=true;

    /**
     * 设置是否允许循环重定向。如果设置为true,则允许在重定向过程中发生循环重定向;如果设置为false,则在检测到循环重定向时,将会抛出异常。
     * */
    private Boolean circularRedirectsAllowed=false;

    /**
     * 设置是否启用内容压缩。如果设置为true,则允许客户端和 Elasticsearch 之间进行内容压缩,以减少数据传输量。
     * */
    private Boolean contentCompressionEnabled=false;

    /**
     * 设置是否启用 "Expect: continue" 机制。当设置为true时,在发送请求之前,客户端会发送一个请求头部,询问服务器是否接受请求的主体部分。
     * 如果服务器响应允许继续发送请求主体,则客户端会继续发送请求;如果服务器响应拒绝继续发送请求主体,则客户端会放弃该请求。
     * */
    private Boolean expectContinueEnabled=false;

    /**
     * 设置是否标准化 URI。如果设置为true,则客户端会尝试标准化请求 URI,以便消除多余和重复的斜杠等。
     * */
    private Boolean normalizeUri=false;

    /**
     * 用于设置 HTTP 请求的路径前缀。
     * 这个配置选项通常用于设置反向代理或者负载均衡器等中间件与 Elasticsearch 集群之间的连接
     * */
    private String pathPrefix;

    public String getHosts() {
        return hosts;
    }

    public void setHosts(String hosts) {
        this.hosts = hosts;
    }

    public Integer getPort() {
        return port;
    }

    public void setPort(Integer port) {
        this.port = port;
    }

    public String getScheme() {
        return scheme;
    }

    public void setScheme(String scheme) {
        this.scheme = scheme;
    }

    public String getType() {
        return type;
    }

    public void setType(String type) {
        this.type = type;
    }

    public String getIndexPrefix() {
        return indexPrefix;
    }

    public void setIndexPrefix(String indexPrefix) {
        this.indexPrefix = indexPrefix;
    }

    public Integer getBulkFlushMaxActions() {
        return bulkFlushMaxActions;
    }

    public void setBulkFlushMaxActions(Integer bulkFlushMaxActions) {
        this.bulkFlushMaxActions = bulkFlushMaxActions;
    }

    public Integer getBulkFlushMaxSizeMb() {
        return bulkFlushMaxSizeMb;
    }

    public void setBulkFlushMaxSizeMb(Integer bulkFlushMaxSizeMb) {
        this.bulkFlushMaxSizeMb = bulkFlushMaxSizeMb;
    }

    public Integer getBulkFlushInterval() {
        return bulkFlushInterval;
    }

    public void setBulkFlushInterval(Integer bulkFlushInterval) {
        this.bulkFlushInterval = bulkFlushInterval;
    }

    public Boolean getBulkFlushBackoff() {
        return bulkFlushBackoff;
    }

    public void setBulkFlushBackoff(Boolean bulkFlushBackoff) {
        this.bulkFlushBackoff = bulkFlushBackoff;
    }

    public Integer getBulkFlushBackoffDelay() {
        return bulkFlushBackoffDelay;
    }

    public void setBulkFlushBackoffDelay(Integer bulkFlushBackoffDelay) {
        this.bulkFlushBackoffDelay = bulkFlushBackoffDelay;
    }

    public Integer getBulkFlushBackoffRetries() {
        return bulkFlushBackoffRetries;
    }

    public void setBulkFlushBackoffRetries(Integer bulkFlushBackoffRetries) {
        this.bulkFlushBackoffRetries = bulkFlushBackoffRetries;
    }

    public Integer getConnectTimeout() {
        return connectTimeout;
    }

    public void setConnectTimeout(Integer connectTimeout) {
        this.connectTimeout = connectTimeout;
    }

    public Integer getSocketTimeout() {
        return socketTimeout;
    }

    public void setSocketTimeout(Integer socketTimeout) {
        this.socketTimeout = socketTimeout;
    }

    public Integer getConnectionRequestTimeout() {
        return connectionRequestTimeout;
    }

    public void setConnectionRequestTimeout(Integer connectionRequestTimeout) {
        this.connectionRequestTimeout = connectionRequestTimeout;
    }

    public Boolean getRedirectsEnabled() {
        return redirectsEnabled;
    }

    public void setRedirectsEnabled(Boolean redirectsEnabled) {
        this.redirectsEnabled = redirectsEnabled;
    }

    public Integer getMaxRedirects() {
        return maxRedirects;
    }

    public void setMaxRedirects(Integer maxRedirects) {
        this.maxRedirects = maxRedirects;
    }

    public Boolean getAuthenticationEnabled() {
        return authenticationEnabled;
    }

    public void setAuthenticationEnabled(Boolean authenticationEnabled) {
        this.authenticationEnabled = authenticationEnabled;
    }

    public Boolean getCircularRedirectsAllowed() {
        return circularRedirectsAllowed;
    }

    public void setCircularRedirectsAllowed(Boolean circularRedirectsAllowed) {
        this.circularRedirectsAllowed = circularRedirectsAllowed;
    }

    public Boolean getContentCompressionEnabled() {
        return contentCompressionEnabled;
    }

    public void setContentCompressionEnabled(Boolean contentCompressionEnabled) {
        this.contentCompressionEnabled = contentCompressionEnabled;
    }

    public Boolean getExpectContinueEnabled() {
        return expectContinueEnabled;
    }

    public void setExpectContinueEnabled(Boolean expectContinueEnabled) {
        this.expectContinueEnabled = expectContinueEnabled;
    }

    public Boolean getNormalizeUri() {
        return normalizeUri;
    }

    public void setNormalizeUri(Boolean normalizeUri) {
        this.normalizeUri = normalizeUri;
    }

    public String getPathPrefix() {
        return pathPrefix;
    }

    public void setPathPrefix(String pathPrefix) {
        this.pathPrefix = pathPrefix;
    }
}

7.3 客户端工厂类CustomRestClientFactory

作用:设置用于创建 Elasticsearch REST 客户端的工厂,可以自定义创建 Elasticsearch REST 客户端的逻辑,实现 ElasticsearchSinkBase.RestClientFactory 接口

package com.aurora.advanced;

import org.apache.commons.lang3.StringUtils;
import org.apache.flink.streaming.connectors.elasticsearch7.RestClientFactory;
import org.apache.http.Header;
import org.apache.http.message.BasicHeader;
import org.elasticsearch.client.NodeSelector;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;

/**
 * 描述:设置用于创建 Elasticsearch REST 客户端的工厂
 * 解释:可以自定义创建 Elasticsearch REST 客户端的逻辑,实现 ElasticsearchSinkBase.RestClientFactory 接口
 *
 * @author 浅夏的猫
 * @version 1.0.0
 * @date 2024-02-13 00:12:15
 */
public class CustomRestClientFactory implements RestClientFactory {

    private ElasticsearchEntity elasticsearchEntity;

    public CustomRestClientFactory(ElasticsearchEntity elasticsearchEntity) {
        this.elasticsearchEntity = elasticsearchEntity;
    }

    @Override
    public void configureRestClientBuilder(RestClientBuilder restClientBuilder) {

        //设置默认的 HTTP 头部信息,这些信息将在每个请求中包含
        Header contentType = new BasicHeader("Content-Type", "application/json");
        Header authorization = new BasicHeader("Authorization", "Bearer your_access_token");
        Header[] headers = {contentType, authorization};
        restClientBuilder.setDefaultHeaders(headers);


        //设置用于监听节点故障的监听器。当节点发生故障时,可以执行特定的操作
        restClientBuilder.setFailureListener(new RestClient.FailureListener());

        //配置用于选择与之通信的节点的策略。这涉及到 Elasticsearch 集群中多个节点的选择。
        restClientBuilder.setNodeSelector(NodeSelector.ANY);

        //为每个请求设置路径前缀。这可以用于将请求定向到特定的子路径。
        if(StringUtils.isNoneBlank(elasticsearchEntity.getPathPrefix())){
            restClientBuilder.setPathPrefix(elasticsearchEntity.getPathPrefix());
        }

        //允许在创建每个请求的时候进行额外的请求配置。
        restClientBuilder.setRequestConfigCallback(new CustomRequestConfigCallback(elasticsearchEntity));
        //允许在创建 CloseableHttpClient 实例时进行额外的 HTTP 客户端配置。
        restClientBuilder.setHttpClientConfigCallback(new CustomHttpClientConfigCallback(elasticsearchEntity));
        //设置是否启用严格的废弃模式,用于警告有关已弃用功能的使用。
        restClientBuilder.setStrictDeprecationMode(false);
    }
}

7.4 回调函数类CustomRequestConfigCallback

作用:允许在创建每个请求的时候进行额外的请求配置

package com.aurora.advanced;

import org.apache.http.client.config.RequestConfig;
import org.apache.http.impl.cookie.DefaultCookieSpec;
import org.elasticsearch.client.RestClientBuilder;

/**
 * 描述:
 * 允许在创建每个请求的时候进行额外的请求配置
 * @author 浅夏的猫
 * @version 1.0.0
 * @date 2024-02-13 23:24:42
 */
public class CustomRequestConfigCallback implements RestClientBuilder.RequestConfigCallback {

    private ElasticsearchEntity elasticsearchEntity;

    public CustomRequestConfigCallback(ElasticsearchEntity elasticsearchEntity) {
        this.elasticsearchEntity = elasticsearchEntity;
    }

    @Override
    public RequestConfig.Builder customizeRequestConfig(RequestConfig.Builder custom) {
        // 设置与 Elasticsearch 集群建立连接的超时时间,单位为毫秒。在指定的时间内无法建立连接将会抛出连接超时异常
        custom.setConnectTimeout(elasticsearchEntity.getConnectTimeout());
        // 设置与 Elasticsearch 连接的套接字超时时间,单位为毫秒。该参数定义了在建立连接后从服务器读取数据的超时时间。如果在指定的时间内没有读取到数据,将会抛出超时异常。
        custom.setSocketTimeout(elasticsearchEntity.getSocketTimeout());
        // 设置连接请求超时时间,单位为毫秒。该参数表示从连接池获取连接的超时时间。如果在指定的时间内无法获得连接,将会抛出连接请求超时异常。
        custom.setConnectionRequestTimeout(elasticsearchEntity.getConnectionRequestTimeout());

        // 设置是否允许重定向。如果设置为true,则当遇到重定向响应时,客户端将跟随重定向并继续请求;如果设置为false,重定向响应将被视为错误。
        custom.setRedirectsEnabled(elasticsearchEntity.getRedirectsEnabled());
        // 设置最大重定向次数。当允许重定向时,该参数指定在遇到重定向响应时,最多可以重定向的次数。
        custom.setMaxRedirects(elasticsearchEntity.getMaxRedirects());

        // 设置是否允许循环重定向。如果设置为true,则允许在重定向过程中发生循环重定向;如果设置为false,则在检测到循环重定向时,将会抛出异常。
        custom.setCircularRedirectsAllowed(elasticsearchEntity.getCircularRedirectsAllowed());
        // 设置是否启用内容压缩。如果设置为true,则允许客户端和 Elasticsearch 之间进行内容压缩,以减少数据传输量。
        custom.setContentCompressionEnabled(elasticsearchEntity.getContentCompressionEnabled());
        // 设置是否启用 "Expect: continue" 机制。当设置为true时,在发送请求之前,客户端会发送一个请求头部,询问服务器是否接受请求的主体部分。
        //  如果服务器响应允许继续发送请求主体,则客户端会继续发送请求;如果服务器响应拒绝继续发送请求主体,则客户端会放弃该请求。
        custom.setExpectContinueEnabled(elasticsearchEntity.getExpectContinueEnabled());

        // 设置是否标准化 URI。如果设置为true,则客户端会尝试标准化请求 URI,以便消除多余和重复的斜杠等。
        custom.setNormalizeUri(elasticsearchEntity.getNormalizeUri());
        // 设置使用的 Cookie 规范。可以指定客户端在处理与 Elasticsearch 服务器之间的 Cookie 交互时使用的 Cookie 规范
        custom.setCookieSpec(new DefaultCookieSpec().toString());
        return custom;
    }
}

7.5 客户端配置类CustomHttpClientConfigCallback

作用:允许在创建 CloseableHttpClient 实例时进行额外的 HTTP 客户端配置

package com.aurora.advanced;

import org.apache.http.client.config.RequestConfig;
import org.apache.http.impl.cookie.DefaultCookieSpec;
import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;
import org.elasticsearch.client.RestClientBuilder;

/**
 * 描述:客户端配置
 * 允许在创建 CloseableHttpClient 实例时进行额外的 HTTP 客户端配置
 *
 * @author 浅夏的猫
 * @version 1.0.0
 * @date 2024-02-13 23:28:15
 */
public class CustomHttpClientConfigCallback implements RestClientBuilder.HttpClientConfigCallback {

    private ElasticsearchEntity elasticsearchEntity;

    CustomHttpClientConfigCallback(ElasticsearchEntity elasticsearchEntity) {
        this.elasticsearchEntity = elasticsearchEntity;
    }

    @Override
    public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpAsyncClientBuilder) {


        RequestConfig.Builder custom = RequestConfig.custom();
        // 设置与 Elasticsearch 集群建立连接的超时时间,单位为毫秒。在指定的时间内无法建立连接将会抛出连接超时异常
        custom.setConnectTimeout(elasticsearchEntity.getConnectTimeout());
        // 设置与 Elasticsearch 连接的套接字超时时间,单位为毫秒。该参数定义了在建立连接后从服务器读取数据的超时时间。如果在指定的时间内没有读取到数据,将会抛出超时异常。
        custom.setSocketTimeout(elasticsearchEntity.getSocketTimeout());
        // 设置连接请求超时时间,单位为毫秒。该参数表示从连接池获取连接的超时时间。如果在指定的时间内无法获得连接,将会抛出连接请求超时异常。
        custom.setConnectionRequestTimeout(elasticsearchEntity.getConnectionRequestTimeout());

        // 设置是否允许重定向。如果设置为true,则当遇到重定向响应时,客户端将跟随重定向并继续请求;如果设置为false,重定向响应将被视为错误。
        custom.setRedirectsEnabled(elasticsearchEntity.getRedirectsEnabled());
        // 设置最大重定向次数。当允许重定向时,该参数指定在遇到重定向响应时,最多可以重定向的次数。
        custom.setMaxRedirects(elasticsearchEntity.getMaxRedirects());

        // 启用身份验证功能。通过设置该参数为true,可以提供用户名和密码进行身份验证,以连接到 Elasticsearch 集群。
        custom.setAuthenticationEnabled(elasticsearchEntity.getAuthenticationEnabled());
        // 设置是否允许循环重定向。如果设置为true,则允许在重定向过程中发生循环重定向;如果设置为false,则在检测到循环重定向时,将会抛出异常。
        custom.setCircularRedirectsAllowed(elasticsearchEntity.getCircularRedirectsAllowed());
        // 设置是否启用内容压缩。如果设置为true,则允许客户端和 Elasticsearch 之间进行内容压缩,以减少数据传输量。
        custom.setContentCompressionEnabled(elasticsearchEntity.getContentCompressionEnabled());
        // 设置是否启用 "Expect: continue" 机制。当设置为true时,在发送请求之前,客户端会发送一个请求头部,询问服务器是否接受请求的主体部分。
        //  如果服务器响应允许继续发送请求主体,则客户端会继续发送请求;如果服务器响应拒绝继续发送请求主体,则客户端会放弃该请求。
        custom.setExpectContinueEnabled(elasticsearchEntity.getExpectContinueEnabled());

        // 设置是否标准化 URI。如果设置为true,则客户端会尝试标准化请求 URI,以便消除多余和重复的斜杠等。
        custom.setNormalizeUri(elasticsearchEntity.getNormalizeUri());
        // 设置使用的 Cookie 规范。可以指定客户端在处理与 Elasticsearch 服务器之间的 Cookie 交互时使用的 Cookie 规范
        custom.setCookieSpec(new DefaultCookieSpec().toString());

        return httpAsyncClientBuilder.setDefaultRequestConfig(custom.build());
    }
}

7.6 Es操作类CustomElasticsearchSinkFunction

作用:实时把数据写入到队列中,再通过批量提交到Elasticsearch中,实现数据写入

package com.aurora.advanced;

import com.alibaba.fastjson.JSONObject;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.Requests;
import org.elasticsearch.common.xcontent.XContentType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * 描述:自定义elasticsearch sink 算子函数
 * ElasticsearchSinkFunction 是用于将数据流写入 Elasticsearch 的接口。
 * 它允许您自定义如何将 Flink 流式处理的数据写入 Elasticsearch 索引
 *
 * @author 浅夏的猫
 * @version 1.0.0
 * @date 2024-02-12 23:49:22
 */
public class CustomElasticsearchSinkFunction implements ElasticsearchSinkFunction<JSONObject> {

    private static final Logger logger = LoggerFactory.getLogger(CustomElasticsearchSinkFunction.class);

    private ElasticsearchEntity elasticsearchEntity;

    public CustomElasticsearchSinkFunction(ElasticsearchEntity elasticsearchEntity) {
        this.elasticsearchEntity = elasticsearchEntity;
    }

    @Override
    public void process(JSONObject element, RuntimeContext runtimeContext, RequestIndexer indexer) {
        String transId = element.getString("transId");
        String tradeTime = element.getString("tradeTime");
        String index = elasticsearchEntity.getIndexPrefix() + tradeTime;
        logger.info("交易流水={},数据写入索引{}成功", tradeTime, index);
        IndexRequest indexRequest = Requests.indexRequest().index(index).type(elasticsearchEntity.getType()).id(transId).source(element, XContentType.JSON);
        indexer.add(indexRequest);
    }
}


7.7 异常处理类CustomActionRequestFailureHandler

作用:当sink写Elasticsearch出现异常时,可以自定义操作策略

package com.aurora.advanced;

import org.apache.flink.streaming.connectors.elasticsearch.ActionRequestFailureHandler;
import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * 描述:es写入异常处理
 *
 * @author 浅夏的猫
 * @version 1.0.0
 * @date 2024-02-13 00:04:24
 */
public class CustomActionRequestFailureHandler implements ActionRequestFailureHandler {

    private static final Logger logger = LoggerFactory.getLogger(CustomActionRequestFailureHandler.class);

    @Override
    public void onFailure(ActionRequest action, Throwable throwable, int restStatusCode, RequestIndexer requestIndexer) throws Throwable {
        // 处理不同类型的异常
        if (throwable instanceof EsRejectedExecutionException) {
            // 如果是由于线程池饱和导致的拒绝执行异常,可以采取相应的处理措施
            logger.warn("Elasticsearch action execution was rejected due to thread pool saturation.");
            // 这里你可以选择执行重试或者其他处理逻辑,例如将数据写入到一个备用存储
            // 例如: indexer.add(createAnotherRequest(action));
        } else {
            // 对于其他类型的异常,默认返回放弃策略
            logger.error("Unhandled failure, abandoning request: {}", action.toString());
        }
    }
}


7.8 作业主类ElasticsearchSinkStreamJobAdvancedDemo

package com.aurora.advanced;

import com.alibaba.fastjson.JSONObject;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
import org.apache.flink.streaming.connectors.elasticsearch7.ElasticsearchSink;
import org.apache.http.HttpHost;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.UUID;


/**
 * 描述:Flink集成Elasticsearch Connector连接器进阶Demo
 * 实现实时数据流如何无缝地流向Elasticsearch
 *
 * @author 浅夏的猫
 * @version 1.0.0
 * @date 2024-02-11 22:06:45
 */
public class ElasticsearchSinkStreamJobAdvancedDemo {

    private static final Logger logger = LoggerFactory.getLogger(ElasticsearchSinkStreamJobAdvancedDemo.class);

    public static void main(String[] args) {

        try {

            // 读取配置参数
            ElasticsearchEntity elasticsearchEntity = paramsInit();

            // 设置elasticsearch节点
            List<HttpHost> httpHosts = esClusterHttpHostHandler(elasticsearchEntity);

            // 创建esSinkFunction函数
            ElasticsearchSinkFunction<JSONObject> esSinkFunction = new CustomElasticsearchSinkFunction(elasticsearchEntity);

            // 构建ElasticsearchSink算子builder
            ElasticsearchSink.Builder<JSONObject> esSinkBuilder = new ElasticsearchSink.Builder<>(httpHosts, esSinkFunction);

            // es参数配置
            esBuilderHandler(esSinkBuilder, elasticsearchEntity);

            // 构建sink算子
            ElasticsearchSink<JSONObject> esSink = esSinkBuilder.build();

            // 自定义数据源,模拟生产环境交易接入,json格式数据
            SourceFunction<JSONObject> dataSource = new SourceFunction<JSONObject>() {
                @Override
                public void run(SourceContext sourceContext) throws Exception {
                    while (true) {
                        //交易流水号
                        String tradeId = UUID.randomUUID().toString();
                        //交易发生时间戳
                        long timeStamp = System.currentTimeMillis();
                        //交易发生金额
                        long tradeAmount = new Random().nextInt(100);
                        //交易名称
                        String tradeName = "支付宝转账";

                        JSONObject dataObj = new JSONObject();
                        dataObj.put("transId", tradeId);
                        dataObj.put("timeStamp", timeStamp);
                        dataObj.put("tradeTime", dateUtil(timeStamp));
                        dataObj.put("tradeAmount", tradeAmount);
                        dataObj.put("tradeName", tradeName);

                        //模拟生产,每隔1秒生成一笔交易
                        Thread.sleep(1000);
                        logger.info("交易接入,原始报文={}", dataObj.toJSONString());
                        sourceContext.collect(dataObj);
                    }
                }

                @Override
                public void cancel() {

                }
            };

            // 创建运行环境
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            // 构建数据源
            DataStreamSource<JSONObject> dataStreamSource = env.addSource(dataSource);
            // 构建sink算子
            dataStreamSource.addSink(esSink);
            // 运行作业
            env.execute();
        } catch (Exception e) {
            e.printStackTrace();
        }

    }

    /**
     * 描述:Flink参数配置读取
     *
     * @return {@code ElasticsearchEntity }
     * @throws IOException
     */
    private static ElasticsearchEntity paramsInit() throws IOException {
        // 通过flink内置工具类获取命令行参数
        String propertiesFilePath = "E:\\project\\aurora_dev\\aurora_flink_connector_elasticsearch\\src\\main\\resources\\application.properties";
        ParameterTool paramsMap = ParameterTool.fromPropertiesFile(propertiesFilePath);
        ElasticsearchEntity elasticsearchEntity = new ElasticsearchEntity();
        String hosts = paramsMap.get("es.cluster.hosts");
        int port = paramsMap.getInt("es.cluster.port");
        String scheme = paramsMap.get("es.cluster.scheme");
        String type = paramsMap.get("es.cluster.type");
        String indexPrefix = paramsMap.get("es.cluster.indexPrefix");
        int bulkFlushMaxActions = paramsMap.getInt("es.cluster.bulkFlushMaxActions");
        int bulkFlushMaxSizeMb = paramsMap.getInt("es.cluster.bulkFlushMaxSizeMb");
        int bulkFlushInterval = paramsMap.getInt("es.cluster.bulkFlushInterval");
        boolean bulkFlushBackoff = paramsMap.getBoolean("es.cluster.bulkFlushBackoff");
        int bulkFlushBackoffDelay = paramsMap.getInt("es.cluster.bulkFlushBackoffDelay");
        int bulkFlushBackoffRetries = paramsMap.getInt("es.cluster.bulkFlushBackoffRetries");
        int connectTimeout = paramsMap.getInt("es.cluster.connectTimeout");
        int socketTimeout = paramsMap.getInt("es.cluster.socketTimeout");
        int connectionRequestTimeout = paramsMap.getInt("es.cluster.connectionRequestTimeout");
        boolean redirectsEnabled = paramsMap.getBoolean("es.cluster.redirectsEnabled");
        int maxRedirects = paramsMap.getInt("es.cluster.maxRedirects");
        boolean authenticationEnabled = paramsMap.getBoolean("es.cluster.authenticationEnabled");
        boolean circularRedirectsAllowed = paramsMap.getBoolean("es.cluster.circularRedirectsAllowed");
        boolean contentCompressionEnabled = paramsMap.getBoolean("es.cluster.contentCompressionEnabled");
        boolean expectContinueEnabled = paramsMap.getBoolean("es.cluster.expectContinueEnabled");
        boolean normalizeUri = paramsMap.getBoolean("es.cluster.normalizeUri");

        elasticsearchEntity.setHosts(hosts);
        elasticsearchEntity.setPort(port);
        elasticsearchEntity.setScheme(scheme);
        elasticsearchEntity.setType(type);
        elasticsearchEntity.setIndexPrefix(indexPrefix);
        elasticsearchEntity.setBulkFlushMaxActions(bulkFlushMaxActions);
        elasticsearchEntity.setBulkFlushMaxSizeMb(bulkFlushMaxSizeMb);
        elasticsearchEntity.setBulkFlushInterval(bulkFlushInterval);
        elasticsearchEntity.setBulkFlushBackoff(bulkFlushBackoff);
        elasticsearchEntity.setBulkFlushBackoffDelay(bulkFlushBackoffDelay);
        elasticsearchEntity.setBulkFlushBackoffRetries(bulkFlushBackoffRetries);
        elasticsearchEntity.setConnectTimeout(connectTimeout);
        elasticsearchEntity.setSocketTimeout(socketTimeout);
        elasticsearchEntity.setConnectionRequestTimeout(connectionRequestTimeout);
        elasticsearchEntity.setRedirectsEnabled(redirectsEnabled);
        elasticsearchEntity.setMaxRedirects(maxRedirects);
        elasticsearchEntity.setAuthenticationEnabled(authenticationEnabled);
        elasticsearchEntity.setCircularRedirectsAllowed(circularRedirectsAllowed);
        elasticsearchEntity.setExpectContinueEnabled(expectContinueEnabled);
        elasticsearchEntity.setContentCompressionEnabled(contentCompressionEnabled);
        elasticsearchEntity.setNormalizeUri(normalizeUri);

        return elasticsearchEntity;
    }

    /**
     * 描述:时间格式化工具类
     *
     * @param timestamp 时间戳
     * @return {@code String }
     */
    private static String dateUtil(long timestamp) {
        timestamp = timestamp / 1000;
        // 将时间戳转换为 LocalDateTime 对象
        LocalDateTime dateTime = LocalDateTime.ofInstant(Instant.ofEpochSecond(timestamp), ZoneId.systemDefault());
        // 定义日期时间格式
        DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyyMMdd");
        // 格式化日期时间对象为指定格式的字符串
        String dateTimeFormat = formatter.format(dateTime);
        return dateTimeFormat;
    }

    /**
     * 描述:es参数配置
     *
     * @param esSinkBuilder       esSinkBuilder建造器
     * @param elasticsearchEntity es实体类
     */
    private static void esBuilderHandler(ElasticsearchSink.Builder<JSONObject> esSinkBuilder, ElasticsearchEntity elasticsearchEntity) {
        // 设置触发批量写入的最大动作数,
        // 解释:当达到指定的最大动作数时,将触发批量写入到 Elasticsearch。如果你希望在每次写入到 Elasticsearch 时都进行批量写入,可以将该值设置为 1
        esSinkBuilder.setBulkFlushMaxActions(elasticsearchEntity.getBulkFlushMaxActions());

        // 设置触发批量写入的最大数据量
        // 解释:当写入的数据量达到指定的最大值时,将触发批量写入到 Elasticsearch。单位为 MB
        esSinkBuilder.setBulkFlushMaxSizeMb(elasticsearchEntity.getBulkFlushMaxSizeMb());

        // 设置批量写入的时间间隔
        // 解释:每隔指定的时间间隔,无论是否达到最大动作数或最大数据量,都会触发批量写入
        esSinkBuilder.setBulkFlushInterval(elasticsearchEntity.getBulkFlushInterval());

        // 启用批量写入的退避策略
        // 解释:当 Elasticsearch 写入失败时,可以启用退避策略,以避免频繁的重试。此时,setBulkFlushBackoffDelay 和 setBulkFlushBackoffRetries 参数生效。
        esSinkBuilder.setBulkFlushBackoff(elasticsearchEntity.getBulkFlushBackoff());

        // 设置批量写入的退避延迟时间
        // 解释:在发生写入失败后,等待指定的延迟时间后再进行重试
        esSinkBuilder.setBulkFlushBackoffDelay(elasticsearchEntity.getBulkFlushBackoffDelay());

        // 设置批量写入的最大重试次数
        // 解释:设置在写入失败后的最大重试次数。超过这个次数后,将不再重试
        esSinkBuilder.setBulkFlushBackoffRetries(elasticsearchEntity.getBulkFlushBackoffRetries());

        // 设置写入失败时的处理策略
        // 解释:可以自定义处理失败的策略,实现 ElasticsearchSinkFunction.FailureHandler 接口
        esSinkBuilder.setFailureHandler(new CustomActionRequestFailureHandler());

        // 设置用于创建 Elasticsearch REST 客户端的工厂
        // 解释:可以自定义创建 Elasticsearch REST 客户端的逻辑,实现 ElasticsearchSinkBase.RestClientFactory 接口
        esSinkBuilder.setRestClientFactory(new CustomRestClientFactory(elasticsearchEntity));
    }

    /**
     * 描述:
     * elasticsearch 节点配置
     *
     * @param elasticsearchEntity es实体类
     * @return {@code List<HttpHost> }
     */
    private static List<HttpHost> esClusterHttpHostHandler(ElasticsearchEntity elasticsearchEntity) {
        List<HttpHost> httpHosts = new ArrayList<>();
        String[] clusterArray = elasticsearchEntity.getHosts().split(",");
        for (String node : clusterArray) {
            httpHosts.add(new HttpHost(node, elasticsearchEntity.getPort(), elasticsearchEntity.getScheme()));
        }
        return httpHosts;
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/390952.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

JVM-JVM中对象的结构

对象内存布局 对象里的三个区&#xff1a; 对象头&#xff08;Header&#xff09;&#xff1a;Java对象头占8byte。如果是数组则占12byte。因为JVM里数组size需要使用4byte存储。 标记字段MarkWord&#xff1a; 用于存储对象自身的运行时数据&#xff0c;它是synchronized实现轻…

AliOS编译三方库

文章目录 1、官网教程2、编译NDK2.1 下载ndk2.2 编译环境准备2.3 安装ndk 3 cmake交叉编译3.1 编译工具链3.2 编译三方库 4 自带编译配置文件的交叉编译 1、官网教程 AliOS开发官网链接&#xff1a;AliOS开发者官网 应用开发下NDK开发有相关NDK开发介绍 2、编译NDK 2.1 下载…

037-安全开发-JavaEE应用JNDI注入RMI服务LDAP服务JDK绕过调用链类

037-安全开发-JavaEE应用&JNDI注入&RMI服务&LDAP服务&JDK绕过&调用链类 #知识点&#xff1a; 1、JavaEE-JNDI注入-RMI&LDAP 2、JavaEE-漏洞结合-FastJson链 3、JavaEE-漏洞条件-JDK版本绕过 演示案例&#xff1a; ➢JNDI注入-RMI&LDAP服务 ➢JNDI注…

C语言scanf函数详解..

1.前言 前面说过了printf函数 他是将二进制表示的整数、浮点数、字符、字符串根据转换规范转换成字符或者字符串 并且打印到了控制台上 那么既然有了输出函数 那么肯定也有输入函数咯 的确如此 他就是scanf函数 他是将字符或者字符串根据转换规范转换成二进制表示的整数、浮点…

数学实验第三版(主编:李继成 赵小艳)课后练习答案(十二)(3)

实验十二&#xff1a;微分方程模型 练习三 1.分别用数值解命令ode23t和ode45 计算示例3中微分方程的数值解,同用命令ode23 算得的数值解以及解析解比较,哪种方法精度较高?你用什么方法比较它们之间的精度? clc;clear; f(x,y)2*yx2; figure(1) [x,y]ode23t(f,[1,2],1); plo…

三、Maven项目搭建及Destination(队列、主题)

Maven项目搭建及Destination&#xff08;队列、主题&#xff09; 一、Idea中Maven项目准备1.创建Module2.创建java包3.配置pom.xml 二、队列&#xff08;Queue&#xff09;1.JMS编程架构2.代码实现生产者3.代码实现消费者4.队列消费者三大情况 三、消费者类型1.同步式消费者1.1…

【MATLAB】鲸鱼算法优化混合核极限学习机(WOA-HKELM)回归预测算法

有意向获取代码&#xff0c;请转文末观看代码获取方式~也可转原文链接获取~ 1 基本定义 鲸鱼算法优化混合核极限学习机&#xff08;WOA-HKELM&#xff09;回归预测算法是一种结合鲸鱼优化算法和混合核极限学习机的混合算法。其原理主要包含以下几个步骤&#xff1a; 初始化&am…

VMware Tools安装教程(适用windows虚拟机)

一、资源 VMware-tools安装包已绑定在资源中 二、步骤 1、点击已经开启的虚拟机中的此图标&#xff0c;点击设置 2、将镜像文件选中&#xff0c;点击确定 3、之后会自动进入安装过程&#xff0c;点击下一步 4、选择典型安装&#xff0c;下一步直到完成&#xff0c;完成后重启…

Swift Combine 合并多个管道以更新 UI 元素 从入门到精通十七

Combine 系列 Swift Combine 从入门到精通一Swift Combine 发布者订阅者操作者 从入门到精通二Swift Combine 管道 从入门到精通三Swift Combine 发布者publisher的生命周期 从入门到精通四Swift Combine 操作符operations和Subjects发布者的生命周期 从入门到精通五Swift Com…

【Deep Learning 3】CNN卷积神经网络

&#x1f31e;欢迎来到机器学习的世界 &#x1f308;博客主页&#xff1a;卿云阁 &#x1f48c;欢迎关注&#x1f389;点赞&#x1f44d;收藏⭐️留言&#x1f4dd; &#x1f31f;本文由卿云阁原创&#xff01; &#x1f4c6;首发时间&#xff1a;&#x1f339;2024年2月17日&…

Netty中的内置通信模式、Bootstrap和ChannelInitializer

内置通信传输模式 NIO:io.netty.channel.socket.nio 使用java.nio.channels包作为基础–基于选择器的方式Epoll:io.netty.channel.epoll由JNI驱动的epoll()和非阻塞IO.这个传输支持只有在Linux上可用的多种特性&#xff0c;如果SO_REUSEPORT&#xff0c;比NIO传输更快&#xf…

【深度学习】Pytorch 系列教程(三):PyTorch数据结构:2、张量的数学运算(1):向量运算(加减乘除、数乘、内积、外积、范数、广播机制)

文章目录 一、前言二、实验环境三、PyTorch数据结构0、分类1、Tensor&#xff08;张量&#xff09;1. 维度&#xff08;Dimensions&#xff09;2. 数据类型&#xff08;Data Types&#xff09;3. GPU加速&#xff08;GPU Acceleration&#xff09; 2、张量的数学运算1. 向量运算…

数字孪生与智慧城市:共筑未来城市的科技基石

一、引言 随着科技的飞速发展&#xff0c;数字孪生与智慧城市已成为未来城市建设的两大关键技术。数字孪生为城市提供了一个虚拟的数字镜像&#xff0c;使我们能全面、深入地了解城市的运行状态。而智慧城市则借助先进的信息通信技术&#xff0c;提升城市的智能化水平&#xf…

算法刷题:复写零

复写零 .习题链接题目描述算法原理初始值步骤1步骤2我的答案: . 习题链接 复写零 题目描述 给你一个长度固定的整数数组 arr &#xff0c;请你将该数组中出现的每个零都复写一遍&#xff0c;并将其余的元素向右平移。 注意&#xff1a;请不要在超过该数组长度的位置写入元素…

【OpenAI Sora】开启未来:视频生成模型作为终极世界模拟器的突破之旅

这份技术报告主要关注两个方面&#xff1a;&#xff08;1&#xff09;我们的方法将各种类型的视觉数据转化为统一的表示形式&#xff0c;从而实现了大规模生成模型的训练&#xff1b;&#xff08;2&#xff09;对Sora的能力和局限性进行了定性评估。报告中不包含模型和实现细节…

CCF编程能力等级认证GESP—C++6级—20231209

CCF编程能力等级认证GESP—C6级—20231209 单选题&#xff08;每题 2 分&#xff0c;共 30 分&#xff09;判断题&#xff08;每题 2 分&#xff0c;共 20 分&#xff09;编程题 (每题 25 分&#xff0c;共 50 分)闯关游戏工作沟通 答案及解析单选题判断题编程题1编程题2 单选题…

二叉树入门算法题详解

二叉树入门题目详解 首先知道二叉树是什么&#xff1a; 代码随想录 (programmercarl.com) 了解后知道其实二叉树就是特殊的链表&#xff0c;只是每个根节点节点都与两个子节点相连而其实图也是特殊的链表&#xff0c;是很多节点互相连接&#xff1b;这样说只是便于理解和定义…

安卓TextView 拖动命名

需求&#xff1a;该布局文件使用线性布局来排列三个文本视图和一个按钮&#xff0c;分别用于显示两个动物名称以及占位文本视图。在占位文本视图中&#xff0c;我们为其设置了背景和居中显示样式&#xff0c;并用其作为接收拖放操作的目标 效果图&#xff1b; 实现代码 第一布…

大数据02-数据仓库

零、文章目录 大数据02-数据仓库 1、数据仓库介绍 &#xff08;1&#xff09;基本概念 数据仓库&#xff0c;英文名称为Data Warehouse&#xff0c;可简写为DW或DWH。数据仓库的目的是构建面向分析的集成化数据环境&#xff0c;为企业提供决策支持&#xff08;Decision Sup…

牛客网SQL进阶123:高难度试卷的得分的截断平均值

官网链接&#xff1a; SQL类别高难度试卷得分的截断平均值_牛客题霸_牛客网牛客的运营同学想要查看大家在SQL类别中高难度试卷的得分情况。 请你帮她从exam_。题目来自【牛客题霸】https://www.nowcoder.com/practice/a690f76a718242fd80757115d305be45?tpId240&tqId2180…