Kafka篇:Kafka搭建、使用、及Flink整合Kafka文档

一、Kafka搭建

1、上传并解压改名

tar -xvf kafka_2.11-1.0.0.tgz

mv kafka_2.11-1.0.0 kafka-1.0.0

2、配置环境变量

      vim /etc/profile

export KAFKA_HOME=/usr/local/soft/kafka-1.0.0
export PATH=$PATH:$KAFKA_HOME/bin  

     source /etc/profile (使环境变量生效)

3、修改配置文件

      vim config/server.properties

#将从节点的broker.id修改为1,2

broker.id=0       

#指定数据存放的位置,包含了Kafka集群中所有topic的分区数据 
log.dirs=/usr/local/soft/kafka-1.0.0/data         

#指定Kafka如何连接到其依赖的ZooKeeper集群
zookeeper.connect=master:2181,node1:2181,node2:2181/kafka       

4、将kafka文件同步到node1,node2

4.1  同步kafka文件

scp -r kafka-1.0.0/ node1:`pwd`
scp -r kafka-1.0.0/ node2:`pwd`

4.2  因为kafka不是主从架构的分布式组件,而是去中心化的架构,没有主从节点之分,所以也要将master中的而环境变量同步到node1和node2中

scp /etc/profile node1:/etc/
scp /etc/profile node2:/etc/

4.3  在ndoe1和node2中执行source

source /etc/profile

4.4  修改node1和node2中的broker.id为1,2

# node1
broker.id=1
# node2
broker.id=2

5、启动kafka

前提:已经安装了zookeeper,没安装可以参考以前的写的一个zookeeper的搭建文档

链接:http://t.csdnimg.cn/qpuKV

启动步骤:

5.1 因为kafka使用zookeeper保存元数据需要,所以需要先在三个节点上启动zookeeper(也是去中心化架构)

zkServer.sh start

5.2 查看zookeeper启动的状态

zkServer.sh status       #一个节点Mode:leader,剩余节点Mode: follower说明启动成功

5.3  启动kafka,每个节点中都要启动(去中心化的架构)

# -daemon后台启动

kafka-server-start.sh -daemon /usr/local/soft/kafka-1.0.0/config/server.properties

5.4  测试是否成功

#生产者
kafka-console-producer.sh --broker-list master:9092,node1:9092,node2:9092 --topic 

二、Kafka的使用

1、创建topic

注意:在生产和消费数据时,如果topic不存在会自动创建一个分区为1,副本为1的topic

--replication-factor     #每一个分区的副本数量, 同一个分区的副本不能放在同一个节点,副本的数量不能大于kafak集群节点的数量

--partition          #分区数, 根据数据量设置

--zookeeper      #zk的地址,将topic的元数据保存在zookeeper中 ​

kafka-topics.sh --create --zookeeper master:2181,node1:2181,node2:2181/kafka --replication-factor 2 --partitions 3 --topic bigdata

删除topic
kafka-topics.sh --delete --topic  bigdata--zookeeper master:2181,node1:2181,node2:2181/kafka

 

2、查看topic描述信息

kafka-topics.sh --describe  --zookeeper master:2181,node1:2181,node2:2181/kafka --topic kfkuse

3、获取所有topic

kafka-topics.sh --list --zookeeper master:2181,node1:2181,node2:2181/kafka

__consumer_offsetsL是kafka自带的用于保存消费偏移量的topic

 

4、创建控制台生产者

kafka-console-producer.sh --broker-list master:9092,node1:9092,node2:9092 --topic kfkuse

5、创建控制台消费者

--from-beginning   从头消费,, 如果不在执行消费的新的数据

kafka-console-consumer.sh --bootstrap-server master:9092,node1:9092,node2:9092 --from-beginning --topic bigdata

6、kafka数据保存的方式及相关事项

# 1、保存的文件
/usr/local/soft/kafka_2.11-1.0.0/data

# 2、每一个分区每一个副本对应一个目录

# 3、每一个分区目录中可以有多个文件, 文件时滚动生成的
00000000000000000000.log
00000000000000000001.log
00000000000000000002.log

# 4、滚动生成文件的策略
log.segment.bytes=1073741824           #达到1GB左右滚动生成一个文件
log.retention.check.interval.ms=300000      #每隔5分钟检查一次文件数据是否被清理

# 5、文件删除的策略,默认为7天,以文件为单位删除
log.retention.hours=168

三、Flink整合kafka

1、IDEA中整合

  1.1 根据自己的flink版本添加依赖

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-kafka</artifactId>
    <version>1.15.2</version>
</dependency>

  1.2 案例

    案例1:编写flink代码,使用flink从kafka中读数据

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

/**
 * 使用flink从kafka中读数据
 */
public class Demo1KafkaSource {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        /*
         *内置的位点初始化器包括:
         *     // 从消费组提交的位点开始消费,不指定位点重置策略
         *     .setStartingOffsets(OffsetsInitializer.committedOffsets())
         *     // 从消费组提交的位点开始消费,如果提交位点不存在,使用最早位点
         *     .setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST))
         *     // 从时间戳大于等于指定时间戳(毫秒)的数据开始消费
         *     .setStartingOffsets(OffsetsInitializer.timestamp(1657256176000L))
         *     // 从最早位点开始消费
         *     .setStartingOffsets(OffsetsInitializer.earliest())
         *     // 从最末尾位点开始消费
         *     .setStartingOffsets(OffsetsInitializer.latest());
         */
        //创建kafka source
        KafkaSource<String> source = KafkaSource.<String>builder()
                .setBootstrapServers("master:9092,node1:9092,node2:9092")     //设置kafka集群列表
                .setTopics("hash_students")     //指定消费的topic
                .setStartingOffsets(OffsetsInitializer.earliest())     //从最早点开始消费
                .setValueOnlyDeserializer(new SimpleStringSchema())      //设置读取数据的编码格式utf-8
                .build();

        //使用kafka source
        DataStreamSource<String> studentsDS = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");

        studentsDS.print();

        env.execute();

    }
}

 

案例2:使用flink从kafka中读json数据,解析json格式的数据

使用阿里的fastjson工具,要先添加fastjson依赖

<dependency>
    <groupId>com.alibaba</groupId>
    <artifactId>fastjson</artifactId>
    <version>1.2.83</version>
</dependency>
import com.alibaba.fastjson.JSON;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class Demo3Json {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        //创建kafka source
        KafkaSource<String> source = KafkaSource.<String>builder()
                .setBootstrapServers("master:9092,node1:9092,node2:9092")     //设置kafka集群列表
                .setTopics("cars")     //指定消费的topic
                .setGroupId("flink_group1")     //指定消费者组
                .setStartingOffsets(OffsetsInitializer.earliest())     //从最早尾位点开始消费
                .setValueOnlyDeserializer(new SimpleStringSchema())      //设置读取数据的编码格式utf-8
                .build();

        //使用kafka source
        DataStreamSource<String> kafkaSource = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");

        //解析json格式的数据
        DataStream<car> cars = kafkaSource.map(line -> JSON.parseObject(line, car.class));

        cars.print();
        env.execute();
    }
}


@Data
@AllArgsConstructor
@NoArgsConstructor
class car{
    private String car;
    private String city_code;
    private String county_code;
    private String card;
    private String camera_id;
    private String orientation;
    private Long road_id;
    private Long time;
    private Double speed;
}

 

案例3:将本地文件的数据生产(写入)到kafka中

import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.connector.base.DeliveryGuarantee;
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
import org.apache.flink.connector.kafka.sink.KafkaSink;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class Demo2FIleToKafkaSink {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStream<String> carsDS = env.readTextFile("flink/data/cars_sample.json");

        //创建kafka sink
        KafkaSink<String> sink = KafkaSink.<String>builder()
                .setBootstrapServers("master:9092,node1:9092,node2:9092")//kafka集群列表
                .setRecordSerializer(KafkaRecordSerializationSchema.builder()
                        .setTopic("cars")//指定topic
                        .setValueSerializationSchema(new SimpleStringSchema())//指定数据格式
                        .build()
                )
                //指定数据处理的语义
                .setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
                .build();

        //使用kafka sink
        carsDS.sinkTo(sink);

        env.execute();
    }
}

 

2、集群中整合

将flink-sql-connector-kafka-1.15.2.jar包上传到Flink的lib目录下就行

注意:一些外部的依赖,比如说上述案例中使用的fastjson依赖也要上传,否则会报错或者会运行不成功。

至于在什么集群中运行,以及集群中运行的详细步骤,我在Flink系列三中以及详细写过,在此不再赘述了。

文章链接:http://t.csdnimg.cn/W1yZL

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/667208.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

05.k8s弹性伸缩

5.k8s弹性伸缩 k8s弹性伸缩,需要附加插件heapster监控 弹性伸缩&#xff1a;随着业务访问量的大小&#xff0c;k8s系统中的pod比较弹性&#xff0c;会自动增加或者减少pod数量&#xff1b; 5.1 安装heapster监控 1:上传并导入镜像,打标签 ls *.tar.gz for n in ls *.tar.gz…

Linux下的Git应用

1、卸载 2、安装 3、创建并初始化 4、配置 &#xff08;附加删除语句&#xff09; 5、查看&#xff08;tree .git/&#xff09; 6、增加和提交 7、打印日志 8、验证已操作工作

7、css3实现边框不停地跑动效果

效果例图&#xff1a; 1、上html代码&#xff1a; <!DOCTYPE html> <html lang"zh"> <head><meta charset"UTF-8" /><meta name"viewport" content"widthdevice-width, initial-scale1.0" /><meta …

unity打包的WebGL部署到IIS问题

部署之后会出错&#xff0c;我遇到的有以下几种&#xff1b; 进度条卡住不动 明明已经部署到了IIS上&#xff0c;为什么浏览网页的时候还是过不去或者直接报错。 进度条卡住不动的问题其实就是wasm和data的错误。 此时在浏览器上按F12进入开发者模式查看错误&#xff08;下图…

USB主机模式——Android

理论 摘自&#xff1a;USB 主机和配件概览 | Connectivity | Android Developers (google.cn) Android 通过 USB 配件和 USB 主机两种模式支持各种 USB 外围设备和 Android USB 配件&#xff08;实现 Android 配件协议的硬件&#xff09;。 在 USB 主机模式下&#xff0…

香橙派 AIpro上手体验并验证车道线识别算法

香橙派 AIpro上手体验并验证车道线识别算法 1.前言 最近入手了一块香橙派AIpro&#xff0c;体验了一下&#xff0c;感觉还不错&#xff0c;在这里分享给大家&#xff0c;大家可以做个参考。 2.开箱 整套产品包含一块主板、一个电源插头和一条双端Type-C的数据线&#xff0c;…

jenkins本地打包远程部署项目

1、 Jenkins简介 Jenkins是一款开源的持续集成工具&#xff0c;用于自动化构建、测试和部署软件项目。它可以通过插件扩展来支持各种不同的开发语言和工具。Jenkins提供了一个简单易用的Web界面&#xff0c;可以通过界面配置和管理构建任务&#xff0c;也可以通过命令行工具进…

实物资产的市场主线将逐步回归

民生证券认为&#xff0c;投资者逐渐意识到长期趋势并没有发生变化&#xff0c;这或许正是本周最大的变化。在预期博弈重回冷静期后&#xff0c;去金融化背景下实物资源占优的市场主线也将逐步回归。 1 高低切换后的冷静期 从4月下旬至上周&#xff0c;A股市场呈现出由高位资产…

【iOS】UI学习(二)

UI学习&#xff08;二&#xff09; 进度条和滑动条步进器与分栏控件警告对话框和提示等待器UITextFieldUITextField控件UITextFieldDelegate协议 UIScrollView布局子视图手动布局子视图自动布局子视图 进度条和滑动条 下面通过一个程序来讲解该内容&#xff1a; #import <…

c++函数基础总结

在给出的代码片段中&#xff0c;我们看到两部分内容&#xff1a;一个类定义和一个全局函数声明。让我们逐一分析它们&#xff1a; 类定义&#xff1a; cpp复制代码 class { public: void a(); }; 这个类定义是不完整的&#xff0c;因为它没有类名。但为了说明&#xff0c;我…

变量的作用域

自学python如何成为大佬(目录):https://blog.csdn.net/weixin_67859959/article/details/139049996?spm1001.2014.3001.5501 变量的作用域是指程序代码能够访问该变量的区域&#xff0c;如果超出该区域&#xff0c;再访问时就会出现错误。在程序中&#xff0c;一般会根据变量…

HarmonyOS鸿蒙学习笔记(28)@entry和@Component的生命周期

entry和Component的生命周期 entry和Component的关系Component生命周期Entry生命周期 生命周期流程图生命周期展示示例代码参考资料 HarmonyOS的生命周期可以分为Compnent的生命周期和Entry的生命周期&#xff0c;也就是自定义组件的生命周期和页面的生命周期。 entry和Compone…

MySQL 索引使用(二)

本篇继续介绍有关索引的使用。 目录 一、SQL提示 二、单列索引和联合索引 三、覆盖索引 四、前缀索引 五、索引的使用原则 一、SQL提示 我们在使用索引来进行查询时&#xff0c;很有可能会出现一个字段中包含多个索引的情况&#xff0c;例如这里有一个name字段&#xff0c…

曾巩,散文的艺术与哲思

曾巩&#xff0c;字子固&#xff0c;世称南丰先生&#xff0c;南丰&#xff08;今江西&#xff09;人&#xff0c;生于北宋真宗天禧三年&#xff08;公元1019年&#xff09;&#xff0c;卒于北宋元丰六年&#xff08;公元1083年&#xff09;&#xff0c;享年64岁。他是中国北宋…

Unity开发——编辑器打包、3种方式加载AssetBundle资源

一、创建ab资源 &#xff08;一&#xff09;Unity资源设置ab格式 1、选中要打包成assetbundle的资源&#xff1b; 可以是图片&#xff0c;材质球&#xff0c;预制体等&#xff0c;这里方便展示用预制体打包设置展示&#xff1b; 2、AssetBundle面板说明 &#xff08;1&…

【React篇】组件错误边界处理(组件错误引起的页面白屏)

我们知道在生产环境react错误会导致整个页面崩溃&#xff0c;显示为空白页面。 比如下图的错误&#xff0c;导致了左侧页面直接白屏&#xff1a; 由于某一个组件报错导致整个页面崩溃是很严重的问题&#xff0c;那么我们应该如何去降低代码报错带来的影响呢&#xff1f; 我们…

JavaScript 动态网页实例 —— 窗口控制

除了打开和关闭窗口之外,还有很多其他控制窗口的方法。例如,可以使用 window.focus()方法使窗口获得焦点,也可以利用与其相对的window.blur 方法使窗口失去焦点。本节介绍移动窗口、改变窗口大小、窗口滚动、窗口超时操作、常用窗口事件、常用窗口扩展等窗口控制的方法和手段。…

mac电脑鼠标键盘共享软件:ShareMouse for Mac 激活版

ShareMouse 是一款跨平台的键盘和鼠标共享软件&#xff0c;它允许用户在多台计算机之间共享同一组键盘和鼠标&#xff0c;实现无缝的操作和控制。该软件适用于 Windows 和 macOS 系统&#xff0c;并且支持多种连接方式&#xff0c;包括局域网连接和无线连接。 使用 ShareMouse&…

从openstack环境中将服务器镜像导出的简单办法

1 登录openstack的页面&#xff0c;找到计划导出的主机信息。 通过实例名称&#xff0c; IP地址&#xff0c;找到对应的记录。点击实例名称&#xff0c;进入详情页。 在这里主要可以知道&#xff0c;当前主机在服务器上的文件ID&#xff0c;可以按这个ID去找对应的目录。 还可…

java读取文件内容(正则表达式匹配)

已知文件score.txt内容如下&#xff1a; 语文85分&#xff0c;数学89分&#xff0c;英语75分&#xff0c;马列95分。 要求解析出其中的成绩数据&#xff0c;并计算总成绩 import java.io.BufferedReader; import java.io.FileReader; import java.io.IOException; import jav…