PHP小白搭建Kafka环境以及初步使用rdkafka

提示:文章写完后,目录可以自动生成,如何生成可参考右边的帮助文档

文章目录

  • 前言
  • 一、安装java(Kafka必须安装java,因为kafka依赖java核心)
  • 二、安装以及配置Kafka、zookeeper
    • 1.下载Kafka(无需下载zookeeper,使用kafka自带的即可)
    • 2.配置topid
    • 3.安装PHP的rdkafka,这个网上教程很多,基本上都是正确的


前言

提示:windows环境安装失败,Linux环境安装成功(以下并没有windows安装示例)

一、安装java(Kafka必须安装java,因为kafka依赖java核心)

下载地址:链接: https://www.oracle.com/java/technologies/downloads/#jdk20-linux
在这里插入图片描述
将文件放在Linux目录中后进行解压:

假设我把[jdk-20_linux-x64_bin.tar.gz]包放在了/root/src/uap/web/third 目录下
1、tar -zxvf jdk-20_linux-x64_bin.tar.gz
2、mv jdk.0.20 ./jdk
3、vim /etc/profile 
 JAVA_HOME=/root/src/uap/web/third/jdk
 PATH=/root/src/uap/web/third/jdk/bin:$PATH
 export JAVA_HOME
4、source /ect/profile
5、java -version (出现下图极为成功)

在这里插入图片描述

二、安装以及配置Kafka、zookeeper

1.下载Kafka(无需下载zookeeper,使用kafka自带的即可)

下载地址:https://kafka.apache.org/downloads
提示:不要下载带src的那个,具体我也不知道,因为我也是个小白
在这里插入图片描述

假设我把[kafka_2.12-3.5.1.tgz]包放在了/root/src/uap/web/third 目录下
1、tar -zxvf kafka_2.12-3.5.1.tgz
2、mv kafka.2.12 ./kafka
3、创建kafka日志文件
 mkdir -p ./kafka_data/log/kafka
 mkdir -p ./kafka_data/log/zookeeper
 mkdir -p ./kafka_data/zookeeper
4、cd ./kafka/config
vim server.properties
 listeners=PLAINTEXT://localhost:9092 (34行左右,添加对应的host、port)
 broker.id=0
 port=9092
 host.name=192.168.1.241
 log.dirs=/root/src/uap/web/third/kafka_data/log/kafka
 zookeeper.connect=localhost:2181
wd
vim zookeeper.properties
 dataDir=/root/src/uap/web/third/kafka_data/zookeeper
 dataLogDir=/root/src/uap/web/third/kafka_data/log/zookeeper
 clientPort=2181
 maxClientCnxns=100
 tickTimes=2000
 initLimit=10
 syncLimit=5
wd
5、cd ../ 进入kafka目录下
#启动zookeeper
./bin/zookeeper-server-start.sh ./config/zookeeper.properties &
//如果其中报错,大部分应该是报JAVA_HOME 这个说明你没有配置 /etc/profile 上面有
./bin/kafka-server-start.sh -daemon ./config/server.properties &

2.配置topid

代码如下(示例):

kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic myt
返回值:Created topic myt.  创建成功/否则失败

3.安装PHP的rdkafka,这个网上教程很多,基本上都是正确的

例如:阿里云开发者社区,php安装rdkafka教程
剩下逻辑就直接贴代码了

生产者:
public function producer(){
        $conf = new RdKafka\Conf();
        $conf->set('metadata.broker.list', 'localhost:9092');
        $producer = new RdKafka\Producer($conf);
        $topic = $producer->newTopic("mytest");
        //获取数据库数据,存入kafka中
        $wanchk = $this->db->query("SELECT * FROM hf_alarm_wanchk");
        foreach ($wanchk as $k => $v){
            $topic->produce(RD_KAFKA_PARTITION_UA, 0, array2json($v));
            $producer->poll(0);
        }
        
        $result = $producer->flush(10000);
        if (RD_KAFKA_RESP_ERR_NO_ERROR !== $result) {
            throw new \RuntimeException('Was unable to flush, messages might be lost!');
        }
        $producer->purge(RD_KAFKA_PURGE_F_QUEUE);
        $producer->flush(10000);
    }
消费者:
//这个代码需要使用终端运行:
// /bin/php -c /etc/php.ini  -f  /入口文件目录/index.php (类)consumer (方法)consumer
 public function consumer()
    {
        $conf = new \RdKafka\Conf();

        $conf->set('group.id', 'mytest');

        $rk = new \RdKafka\Consumer($conf);

        $rk->addBrokers("127.0.0.1");

        $topicConf = new \RdKafka\TopicConf();

        $topicConf->set('auto.commit.interval.ms', 100);

        $topicConf->set('offset.store.method', 'broker');

        $topicConf->set('auto.offset.reset', 'smallest');

        $topic = $rk->newTopic('mytest', $topicConf);

        $topic->consumeStart(0, RD_KAFKA_OFFSET_STORED);

        while (true) {
            $message = $topic->consume(0, 120 * 10000);
            switch ($message->err) {
                case RD_KAFKA_RESP_ERR_NO_ERROR:
                    var_dump($message);
                    break;
                case RD_KAFKA_RESP_ERR__PARTITION_EOF:
                    echo "No more messages; will wait for more\n";
                    break;
                case RD_KAFKA_RESP_ERR__TIMED_OUT:
                    echo "Timed out\n";
                    break;
                default:
                    throw new \Exception($message->errstr(), $message->err);
                    break;
            }
        }
    } 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/90056.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

优维产品最佳实践第5期:什么是持续集成?

谈到到DevOps,持续交付流水线是绕不开的一个话题,相对于其他实践,通过流水线来实现快速高质量的交付价值是相对能快速见效的,特别对于开发测试人员,能够获得实实在在的收益。 本期EasyOps产品使用最佳实践&#xff0c…

Java的锁大全

Java的锁 各种锁的类型 乐观锁 VS 悲观锁 乐观锁与悲观锁是一种广义上的概念,体现了看待线程同步的不同角度。在Java和数据库中都有此概念对应的实际应用。 先说概念。对于同一个数据的并发操作,悲观锁认为自己在使用数据的时候一定有别的线程来修改数…

陪诊系统源码开发:实现个性化医疗陪护的创新之路

陪诊系统的源码开发在现代医疗中具有重要意义。本文将通过代码示例介绍陪诊系统的源码开发,展示如何实现个性化医疗陪护的创新方案。 1. 安装和环境设置: 首先,确保你的开发环境中已经安装了合适的编程语言和框架,比如Python和…

c++ qt--页面布局(第五部分)

c qt–页面布局(第五部分) 一.页面布局 在设计页面的左侧一栏的组件中我们可以看到进行页面布局的一些组件 布局组件的使用 1.水平布局 使用:将别的组件拖到水平布局的组件中即可,可以选择是在哪个位置 2.垂直布局 使用&…

玩转 PI 系列-看起来像服务器的 ARM 开发板矩阵-Firefly Cluster Server

前言 基于我个人的工作内容和兴趣,想要在家里搞一套服务器集群,用于容器/K8s 等方案的测试验证。 考虑过使用二手服务器,比如 Dell R730, 还搞了一套配置清单,如下: Dell R7303.5 尺寸规格硬盘CPU: 2686v4*2 内存&a…

初阶c语言:趣味扫雷游戏

目录 前言 制作菜单 构建游戏选择框架 实现游戏功能 模块化编程:查看前节三子棋的内容 初始化雷区 ​编辑 优化棋盘 随机埋入地雷 点击后的决策 实现此功能代码 game();的安排 前言 《扫雷》是一款大众类的益智小游戏&…

从0到1学会Git(第一部分):Git的下载和初始化配置

1.Git是什么: 首先我们看一下百度百科的介绍:Git(读音为/gɪt/)是一个开源的分布式版本控制系统,可以有效、高速地处理从很小到非常大的项目版本管理。 也是Linus Torvalds为了帮助管理Linux内核开发而开发的一个开放源码的版本控制软件。 …

配置Linux内核支持make menuconfig

新环境从0配置Linux内核支持make menuconfig hudahuahudahua-virtual-machine:~/workspace/tools/linux-5.15.13$ make menuconfigCommand ‘make’ not found, but can be installed with sudo apt install make # version 4.2.1-1.2, or sudo apt install make-guile # vers…

系统上线安全测评需要做哪些内容?

电力信息系统、航空航天、交通运输、银行金融、地图绘画、政府官网等系统再正式上线前需要做安全测试。避免造成数据泄露从而引起的各种严重问题。 那么系统上线前需要做哪些测试内容呢?下面由我给大家介绍 1、安全机制检测-应用安全 身份鉴别 登录控制模块 应提供…

【2023钉钉杯复赛】A题 智能手机用户监测数据分析 Python代码分析

【2023钉钉杯复赛】A题 智能手机用户监测数据分析 Python代码分析 1 题目 一、问题背景 近年来,随着智能手机的产生,发展到爆炸式的普及增长,不仅推动了中 国智能手机市场的发展和扩大,还快速的促进手机软件的开发。近年中国智能…

操作员管理 微人事 项目 SpringBooot + Vue 前后端分离

操作员管理接口设计 HrController RestController RequestMapping("/system/hr") public class HrController {AutowiredHrService hrService;GetMapping("/")public List<Hr> getAllHr(){return hrService.getAllHr();}}HrService public List<…

【深度学习_TensorFlow】过拟合

写在前面 过拟合与欠拟合 欠拟合&#xff1a; 是指在模型学习能力较弱&#xff0c;而数据复杂度较高的情况下&#xff0c;模型无法学习到数据集中的“一般规律”&#xff0c;因而导致泛化能力弱。此时&#xff0c;算法在训练集上表现一般&#xff0c;但在测试集上表现较差&…

FPGA应用于图像处理

FPGA应用于图像处理 FPGA&#xff08;Field-Programmable Gate Array&#xff09;直译过来就是现场可编程门阵列。是一种可以编程的逻辑器件&#xff0c;具有高度的灵活性&#xff0c;可以根据具体需求就像编程来实现不同的功能。 FPGA器件属于专用的集成电流中的一种半定制电…

TinyVue - 华为云 OpenTiny 出品的企业级前端 UI 组件库,免费开源,同时支持 Vue2 / Vue3,自带 TinyPro 中后台管理系统

华为最新发布的前端 UI 组件库&#xff0c;支持 PC 和移动端&#xff0c;自带了 admin 后台系统&#xff0c;完成度很高&#xff0c;web 项目开发又多一个选择。 关于 OpenTiny 和 TinyVue 在上个月结束的华为开发者大会2023上&#xff0c;官方正式进行发布了 OpenTiny&#…

使用VSCode SSH实现公网远程连接本地服务器开发的详细教程

文章目录 前言1、安装OpenSSH2、vscode配置ssh3. 局域网测试连接远程服务器4. 公网远程连接4.1 ubuntu安装cpolar内网穿透4.2 创建隧道映射4.3 测试公网远程连接 5. 配置固定TCP端口地址5.1 保留一个固定TCP端口地址5.2 配置固定TCP端口地址5.3 测试固定公网地址远程 前言 远程…

【二分】搜索旋转数组

文章目录 不重复数组找最小值&#xff0c;返回下标重复数组找最小值&#xff0c;返回下标不重复数组找target&#xff0c;返回下标重复数组找target&#xff0c;返回bool重复数组找target&#xff0c;返回下标 不重复数组找最小值&#xff0c;返回下标 class Solution {public …

Windows下 MySql通过拷贝data目录迁移数据库的方法

MySQL数据库的文件目录下图所示&#xff0c; 现举例说明通过COPY文件夹data下数据库文件&#xff0c;进行数据拷贝的步骤&#xff1b;源数据库运行在A服务器上&#xff0c;拷贝到B服务器&#xff0c;假定B服务器上MySQL数据库已经安装完成&#xff0c;为空数据库。 首先进入A服…

华为云渲染实践

// 编者按&#xff1a;云计算与网络基础设施发展为云端渲染提供了更好的发展机会&#xff0c;华为云随之长期在自研图形渲染引擎、工业领域渲染和AI加速渲染三大方向进行云渲染方面的探索与研究。本次LiveVideoStackCon 2023上海站邀请了来自华为云的陈普&#xff0c;为大家分…

百度“AI智障”到AI智能体验之旅

目录 前言一、百度PLATO1.抬杠第一名2.听Ta瞎扯淡3.TA当场去世了4.智障与网友的高光时刻 二、文心一言1.设计测试用例2.随意发问3.手机端约会神器 三、体验总结&#xff1a;四、千帆大模型 前言 最近收到了文心一言3.5大模型的内测资格&#xff0c;正巧之前也体验过它的前身&q…