kafka 发送文件二进制流及使用header发送附属信息

文章目录

  • 背景
  • 案例
    • 发送方
    • 接收方

背景

需要使用kafka发送文件二进制以及附属信息

案例

发送方

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.util.Properties;

public class SendFileToKafka {

    public static void main(String[] args) {

        String filePath = "com/example/kafka/file/ConsumerFileByteArrayFromKafka.java";

        Properties kafkaProps = new Properties();
        kafkaProps.put("bootstrap.servers", "192.168.56.112:9092");
        kafkaProps.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        kafkaProps.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
        KafkaProducer<String, byte[]> producer = new KafkaProducer<>(kafkaProps);
        InputStream in = SendFileToKafka.class.getResourceAsStream("/com/example/kafka/file/ConsumerFileByteArrayFromKafka.java");
        try {
            byte[] buffer = new byte[in.available()];
            // 读到buffer字节数组中
            in.read(buffer);
            ProducerRecord<String, byte[]> record = new ProducerRecord<>("dataTopic", buffer);
            String header = "aaa";
            record.headers().add("test_header", header.getBytes(StandardCharsets.UTF_8));
            producer.send(record);
            in.close();
            producer.close();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

接收方

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.Headers;

import java.util.Arrays;
import java.util.Properties;

public class ConsumerFileByteArrayFromKafka {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "192.168.56.112:9092");
        props.put("group.id", "group1");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");

        KafkaConsumer<String, byte[]> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Arrays.asList("dataTopic"));
        try {
            while (true) {
                ConsumerRecords<String, byte[]> records = consumer.poll(100);
                for (ConsumerRecord<String, byte[]> record : records) {
                    Headers headers = record.headers();
                    Iterable<Header> testHeader = headers.headers("test_header");
                    for (Header header : testHeader) {
                        String recordHeader = new String(header.value(), "UTF-8");
                        System.out.println("recordHeader => " + recordHeader);
                    }
                    byte[] message = record.value();
                    System.out.println(new String(message));
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            consumer.close();
        }
    }
}

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/671143.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

Part 4.1 线性动态规划

线性动态规划&#xff0c;即具有线性阶段划分的动态规划。 [USACO1.5] [IOI1994]数字三角形 Number Triangles 题目描述 观察下面的数字金字塔。 写一个程序来查找从最高点到底部任意处结束的路径&#xff0c;使路径经过数字的和最大。每一步可以走到左下方的点也可以到达右…

统计信号处理基础 习题解答10-6

题目 在例10.1中&#xff0c;把数据模型修正为&#xff1a; 其中是WGN&#xff0c;如果&#xff0c;那么方差&#xff0c;如果&#xff0c;那么方差。求PDF 。把它与经典情况PDF 进行比较&#xff0c;在经典的情况下A是确定性的&#xff0c;是WGN&#xff0c;它的方差为&#…

数据库(17)——DCL数据控制语言

DCL DCL是Data Control Language数据控制语言&#xff0c;用来管理数据库用户、控制数据库的访问权限。 DCL-管理用户 语法 1.查询用户 USE mysql; SELECT * FROM user; 也可以直接在datagrip找到user表 我们要操作用户要通过User和Host同时定位。Host表示当前用户只能在哪个…

【深度学习实战—9】:基于MediaPipe的坐姿检测

✨博客主页&#xff1a;王乐予&#x1f388; ✨年轻人要&#xff1a;Living for the moment&#xff08;活在当下&#xff09;&#xff01;&#x1f4aa; &#x1f3c6;推荐专栏&#xff1a;【图像处理】【千锤百炼Python】【深度学习】【排序算法】 目录 &#x1f63a;一、Med…

前端Vue自定义滚动卡片组件设计与实现

摘要 随着技术的日新月异&#xff0c;前端开发的复杂度不断提升。传统的整块应用开发方式在面对小的改动或功能增加时&#xff0c;常常需要修改大量的整体逻辑&#xff0c;造成开发效率低下和维护成本高昂。为了应对这一挑战&#xff0c;组件化开发应运而生。本文将以Vue框架下…

Day46 动态规划part06

完全背包问题 完全背包和01背包问题唯一不同的地方就是&#xff0c;每种物品有无限件。先遍历物品还是先遍历背包以及遍历顺序 根据递推公式可知&#xff1a;每一个dp需要根据上方和左方的数据推出&#xff0c;只要保证数据左上方数据是递推出来的这种两个for循环的顺序就是可…

使用Gradio构建大模型应用:Building Generative AI Applications with Gradio

Building Generative AI Applications with Gradio 本文是学习 https://www.deeplearning.ai/short-courses/building-generative-ai-applications-with-gradio/ 这门课的学习笔记。 What you’ll learn in this course Join our new short course, Building Generative AI A…

C++与Java数据结构设计的区别(持续更新中)

链表 c代码示例 #include <list> int main() {std::list<int> list;list.push_front(1);list.push_back(2);std::list<int>::iterator it list.begin();while (it ! list.end()){std::cout << *it << std::endl;}return 0; } java代码示例 …

C#WPF数字大屏项目实战06--报警信息

1、ItemsControl 简介 ItemsControl 是用来表示一些条目集合的控件&#xff0c;所以它叫条目控件&#xff0c;它的成员是一些其它控件的集合&#xff0c;其继承关系如下&#xff1a; 其常用的派生控件为&#xff1a;ListBox、ListView、ComboBox&#xff0c;为ItemsCo…

Spring Boot 官方不再支持 Spring Boot 的 2.x 版本!新idea如何创建java8项目

idea现在只能创建最少jdk17 使用 IDEA 内置的 Spring Initializr 创建 Spring Boot 新项目时&#xff0c;没有 Java 8 的选项了&#xff0c;只剩下了 > 17 的版本 是因为 Spring Boot 官方不再支持 Spring Boot 的 2.x 版本了&#xff0c;之后全力维护 3.x&#xff1b;而 …

Mac专用投屏工具:AirServer 7 for Mac 激活版下载

AirServer 7 是一款在 Windows 和 macOS 平台上运行的强大的屏幕镜像和屏幕录制软件。它能够将 iOS 设备、Mac 以及其他 AirPlay、Google Cast 和 Miracast 兼容设备的屏幕镜像到电脑上&#xff0c;并支持高质量的录制功能。总的来说&#xff0c;AirServer 7 是一款功能全面的屏…

配置 HTTP 代理 (HTTP proxy)

配置 HTTP 代理 [HTTP proxy] 1. Proxies2. curl2.1. Environment2.2. Proxy protocol prefixes 3. Use an HTTP proxy (使用 HTTP 代理)3.1. Using the examples (使用示例)3.1.1. Linux or macOS3.1.2. Windows Command Prompt 3.2. Authenticating to a proxy (向代理进行身…

Java开发分析工具:JProfiler 14 for Mac/win 激活版下载

JProfiler是一款功能强大的Java应用程序性能分析工具&#xff0c;适用于Java开发人员和企业用户&#xff0c;可帮助他们识别和解决Java应用程序中的性能问题&#xff0c;提高应用程序的性能和稳定性。使用JProfiler&#xff0c;开发人员可以实时查看Java应用程序的性能数据&…

如何实现对亚马逊网站产品搜索结果网页中自动上下页的翻页

需要在亚马逊网站中通过关键词搜索产品时&#xff0c;实现对网页自动进行点击上下页链接翻页&#xff0c;以便进行数据提取。经观察发现&#xff0c;包含上下页翻页链接的HTML内容及代码如下所示&#xff1a; <a href"/s?kiphonecharger&amp;crid1EE3FWFFFIWJEOF&…

mediasoup基础概览

提示&#xff1a;本文为之前mediasoup基础介绍的优化 mediasoup基础概览 架构&#xff1a;2.特性&#xff1a;优点缺点 3.mediasoup常见类介绍js部分c 4.mediasoup类图5.业务类图 Mediasoup 是一个构建在现代 Web 技术之上的实时通信&#xff08;RTC&#xff09;解决方案&#…

Mac硬件设备系统环境的升级/更新 macOS

Mac硬件设备上进行系统环境的升级/更新macOS 1.大版本(升级)判断(比如&#xff1a;我买的这台电脑设备最高支持Monterey) 点击进入对应的大版本描述说明页查看相关的兼容性描述&#xff0c;根据描述确定当前的电脑设备最高可采用哪个大版本系统(Sonoma/Ventura/Monterey/Big Su…

vue中使用WebSocket心跳机制与Linux中的心跳机制

WebSocket心跳机制 一、WebSocket简介 WebSocket是HTML5开始提供的一种浏览器与服务器进行全双工通讯的网络技术&#xff0c;属于应用层协议。 WebSocket 使得客户端和服务器之间的数据交换变得更加简单&#xff0c;允许服务端主动向客户端推送数据。 二、WebSocket事件与方法 …

为什么总是卡在验证真人这里无法通过验证?

最近总是在浏览某些网站的时候卡在这个“确认你是真人”的验证页面&#xff0c;无法通过真人验证&#xff0c;这是怎么回事儿&#xff1f;如何解决呢&#xff1f; 首先&#xff0c;出现这个“确认您是真人”的验证一般都是这个网站使用了 CloudFlare 的安全防护 WAF 规则才会出…

【每日刷题】Day54

【每日刷题】Day54 &#x1f955;个人主页&#xff1a;开敲&#x1f349; &#x1f525;所属专栏&#xff1a;每日刷题&#x1f34d; &#x1f33c;文章目录&#x1f33c; 1. 575. 分糖果 - 力扣&#xff08;LeetCode&#xff09; 2. 147. 对链表进行插入排序 - 力扣&#xf…

Ubuntu 24.04 LTS 安装Docker

1 更新软件包索引&#xff1a; sudo apt-get update 2 安装必要的软件包&#xff0c;以允许apt通过HTTPS使用仓库&#xff1a; sudo apt-get install apt-transport-https ca-certificates curl software-properties-common 3 添加Docker的官方GPG密钥&#xff1a; curl -fs…