小编最近需要用到消息中间件,有需要要复习一下以前的东西,有需要的自取,强调一点,如果真的想了解透彻,一定要动手,脑袋会了不代表就会写了
Kafka是由Scala和Java编写。Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者在网站中的所有动作流数据。 这种动作(网页浏览,搜索和其他用户的行动)是在现代网络上的许多社会功能的一个关键因素。 这些数据通常是由于吞吐量的要求而通过处理日志和日志聚合来解决。 对于像Hadoop一样的日志数据和离线分析系统,但又要求实时处理的限制,这是一个可行的解决方案。Kafka的目的是通过Hadoop的并行加载机制来统一线上和离线的消息处理,也是为了通过集群来提供实时的消息
Kafka 的特性
高吞吐、低延迟:kakfa 最大的特点就是收发消息非常快,kafka 每秒可以处理几十万条消息,它的最低延迟只有几毫秒。
高伸缩性: 每个主题(topic) 包含多个分区(partition),主题中的分区可以分布在不同的主机(broker)中。
持久性、可靠性: Kafka 能够允许数据的持久化存储,消息被持久化到磁盘,并支持数据备份防止数据丢失,Kafka 底层的数据存储是基于 Zookeeper 存储的,Zookeeper 我们知道它的数据能够持久存储。
容错性: 允许集群中的节点失败,某个节点宕机,Kafka 集群能够正常工作
高并发: 支持数千个客户端同时读写
Kafka 的使用场景
活动跟踪:Kafka 可以用来跟踪用户行为,比如我们经常回去淘宝购物,你打开淘宝的那一刻,你的登陆信息,登陆次数都会作为消息传输到 Kafka ,当你浏览购物的时候,你的浏览信息,你的搜索指数,你的购物爱好都会作为一个个消息传递给 Kafka ,这样就可以生成报告,可以做智能推荐,购买喜好等。
传递消息:Kafka 另外一个基本用途是传递消息,应用程序向用户发送通知就是通过传递消息来实现的,这些应用组件可以生成消息,而不需要关心消息的格式,也不需要关心消息是如何发送的。
度量指标:Kafka也经常用来记录运营监控数据。包括收集各种分布式应用的数据,生产各种操作的集中反馈,比如报警和报告。
日志记录:Kafka 的基本概念来源于提交日志,比如我们可以把数据库的更新发送到 Kafka 上,用来记录数据库的更新时间,通过kafka以统一接口服务的方式开放给各种consumer,例如hadoop、Hbase、Solr等。
流式处理:流式处理是有一个能够提供多种应用程序的领域。
限流削峰:Kafka 多用于互联网领域某一时刻请求特别多的情况下,可以把请求写入Kafka 中,避免直接请求后端程序导致服务崩溃。
消息:
Kafka中的数据单元被称为消息,也被称为记录,可以把它看作数据库表中某一行的记录。
批次:为了提高效率,消息会分批次写入Kafka,批次就代指的是一组消息。
主题:
消息的种类称为主题(Topic),可以说一个主题代表了一类消息。相当于是对消息进行分类。主题就像是数据库中的表。
分区:
主题可以被分为若干个分区(partition),同一个主题中的分区可以不在一个机器上,有可能会部署在多个机器上,
由此来实现 kafka 的伸缩性,单一主题中的分区有序,但是无法保证主题中所有的分区有序
生产者:
向主题发布消息的客户端应用程序称为生产者(Producer),生产者用于持续不断的向某个主题发送消息。
消费者:
订阅主题消息的客户端程序称为消费者(Consumer),消费者用于处理生产者产生的消息。
消费者群组:
生产者与消费者的关系就如同餐厅中的厨师和顾客之间的关系一样,一个厨师对应多个顾客,也就是一个生产者对应多个消费者,
消费者群组(Consumer Group)指的就是由一个或多个消费者组成的群体。
偏移量:
偏移量(Consumer Offset)是一种元数据,它是一个不断递增的整数值,用来记录消费者发生重平衡时的位置,以便用来恢复数据。
broker:
一个独立的 Kafka 服务器就被称为 broker,broker 接收来自生产者的消息,为消息设置偏移量,并提交消息到磁盘保存。
broker 集群:
broker 是集群 的组成部分,broker 集群由一个或多个 broker 组成,每个集群都有一个 broker 同时充当了集群控制器的角色(自动从集群的活跃成员中选举出来)。
副本:Kafka 中消息的备份又叫做 副本(Replica),副本的数量是可以配置的,Kafka 定义了两类副本:领导者副本(Leader Replica) 和 追随者副本(Follower Replica),前者对外提供服务,后者只是被动跟随。
重平衡:Rebalance。消费者组内某个消费者实例挂掉后,其他消费者实例自动重新分配订阅主题分区的过程。Rebalance 是 Kafka 消费者端实现高可用的重要手段。
安装
下载地址
librdkafka获取地址:https://github.com/edenhill/librdkafka
kafka获取地址:https://github.com/arnaud-lb/php-rdkafka
安转java环境
下载地址:https://www.oracle.com/java/technologies/javase/javase-jdk8-downloads.html
下载解压完成之后,设置系统变量:path(路径为:安装目录/bin)
设置环境变量:JAVA_HOME(路径为:安装目录\bin)
查看是否安装成功:java -version
Zookeeper安装
下载地址:https://www.apache.org/dyn/closer.lua/zookeeper/zookeeper-3.6.4/apache-zookeeper-3.6.4-bin.tar.gz
设置环境变量:path(路径为:安装目录\bin)
新建data文件夹,新建logs文件夹
config文件夹:zoo_sample.cfg 新复制一个:zoo.cfg
编辑zoo.cfg文件:
新增(配置路径【一定要配置\\,要不然不识别】:安装路径\\zookeeper\\apache-zookeeper-3.6.4-bin\\):
dataDir= 安装路径\zookeeper\apache-zookeeper-3.6.4-bin\data
dataLogDir=安装路径\zookeeper\apache-zookeeper-3.6.4-bin\log
audit.enable=truezookeeper/conf/zoo.cfg 参数详解
tickTime=2000:
这个时间是作为 Zookeeper 服务器之间或客户端与服务器之间维持心跳的时间间隔,也就是每个 tickTime 时间就会发送一个心跳,单位是毫秒
initLimit=10:
这个配置项是用来配置 Zookeeper 接受客户端(这里所说的客户端不是用户连接 Zookeeper 服务器的客户端,而是 Zookeeper 服务器集群中连接到 Leader 的 Follower 服务器)初始化连接时最长能忍受多少个心跳时间间隔数。当已经超过 5个心跳的时间(也就是 tickTime)长度后 Zookeeper 服务器还没有收到客户端的返回信息,那么表明这个客户端连接失败。总的时间长度就是 5*2000=10 秒,10秒内要启动集群并出现leader和floower。
syncLimit=5:
这个配置项标识 Leader 与Follower 之间发送消息,请求和应答时间长度,最长不能超过多少个 tickTime 的时间长度,总的时间长度就是5*2000=10秒,超出时间认为是死机。
dataDir:
快照日志的存储路径
dataLogDir:
事物日志的存储路径,如果不配置这个那么事物日志会默认存储到dataDir制定的目录,这样会严重影响zk的性能,当zk吞吐量较大的时候,产生的事物日志、快照日志太多
clientPort=12181:
这个端口就是客户端连接 Zookeeper 服务器的端口,Zookeeper 会监听这个端口,接受客户端的访问请求。修改他的端口改大点
启动zkServer
启动目录:\bin\zkServer.sh
启动命令:zkServer.sh start
查找看到:binding to port .0.0.0.0/0.0.0.2181能表示成功
安装Scala
下载地址:https://downloads.lightbend.com/scala/2.11.12/scala-2.11.12.msi
安装:一键安装(一直next,直到完成)
配置环境变量(这个需要配置):安装目录/bin
判断是否安装完成:scala -version
Kafka安装
下载地址:https://kafka.apache.org/downloads
Kafka安装目录下新建目录logs
编辑config\server.properties文件
log.dirs=安装目录\\logs(注意双斜线,如果是cmd命令出现命令行太长,那就把Kafka安装安装在磁盘的最外面,D盘的最外层)
新增参数:listeners=PLAINTEXT://localhost:9092
启动
一定要先启动zookeeper(命令:zkServer)
然后启动kafka(命令(cmd进入Kafka安装目录):.\bin\windows\kafka-server-start.bat .\config\server.properties)
查找看到:from now on will use node localhost:9092
能表示成功(如果启动不了,删除logs文件夹下的文件)
操作:
创建topics(主题):
kafka-topics.bat --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test
查看主题:
kafka-topics.bat --bootstrap-server localhost:9092 --list
生产者:
cmd进入:安装目录\bin\windows
打开生产者:kafka-console-producer.bat --broker-list localhost:9092 --topic test
消费者:
cmd进入:安装目录\bin\windows
打开消费者:kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic test --from-beginning
测试:
生产者发送消息,消费者订阅收到消息
注意:
1.一共打开四个窗口,1.zookeeper 2.kafka 3.生产者 4.消费者 (注意,这四个窗口不能关闭,要一直开着)
2.一定是生产者生产消息,消费者才会收到消息(注意,生产者和消费者的topics一定要是一样的,要不然收不到消息)
方法
getOutQlen方法
使用方法:$producer->getOutQLen();
作用:
1.用于获取生产者(Producer)内部队列中等待发送到Kafka broker的消息数量。
2.getOutQLen() 方法允许你查询这个内部队列中当前待发送的消息数量。通常用于监控和调试目的,帮助了解生产者的发送速率和队列积压情况
3.getOutQLen() 返回的是近似值,它可能在调用之间发生变化
输出数据:
int(0)
poll方法
使用方法:
while ($producer->getOutQLen() > 0) {
$producer->poll(1);
}
作用:
用于从Kafka集群中拉取消息,当消费者调用poll()方法时,如果在规定的时间内没有收到任何消息,它会立即返回,并且没有任何消息被拉取到(轮询一次就相当于拉取一定时间段broker中可消费的数据)
flush方法
使用方法:$producer->flush(10000);
作用:将生产者内部缓冲区中的消息强制发送到Kafka broker的过程。
consumerstart方法
使用方法:$topic->consumeStart(0, RD_KAFKA_OFFSET_STORED);
作用:在Kafka中,consumerstart方法是用于启动消费者线程并开始从Kafka集群中拉取消息的方法。
Consume方法
使用方法:$topic->consume(0, 120*10000);
作用:是指Kafka消费者从Kafka集群中读取消息的过程。首先需要从集群中先拉取数据
Purge方法
使用方法:$producer->purge(RD_KAFKA_PURGE_F_QUEUE);
作用:是指清除已完成或已过期的请求,以释放缓存资源。
initTransactions方法
作用:用于初始化一个事务。Kafka从0.11.0.0版本开始支持事务性生产者API,允许生产者将多个消息组合成一个事务,确保这些消息被原子性地写入Kafka。这意味着要么所有消息都成功写入,要么都不写入,保证了消息的一致性
beginTransaction方法
作用:用于开始一个新的生产者事务。Kafka从0.11.0.0版本开始支持事务性生产者API,它允许你将多个消息组合成一个事务,确保这些消息被原子性地写入Kafka。这意味着要么所有消息都成功写入,要么都不写入,这保证了消息的一致性
commitTransaction方法
作用:它用于提交一个事务。当你使用 Kafka 的事务性生产者 API 时,你可以将一系列的消息发送操作组合成一个原子性的事务。这意味着这些操作要么全部成功,要么全部失败,从而确保数据的一致性和顺序性
abortTransaction方法
作用:中止事务,当发送消息或提交事务过程中发生错误时使用
getMetadata方法
使用方法:$producer->getMetadata(false, $topic, 10*1000);
作用:
用于获取Kafka集群的元数据
获取数据包括:1.主题(topics)2.分区(partitions)3.副本(replicas)4.ISR(In-Sync Replicas)等信息。
通常,客户端库(如PHP的php-kafka)会在初始化时或需要时自动执行此操作,以便了解集群的状态和可用主题。
代码:
class Kafka extends CI_Controller {
//定义变量(分区)
private $borker_list = "";
//定义变量(配置)
private $conf = "";
//定义变量(主题)
private $topics = "";
//定义变量(分组)
private $topics_group = "";
//构造
public function __construct(){
parent::__construct();
//初始化数据
$this->borker_list = "localhost:9092";
$this->topics = "test";
$this->topics_group = "test-group";
}
//消息生产者
public function producter(){
//初始化
$conf= new RdKafka\Conf();
//设置分区
$conf->set('metadata.broker.list', $this->borker_list);
//初始化生产者
$producer = new RdKafka\Producer($conf);
//设置主题
$topic = $producer->newTopic($this->topics);
//产生信息
for ($i = 0; $i < 10; $i++) {
$topic->produce(RD_KAFKA_PARTITION_UA, 0, "Message $i");
$producer->poll(0);
}
//消息刷新
for ($flushRetries = 0; $flushRetries < 10; $flushRetries++) {
$result = $producer->flush(10000);
if (RD_KAFKA_RESP_ERR_NO_ERROR === $result) {
break;
}
}
//刷新结果
if (RD_KAFKA_RESP_ERR_NO_ERROR !== $result) {
throw new \RuntimeException('Was unable to flush, messages might be lost!');
}
}
//消息订阅者
public function consumer(){
set_time_limit(0);
//初始化
$conf = new RdKafka\Conf();
//设置分区
$conf->set('metadata.broker.list', $this->borker_list);
$conf->set('group.id',$this->topics_group);
//初始化消费者
$rk = new RdKafka\Consumer($conf);
//主题配置
$topicConf = new RdKafka\TopicConf();
$topicConf->set('auto.commit.interval.ms', 100);
$topicConf->set('offset.store.method', 'file');
$topicConf->set('offset.store.path', sys_get_temp_dir());
$topicConf->set('auto.offset.reset', 'smallest');
$topic = $rk->newTopic($this->topics, $topicConf);
$topic->consumeStart(0, RD_KAFKA_OFFSET_STORED);
while (true) {
$message = $topic->consume(0, 120*10000);
switch ($message->err) {
case RD_KAFKA_RESP_ERR_NO_ERROR:
//没有错误打印信息
var_dump($message);
break;
case RD_KAFKA_RESP_ERR__PARTITION_EOF:
echo "等待接收信息\n";
break;
case RD_KAFKA_RESP_ERR__TIMED_OUT:
echo "超时\n";
break;
default:
throw new \Exception($message->errstr(), $message->err);
break;
}
}
}
//获取元数据(包括主题(topics)、分区(partitions)、副本(replicas)和ISR(In-Sync Replicas)等信息)
public function gettest(){
//初始化
$conf= new RdKafka\Conf();
//设置分区
$conf->set('metadata.broker.list', $this->borker_list);
//初始化生产者
$producer = new RdKafka\Producer($conf);
$topic = $producer->newTopic($this->topics);
// $result = $producer->getMetadata(false, $topic, 10*1000);
$result = $producer->getOutQLen();
var_dump($result);die;
}
//获取元数据
public function metadata(){
$conf = new RdKafka\Conf();
$conf->setDrMsgCb(function ($kafka, $message) {
file_put_contents("./xx.log", var_export($message, true), FILE_APPEND);
});
$conf->setErrorCb(function ($kafka, $err, $reason) {
printf("Kafka error: %s (reason: %s)\n", rd_kafka_err2str($err), $reason);
});
$conf->set('group.id', 'myConsumerGroup');
$rk = new RdKafka\Consumer($conf);
$rk->addBrokers("127.0.0.1");
$allInfo = $rk->getMetadata(true, NULL, 60e3);
$topics = $allInfo->getTopics();
//循环输出
foreach ($topics as $topic) {
$topicName = $topic->getTopic();
if ($topicName == "__consumer_offsets") {
continue ;
}
$partitions = $topic->getPartitions();
foreach ($partitions as $partition) {
$topPartition = new RdKafka\TopicPartition($topicName, $partition->getId());
echo "当前的话题:" . ($topPartition->getTopic()) . " - " . $partition->getId() . " - ";
echo "offset:" . ($topPartition->getOffset()) . PHP_EOL;
}
}
}
}