【myXdb.stop()关闭时保存数据流程分析】xdb关服时数据落地源码

1)关闭xdb

myXdb.stop();

2)xdb.Xdb#close

 private final synchronized void close() {
        Trace.warn("xdb stop begin");
        this.openFlag = false;
        mbeans().unregisterAll();
        UniqName.stop();
        Engine.getInstance().close();
        Executor.stop();
        if (null != this.angelThread) {
            this.angelThread.shutdown();
            this.angelThread = null;
        }

        if (null != this.checkpoint) {
            this.checkpoint.shutdown();
            this.checkpoint = null;
        }

    
        // 重点: 设置关闭缓存. 这个其实是ThreadHelper的关闭
        // 也就是CheckpointCache继承于ThreadHelper
        if (null != this.checkpointcache) {
            this.checkpointcache.shutdown();
            this.checkpointcache = null;
        }

        if (null != this.tables) {
            this.tables.close();
            this.tables = null;
        }

        MySqlMgr.getInstance().exit();
        Trace.warn("xdb stop end");
    }

3)xdb.CheckpointCache#run // 既然是一个线程,那就看执行体

public void run() {
        while(super.isRunning()) {
            long now = System.currentTimeMillis();
            this.save(PROCESS_MAX_COUNT);
            long useTime = System.currentTimeMillis() - now;
            if (useTime >= (long)PROCESS_TIME) {
                super.sleepIdle(100L);
                Trace.info("checkpoint cache process time is " + useTime + "ms!");
            } else {
                super.sleepIdle((long)PROCESS_TIME - useTime);
            }
        }

        Trace.warn("final CheckPointCache begin");

        // 重点: 因此这个才是关闭的本质
        this.save(2147483647);
        Trace.warn("final CheckPointCache end");
        int totalCacheCount = this.totalCacheCount();
        if (totalCacheCount > 0) {
            Trace.error("final CheckPointCache left " + totalCacheCount);

            for(int i = 0; i < 60; ++i) {
                this.save(2147483647);
                totalCacheCount = this.totalCacheCount();
                if (totalCacheCount == 0) {
                    break;
                }

                Trace.error("final CheckPointCache i=" + i + " left=" + totalCacheCount);
                super.sleepIdle(10000L);
            }
        }

    }

4)xdb.CheckpointCache#save // 保存

 private void save(int maxCount) {
        // 拿到所有的表
        Iterator iter = this.cacheTables.values().iterator();
        
        while(iter.hasNext()) {
            // 挨个执行save
            ((CheckpointCacheInfo)iter.next()).save(maxCount);
        }

 }

5)xdb.CheckpointCacheInfo#save

public void save(int maxCount) {
        if (this.record.size() != 0) {
            int initRecordCount = this.record.size();
            HashSet<String> failedRecordKey = new HashSet();
            Iterator<Entry<String, Treble<State, OctetsStream, Long>>> iter = this.record.entrySet().iterator();
            int count = 0;
            String key;
            String var10000;
            if (initRecordCount >= CheckpointCache.PROCESS_BATCH || maxCount == 2147483647) {
                long now = System.currentTimeMillis();
                Connection conn = null;

                try {
                    conn = MySqlMgr.getInstance().getWriteConn();
                    PreparedStatement pstReplace = conn.prepareStatement("REPLACE INTO " + this.tableName + " VALUES(?, ?)");
                    pstReplace.setQueryTimeout(10);
                    PreparedStatement pstRemove = conn.prepareStatement("DELETE FROM " + this.tableName + " WHERE k=?");
                    pstRemove.setQueryTimeout(10);

                    label436:
                    while(true) {
                        while(true) {
                            Entry entry;
                            do {
                                if (!iter.hasNext()) {
                                    break label436;
                                }

                                entry = (Entry)iter.next();
                            } while((Long)((Treble)entry.getValue()).third >= now && maxCount < 2147483647);

                            if (this.isZero((String)entry.getKey())) {
                                var10000 = (String)entry.getKey();
                                Trace.error("Key is Zero! Key=" + var10000 + " tableName=" + this.tableName);
                                failedRecordKey.add((String)entry.getKey());
                            } else {
                                if (((Treble)entry.getValue()).first != State.INDB_GET && ((Treble)entry.getValue()).first != State.INDB_ADD && ((Treble)entry.getValue()).first != State.ADD) {
                                    if (((Treble)entry.getValue()).first != State.INDB_REMOVE) {
                                        continue;
                                    }

                                    failedRecordKey.add((String)entry.getKey());
                                    pstRemove.setString(1, (String)entry.getKey());
                                    pstRemove.addBatch();
                                } else {
                                    failedRecordKey.add((String)entry.getKey());
                                    pstReplace.setString(1, (String)entry.getKey());
                                    if (Xdb.getInstance().isJsonFlag()) {
                                        pstReplace.setString(2, ((OctetsStream)((Treble)entry.getValue()).second).unmarshalJson());
                                    } else {
                                        pstReplace.setBinaryStream(2, new ByteArrayInputStream(((OctetsStream)((Treble)entry.getValue()).second).array()), ((OctetsStream)((Treble)entry.getValue()).second).size());
                                    }

                                    pstReplace.addBatch();
                                }

                                ++count;
                                if (count >= maxCount) {
                                    break label436;
                                }
                            }
                        }
                    }

                    if (count > 0) {
                        pstReplace.executeBatch();
                        pstRemove.executeBatch();
                        conn.commit();
                    }

                    pstReplace.close();
                    pstRemove.close();
                    Iterator var34 = failedRecordKey.iterator();

                    label410:
                    while(true) {
                        do {
                            if (!var34.hasNext()) {
                                failedRecordKey.clear();
                                long useTime = System.currentTimeMillis() - now;
                                if (useTime > (long)MySqlMgr.MS_DEBUG && Trace.isDebugEnabled()) {
                                    Trace.debug("Execute Batch Use " + useTime + " tableName is " + this.tableName);
                                }

                                if (useTime > (long)MySqlMgr.MS_INFO * 2L) {
                                    Trace.info("Execute Batch Use " + useTime + " tableName is " + this.tableName);
                                }
                                break label410;
                            }

                            key = (String)var34.next();
                        } while((Long)((Treble)this.record.get(key)).third >= now && maxCount < 2147483647);

                        this.record.remove(key);
                    }
                } catch (Exception var25) {
                    if (conn != null) {
                        try {
                            conn.rollback();
                            Trace.error("save exec Batch Rollback Success " + this.tableName);
                        } catch (Exception var24) {
                            Trace.error("save exec Batch Rollback Exception " + this.tableName, var24);
                        }
                    }

                    Trace.error("save exec Batch ReplaceSQL Exception " + this.tableName, var25);
                } finally {
                    if (conn != null) {
                        try {
                            conn.close();
                        } catch (SQLException var22) {
                            Trace.error("write conn close", var22);
                        }
                    }

                }
            }

            if (initRecordCount < CheckpointCache.PROCESS_BATCH || failedRecordKey.size() > 0 || maxCount == 2147483647) {
                int countInit = count;
                long now = System.currentTimeMillis();
                iter = this.record.entrySet().iterator();

                label389:
                while(true) {
                    while(true) {
                        boolean bSuccess;
                        Entry entry;
                        do {
                            if (!iter.hasNext()) {
                                break label389;
                            }

                            bSuccess = true;
                            entry = (Entry)iter.next();
                        } while((Long)((Treble)entry.getValue()).third >= now && maxCount < 2147483647);

                        if (this.isZero((String)entry.getKey())) {
                            var10000 = (String)entry.getKey();
                            Trace.error("Key is Zero! Key=" + var10000 + " tableName=" + this.tableName);
                            failedRecordKey.add((String)entry.getKey());
                        } else {
                            switch((State)((Treble)entry.getValue()).first) {
                            case INDB_REMOVE:
                                var10000 = this.tableName;
                                key = "DELETE FROM " + var10000 + " WHERE k='" + (String)entry.getKey() + "'";
                                if (MySqlMgr.getInstance().execUpdate(key, (OctetsStream)null) < 0 && MySqlMgr.getInstance().execUpdate(key, (OctetsStream)null) < 0) {
                                    failedRecordKey.add((String)entry.getKey());
                                    bSuccess = false;
                                }
                            case REMOVE:
                            default:
                                break;
                            case INDB_GET:
                            case INDB_ADD:
                            case ADD:
                                var10000 = this.tableName;
                                String replaceSql = "REPLACE INTO " + var10000 + " VALUES('" + (String)entry.getKey() + "', ?)";
                                if (MySqlMgr.getInstance().execUpdate(replaceSql, (OctetsStream)((Treble)entry.getValue()).second) < 0 && MySqlMgr.getInstance().execUpdate(replaceSql, (OctetsStream)((Treble)entry.getValue()).second) < 0) {
                                    failedRecordKey.add((String)entry.getKey());
                                    bSuccess = false;
                                }
                            }

                            if (bSuccess && ((Long)((Treble)this.record.get(entry.getKey())).third < now || maxCount == 2147483647)) {
                                iter.remove();
                            }

                            if (bSuccess) {
                                ++count;
                                if (count - countInit >= CheckpointCache.PROCESS_SINGLE && maxCount < CheckpointCache.PROCESS_MAX_COUNT) {
                                    break label389;
                                }
                            }
                        }
                    }
                }
            }

            if (failedRecordKey.size() > 0) {
                var10000 = this.tableName;
                Trace.error(var10000 + " success:" + count + " failed:" + failedRecordKey.size() + " left:" + this.record.size());
                Iterator var28 = failedRecordKey.iterator();

                while(var28.hasNext()) {
                    String key = (String)var28.next();
                    Treble<State, OctetsStream, Long> info = (Treble)this.record.get(key);
                    if (info != null && info.second != null) {
                        try {
                            String jsonStr = ((OctetsStream)info.second).unmarshalJson();
                            if (jsonStr.length() > 1000) {
                                Trace.error(this.tableName + " key=" + key + " json size=" + jsonStr.length());
                            } else {
                                Trace.error(this.tableName + " key=" + key + " json=" + jsonStr);
                            }
                        } catch (MarshalException var23) {
                            Trace.error(this.tableName + " key=" + key, var23);
                        }
                    }

                    if (MySqlMgr.failedClearMap.get(this.tableName + key) != null) {
                        this.record.remove(key);
                        Trace.error(this.tableName + " key=" + key + " is removed!");
                    }
                }
            } else {
                Trace.info(this.tableName + " success:" + count + " left:" + this.record.size());
            }

        }
    }

理解:

其实就是找到key和value, value是json。

而且是批量处理。

字段是删除还是别的,如果是删除会执行delete操作删除数据。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/939320.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

智慧公交指挥中枢,数据可视化 BI 驾驶舱

随着智慧城市的蓬勃发展&#xff0c;公共交通作为城市运营的核心枢纽&#xff0c;正朝着智能化和数据驱动的方向演进。通过整合 CAN 总线技术(Controller Area Network&#xff0c;控制器局域网总线)、车载智能终端、大数据分析及处理等尖端技术&#xff0c;构建的公交“大脑”…

页面无滚动条,里面div各自有滚动条

一、双滚动条左右布局 实现效果 实现代码 <!DOCTYPE html> <html lang"en"><head><meta charset"UTF-8" /><meta name"viewport" content"widthdevice-width, initial-scale1.0" /><title>Doc…

美畅物联丨分布式锁实战:Spring Boot项目中的Redis应用

在分布式系统里&#xff0c;多个节点或许会同时对共享资源进行访问与操作。为防止出现数据不一致、资源竞争等状况&#xff0c;就需要一种机制来对这些并发访问加以协调&#xff0c;于是分布式锁就出现了。它如同一把全局的钥匙&#xff0c;在同一时刻仅有一个节点能够获取该钥…

[计算机网络]ARP协议的故事:小明找小红的奇妙旅程

1.ARP小故事 在一个繁忙的网络世界中&#xff0c;每个设备都有自己的身份标识——MAC地址&#xff0c;就像每个人的身份证号码一样。在这个故事里&#xff0c;我们的主角小明&#xff08;主机&#xff09;需要找到小红&#xff08;目标主机&#xff09;的MAC地址&#xff0c;才…

从RNN到Transformer:生成式AI自回归模型的全面剖析

个人主页&#xff1a;chian-ocean 文章专栏 生成式AI中的自回归模型详解 在生成式AI的飞速发展中&#xff0c;自回归模型作为核心技术之一&#xff0c;成为文本生成、语音合成、图像生成等领域的重要支柱。本文将全面探讨自回归模型的原理、架构、实际应用&#xff0c;并结合…

「Mac畅玩鸿蒙与硬件47」UI互动应用篇24 - 虚拟音乐控制台

本篇将带你实现一个虚拟音乐控制台。用户可以通过界面控制音乐的播放、暂停、切换歌曲&#xff0c;并查看当前播放的歌曲信息。页面还支持调整音量和动态显示播放进度&#xff0c;是音乐播放器界面开发的基础功能示例。 关键词 UI互动应用音乐控制播放控制动态展示状态管理按钮…

用QT制作的倒计时软件

一、pro代码 RC_ICONS countdown.ico 二、mainwindow.cpp代码 #include "mainwindow.h" #include "ui_mainwindow.h"#include <QDateTime> #include <QMessageBox> #include <QSettings>MainWindow::MainWindow(QWidget *parent): QM…

Unbuntu下怎么生成SSL自签证书?

环境&#xff1a; WSL2 Unbuntu 22.04 问题描述&#xff1a; Unbuntu下怎么生成SSL自签证书&#xff1f; 解决方案&#xff1a; 生成自签名SSL证书可以使用OpenSSL工具&#xff0c;这是一个广泛使用的命令行工具&#xff0c;用于创建和管理SSL/TLS证书。以下是生成自签名…

springboot446数字化农家乐管理平台的设计与实现(论文+源码)_kaic

摘 要 互联网发展至今&#xff0c;无论是其理论还是技术都已经成熟&#xff0c;而且它广泛参与在社会中的方方面面。它让信息都可以通过网络传播&#xff0c;搭配信息管理工具可以很好地为人们提供服务。针对信息管理混乱&#xff0c;出错率高&#xff0c;信息安全性差&#x…

laya游戏引擎中打包之后图片模糊

如下图正常运行没问题&#xff0c;打包之后却模糊 纹理类型中的默认类型都是精灵纹理&#xff0c;改为默认值即可。注意&#xff1a;要点击“应用”才可有效。精灵纹理类型会对图片进行渲染处理&#xff0c;而默认值 平面类型不会处理图片。

[SZ901]FPGA程序固化工具使用方法

工具为脚本形式&#xff0c;前期需进行vivado版本&#xff0c;下载器端口配置 1&#xff0c;编辑 【SZ901程序固化工具.bat】&#xff0c;设置软件版本 修改软件版本和安装路径 2&#xff0c;设置下载器端口&#xff08;SZ901->USER_TCL->FlashBurn_Config.tcl&#x…

基于微信小程序的小区疫情防控ssm+论文源码调试讲解

第2章 程序开发技术 2.1 Mysql数据库 为了更容易理解Mysql数据库&#xff0c;接下来就对其具备的主要特征进行描述。 &#xff08;1&#xff09;首选Mysql数据库也是为了节省开发资金&#xff0c;因为网络上对Mysql的源码都已进行了公开展示&#xff0c;开发者根据程序开发需…

Arduino ADC模数转换

1.Arduino UNO ADC的配置及原理 1.1ADC配置 1.1.1分辨率 Arduino Uno支持6个adc模数转换,其ADC只有10位分辨率,也就是说我们只能将输入电平分成2^101024份(0~1023),4.88mV的测量精度. 1.1.2输入电压范围 Arduino Uno的引脚输出是5V,同样引脚输入也最多支持5V,我们可以5V电压分…

论文笔记:是什么让多模态学习变得困难?

整理了What Makes Training Multi-modal Classification Networks Hard? 论文的阅读笔记 背景方法OGR基于最小化OGR的多监督信号混合在实践中的应用 实验 背景 直观上&#xff0c;多模态网络接收更多的信息&#xff0c;因此它应该匹配或优于其单峰网络。然而&#xff0c;最好的…

唯品会Android面试题及参考答案

HTTP 和 HTTPS 的区别是什么&#xff1f;你的项目使用的是 HTTP 还是 HTTPS&#xff1f; HTTP 和 HTTPS 主要有以下区别。 首先是安全性。HTTP 是超文本传输协议&#xff0c;数据传输是明文的&#xff0c;这意味着在数据传输过程中&#xff0c;信息很容易被窃取或者篡改。比如&…

LWIP协议:三次握手和四次挥手、TCP/IP模型

一、三次握手&#xff1a;是客户端与服务器建立连接的方式&#xff1b; 1、客户端发送建立TCP连接的请求。seq序列号是由发送端随机生成的&#xff0c;SYN字段置为1表示需要建立TCP连接。&#xff08;SYN1&#xff0c;seqx&#xff0c;x为随机生成数值&#xff09;&#xff1b;…

Kafka Streams 在监控场景的应用与实践

作者&#xff1a;来自 vivo 互联网服务器团队- Pang Haiyun 介绍 Kafka Streams 的原理架构&#xff0c;常见配置以及在监控场景的应用。 一、背景 在当今大数据时代&#xff0c;实时数据处理变得越来越重要&#xff0c;而监控数据的实时性和可靠性是监控能力建设最重要的一环…

Medium是什么,Medium能干嘛,如何用开通medium会员

1.背景介绍 1.1 什么是medium medium是国外一个内容创作和分享平台。 主要用户来自美国&#xff0c;每月有26万的访问量。 网址&#xff1a; Medium官网 平台注重优质、专业的内容。 这个平台有2点比较吸引人&#xff1a; ① 内容优质、专业 ② 在上面写作&#xff0c;能…

【实验17】不同优化算法的比较分析

目录 1 不同优化算法比较分析-2D可视化实验 1.1 优化算法的实验设定(以函数为例) 1.2 学习率调整优化策略 1.1.2 AdaGrad算法 1.1.2 RMSprop算法 1.3 梯度估计修正优化策略 1.3.1 动量法 1.3.2 Adam算法 1.4 完整代码 1.5 函数 的优化算法比较 2 不同优化算法比较分…

复习打卡大数据篇——Hadoop HDFS 01

目录 1. HDFS简介 2. HDFS基本操作 3. HDFS原理 1. HDFS简介 HDFS概念&#xff1a; HDFS是一个分布式的文件系统。分布式意味着多台机器存储&#xff0c;文件系统&#xff0c;就是用来存储文件、存储数据。是大数据最底层一个服务。 HDFS设计目标&#xff1a; 故障的检测…