flink1.16+连接Elasticsearch7官方例子报错解决方案

flink输出至es,连接es7时使用的sink类是Elasticsearch7SinkBuilder,与es6链接方式不同,官网地址:fink连接es官方地址。

flink连接es7官方示例

依赖jar:

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-elasticsearch7</artifactId>
    <version>3.0.1-1.16</version>
</dependency>

官方连接es7代码

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.connector.elasticsearch.sink.Elasticsearch7SinkBuilder;
import org.apache.flink.streaming.api.datastream.DataStream;

import org.apache.http.HttpHost;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.Requests;

import java.util.HashMap;
import java.util.Map;

DataStream<String> input = ...;

input.sinkTo(
    new Elasticsearch7SinkBuilder<String>()
        .setBulkFlushMaxActions(1) // Instructs the sink to emit after every element, otherwise they would be buffered
        .setHosts(new HttpHost("127.0.0.1", 9200, "http"))
        .setEmitter(
        (element, context, indexer) ->
        indexer.add(createIndexRequest(element)))
        .build());

private static IndexRequest createIndexRequest(String element) {
    Map<String, Object> json = new HashMap<>();
    json.put("data", element);

    return Requests.indexRequest()
        .index("my-index")
        .id(element)
        .source(json);
}

本地运行报错

java.lang.IllegalStateException: The elasticsearch emitter must be serializable.
错误提示很明确:emitter没有序列化,点击查看ElasticsearchEmitter的源码其实最终是有继承序列化接口的,那就猜测是使用lambda方式不能自己实现序列化,那就自己写个实现类ElasticsearchEmitter接口然后显示序列化一次。

修改后代码

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.connector.elasticsearch.sink.Elasticsearch7SinkBuilder;
import org.apache.flink.streaming.api.datastream.DataStream;

import org.apache.http.HttpHost;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.Requests;

import java.util.HashMap;
import java.util.Map;

DataStream<String> input = ...;

input.sinkTo(
    new Elasticsearch7SinkBuilder<String>()
        .setBulkFlushMaxActions(1) // Instructs the sink to emit after every element, otherwise they would be buffered
        .setHosts(new HttpHost("127.0.0.1", 9200, "http"))
        .setEmitter(
        (element, context, indexer) ->
        indexer.add(createIndexRequest(element)))
        .build());

private static IndexRequest createIndexRequest(String element) {
    Map<String, Object> json = new HashMap<>();
    json.put("data", element);

    return Requests.indexRequest()
        .index("my-index")
        .id(element)
        .source(json);
}

//自己新增类继承ElasticsearchEmitter,实现序列化接口
    static class MyElasticsearchEmitter implements ElasticsearchEmitter<String>, Serializable {

        @Override
        public void emit(String element, SinkWriter.Context context, RequestIndexer indexer) {
            indexer.add(createIndexRequest(element));
        }

    }

启动后验证成功。

flink连接es7抽出完成工具类


import com.alibaba.fastjson.JSON;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.api.connector.sink2.SinkWriter;
import org.apache.flink.connector.elasticsearch.sink.*;
import org.apache.http.HttpHost;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.Requests;

import java.io.Serializable;

@Slf4j
public class Es7SinkFactory {

    public static Sink createSink(String host, Integer port, String schema, String user, String pwd, String index) {
        try {

            ElasticsearchSink<String> elasticsearchSink = new Elasticsearch7SinkBuilder<String>()
                    .setHosts(new HttpHost(host, port, schema))
                    .setConnectionUsername(user)
                    .setConnectionPassword(pwd)
                    .setEmitter(new MyElasticsearchEmitter(index))
                    .setBulkFlushMaxSizeMb(5)//最大缓冲大小
                    .setBulkFlushInterval(5000)//刷新间隔
                    .setBulkFlushMaxActions(1000)//批量插入最大条数
                    .setBulkFlushBackoffStrategy(FlushBackoffType.CONSTANT, 5, 100)
                    .build();

            return elasticsearchSink;
        } catch (Exception e) {
            log.error("Es7SinkFactory.createSink error e=", e);
        }

        return null;
    }

    private static IndexRequest createIndexRequest(String element, String index) {
        return Requests.indexRequest()
                .index(index)
                .source(JSON.parseObject(element));
    }


    //匿名内部类/lamda表达式未集成序列化接口,需要实现序列化接口
    static class MyElasticsearchEmitter implements ElasticsearchEmitter<String>, Serializable {

        private String index;

        MyElasticsearchEmitter(String index) {
            this.index = index;
        }

        @Override
        public void emit(String element, SinkWriter.Context context, RequestIndexer indexer) {
            indexer.add(createIndexRequest(element, index));
        }

    }


}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/922143.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

前端常用内容

Style 1. 文本左对齐 style"text-align: left;" 2. 文本居中 style"text-align: center;" 3. 文本右对齐 style"text-align: right;"margin 属性可以设置以下四种类型的外边距&#xff1a; 1. 单一值&#xff1a;为所有四个方向&#xff08;上、…

免费微调自己的大模型(llama-factory微调llama3.1-8b)

目录 1. 名词/工具解释2. 微调过程3. 总结 本文主要介绍通过llama-factory框架&#xff0c;使用Lora微调方法&#xff0c;微调meta开源的llama3.1-8b模型&#xff0c;平台使用的是趋动云GPU算力资源。 微调已经经过预训练的大模型目的是&#xff0c;通过调整模型参数和不断优化…

pytest日志总结

pytest日志分为两类&#xff1a; 一、终端&#xff08;控制台&#xff09;打印的日志 1、指定-s&#xff0c;脚本中print打印出的信息会显示在终端&#xff1b; 2、pytest打印的summary信息&#xff0c;这部分是pytest 的默认输出&#xff08;例如测试结果PASSED, FAILED, S…

labview关于文件路径的问题

在调用文件或拆分文件的时候经常会用到拆分路径函数和创建路径函数&#xff0c;最常用的也是当前应用程序目录或者是当前VI目录。 这里我们看到应用程序目录和VI目录在同一项目中&#xff0c;应用程序目录更像是根目录&#xff0c;往下拆分成了各个VI的子目录。 接下来我们来拆…

【MySQL课程学习】:MySQL安装,MySQL如何登录和退出?MySQL的简单配置

&#x1f381;个人主页&#xff1a;我们的五年 &#x1f50d;系列专栏&#xff1a;MySQL课程学习 &#x1f337;追光的人&#xff0c;终会万丈光芒 &#x1f389;欢迎大家点赞&#x1f44d;评论&#x1f4dd;收藏⭐文章 目录 MySQL在Centos 7环境下的安装&#xff1a; 卸载…

第二十一周机器学习笔记:动手深度学习之——数据操作、数据预处理

第二十周周报 摘要Abstract一、动手深度学习1. 数据操作1.1 数据基本操作1.2 数据运算1.2.1 广播机制 1.3 索引和切片 2. 数据预处理 二、复习RNN与LSTM1. Recurrent Neural Network&#xff08;RNN&#xff0c;循环神经网络&#xff09;1.1 词汇vector的编码方式1.2 RNN的变形…

SSM全家桶 1.Maven

或许总要彻彻底底地绝望一次 才能重新再活一次 —— 24.11.20 maven在如今的idea中已经实现自动配置&#xff0c;不需要我们手动下载 一、Maven的简介和快速入门 Maven 是一款为 Java 项目构建管理、依赖管理的工具(软件)&#xff0c;使用 Maven 可以自动化构建测试、打包和发…

《Python 股票交易分析:开启智能投资新时代》(二)

Python 进行股票交易分析的优势 简洁易读&#xff1a;Python 的语法简洁明了&#xff0c;即使是编程新手也能较快上手&#xff0c;降低了股票交易分析的门槛。 Python 的简洁易读是其在股票交易分析中受欢迎的重要原因之一。Python 的语法简洁明了&#xff0c;与其他编程语言相…

cangjie (仓颉) vscode环境搭建

sdk下载 下载中心-仓颉编程语言官网 可选择半年更新版&#xff0c;不用申请。目前版本&#xff1a;0.53.13 &#xff0c;选择不同平台压缩包下载解压到任意位置即可 补充下载&#xff0c;vscode插件解压后&#xff0c;在vscode扩展中选择从vsix安装&#xff0c;安装后新增名为…

HarmonyOS Next原创项目

学友市集 HarmonyOS毕设,项目完整,代码原创,可接毕设 项目展示 项目简介 学友集市是一款基于HarmonyOS Next开发的二手交易平台,适配HarmonyOS5.0&#xff0c;采用前后端分离架构&#xff0c;致力于为用户提供安全、便捷、高品质的二手商品交易服务。平台整合了华为云认证服…

从〇开始深度学习(0)——背景知识与环境配置

从〇开始深度学习(0)——背景知识与环境配置 文章目录 从〇开始深度学习(0)——背景知识与环境配置写在前面1.背景知识1.1.Pytorch1.2.Anaconda1.3.Pycharm1.4.CPU与GPU1.5.整体关系 2.环境配置2.1.准备工作2.1.1.判断有无英伟达显卡2.1.2.清理电脑里的旧环境 2.1.安装Anaconda…

Gate学习(6) 指令学习3

一、/particle/ 目录及其子目录下的命令 在 `/particle/` 命令目录及其子目录下,可以控制和管理粒子相关的属性和过程。以下是每个命令目录和命令的简要解释: ### `/particle/` 这是粒子控制命令的主目录,包括选择粒子、列出粒子名称、查找粒子编码、创建所有离子和同位旋等…

【Git】:Git基本操作

目录 创建、配置本地仓库 创建本地仓库 配置本地仓库 认识工作区、暂存区、版本库 修改文件 版本回退 撤销修改 删除文件 创建、配置本地仓库 创建本地仓库 我们通常可以通过以下两种方式之一获取 Git 存储库&#xff1a; 自己在本地目录创建一个本地仓库 从其它服务…

android 性能分析工具(03)Android Studio Profiler及常见性能图表解读

说明&#xff1a;主要解读Android Studio Profiler 和 常见性能图表。 Android Studio的Profiler工具是一套功能强大的性能分析工具集&#xff0c;它可以帮助开发者实时监控和分析应用的性能&#xff0c;包括CPU使用率、内存使用、网络活动和能耗等多个方面。以下是对Android …

LabVIEW配电网谐波在线监测与分析系统

统利用LabVIEW与NI数据采集卡&#xff0c;结合高精度谐波分析算法&#xff0c;实现了配电网谐波的实时监测与分析。通过虚拟仪器技术的灵活性和扩展性&#xff0c;显著提高电网运行的可靠性与电能质量&#xff0c;提供了一套有效的技术解决方案。 项目背景 随着非线性负载&am…

git使用(二)

git使用&#xff08;二&#xff09; git常用基本操作命令git clonegit loggit remotegit statusgit addgit commitgit pushgit branchgit pull git常用基本操作命令 git clone 项目开发中项目负责人会在github上创建一个远程仓库&#xff0c;我们需要使用git clone将远程仓库…

Excel求和如何过滤错误值

一、问题的提出 平时&#xff0c;我们在使用Excel时&#xff0c;最常用的功能就是求和了&#xff0c;一说到求和你可能想到用sum函数&#xff0c;但是如果sum的求和区域有#value #Div等错误值怎么办&#xff1f;如下图&#xff0c;记算C列中工资的总和。 直接用肯定会报错&…

【数据分享】2024年我国省市县三级的住宿服务设施数量(8类住宿设施/Excel/Shp格式)

宾馆酒店、旅馆招待所等住宿服务设施的配置情况是一个城市公共基础设施完善程度的重要体现&#xff0c;一个城市住宿服务设施种类越丰富&#xff0c;数量越多&#xff0c;通常能表示这个城市的公共服务水平越高&#xff01; 本次我们为大家带来的是我国各省份、各地级市、各区…

自制Windows系统(十)

上图 &#xff08;真的不是Windows破解版&#xff09; 开源地址&#xff1a;仿Windows

Ubuntu20.04下安装向日葵

向日葵远程控制app官方下载 - 贝锐向日葵官网 下载Ununtu版的图形版本的安装deb包SunloginClient_15.2.0.63064_amd64.deb 直接执行 sudo dpkg -i SunloginClient_15.2.0.63064_amd64.deb 的话会报错: 如果在Ubuntu20.04里直接执行sudo apt install libgconf-2-4安装libgco…