RocketMQ源码 Broker-BrokerStatsManager Broker统计管理组件源码分析

前言

BrokerStatsManager 主要负责对broker端的系统指标进行统计,如QUEUE_GET_NUMS队列获取数量、QUEUE_GET_SIZE队列获取大小指标的 分钟、小时、天级别的统计数据。它针对的所有指标都是使用后台定时调度线程,对统计条目中的数据进行后台统计计算,存储在统计条目中的对应集合里,以便使用。


源码版本:4.9.3

源码架构图

核心数据结构

最核心的是维护了一个数据统计table,key为统计名称,value为统计条目Set。

// Broker统计信息管理
public class BrokerStatsManager {

    // Broker统计调度线程池
    private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl(
        "BrokerStatsThread"));

    // 商业化统计调度线程池
    private final ScheduledExecutorService commercialExecutor = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl(
        "CommercialStatsThread"));

    // 核心数据结构,数据统计table,key为统计名称,value为统计条目Set
    private final HashMap<String, StatsItemSet> statsTable = new HashMap<String, StatsItemSet>();
    // 集群名称
    private final String clusterName;
    // 是否开启队列统计
    private final boolean enableQueueStat;

    // 统计条目跌落大小
    private final MomentStatsItemSet momentStatsItemSetFallSize = new MomentStatsItemSet(GROUP_GET_FALL_SIZE, scheduledExecutorService, log);
    // 统计条目跌落时间
    private final MomentStatsItemSet momentStatsItemSetFallTime = new MomentStatsItemSet(GROUP_GET_FALL_TIME, scheduledExecutorService, log);

}

统计条目集合内部结构,主要维护一个统计条目表, key为统计指标细分key, value为统计指标对象。

// 统计指标集合
public class StatsItemSet {
    // 统计指标表, key为统计指标细分key, value为统计指标对象
    private final ConcurrentMap<String/* key */, StatsItem> statsItemTable =
        new ConcurrentHashMap<String, StatsItem>(128);

    // 统计名称
    private final String statsName;
    private final ScheduledExecutorService scheduledExecutorService;
    private final InternalLogger log;
}

在深入看下统计条目内部结构,维护记录指标值的value、times和记录统计快照的csListMinute、csListHour、csListDay。记录统计快照集合是通过后台线程定时加工出来的,可以在数据行为小结的源码里找到。

// 统计项
public class StatsItem {

    // 统计值
    private final LongAdder value = new LongAdder();
    // 统计次数
    private final LongAdder times = new LongAdder();
    // 统计快照时间-单位分钟
    private final LinkedList<CallSnapshot> csListMinute = new LinkedList<CallSnapshot>();
    // 统计快照时间-单位小时
    private final LinkedList<CallSnapshot> csListHour = new LinkedList<CallSnapshot>();
    // 统计快照时间-单位天
    private final LinkedList<CallSnapshot> csListDay = new LinkedList<CallSnapshot>();

    // 统计名称
    private final String statsName;
    // 统计项名称
    private final String statsKey;
    private final ScheduledExecutorService scheduledExecutorService;
    private final InternalLogger log;
}

核心行为

以下是都有维护和使用上边的内存数据结构的行为的源码 + 注释。


// Broker统计信息管理
public class BrokerStatsManager {


    public static final String QUEUE_PUT_NUMS = "QUEUE_PUT_NUMS";
    public static final String QUEUE_PUT_SIZE = "QUEUE_PUT_SIZE";
    public static final String QUEUE_GET_NUMS = "QUEUE_GET_NUMS";
    public static final String QUEUE_GET_SIZE = "QUEUE_GET_SIZE";
    public static final String TOPIC_PUT_NUMS = "TOPIC_PUT_NUMS";
    public static final String TOPIC_PUT_SIZE = "TOPIC_PUT_SIZE";
    public static final String GROUP_GET_NUMS = "GROUP_GET_NUMS";
    public static final String GROUP_GET_SIZE = "GROUP_GET_SIZE";
    public static final String SNDBCK_PUT_NUMS = "SNDBCK_PUT_NUMS";
    public static final String BROKER_PUT_NUMS = "BROKER_PUT_NUMS";
    public static final String BROKER_GET_NUMS = "BROKER_GET_NUMS";
    public static final String GROUP_GET_FROM_DISK_NUMS = "GROUP_GET_FROM_DISK_NUMS";
    public static final String GROUP_GET_FROM_DISK_SIZE = "GROUP_GET_FROM_DISK_SIZE";
    public static final String BROKER_GET_FROM_DISK_NUMS = "BROKER_GET_FROM_DISK_NUMS";
    public static final String BROKER_GET_FROM_DISK_SIZE = "BROKER_GET_FROM_DISK_SIZE";
    // For commercial
    public static final String COMMERCIAL_SEND_TIMES = "COMMERCIAL_SEND_TIMES";
    public static final String COMMERCIAL_SNDBCK_TIMES = "COMMERCIAL_SNDBCK_TIMES";
    public static final String COMMERCIAL_RCV_TIMES = "COMMERCIAL_RCV_TIMES";
    public static final String COMMERCIAL_RCV_EPOLLS = "COMMERCIAL_RCV_EPOLLS";
    public static final String COMMERCIAL_SEND_SIZE = "COMMERCIAL_SEND_SIZE";
    public static final String COMMERCIAL_RCV_SIZE = "COMMERCIAL_RCV_SIZE";
    public static final String COMMERCIAL_PERM_FAILURES = "COMMERCIAL_PERM_FAILURES";
    public static final String COMMERCIAL_OWNER = "Owner";
    // Message Size limit for one api-calling count.
    public static final double SIZE_PER_COUNT = 64 * 1024;

    public static final String GROUP_GET_FALL_SIZE = "GROUP_GET_FALL_SIZE";
    public static final String GROUP_GET_FALL_TIME = "GROUP_GET_FALL_TIME";
    // Pull Message Latency
    public static final String GROUP_GET_LATENCY = "GROUP_GET_LATENCY";

    /**
     * read disk follow stats
     */
    private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.ROCKETMQ_STATS_LOGGER_NAME);
    private static final InternalLogger COMMERCIAL_LOG = InternalLoggerFactory.getLogger(LoggerName.COMMERCIAL_LOGGER_NAME);

    // Broker统计调度线程池
    private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl(
        "BrokerStatsThread"));

    // 商业化统计调度线程池
    private final ScheduledExecutorService commercialExecutor = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl(
        "CommercialStatsThread"));

    // 核心数据结构,数据统计table,key为统计名称,value为统计条目Set
    private final HashMap<String, StatsItemSet> statsTable = new HashMap<String, StatsItemSet>();
    // 集群名称
    private final String clusterName;
    // 是否开启队列统计
    private final boolean enableQueueStat;

    // 统计条目跌落大小
    private final MomentStatsItemSet momentStatsItemSetFallSize = new MomentStatsItemSet(GROUP_GET_FALL_SIZE, scheduledExecutorService, log);
    // 统计条目跌落时间
    private final MomentStatsItemSet momentStatsItemSetFallTime = new MomentStatsItemSet(GROUP_GET_FALL_TIME, scheduledExecutorService, log);

    public BrokerStatsManager(String clusterName, boolean enableQueueStat) {
        this.clusterName = clusterName;
        this.enableQueueStat = enableQueueStat;
        // 初始化统计条目集合
        if (enableQueueStat) {
            this.statsTable.put(QUEUE_PUT_NUMS, new StatsItemSet(QUEUE_PUT_NUMS, this.scheduledExecutorService, log));
            this.statsTable.put(QUEUE_PUT_SIZE, new StatsItemSet(QUEUE_PUT_SIZE, this.scheduledExecutorService, log));
            this.statsTable.put(QUEUE_GET_NUMS, new StatsItemSet(QUEUE_GET_NUMS, this.scheduledExecutorService, log));
            this.statsTable.put(QUEUE_GET_SIZE, new StatsItemSet(QUEUE_GET_SIZE, this.scheduledExecutorService, log));
        }
        this.statsTable.put(TOPIC_PUT_NUMS, new StatsItemSet(TOPIC_PUT_NUMS, this.scheduledExecutorService, log));
        this.statsTable.put(TOPIC_PUT_SIZE, new StatsItemSet(TOPIC_PUT_SIZE, this.scheduledExecutorService, log));
        this.statsTable.put(GROUP_GET_NUMS, new StatsItemSet(GROUP_GET_NUMS, this.scheduledExecutorService, log));
        this.statsTable.put(GROUP_GET_SIZE, new StatsItemSet(GROUP_GET_SIZE, this.scheduledExecutorService, log));
        this.statsTable.put(GROUP_GET_LATENCY, new StatsItemSet(GROUP_GET_LATENCY, this.scheduledExecutorService, log));
        this.statsTable.put(SNDBCK_PUT_NUMS, new StatsItemSet(SNDBCK_PUT_NUMS, this.scheduledExecutorService, log));
        this.statsTable.put(BROKER_PUT_NUMS, new StatsItemSet(BROKER_PUT_NUMS, this.scheduledExecutorService, log));
        this.statsTable.put(BROKER_GET_NUMS, new StatsItemSet(BROKER_GET_NUMS, this.scheduledExecutorService, log));
        this.statsTable.put(GROUP_GET_FROM_DISK_NUMS, new StatsItemSet(GROUP_GET_FROM_DISK_NUMS, this.scheduledExecutorService, log));
        this.statsTable.put(GROUP_GET_FROM_DISK_SIZE, new StatsItemSet(GROUP_GET_FROM_DISK_SIZE, this.scheduledExecutorService, log));
        this.statsTable.put(BROKER_GET_FROM_DISK_NUMS, new StatsItemSet(BROKER_GET_FROM_DISK_NUMS, this.scheduledExecutorService, log));
        this.statsTable.put(BROKER_GET_FROM_DISK_SIZE, new StatsItemSet(BROKER_GET_FROM_DISK_SIZE, this.scheduledExecutorService, log));

        this.statsTable.put(COMMERCIAL_SEND_TIMES, new StatsItemSet(COMMERCIAL_SEND_TIMES, this.commercialExecutor, COMMERCIAL_LOG));
        this.statsTable.put(COMMERCIAL_RCV_TIMES, new StatsItemSet(COMMERCIAL_RCV_TIMES, this.commercialExecutor, COMMERCIAL_LOG));
        this.statsTable.put(COMMERCIAL_SEND_SIZE, new StatsItemSet(COMMERCIAL_SEND_SIZE, this.commercialExecutor, COMMERCIAL_LOG));
        this.statsTable.put(COMMERCIAL_RCV_SIZE, new StatsItemSet(COMMERCIAL_RCV_SIZE, this.commercialExecutor, COMMERCIAL_LOG));
        this.statsTable.put(COMMERCIAL_RCV_EPOLLS, new StatsItemSet(COMMERCIAL_RCV_EPOLLS, this.commercialExecutor, COMMERCIAL_LOG));
        this.statsTable.put(COMMERCIAL_SNDBCK_TIMES, new StatsItemSet(COMMERCIAL_SNDBCK_TIMES, this.commercialExecutor, COMMERCIAL_LOG));
        this.statsTable.put(COMMERCIAL_PERM_FAILURES, new StatsItemSet(COMMERCIAL_PERM_FAILURES, this.commercialExecutor, COMMERCIAL_LOG));
    }

    public MomentStatsItemSet getMomentStatsItemSetFallSize() {
        return momentStatsItemSetFallSize;
    }

    public MomentStatsItemSet getMomentStatsItemSetFallTime() {
        return momentStatsItemSetFallTime;
    }

    public void start() {
    }

    public void shutdown() {
        this.scheduledExecutorService.shutdown();
        this.commercialExecutor.shutdown();
    }

    // 获取指定统计名称下的统计key的统计条目
    public StatsItem getStatsItem(final String statsName, final String statsKey) {
        try {
            return this.statsTable.get(statsName).getStatsItem(statsKey);
        } catch (Exception e) {
        }

        return null;
    }

    // 删除指定topic的统计数据
    public void onTopicDeleted(final String topic) {
        this.statsTable.get(TOPIC_PUT_NUMS).delValue(topic);
        this.statsTable.get(TOPIC_PUT_SIZE).delValue(topic);
        if (enableQueueStat) {
            this.statsTable.get(QUEUE_PUT_NUMS).delValueByPrefixKey(topic, "@");
            this.statsTable.get(QUEUE_PUT_SIZE).delValueByPrefixKey(topic, "@");
        }
        this.statsTable.get(GROUP_GET_NUMS).delValueByPrefixKey(topic, "@");
        this.statsTable.get(GROUP_GET_SIZE).delValueByPrefixKey(topic, "@");
        this.statsTable.get(QUEUE_GET_NUMS).delValueByPrefixKey(topic, "@");
        this.statsTable.get(QUEUE_GET_SIZE).delValueByPrefixKey(topic, "@");
        this.statsTable.get(SNDBCK_PUT_NUMS).delValueByPrefixKey(topic, "@");
        this.statsTable.get(GROUP_GET_LATENCY).delValueByInfixKey(topic, "@");
        this.momentStatsItemSetFallSize.delValueByInfixKey(topic, "@");
        this.momentStatsItemSetFallTime.delValueByInfixKey(topic, "@");
    }

    // 根据group删除统计数据
    public void onGroupDeleted(final String group) {
        this.statsTable.get(GROUP_GET_NUMS).delValueBySuffixKey(group, "@");
        this.statsTable.get(GROUP_GET_SIZE).delValueBySuffixKey(group, "@");
        if (enableQueueStat) {
            this.statsTable.get(QUEUE_GET_NUMS).delValueBySuffixKey(group, "@");
            this.statsTable.get(QUEUE_GET_SIZE).delValueBySuffixKey(group, "@");
        }
        this.statsTable.get(SNDBCK_PUT_NUMS).delValueBySuffixKey(group, "@");
        this.statsTable.get(GROUP_GET_LATENCY).delValueBySuffixKey(group, "@");
        this.momentStatsItemSetFallSize.delValueBySuffixKey(group, "@");
        this.momentStatsItemSetFallTime.delValueBySuffixKey(group, "@");
    }

    // 增加队列的put统计数据1次
    public void incQueuePutNums(final String topic, final Integer queueId) {
        if (enableQueueStat) {
            this.statsTable.get(QUEUE_PUT_NUMS).addValue(buildStatsKey(topic, queueId), 1, 1);
        }
    }

    // 增加队列的put统计数据n次
    public void incQueuePutNums(final String topic, final Integer queueId, int num, int times) {
        if (enableQueueStat) {
            this.statsTable.get(QUEUE_PUT_NUMS).addValue(buildStatsKey(topic, queueId), num, times);
        }
    }
    // 增加队列的put统计数据大小
    public void incQueuePutSize(final String topic, final Integer queueId, final int size) {
        if (enableQueueStat) {
            this.statsTable.get(QUEUE_PUT_SIZE).addValue(buildStatsKey(topic, queueId), size, 1);
        }
    }

    // 增加队列的get统计数据1次
    public void incQueueGetNums(final String group, final String topic, final Integer queueId, final int incValue) {
        if (enableQueueStat) {
            final String statsKey = buildStatsKey(topic, queueId, group);
            this.statsTable.get(QUEUE_GET_NUMS).addValue(statsKey, incValue, 1);
        }
    }
    // 增加队列的get统计数据大小
    public void incQueueGetSize(final String group, final String topic, final Integer queueId, final int incValue) {
        if (enableQueueStat) {
            final String statsKey = buildStatsKey(topic, queueId, group);
            this.statsTable.get(QUEUE_GET_SIZE).addValue(statsKey, incValue, 1);
        }
    }

    // 增加topic的put统计数据1次
    public void incTopicPutNums(final String topic) {
        this.statsTable.get(TOPIC_PUT_NUMS).addValue(topic, 1, 1);
    }

    public void incTopicPutNums(final String topic, int num, int times) {
        this.statsTable.get(TOPIC_PUT_NUMS).addValue(topic, num, times);
    }

    public void incTopicPutSize(final String topic, final int size) {
        this.statsTable.get(TOPIC_PUT_SIZE).addValue(topic, size, 1);
    }

    // 增加group的get统计数据1次
    public void incGroupGetNums(final String group, final String topic, final int incValue) {
        final String statsKey = buildStatsKey(topic, group);
        this.statsTable.get(GROUP_GET_NUMS).addValue(statsKey, incValue, 1);
    }

    public String buildStatsKey(String topic, String group) {
        StringBuilder strBuilder;
        if (topic != null && group != null) {
            strBuilder = new StringBuilder(topic.length() + group.length() + 1);
        } else {
            strBuilder = new StringBuilder();
        }
        strBuilder.append(topic).append("@").append(group);
        return strBuilder.toString();
    }

    public String buildStatsKey(String topic, int queueId) {
        StringBuilder strBuilder;
        if (topic != null) {
            strBuilder = new StringBuilder(topic.length() + 5);
        } else {
            strBuilder = new StringBuilder();
        }
        strBuilder.append(topic).append("@").append(queueId);
        return strBuilder.toString();
    }

    public String buildStatsKey(String topic, int queueId, String group) {
        StringBuilder strBuilder;
        if (topic != null && group != null) {
            strBuilder = new StringBuilder(topic.length() + group.length() + 6);
        } else {
            strBuilder = new StringBuilder();
        }
        strBuilder.append(topic).append("@").append(queueId).append("@").append(group);
        return strBuilder.toString();
    }

    public String buildStatsKey(int queueId, String topic, String group) {
        StringBuilder strBuilder;
        if (topic != null && group != null) {
            strBuilder = new StringBuilder(topic.length() + group.length() + 6);
        } else {
            strBuilder = new StringBuilder();
        }
        strBuilder.append(queueId).append("@").append(topic).append("@").append(group);
        return strBuilder.toString();
    }

    public void incGroupGetSize(final String group, final String topic, final int incValue) {
        final String statsKey = buildStatsKey(topic, group);
        this.statsTable.get(GROUP_GET_SIZE).addValue(statsKey, incValue, 1);
    }

    // 增加group的get延迟统计数据1次
    public void incGroupGetLatency(final String group, final String topic, final int queueId, final int incValue) {
        String statsKey;
        if (enableQueueStat) {
            statsKey = buildStatsKey(queueId, topic, group);
        } else {
            statsKey = buildStatsKey(topic, group);
        }
        this.statsTable.get(GROUP_GET_LATENCY).addRTValue(statsKey, incValue, 1);
    }

    public void incBrokerPutNums() {
        this.statsTable.get(BROKER_PUT_NUMS).getAndCreateStatsItem(this.clusterName).getValue().add(1);
    }

    public void incBrokerPutNums(final int incValue) {
        this.statsTable.get(BROKER_PUT_NUMS).getAndCreateStatsItem(this.clusterName).getValue().add(incValue);
    }

    public void incBrokerGetNums(final int incValue) {
        this.statsTable.get(BROKER_GET_NUMS).getAndCreateStatsItem(this.clusterName).getValue().add(incValue);
    }

    public void incSendBackNums(final String group, final String topic) {
        final String statsKey = buildStatsKey(topic, group);
        this.statsTable.get(SNDBCK_PUT_NUMS).addValue(statsKey, 1, 1);
    }

    public double tpsGroupGetNums(final String group, final String topic) {
        final String statsKey = buildStatsKey(topic, group);
        return this.statsTable.get(GROUP_GET_NUMS).getStatsDataInMinute(statsKey).getTps();
    }

    public void recordDiskFallBehindTime(final String group, final String topic, final int queueId,
        final long fallBehind) {
        final String statsKey = buildStatsKey(queueId, topic, group);
        this.momentStatsItemSetFallTime.getAndCreateStatsItem(statsKey).getValue().set(fallBehind);
    }

    public void recordDiskFallBehindSize(final String group, final String topic, final int queueId,
        final long fallBehind) {
        final String statsKey = buildStatsKey(queueId, topic, group);
        this.momentStatsItemSetFallSize.getAndCreateStatsItem(statsKey).getValue().set(fallBehind);
    }

    public void incCommercialValue(final String key, final String owner, final String group,
        final String topic, final String type, final int incValue) {
        final String statsKey = buildCommercialStatsKey(owner, topic, group, type);
        this.statsTable.get(key).addValue(statsKey, incValue, 1);
    }

    public String buildCommercialStatsKey(String owner, String topic, String group, String type) {
        StringBuilder strBuilder = new StringBuilder();
        strBuilder.append(owner);
        strBuilder.append("@");
        strBuilder.append(topic);
        strBuilder.append("@");
        strBuilder.append(group);
        strBuilder.append("@");
        strBuilder.append(type);
        return strBuilder.toString();
    }

    public enum StatsType {
        SEND_SUCCESS,
        SEND_FAILURE,
        SEND_BACK,
        SEND_TIMER,
        SEND_TRANSACTION,
        RCV_SUCCESS,
        RCV_EPOLLS,
        PERM_FAILURE
    }
}

// 统计条目数据集合
public class MomentStatsItemSet {
    // 核心数据结构:Map<统计条目key名称,统计条目>
    private final ConcurrentMap<String/* key */, MomentStatsItem> statsItemTable =
        new ConcurrentHashMap<String, MomentStatsItem>(128);

    // 统计名称
    private final String statsName;
    // 定时调度线程池
    private final ScheduledExecutorService scheduledExecutorService;
    private final InternalLogger log;

    public MomentStatsItemSet(String statsName, ScheduledExecutorService scheduledExecutorService, InternalLogger log) {
        this.statsName = statsName;
        this.scheduledExecutorService = scheduledExecutorService;
        this.log = log;
        this.init();
    }

    public ConcurrentMap<String, MomentStatsItem> getStatsItemTable() {
        return statsItemTable;
    }

    public String getStatsName() {
        return statsName;
    }

    public void init() {

        // 每隔5分钟打印一次
        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
            @Override
            public void run() {
                try {
                    printAtMinutes();
                } catch (Throwable ignored) {
                }
            }
        }, Math.abs(UtilAll.computeNextMinutesTimeMillis() - System.currentTimeMillis()), 1000 * 60 * 5, TimeUnit.MILLISECONDS);
    }

    private void printAtMinutes() {
        // 迭代遍历,打印当前统计指标所有key对应的数据
        Iterator<Entry<String, MomentStatsItem>> it = this.statsItemTable.entrySet().iterator();
        while (it.hasNext()) {
            Entry<String, MomentStatsItem> next = it.next();
            next.getValue().printAtMinutes();
        }
    }

    // 对某一个统计指标对应的统计key,设置值
    public void setValue(final String statsKey, final int value) {
        MomentStatsItem statsItem = this.getAndCreateStatsItem(statsKey);
        statsItem.getValue().set(value);
    }

    // 删除统计指标对应的统计key
    public void delValueByInfixKey(final String statsKey, String separator) {
        Iterator<Entry<String, MomentStatsItem>> it = this.statsItemTable.entrySet().iterator();
        while (it.hasNext()) {
            Entry<String, MomentStatsItem> next = it.next();
            if (next.getKey().contains(separator + statsKey + separator)) {
                it.remove();
            }
        }
    }

    // 删除统计指标对应的统计key(指定了后缀)
    public void delValueBySuffixKey(final String statsKey, String separator) {
        Iterator<Entry<String, MomentStatsItem>> it = this.statsItemTable.entrySet().iterator();
        while (it.hasNext()) {
            Entry<String, MomentStatsItem> next = it.next();
            if (next.getKey().endsWith(separator + statsKey)) {
                it.remove();
            }
        }
    }

    // 获取统计指标对应的统计条目key对应的统计条目
    public MomentStatsItem getAndCreateStatsItem(final String statsKey) {
        MomentStatsItem statsItem = this.statsItemTable.get(statsKey);
        if (null == statsItem) {
            statsItem =
                new MomentStatsItem(this.statsName, statsKey, this.scheduledExecutorService, this.log);
            MomentStatsItem prev = this.statsItemTable.putIfAbsent(statsKey, statsItem);

            if (null != prev) {
                statsItem = prev;
                // statsItem.init();
            }
        }

        return statsItem;
    }
}

// 统计条目
public class MomentStatsItem {

    // 指标值
    private final AtomicLong value = new AtomicLong(0);
    // 统计指标名称
    private final String statsName;
    // 统计条目key
    private final String statsKey;
    private final ScheduledExecutorService scheduledExecutorService;
    private final InternalLogger log;

    public MomentStatsItem(String statsName, String statsKey,
        ScheduledExecutorService scheduledExecutorService, InternalLogger log) {
        this.statsName = statsName;
        this.statsKey = statsKey;
        this.scheduledExecutorService = scheduledExecutorService;
        this.log = log;
    }

    public void init() {
        // 每5分钟打印一次
        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
            @Override
            public void run() {
                try {
                    printAtMinutes();

                    MomentStatsItem.this.value.set(0);
                } catch (Throwable e) {
                }
            }
        }, Math.abs(UtilAll.computeNextMinutesTimeMillis() - System.currentTimeMillis()), 1000 * 60 * 5, TimeUnit.MILLISECONDS);
    }

    public void printAtMinutes() {
        log.info(String.format("[%s] [%s] Stats Every 5 Minutes, Value: %d",
            this.statsName,
            this.statsKey,
            this.value.get()));
    }

    public AtomicLong getValue() {
        return value;
    }

    public String getStatsKey() {
        return statsKey;
    }

    public String getStatsName() {
        return statsName;
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/246306.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

YOLOv5改进 | 2023卷积篇 | AKConv轻量级架构下的高效检测(既轻量又提点)

一、本文介绍 本文给大家带来的改进内容是AKConv是一种创新的变核卷积&#xff0c;它旨在解决标准卷积操作中的固有缺陷&#xff08;采样形状是固定的&#xff09;&#xff0c;AKConv的核心思想在于它为卷积核提供了任意数量的参数和任意采样形状&#xff0c;能够使用任意数量…

从池化的角度看GNN(包含PR-GNN,EdgePool等7篇论文)下篇

从池化的角度看GNN&#xff08;包含PR-GNN&#xff0c;EdgePool等7篇论文&#xff09;下篇 前言一些总结一些早期论文的简要介绍5️⃣论文StructPool&#xff1a;《StructPool: Structured Graph Pooling via Conditional Random Fields》6️⃣论文ASAP&#xff1a;《ASAP: Ada…

docker 安装keepalived

docker 安装keepalived 1.Keepalived 简介 Keepalived 是 Linux 下一个轻量级别的高可用解决方案。高可用(High Avalilability,HA)&#xff0c;其实两种不同的含义&#xff1a;广义来讲&#xff0c;是指整个系统的高可用行&#xff0c;狭义的来讲就是之主机的冗余和接管&…

记录 | mac打开终端时报错:login: /opt/homebrew/bin/zsh: No such file or directory [进程已完成]

mac打开终端时报错&#xff1a;login: /opt/homebrew/bin/zsh: No such file or directory [进程已完成]&#xff0c;导致终端没有办法使用的情况 说明 zsh 没有安装或者是安装路径不对 可以看看 /bin 下有没有 zsh&#xff0c;若没有&#xff0c;肯定是有 bash 那就把终端默…

[算法基础 ~排序] Golang 实现

文章目录 排序什么是排序排序的分类1. 冒泡1.1 冒泡排序1.2. 快速排序 2. 选择2.1 简单选择排序2.2 堆排序 3. 插入3.1 直接插入3.2 折半插入3.3 希尔排序 4. 归并排序代码实现 5. 基数排序 排序图片就不贴了吧 排序 什么是排序 以下部分动图来自CSDN ::: tip 稳定性的概念 …

后端接口开发-web前台请求接口对后台数据库增删改查-实例

一、后端接口开发的逻辑是&#xff1a; 1.Application项目启动 2.前台接口Url请求后台 3.Controller控制拿到前台请求参数&#xff0c;传递给中间组件Service 4.Service调用Mapper.java 5. mapper.java映射到mapper.xml中的mybatis语句&#xff0c;类似Sql语句操作数据库 6.其…

【C语言(十二)】

数据在内存中的存储 一、整数在内存中的存储 整数的2进制表示方法有三种&#xff0c;即 原码、反码和补码 有符号的整数&#xff0c;三种表示方法均有符号位和数值位两部分&#xff0c;符号位都是用0表示“正”&#xff0c;用1表示“负”&#xff0c;最高位的⼀位是被当做符号…

利用svm进行模型训练

一、步骤 1、将文本数据转换为特征向量 &#xff1a; tf-idf 2、使用这些特征向量训练SVM模型 二、代码 from sklearn.model_selection import train_test_split from sklearn.feature_extraction.text import TfidfVectorizer from sklearn.svm import SVC from sklearn.m…

Android : SensorManager 传感器入门 简单应用

功能介绍&#xff1a;转动手机 图片跟着旋转 界面&#xff1a; activity_main.xml <?xml version"1.0" encoding"utf-8"?> <androidx.constraintlayout.widget.ConstraintLayout xmlns:android"http://schemas.android.com/apk/res/andr…

SpringSecurity 手机号登录

一、工作流程 1.向手机发送验证码&#xff0c;第三方短信发送平台&#xff0c;如阿里云短信。 2.手机获取验证码后&#xff0c;在表单中输入验证码。 3.使用自定义过滤器​SmsCodeValidateFilter​。 4.短信校验通过后&#xff0c;使用自定义手机认证过滤器​SmsCodeAuthentic…

ROS2 Control分析讲解

ROS2 Control 文章目录 前言简述组成安装 框架Controller ManagerResource ManagerControllersUser Interfaces Hardware ComponentsURDF中的硬件描述机器人运行框架 总结 前言 ros2_control是一个使用&#xff08;ROS 2&#xff09;进行机器人&#xff08;实时&#xff09;控…

如何用开关电源测试系统测试电源峰值电流?

一、用万用表、示波器测量峰值电流 首先将待测电路输入信号线分别连接到测试电路的输入端和地端。待测电路的电源端连接电源。然后将示波器设置为AC耦合模式&#xff0c;通道1连接待测电路输入端&#xff0c;通道2连接待测电路地端。调整数字万用表为电流测量模式。打开电源&am…

使用VeryFL【02】python环境安装

新建虚拟环境 conda create --name vfl python3.7激活新建的虚拟环境 conda activate vfl安装pytorch 安装Brownie pip install eth-brownie -i https://pypi.tuna.tsinghua.edu.cn/simple

一款计算机顶会爬取解析系统 paper info

一款计算机顶会爬取解析系统 paper info 背景项目实现的功能 技术方案架构设计项目使用的技术选型 使用方法本地项目部署使用ChatGPT等大模型创建一个ChatGPT助手使用阿里云 顶会数据量 百度网盘pfd文件json文件 Q&A github链接 &#xff1a;https://github.com/codebricki…

Nginx+Tomcat实现负载均衡和动静分离

目录 前瞻 动静分离和负载均衡原理 实现方法 实验&#xff08;七层代理&#xff09; 部署Nginx负载均衡服务器(192.168.75.50:80) 部署第一台Tomcat应用服务器&#xff08;192.168.75.60:8080&#xff09; 多实例部署第二台Tomcat应用服务器&#xff08;192.168.75.70:80…

LOF基金跟股票一样吗?

LOF基金&#xff0c;全称为"上市型开放式基金"&#xff0c;是一种可以在上海证券交易所认购、申购、赎回及交易的开放式证券投资基金。投资者可以通过上海证券交易所场内证券经营机构或场外基金销售机构进行认购、申购和赎回基金份额。 LOF基金的特点是既可以像股票…

DataGrip连接Hive以及MySQL

如果连接失败&#xff0c;是因为useSSL ,改成NO或者False;

Spring Cloud + Vue前后端分离-第5章 单表管理功能前后端开发

Spring Cloud Vue前后端分离-第5章 单表管理功能前后端开发 完成单表的增删改查 控台单表增删改查的前后端开发&#xff0c;重点学习前后端数据交互&#xff0c;vue ajax库axios的使用等 通用组件开发:分页、确认框、提示框、等待框等 常用的公共组件:确认框、提示框、等待…

时序分解 | Matlab实现DBO-VMD基于蜣螂优化算法优化VMD变分模态分解时间序列信号分解

时序分解 | Matlab实现DBO-VMD基于蜣螂优化算法优化VMD变分模态分解时间序列信号分解 目录 时序分解 | Matlab实现DBO-VMD基于蜣螂优化算法优化VMD变分模态分解时间序列信号分解效果一览基本介绍程序设计参考资料 效果一览 基本介绍 1.利用蜣螂优化算法优化VMD中的参数k、a&…

【PostgreSQL】从零开始:(四)使用PgAdmin4连接数据库,及工具使用

登陆pgAdmin4 连接数据库 填写连接名称 填写连接信息 错误信息如下 解决办法 1.登陆数据库服务器切换到postgres用户 [rootpostgre-sql ~]# su - postgres 上一次登录&#xff1a;三 12月 13 18:10:00 CST 2023pts/0 上 [postgrespostgre-sql ~]$ 2.查看数据库进程 [postgre…