Mongodb 开启oplog,java监听oplog并写入关系型数据库

开启Oplog

windows mongodb bin目录下找到配置文件/bin/mongod.cfg,配置如下:

replication:
  replSetName: local
  oplogSizeMB: 1024

在这里插入图片描述
双击mongo.exe
在这里插入图片描述
在这里插入图片描述
执行

rs.initiate({_id: "local", members: [{_id: 0, host: "localhost:27017"}]})

若出现如下情况则成功

{
	"ok" : 1,
	"operationTime" : Timestamp(1627503341, 1),
	"$clusterTime" : {
		"clusterTime" : Timestamp(1627503341, 1),
		"signature" : {
			"hash" : BinData(0,"AAAAAAAAAAAAAAAAAAAAAAAAAAA="),
			"keyId" : NumberLong(0)
		}
	}
}

监听Oplog日志

pom

 	<parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.7.10</version>
        <relativePath/>
    </parent>
    
		<dependency>
        	<groupId>org.springframework.boot</groupId>
       	 	<artifactId>spring-boot-starter-data-mongodb</artifactId>
   	 	</dependency>
 		 <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-jpa</artifactId>
        </dependency>
        <dependency>
            <groupId>org.mongodb</groupId>
            <artifactId>mongodb-driver</artifactId>
            <version>3.12.7</version>
        </dependency>
        <dependency>
            <groupId>com.vividsolutions</groupId>
            <artifactId>jts</artifactId>
            <version>1.13</version>
        </dependency>
        <dependency>
            <groupId>org.hibernate</groupId>
            <artifactId>hibernate-spatial</artifactId>
            <version>5.3.0.Beta1</version>
        </dependency>
        <dependency>
            <groupId>org.hibernate</groupId>
            <artifactId>hibernate-java8</artifactId>
            <version>5.3.0.Beta1</version>
        </dependency>
        <dependency>
            <groupId>com.bedatadriven</groupId>
            <artifactId>jackson-datatype-jts</artifactId>
            <version>2.3</version>
        </dependency>
        <dependency>
            <groupId>org.postgresql</groupId>
            <artifactId>postgresql</artifactId>
            <scope>runtime</scope>
        </dependency>

配置

spring.datasource.driver-class-name=org.postgresql.Driver
spring.datasource.url=jdbc:postgresql://localhost:5432/databaseName?characterEncoding=UTF-8&useUnicode=true&useSSL=false&tinyInt1isBit=false&currentSchema=public
spring.datasource.username=postgres
spring.datasource.password=123456
spring.jpa.database=postgresql
spring.jpa.show-sql=true
spring.jpa.hibernate.ddl-auto=update
spring.jpa.properties.hibernate.dialect=org.hibernate.spatial.dialect.postgis.PostgisDialect
server.port=10050
spring.data.mongodb.uri=mongodb://admin:123456@localhost:27017/?authSource=admin
spring.data.mongodb.database=databseName

代码

  import com.mongodb.CursorType;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoCursor;
import com.mongodb.client.MongoDatabase;
import com.mongodb.client.model.Filters;
import com.mongodb.util.JSON;
import lombok.extern.slf4j.Slf4j;
import org.bson.BsonTimestamp;
import org.bson.Document;
import org.bson.conversions.Bson;
import org.springframework.context.ApplicationListener;
import org.springframework.context.event.ContextRefreshedEvent;
import org.springframework.data.mongodb.core.MongoTemplate;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;

import javax.annotation.Resource;
import javax.persistence.EntityManager;
import javax.persistence.Query;

@Slf4j
@Component
public class OplogListener implements ApplicationListener<ContextRefreshedEvent> {
    @Resource
    private MongoTemplate mongoTemplate;
    @Resource
    private EntityManager entityManager;


    @Override
    public void onApplicationEvent(ContextRefreshedEvent event) {
        MongoDatabase db = mongoTemplate.getMongoDatabaseFactory().getMongoDatabase("local");
        MongoCollection<Document> oplog = db.getCollection("oplog.rs");

        BsonTimestamp startTS = getStartTimestamp();
        BsonTimestamp endTS = getEndTimestamp();

        Bson filter = Filters.and(Filters.gt("ts", startTS));

        MongoCursor<Document> cursor = oplog.find(filter).cursorType(CursorType.TailableAwait).iterator();

        while (true) {
            if (cursor.hasNext()) {
                Document doc = cursor.next();
                String operation = doc.getString("op");

                if (!"n".equals(operation)) {
                    String namespace = doc.getString("ns");
                    String[] nsParts = StringUtils.split(namespace, ".");
                    String collectionName = nsParts[1];
                    String databaseName = nsParts[0];
                    Document object = (Document) doc.get("o");
                    log.info("同步数据:databse-{}  collention-{}  data-{}", databaseName, collectionName, object);
                    if ("i".equals(operation)) {
                        insert((Document) doc.get("o"), databaseName, collectionName);
                    } else if ("u".equals(operation)) {
                        update((Document) doc.get("o"), (Document) doc.get("o2"), databaseName, collectionName);
                    } else if ("d".equals(operation)) {
                        delete((Document) doc.get("o"), databaseName, collectionName);
                    }
                }
            }
        }
    }

    private BsonTimestamp getStartTimestamp() {
        long currentSeconds = System.currentTimeMillis() / 1000;
        return new BsonTimestamp((int) currentSeconds, 1);
    }

    private BsonTimestamp getEndTimestamp() {
        return new BsonTimestamp(0, 1);
    }

    private void insert(Document object, String databaseName, String collectionName) {
        entityManager.getTransaction().begin();
        try {
            String json = JSON.serialize(object);
            Query query = entityManager.createNativeQuery("INSERT INTO " + collectionName + " (json) VALUES (:json)");
            query.setParameter("json", json);
            query.executeUpdate();
            entityManager.getTransaction().commit();
        } catch (Exception e) {
            entityManager.getTransaction().rollback();
            throw new RuntimeException(e);
        }
    }

    private void update(Document object, Document update, String databaseName, String collectionName) {
        entityManager.getTransaction().begin();
        try {
            String json = JSON.serialize(object);
            String updateJson = JSON.serialize(update);
            Query query = entityManager.createNativeQuery("UPDATE " + collectionName + " SET json = :json WHERE json = :updateJson");
            query.setParameter("json", json);
            query.setParameter("updateJson", updateJson);
            query.executeUpdate();
            entityManager.getTransaction().commit();
        } catch (Exception e) {
            entityManager.getTransaction().rollback();
            throw new RuntimeException(e);
        }
    }

    private void delete(Document object, String databaseName, String collectionName) {
        entityManager.getTransaction().begin();
        try {
            String json = JSON.serialize(object);
            Query query = entityManager.createNativeQuery("DELETE FROM " + collectionName + " WHERE json = :json");
            query.setParameter("json", json);
            query.executeUpdate();
            entityManager.getTransaction().commit();
        } catch (Exception e) {
            entityManager.getTransaction().rollback();
            throw new RuntimeException(e);
        }
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/209377.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

人工智能|机器学习——感知器算法原理与python实现

感知器算法是一种可以直接得到线性判别函数的线性分类方法&#xff0c;它是基于样本线性可分的要求下使用的。 一、线性可分与线性不可分 为了方便讨论&#xff0c;我们蒋样本增加了以为常数&#xff0c;得到增广样向量 y&#xff08;1;;;...;&#xff09;,则n个样本的集合为&a…

[论文精读]利用大语言模型对扩散模型进行自我修正

本博客是一篇最新论文的精读&#xff0c;论文为UC伯克利大学相关研究者新近(2023.11.27)在arxiv上上传的《Self-correcting LLM-controlled Diffusion Models》 。 内容提要: 现有的基于扩散的文本到图像生成模型在生成与复杂提示精确对齐的图像时仍然存在困难,尤其是需要数值和…

ansible模块

目录 一、ansible的command模块 1.ad-hoc 2.playbook 3.command模块 二、ansible的shell模块 1.shell模块帮助 2.shell模块支持的参数和解释 3.简单试验 4.批量远程执行脚本 三、script模块 1.script模块帮助 2.shell模块支持的参数和解释 3.实践 四、ansible文件…

【pytorch】从yolo的make_grid理解torch.meshgrid、torch.stack

文章目录 简述1、torch.meshgrid 创建行列坐标2、torch.stack 结合行列坐标3、通过view函数扩展维度 简述 yolo检测 make_grid创建网格代码如下&#xff0c;那么什么是torch.meshgrid? def _make_grid(nx20, ny20):yv, xv torch.meshgrid([torch.arange(ny), torch.arange(…

C++基础 -28- 友元

友元用于访问类中的所有数据成员 类中的私有成员&#xff0c;类外不可访问 定义友元的格式&#xff08;友元函数必须要在类内&#xff0c;声明&#xff09; friend void show(person &b); 使用友元访问类的所有成员 #include "iostream"using namespace std…

深入Spring Security魔幻山谷-获取认证机制核心原理讲解(新版)

文/朱季谦 这是一个古老的传说。 在神秘的Web系统世界里&#xff0c;有一座名为Spring Security的山谷&#xff0c;它高耸入云&#xff0c;蔓延千里&#xff0c;鸟飞不过&#xff0c;兽攀不了。这座山谷只有一条逼仄的道路可通。然而&#xff0c;若要通过这条道路前往另一头的…

html实现各种好看的鼠标滑过图片特效模板

文章目录 1.鼠标悬浮效果1.1 渐动效果1.2 渐变效果1.3 边框效果1.4 线行效果1.5 图标效果1.6 块状效果1.7 边线效果1.8 放大效果1.9 渐出效果1.10 痕迹效果1.11 交叉效果1.12 着重效果1.13 详展效果1.14 能动效果1.15 明细效果 2.主要源码2.1 源代码 源码下载 作者&#xff1a;…

基于python的FMCW雷达工作原理仿真

这篇文章将介绍如何使用python来实现FMCW工作原理的仿真&#xff0c;第1章内容将介绍距离检测原理&#xff0c;第2章内容会介绍速度检测原理。 第1章 第1部分: 距离检测原理 调制的连续波雷达通常也被叫做调频连续波&#xff08;FMCW&#xff09;雷达是一个使用频率调制来测量…

鸿蒙(HarmonyOS)应用开发——容器组件(Grid组件)

前言 前面一篇文章中&#xff0c;已经说了List组件。那么接下来就是容器组件中的Grid组件 #mermaid-svg-oz1b7w45ASmMlZFa {font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;fill:#333;}#mermaid-svg-oz1b7w45ASmMlZFa .error-icon{fill:#5522…

Docker-compose的在线与离线安装方式及问题解决

文章目录 一、在线方式1、GitHub2、daocloud.io 二、离线方式&#xff08;推荐&#xff09;三、验证 一、在线方式 1、GitHub curl -L "https://github.com/docker/compose/releases/latest/download/docker-compose-$(uname -s)-$(uname -m)" -o /usr/local/bin/d…

springboot缓存技术-Ehcache-Redis-memcached

springboot缓存技术-Ehcache-Redis-memcached 文章目录 springboot缓存技术-Ehcache-Redis-memcachedspring缓存使用方式手机验证码案例缓存供应商变更Ehcache变更缓存供应商Redis缓存供应商变更memcached下载安装memcachedSpringBoot整合memcached spring缓存使用方式 导缓存…

linux后端基础---笔记整理(tmux、vim、shell、ssh/scp、git、thrift、docker)

目录 1.Linux常用文件管理命令 2.tmux终端复用器/vim命令式文本编辑器 3.Shell语法 3.1 Shell—版本3.2 新建一个test.sh文件3.3 Shell文件—运行方式3.4 Shell—注释3.5 Shell—变量3.6 Shell—默认变量&#xff0c;文件参数, “$”的用法3.7 Shell—数组3.8 shell—expr命令…

java餐饮刀削面快餐店点餐服务系统springboot+jsp

网上点餐省去了客户很多不必要的时间和麻烦&#xff0c;给商家带来更多利益。同时&#xff0c;网上点餐可以辅助餐饮企业营销。传统的点餐是需要配备一个专业的服务员负责菜品介绍并记录顾客点单&#xff0c;确认后上交至后台厨房&#xff0c;厨房根据菜品种类安排做菜顺序最终…

数据结构:带头双向循环链表的实现

引言 单链表存在缺陷&#xff1a;需要从头开始找前一个节点 解决方法&#xff1a;双向链表 链表的结构&#xff08;8种&#xff09;&#xff1a; 1. 单向&#xff0c;双向 2. 带头、不带头 带头即为带哨兵位的头节点&#xff0c;第一个节点不存储有效数据。带头节点&#…

垃圾回收与内存泄漏

前端面试大全JavaScript垃圾回收与内存泄漏 &#x1f31f;经典真题 &#x1f31f;什么是内存泄露 &#x1f31f;JavaScript 中的垃圾回收 &#x1f31f;标记清除 &#x1f31f;引用计数 &#x1f31f;真题解答 &#x1f31f;总结 &#x1f31f;经典真题 请介绍一下 Jav…

混合使用Windows和Linux子系统的工具和命令

文章目录 在Windows中运行Linux命令使用PowerShell混合使用Linux和Windows命令通过power shell在Windows混合使用Linux工具在Linux中混合使用Windows 工具 推荐阅读 Windows和Linux的工具和命令可以通过WSL互换使用。 可以在Linux子系统中运行Windows命令&#xff0c;也可以在W…

redis主从复制模式和哨兵机制

目录 第一章、主从复制模式1.1&#xff09;Redis 主从复制模式介绍1.2&#xff09;Redis 主从复制实现、 第二章、哨兵机制2.1&#xff09;容灾处理之哨兵2.2&#xff09;Sentinel 配置 第一章、主从复制模式 1.1&#xff09;Redis 主从复制模式介绍 ①单点故障&#xff1a;数…

【C++】string类模拟实现过程中值得注意的点

&#x1f440;樊梓慕&#xff1a;个人主页 &#x1f3a5;个人专栏&#xff1a;《C语言》《数据结构》《蓝桥杯试题》《LeetCode刷题笔记》《实训项目》《C》《Linux》 &#x1f31d;每一个不曾起舞的日子&#xff0c;都是对生命的辜负 目录 前言 1.有关const的使用 &#x…

常用sql记录

备份一张表 PostgreSQL CREATE TABLE new_table AS SELECT * FROM old_table;-- 下面这个比上面好&#xff0c;这个复制表结构时&#xff0c;会把默认值、约束、注释都复制 CREATE TABLE new_table (LIKE old_table INCLUDING ALL) WITHOUT OIDS; INSERT INTO new_table SELE…

一进三出宿舍限电模块的改造升级

一进三出宿舍限电模块改造升级石家庄光大远通电气有限公司智能模块功能特点&#xff1a; 电能控制功能&#xff1a;可实施剩余电量管理&#xff0c;电量用完时将自动断电&#xff1b; 剩余电量可视报警提示功能&#xff1a;剩余电量可视&#xff0c;并当电量剩余5度时&#xff…