前置
mongodb分片集群想要使用事务,需要对应分片没有仲裁节点
代码
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-mongodb</artifactId>
<version>2.1.0.RELEASE</version>
</dependency>
如果是单个mongos
import org.springframework.context.annotation.Bean;
import org.springframework.data.mongodb.MongoDbFactory;
import org.springframework.data.mongodb.MongoTransactionManager;
import org.springframework.stereotype.Component;
/**
* @author kittlen
* @date 2024-04-09 17:20
* @description
*/
@Component
public class MongodbConfig {
@Bean
public MongoTransactionManager transactionManager(MongoDbFactory factory) {
return new MongoTransactionManager(factory);
}
}
使用
@Autowired
private MongoTemplate mongoTemplate;
@Autowired
private MongoTransactionManager mongoTransactionManager;
public int dbFunc(){
TransactionTemplate transactionTemplate = new TransactionTemplate(mongoTransactionManager);
return transactionTemplate.execute(status -> {
try {
UpdateResult updateResult = mongoTemplate.updateFirst(query, update, collection1);
long l = updateResult.getUpsertedId() == null ? updateResult.getModifiedCount() : 1;
if (l > 0) {
mongoTemplate.insert(saveEntity, collection2);
}
return 1;
} catch (Exception e) {
// 如果发生异常,事务将在此处回滚,通过status.setRollbackOnly();或者抛出异常都可回滚
status.setRollbackOnly();
return 0;
}
});
}
如果连接是多mongos,则需要重写BaseCluster类
多mongos时使用的是随机获取的方式获取mongosClient,通过记录第一次调用的client使后续事务内的请求都通过同一个client请求,防止出现不同mongos导致事务失败情况
事务记录类
import com.mongodb.connection.Server;
import java.util.function.Supplier;
/**
* @author kittlen
* @date 2024-04-29 12:08
* @description
*/
public class MultiServiceTransactionConfig {
/**
* mongodb多实例事务使用
*/
private static ThreadLocal<Server> mongoMultiServerTransactionUserService = new ThreadLocal<>();
/**
* 是否开启多实例事务
*/
private static ThreadLocal<Boolean> mongoMultiServerTransactionCanUser = new ThreadLocal<>();
/**
* 获取service
*
* @param supplier 如果该service不存在,则获取新service的方法
* @return
*/
public static Server getService(Supplier<Server> supplier) {
Server server = mongoMultiServerTransactionUserService.get();
if (server != null) {
return server;
} else {
Server saveServer = supplier.get();
mongoMultiServerTransactionUserService.set(saveServer);
return saveServer;
}
}
/**
* 开启事务记录
*/
public static void openMultiServerTransaction() {
mongoMultiServerTransactionCanUser.set(true);
}
/**
* 是否开启多实例事务
*
* @return
*/
public static boolean canOpenMultiServerTransaction() {
Boolean b = mongoMultiServerTransactionCanUser.get();
return Boolean.TRUE.equals(b);
}
/**
* 清除事务配置信息
*/
public static void clean() {
mongoMultiServerTransactionCanUser.remove();
mongoMultiServerTransactionUserService.remove();
}
}
重写mongodb的类com.mongodb.internal.connection.BaseCluster的selectServer方法
@Override
public Server selectServer(final ServerSelector serverSelector) {
isTrue("open", !isClosed());
try {
CountDownLatch currentPhase = phase.get();
ClusterDescription curDescription = description;
ServerSelector compositeServerSelector = getCompositeServerSelector(serverSelector);
Server server;
if (this instanceof MultiServerCluster) {
server = MultiServiceTransactionConfig.canOpenMultiServerTransaction() ? MultiServiceTransactionConfig.getService(() -> selectRandomServer(compositeServerSelector, description)) : selectRandomServer(compositeServerSelector, curDescription);
} else {
server = selectRandomServer(compositeServerSelector, curDescription);
}
boolean selectionFailureLogged = false;
long startTimeNanos = System.nanoTime();
long curTimeNanos = startTimeNanos;
long maxWaitTimeNanos = getMaxWaitTimeNanos();
while (true) {
throwIfIncompatible(curDescription);
if (server != null) {
return server;
}
if (curTimeNanos - startTimeNanos > maxWaitTimeNanos) {
throw createTimeoutException(serverSelector, curDescription);
}
if (!selectionFailureLogged) {
logServerSelectionFailure(serverSelector, curDescription);
selectionFailureLogged = true;
}
connect();
currentPhase.await(Math.min(maxWaitTimeNanos - (curTimeNanos - startTimeNanos), getMinWaitTimeNanos()), NANOSECONDS);
curTimeNanos = System.nanoTime();
currentPhase = phase.get();
curDescription = description;
server = selectRandomServer(compositeServerSelector, curDescription);
}
} catch (InterruptedException e) {
throw new MongoInterruptedException(format("Interrupted while waiting for a server that matches %s", serverSelector), e);
}
}
重点为:
Server server;
if (this instanceof MultiServerCluster) {
server = MultiServiceTransactionConfig.canOpenMultiServerTransaction() ? MultiServiceTransactionConfig.getService(() -> selectRandomServer(compositeServerSelector, description)) : selectRandomServer(compositeServerSelector, curDescription);
} else {
server = selectRandomServer(compositeServerSelector, curDescription);
}
使用
try {
TransactionTemplate transactionTemplate = new TransactionTemplate(mongoTransactionManager);
MultiServiceTransactionConfig.openMultiServerTransaction();
return transactionTemplate.execute(status -> {
try {
UpdateResult updateResult = mongoTemplate.updateFirst(query, update,ollection1);
long l = updateResult.getUpsertedId() == null ? updateResult.getModifiedCount() : 1;
if (l > 0) {
mongoTemplate.insert(historyDetailsEntity, collection2);
}
return 1;
} catch (Exception e) {
// 如果发生异常,事务将在此处回滚,通过status.setRollbackOnly();或者抛出异常都可回滚
status.setRollbackOnly();
return 0;
}
});
} finally {
MultiServiceTransactionConfig.clean();
}