深入探索生产者拦截器的使用以及源码设计

文章目录

  • 一、介绍
  • 二、使用
    • 1. ProducerInterceptor接口
    • 2. 实现之统计
    • 3. 实现之二次处理
    • 4. 小结
  • 三、实现原理
    • 1. 初始化流程
    • 2. 生效流程
  • 四、总结

一、介绍

在软件设计中,为了方便能够应对不同的场景,一般在一些容易有差异的环节会考虑允许用户自定义逻辑,拦截器就是其中的一种实现方式,像Spring、Kafka、Pulsar等都支持这种方式。流程简化起来就如下图,客户端跟服务端的写消息请求和接收请求都要先通过一遍拦截器,因此用户都过自定义拦截器逻辑就能以一种无侵入、规范化的方式来改动消息发送以及处理响应的行为。
在这里插入图片描述

二、使用

1. ProducerInterceptor接口

ProducerInterceptor是Pulsar提供的接口,通过实现该接口用户可以在消息发送和发送成功阶段注入自定义的逻辑来扩展Pulsar客户端的能力,进而优雅的解决某些场景的问题。

@InterfaceAudience.Public
@InterfaceStability.Stable
public interface ProducerInterceptor extends AutoCloseable {

    void close();

    boolean eligible(Message message);

    Message beforeSend(Producer producer, Message message);

    void onSendAcknowledgement(
            Producer producer, Message message, MessageId msgId, Throwable exception);
  
    default void onPartitionsChange(String topicName, int partitions) {
    }
}

这里针对这五个方法大概介绍下

  • close:由于该接口实现了AutoCloseable,因此也要定义生产者关闭时要释放的资源,如果没有就空着
  • eligible:判断拦截器针对那些消息生效,默认false不生效。这个相当于Java8 Stream里的filter,属于职责分离的设计
  • beforeSend:在每条消息要发送时会调用此方法,因此如果在发送前想做点什么可以考虑在这里实现
  • onSendAcknowledgement:在每条消息消息发送服务端响应后(无论成功失败)会调用此方法
  • onPartitionsChange:在分区数有变动的时候会调用这里的逻辑。这是3.2版本新加的逻辑,2.8以及之前的版本没有此接口

2. 实现之统计

这里对生产者累计发送的消息条数进行统计,实现逻辑如下

public class SherlockCountProducerInterceptor implements ProducerInterceptor {

    private AtomicLong count = new AtomicLong(1);

    @Override
    public void close() {

    }

    @Override
    public boolean eligible(Message message) {
        return true;
    }

    @Override
    public Message beforeSend(Producer producer, Message message) {
        System.out.println("累计发送消息条数:"+count.getAndIncrement());
        return message;
    }

    @Override
    public void onSendAcknowledgement(Producer producer, Message message, MessageId msgId, Throwable exception) {

    }
}

逻辑比较简单,其实就是通过一个计数器在每次发送时进行加1即可,并且在eligible中返回true也就是对所有发送的消息都生效,每条消息在发送前都会调用一次beforeSend方法进行自增操作并打印出来。

现在拦截器的逻辑已经定义好了,接下来怎么使用呢,请继续往下看

public static void customInterceptorProducer() throws Exception {
        String serverUrl = "http://localhost:8080";
        PulsarClient pulsarClient =
                PulsarClient.builder().serviceUrl(serverUrl).build();

        Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
                .topic("sherlock-api-tenant-1/sherlock-namespace-1/partition_partition_topic_3")
                .intercept(new SherlockCountProducerInterceptor())	//拦截器生效逻辑
                .create();

        for (int i = 0; i < 200; i++) {
            producer.send("hello java API pulsar:"+i+", 当前时间为:"+new Date());
        }

        producer.close();
        pulsarClient.close();
    }

上述就是使用拦截器的case,通过这种方式就能轻松的定义所需要注入的逻辑。上述代码执行后输出如下
在这里插入图片描述

可以看到我们通过拦截器完成了消息发送的统计功能,可以发散设想一想,像根据不同key进行分组统计、统计某个时间段消息发送失败的条数等功能也同样可以通过拦截器实现。

3. 实现之二次处理

实现统计感觉还不得劲,再折腾一个。假设咱们的生产者中会发送很多地区的消息,这些消息有些是中国的,有些是新加坡的,有些是巴西的,这个时候它们的时间就有歧义了,因为不同时区的时间是有差异的,那咱们尝试用拦截器来实现一下

public class SherlockAdapterTimeProducerInterceptor implements ProducerInterceptor {
    @Override
    public void close() {

    }

    @Override
    public boolean eligible(Message message) {
//        if ("V3".equals(String.valueOf(message.getSchemaVersion()))) {
//            return true;
//        }
        if ("Singapore".equals(message.getKey())) {
            System.out.println("这条消息是新加坡地区的,进行处理!");
            return true;
        }
        System.out.println("这条消息是中国地区的,不进行处理!");
        return false;
    }

    @Override
    public Message beforeSend(Producer producer, Message message) {
        System.out.println("拦截到一条新加坡地区的消息,现在进行处理,消息内容为:"+message.getValue());
        return message;
    }

    @Override
    public void onSendAcknowledgement(Producer producer, Message message, MessageId msgId, Throwable exception) {

    }
}

上面就是demo,继续将这个拦截器应用于生产者

 public static void customInterceptorProducer() throws Exception {
        String serverUrl = "http://localhost:8080";
        PulsarClient pulsarClient =
                PulsarClient.builder().serviceUrl(serverUrl).build();

        Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
                .topic("sherlock-api-tenant-1/sherlock-namespace-1/partition_partition_topic_3")
                .intercept(new SherlockAdapterTimeProducerInterceptor())
                .create();

        producer.newMessage().key("China").value("下单动作").send();
        producer.newMessage().key("Singapore").value("收藏动作").send();
        producer.newMessage().key("China").value("取消动作").send();
        producer.newMessage().key("Singapore").value("订阅动作").send();

        producer.close();
        pulsarClient.close();
    }

执行可以看到下面的输出
在这里插入图片描述

通过输出的结果可以分析出来eligible的逻辑是生效的,针对新加坡地区的消息会进行处理,而中国的消息保持不变,所有地区的时间通过此拦截器来统一成东八区的时间。

4. 小结

通过上述例子可以看到我们可以通过拦截器实现任意的逻辑,但是这里需要注意的是,拦截器里面尽量不要放过多的逻辑,因为这可能会影响生产者发送消息的速度,并且也容易造成处理逻辑的分散。拦截器最好是做一些校验、适配、状态记录等一些需要前置完成并且轻量级的操作。

三、实现原理

1. 初始化流程

通过使用我们可以看到在创建生产者对象是只要通过.intercept方法传入拦截器对象即可生效,那么我们就先通过这个方法来看看实现逻辑

ProducerBuilder<T> intercept(org.apache.pulsar.client.api.interceptor.ProducerInterceptor... interceptors);

public ProducerBuilder<T> intercept(ProducerInterceptor... interceptors) {
  if (this.interceptorList == null) {
    this.interceptorList = new ArrayList();
  }

  this.interceptorList.addAll(Arrays.asList(interceptors));
  return this;
}

通过代码跟踪可以看到ProducerBuilderImpl方法中会先将拦截器对象集合赋值给自己的成员变量,也就是它先保存一份在后面使用。在最终调用create方法来创建Producer时,最终会走到该类的createAsync方法,核心逻辑如下

    public CompletableFuture<Producer<T>> createAsync() {
						....
            return this.interceptorList != null && this.interceptorList.size() != 0 ? this.client.createProducerAsync(this.conf, this.schema, new ProducerInterceptors(this.interceptorList)) : this.client.createProducerAsync(this.conf, this.schema, (ProducerInterceptors)null);
        }
    }

如果用户通过.intercept方法传入了自定义的拦截器,则会调用PulsarClientImpl带有拦截器对象的构造方法

 public <T> CompletableFuture<Producer<T>> createProducerAsync(ProducerConfigurationData conf, Schema<T> schema, ProducerInterceptors interceptors) {
   			....
        //这个方法的核心逻辑就这一行,继续往下跟踪
				return this.createProducerAsync(topic, conf, schema, interceptors);
    }


    private <T> CompletableFuture<Producer<T>> createProducerAsync(String topic, ProducerConfigurationData conf, Schema<T> schema, ProducerInterceptors interceptors) {
      	....
        //同理,核心逻辑就这一行
        producer = this.newProducerImpl(topic, -1, conf, schema, interceptors, producerCreatedFuture);
    }

    protected <T> ProducerImpl<T> newProducerImpl(....) {
        return new ProducerImpl(this, topic, conf, producerCreatedFuture, partitionIndex, schema, interceptors);
    }


    public ProducerImpl(PulsarClientImpl client, String topic, ProducerConfigurationData conf, CompletableFuture<Producer<T>> producerCreatedFuture, int partitionIndex, Schema<T> schema, ProducerInterceptors interceptors) {
      	//只有这里有用到,继续跟踪
        super(client, topic, conf, producerCreatedFuture, schema, interceptors);
      	....
    }

protected ProducerBase(PulsarClientImpl client, String topic, ProducerConfigurationData conf, CompletableFuture<Producer<T>> producerCreatedFuture, Schema<T> schema, ProducerInterceptors interceptors) {
        ....
        //对父类的成员变量进行赋值
        this.interceptors = interceptors;
    }

通过上面的代码跟踪我们可以知道,当我们通过拦截器创建的Producer对象,它是有在内部维护一个ProducerInterceptors对象来存储我们所指定的拦截器集合的逻辑
在这里插入图片描述

那么我们来看看ProducerInterceptors的实现

public class ProducerInterceptors implements Closeable {
		....
    private final List<ProducerInterceptor> interceptors;	//存储拦截器集合逻辑
		....
    //在消息发送前进行触发
    public Message beforeSend(Producer producer, Message message) {
        Message interceptorMessage = message;
        for (ProducerInterceptor interceptor : interceptors) {
          	//调用拦截器的eligible方法来判断是否要对当前这条消息进行拦截处理,这个就是咱们上面实现的eligible接口
            if (!interceptor.eligible(message)) {
                continue;
            }
            try {
              	//循环调用拦截器集合里的每个拦截器对这条消息进行处理
                interceptorMessage = interceptor.beforeSend(producer, interceptorMessage);
            } catch (Throwable e) {
                ....
            }
        }
        return interceptorMessage;
    }

  	//逻辑跟beforeSend基本一致
    public void onSendAcknowledgement(Producer producer, Message message, MessageId msgId, Throwable exception) {
        for (ProducerInterceptor interceptor : interceptors) {
            if (!interceptor.eligible(message)) {
                continue;
            }
            try {
                interceptor.onSendAcknowledgement(producer, message, msgId, exception);
            } catch (Throwable e) {
                log.warn("Error executing interceptor onSendAcknowledgement callback ", e);
            }
        }
    }

    public void onPartitionsChange(String topicName, int partitions) {
        for (ProducerInterceptor interceptor : interceptors) {
            try {
                interceptor.onPartitionsChange(topicName, partitions);
            } catch (Throwable e) {
                log.warn("Error executing interceptor onPartitionsChange callback ", e);
            }
        }
    }

    @Override
    public void close() throws IOException {
        for (ProducerInterceptor interceptor : interceptors) {
            try {
                interceptor.close();
            } catch (Throwable e) {
                log.error("Fail to close producer interceptor ", e);
            }
        }
    }
}

通过上述逻辑可以看到ProducerInterceptors本质上就是个批量管理对象,符合高内聚低耦合的设计,解耦了业务逻辑循环处理的逻辑,将这些循环处理的逻辑都封装在ProducerInterceptors类里面,然后ProducerInterceptors仅对外提供触发某几个动作的api,业务只需要在哪个阶段调用这些api即可。

2. 生效流程

在生产者消息发送阶段,最终都会走到ProducerImpl类的internalSendAsync方法,可以看到这里会调用拦截器进行处理

CompletableFuture<MessageId> internalSendAsync(Message<?> message) {
  //核心方法,跟踪进去
	MessageImpl<?> interceptorMessage = (MessageImpl) beforeSend(message);
	....
}

protected Message<?> beforeSend(Message<?> message) {
  if (interceptors != null) {
    //如果配置了拦截器则调用ProducerInterceptors类的beforeSend方法
    return interceptors.beforeSend(this, message);
  } else {
    //如果没有配置拦截器则直接返回原消息
    return message;
  }
}

这是消息发送的处理逻辑,那如果是再消息发送结束后触发呢?一起来跟踪看下吧,首先还是从ProducerImp类的internalSendAsync方法开始看

 @Override
CompletableFuture<MessageId> internalSendAsync(Message<?> message) {
    sendAsync(interceptorMessage, new SendCallback() {
						....
            @Override
            public void sendComplete(Exception e) {
                try {
                    if (e != null) {
                        stats.incrementSendFailed();
                      	//从这里跟踪进去看看
                        onSendAcknowledgement(interceptorMessage, null, e);
                        future.completeExceptionally(e);
                    } else {
                        onSendAcknowledgement(interceptorMessage, interceptorMessage.getMessageId(), null);
                        future.complete(interceptorMessage.getMessageId());
                        stats.incrementNumAcksReceived(System.nanoTime() - createdAt);
                    }
                } finally {
                    interceptorMessage.getDataBuffer().release();
                }
  
}
      

protected void onSendAcknowledgement(Message<?> message, MessageId msgId, Throwable exception) {
    if (interceptors != null) {
      	//可以看到最终也是调用的ProducerInterceptors类的onSendAcknowledgement方法
       	interceptors.onSendAcknowledgement(this, message, msgId, exception);
    }
}

这里的设计是异步回调的方式,将调用拦截器处理逻辑封装成参数传给下一层,在消息发送完成后再调用参数里指定的回调逻辑。那么什么时候触发呢,由于Pulsar客户端跟服务端是通过Netty的TCP通信的,因此直接看看PulsarDecoder的channelRead方法

public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
		....
    switch (cmd.getType()) {
        
       	....
        case PRODUCER_SUCCESS:
        	//写入消息被Broker处理后会忘生产者客户端通过TCP发送一条PRODUCER_SUCCESS类型的消息也就是这里,跟踪进去看看处理逻辑
          checkArgument(cmd.hasProducerSuccess());
          handleProducerSuccess(cmd.getProducerSuccess());
          break;
    }
}


protected void handleProducerSuccess(CommandProducerSuccess success) {
        ....
        //生产者会在队列维护每条未被ack的写入请求消息,在Broker ack时会从这个队列中移除并获取回调处理逻辑
        CompletableFuture<ProducerResponse> requestFuture =
                (CompletableFuture<ProducerResponse>) pendingRequests.remove(requestId);
        if (requestFuture != null) {
            ProducerResponse pr = new ProducerResponse(success.getProducerName(),
                    success.getLastSequenceId(),
                    success.getSchemaVersion(),
                    success.hasTopicEpoch() ? Optional.of(success.getTopicEpoch()) : Optional.empty());
          	//调用回调逻辑
            requestFuture.complete(pr);
        } else {
            ....
        }
    }

四、总结

通过使用和跟踪原理,我们对Pulsar生产者拦截器有了进一步的认识,除了生产者拦截器,Pulsar还支持Broker侧以及Bookkeeper侧的拦截器,这些放到后面再跟大家一起学习。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/556922.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

相机1:如何系相机肩带

开始解锁新领域&#xff0c;多看几个相关视频&#xff0c;大概也就可以掌握一两种系相机肩带的方法&#xff0c;本质就是新知识的学习过程&#xff0c;不可能等着或者期待出来一个完整的教程&#xff0c;一步一步自己去探索&#xff0c;自己去查资料。 目录 总述 第一步&#…

仓库管理系统 warehouse

系统概要 仓库管理系统总共分为两个大的模块&#xff0c;分别是系统模块和业务模块。其中系统模块和业务模块底下又有其子模块。 功能模块 一、业务模块 1、客户管理 客户列表 客户分页和模糊查询 客户添加、修改、删除、批量删除 2、供应商管理 供应商列表 供应商分页和模糊查…

XiaodiSec day017 Learn Note 小迪渗透学习笔记

XiaodiSec day017 Learn Note 小迪渗透学习笔记 记录得比较凌乱&#xff0c;不尽详细 day 17 主要内容&#xff1a; php 框架 thinkPHPyiilaravel 使用 fofa 搜索 thinkphp 市面上 thinkphp5 版本较多 url 结构 域名/.php(文件名)/index(目录)/index(函数名)模块名-控…

Pyjion:一个解决Python性能瓶颈的利器!

在Python编程领域&#xff0c;性能一直是一个备受关注的话题。随着项目规模的增长和需求的提升&#xff0c;对代码性能的要求也越来越高。 为了解决这一问题&#xff0c;Python社区不断努力优化解释器和引入新的工具来提升代码执行效率。 其中&#xff0c;Pyjion模块作为一个…

Redis key(BigKey、MoreKey)的存储策略

1. MoreKey 案例 1.1 大批量往 redis 里面 插入2000w 测试数据key (1) Linux Bash 下面执行&#xff0c;插入 100w rootspray:~# for((i1;i<100*10000;i)); do echo "set k$i v$i" >> /tmp/redisTest.txt; done; 查看 rootspray:~# more /tmp/redisTest.…

SpringBoot多数据源(五)

SpringBoot多数据源-集成多个mybatis框架 1.基本框架2.使用2.1项目结构2.2 依赖导入2.3 application.yml配置2.4 创建读与写的SqlSessionFactoryBean 3.总结 1.基本框架 通过启动多个SqlSessionFactoryBean&#xff0c;每个SqlSessionFactoryBean对应一个datasource和指定位置的…

【计算机毕业设计】点餐平台网站——后附源码

&#x1f389;**欢迎来到琛哥的技术世界&#xff01;**&#x1f389; &#x1f4d8; 博主小档案&#xff1a; 琛哥&#xff0c;一名来自世界500强的资深程序猿&#xff0c;毕业于国内知名985高校。 &#x1f527; 技术专长&#xff1a; 琛哥在深度学习任务中展现出卓越的能力&a…

吴恩达2022机器学习专项课程(一) 6.1 动机第三周课后实验:Lab1使用逻辑回归进行分类

问题预览/关键词 回归和分类的区别&#xff1f;逻辑回归的作用是&#xff1f;什么是二分类问题&#xff1f;二分类问题案例如何表达二分类的结果&#xff1f;逻辑回归通常用哪种表达形式&#xff1f;什么是正样本和负样本&#xff1f;什么是阈值&#xff1f;可视化线性回归解决…

什么台灯对眼睛好?揭秘央视315推荐的护眼灯

目前很多家长都纠结这个问题&#xff0c;那就是孩子上学以后要怎么保护眼睛&#xff0c;晚上写作业用什么台灯对比较好一点&#xff1f;我建议最好选择一款合格、专业的护眼台灯&#xff0c;因为市面上大多数台灯都是没有专业光源技术&#xff0c;甚至部分廉价台灯还会使用低成…

Linux 网络基本命令

一、查看网络信息 ifconfig 二、关闭网络 ifdown ens33 (有的电脑不一定是ens33&#xff0c;具体看上图画线的地方) 三、开启网络 ifup ens33

【Canvas与艺术】绘制绿圈三红五星Premium Quality标志

【说明】 构图简约但美观。 【成图】 【代码】 <!DOCTYPE html> <html lang"utf-8"> <meta http-equiv"Content-Type" content"text/html; charsetutf-8"/> <head><title>使用HTML5/Canvas绘制绿圈三红五星Prem…

JavaWeb--前端工程化

目录 1. 前端工程化 1.1. 概述 1.2. 前端工程化实现技术栈 2. ECMA6Script 2.1. es6的介绍 2.2. es6 变量 / 模版字符串 2.2.1. let 与 var 的差别 2.2.2. const 与 var 的差异 2.2.3. 模板字符串 2.3. 解构表达式 / 赋值 2.3.1. 数组解构赋值 2.3.2. 对象解构赋值 …

2024年分享酷我音乐如何下载mp3的方法

这里教大家用酷我音乐小程序的下载方法,小程序下载资源的方法有3种 1.利用专业的抓包工具(Fiddler/Charles)进行获取,然后分析数据包,最后直接用下载器下载分析出来的链接。强烈不推荐,因为大部分人并非程序员出身 2.录屏,录屏效率太慢,所以也不推荐 3. 利用专门的下载资源的…

微纤维眼镜清洁布的革命性进化

在日常生活中&#xff0c;眼镜是许多人不可或缺的日常用品&#xff0c;无论是视力矫正还是防护眼睛免受阳光的伤害。然而&#xff0c;眼镜的清洁常常是一个令人头疼的问题&#xff0c;特别是在面对指纹、灰尘和其他污垢时。传统的清洁方法往往需要化学清洁剂&#xff0c;不仅繁…

【css】select实现placeholder效果

场景&#xff1a;使用select下拉选择框的时候&#xff0c;需要像其他控件一样提示默认信息。 问题&#xff1a;表单控件select没有placeholder属性。 解决方案&#xff1a;通过css实现&#xff0c;不需要js <style>select > option[disabled]{ color:#999;cursor: n…

如何用Python构建一个生产级别的电影推荐系统 - 机器学习手册

构建项目是彻底学习概念并发展必要技能的最有效方式之一。 项目使您沉浸在现实世界的问题解决中&#xff0c;巩固您的知识&#xff0c;并培养批判性思维、适应能力和项目管理专业知识。 本指南将带您逐步构建一个根据用户喜好量身定制的电影推荐系统。我们将利用一个庞大的包…

Homebrew安装与卸载

卸载 /bin/bash -c "$(curl -fsSL https://gitee.com/ineo6/homebrew-install/raw/master/uninstall.sh)"安装 /bin/bash -c "$(curl -fsSL https://gitee.com/ineo6/homebrew-install/raw/master/install.sh)"1、复制命令到命令行执行&#xff0c;输入1…

多模态中的视觉编码器clip以及输入分辨率

在多模态的视觉编码主干中&#xff0c;若采用分类的backbone效果很差&#xff0c;经过语义对齐的backbone&#xff0c;比如clip的vit&#xff0c;效果则好很多。 1.Cogvlm中的EVA2-CLIP-E&#xff0c;VIT中最后一层被移除&#xff0c;4.4B&#xff0c;支持分辨率为334/490. 2.…

[源码分享]基于Unity的Live2D虚拟人物——结合了GPT、Azure、情绪识别和口型同步,也可以集合苹果Vision Pro做成3D的形象

# 技术文档 ## 1 项目简介 ### 项目目录 ``` Assets ├─ Animator // 动画 ├─ Code // 代码 │ ├─ AI // AI 模块 │ │ ├─ LM // 语言模型模块 │…

Python爬虫数据可视化分析

Python爬虫用于从网络上获取数据&#xff0c;数据可视化分析则是将获取的数据进行可视化展示和分析&#xff0c;帮助我们更好地理解数据、发现规律、做出决策。下面是一个基本的Python爬虫数据可视化分析的流程&#xff1a; 步骤一&#xff1a;数据爬取 1.选择合适的爬虫工具&a…