文章目录
- 分支事务注册-客户端
- 分支事务服务端的执行
分支事务注册-客户端
第一篇我们将全局事务启动,以及开启源码分析完成了,现在我们需要看一下分支事务注册。
我们分支事务的开始需要从PreparedStatementProxy#executeUpdate中去看。
public class PreparedStatementProxy extends AbstractPreparedStatementProxy implements PreparedStatement, ParametersHolder {
public Map<Integer, ArrayList<Object>> getParameters() {
return this.parameters;
}
public PreparedStatementProxy(AbstractConnectionProxy connectionProxy, PreparedStatement targetStatement, String targetSQL) throws SQLException {
super(connectionProxy, targetStatement, targetSQL);
}
public boolean execute() throws SQLException {
return (Boolean)ExecuteTemplate.execute(this, (statement, args) -> {
return statement.execute();
}, new Object[0]);
}
public ResultSet executeQuery() throws SQLException {
return (ResultSet)ExecuteTemplate.execute(this, (statement, args) -> {
return statement.executeQuery();
}, new Object[0]);
}
//这个是分支事务的核心入口
public int executeUpdate() throws SQLException {
return (Integer)ExecuteTemplate.execute(this, (statement, args) -> {
return statement.executeUpdate();
}, new Object[0]);
}
}
判断出当前的业务Sql是什么类型,我们需要选择不同的执行器。
public static <T, S extends Statement> T execute(List<SQLRecognizer> sqlRecognizers, StatementProxy<S> statementProxy, StatementCallback<T, S> statementCallback, Object... args) throws SQLException {
//判断是否是全局锁,并且是否是AT模式
if (!RootContext.requireGlobalLock() && BranchType.AT != RootContext.getBranchType()) {
//否 执行普通的SQL
return statementCallback.execute(statementProxy.getTargetStatement(), args);
} else {
//获取数据库类型
String dbType = statementProxy.getConnectionProxy().getDbType();
if (CollectionUtils.isEmpty(sqlRecognizers)) {
sqlRecognizers = SQLVisitorFactory.get(statementProxy.getTargetSQL(), dbType);
}
Object executor;
if (CollectionUtils.isEmpty(sqlRecognizers)) {
executor = new PlainExecutor(statementProxy, statementCallback);
} else if (sqlRecognizers.size() == 1) {
SQLRecognizer sqlRecognizer = (SQLRecognizer)sqlRecognizers.get(0);
label44:
//根据不同的SQL类型选择不同的执行器
switch(sqlRecognizer.getSQLType()) {
case INSERT:
executor = (Executor)EnhancedServiceLoader.load(InsertExecutor.class, dbType, new Class[]{StatementProxy.class, StatementCallback.class, SQLRecognizer.class}, new Object[]{statementProxy, statementCallback, sqlRecognizer});
break;
case UPDATE:
executor = new UpdateExecutor(statementProxy, statementCallback, sqlRecognizer);
break;
case DELETE:
executor = new DeleteExecutor(statementProxy, statementCallback, sqlRecognizer);
break;
case SELECT_FOR_UPDATE:
executor = new SelectForUpdateExecutor(statementProxy, statementCallback, sqlRecognizer);
break;
case INSERT_ON_DUPLICATE_UPDATE:
byte var8 = -1;
switch(dbType.hashCode()) {
case 104382626:
if (dbType.equals("mysql")) {
var8 = 0;
}
break;
case 839186932:
if (dbType.equals("mariadb")) {
var8 = 1;
}
}
switch(var8) {
case 0:
case 1:
executor = new MySQLInsertOrUpdateExecutor(statementProxy, statementCallback, sqlRecognizer);
break label44;
default:
throw new NotSupportYetException(dbType + " not support to INSERT_ON_DUPLICATE_UPDATE");
}
default:
executor = new PlainExecutor(statementProxy, statementCallback);
}
} else {
executor = new MultiExecutor(statementProxy, statementCallback, sqlRecognizers);
}
try {
//核心入口执行
T rs = ((Executor)executor).execute(args);
return rs;
} catch (Throwable var9) {
Throwable ex = var9;
if (!(var9 instanceof SQLException)) {
ex = new SQLException(var9);
}
throw (SQLException)ex;
}
}
}
excute()执行
public T execute(Object... args) throws Throwable {
//获取事务的xid
String xid = RootContext.getXID();
if (xid != null) {
//绑定xid
this.statementProxy.getConnectionProxy().bind(xid);
}
//将事务绑定上全局锁
this.statementProxy.getConnectionProxy().setGlobalLockRequire(RootContext.requireGlobalLock());
return this.doExecute(args);
}
protected T executeAutoCommitTrue(Object[] args) throws Throwable {
ConnectionProxy connectionProxy = this.statementProxy.getConnectionProxy();
Object var3;
try {
//将事务改为手动提交
connectionProxy.changeAutoCommit();
var3 = (new AbstractDMLBaseExecutor.LockRetryPolicy(connectionProxy)).execute(() -> {
//执行非自动提交
T result = this.executeAutoCommitFalse(args);
connectionProxy.commit();
return result;
});
} catch (Exception var7) {
LOGGER.error("execute executeAutoCommitTrue error:{}", var7.getMessage(), var7);
if (!AbstractDMLBaseExecutor.LockRetryPolicy.isLockRetryPolicyBranchRollbackOnConflict()) {
//报错将事务进行回滚
connectionProxy.getTargetConnection().rollback();
}
throw var7;
} finally {
connectionProxy.getContext().reset();
//将自动提交置为true
connectionProxy.setAutoCommit(true);
}
return var3;
}
protected T executeAutoCommitFalse(Object[] args) throws Exception {
if (!"mysql".equalsIgnoreCase(this.getDbType()) && this.isMultiPk()) {
throw new NotSupportYetException("multi pk only support mysql!");
} else {
//设置前置镜像
TableRecords beforeImage = this.beforeImage();
//执行业务Sql
T result = this.statementCallback.execute(this.statementProxy.getTargetStatement(), args);
int updateCount = this.statementProxy.getUpdateCount();
if (updateCount > 0) {
//执行后置镜像
TableRecords afterImage = this.afterImage(beforeImage);
this.prepareUndoLog(beforeImage, afterImage);
}
return result;
}
}
执行提交
public void commit() throws SQLException {
try {
this.lockRetryPolicy.execute(() -> {
this.doCommit();
return null;
});
} catch (SQLException var2) {
if (this.targetConnection != null && !this.getAutoCommit() && !this.getContext().isAutoCommitChanged()) {
this.rollback();
}
throw var2;
} catch (Exception var3) {
throw new SQLException(var3);
}
}
执行事务的提交
private void doCommit() throws SQLException {
if (this.context.inGlobalTransaction()) {
//如果是全局事务就执行此方法
this.processGlobalTransactionCommit();
} else if (this.context.isGlobalLockRequire()) {
//执行全局锁的事务提交
this.processLocalCommitWithGlobalLocks();
} else {
//其他
this.targetConnection.commit();
}
}
组装请求数据,发送后端
public Long branchRegister(BranchType branchType, String resourceId, String clientId, String xid, String applicationData, String lockKeys) throws TransactionException {
try {
BranchRegisterRequest request = new BranchRegisterRequest();
request.setXid(xid);
request.setLockKey(lockKeys);
request.setResourceId(resourceId);
request.setBranchType(branchType);
request.setApplicationData(applicationData);
BranchRegisterResponse response = (BranchRegisterResponse)RmNettyRemotingClient.getInstance().sendSyncRequest(request);
if (response.getResultCode() == ResultCode.Failed) {
throw new RmTransactionException(response.getTransactionExceptionCode(), String.format("Response[ %s ]", response.getMsg()));
} else {
return response.getBranchId();
}
} catch (TimeoutException var9) {
throw new RmTransactionException(TransactionExceptionCode.IO, "RPC Timeout", var9);
} catch (RuntimeException var10) {
throw new RmTransactionException(TransactionExceptionCode.BranchRegisterFailed, "Runtime", var10);
}
}
分支事务服务端的执行
我们首先看一下分支事务服务端注册的入口DefaultCoordinator#
@Override
public AbstractResultMessage onRequest(AbstractMessage request, RpcContext context) {
//判断请求是否来自于事务的客户端
if (!(request instanceof AbstractTransactionRequestToTC)) {
throw new IllegalArgumentException();
}
AbstractTransactionRequestToTC transactionRequest = (AbstractTransactionRequestToTC) request;
transactionRequest.setTCInboundHandler(this);
return transactionRequest.handle(context);
}
我们的核心分支事务注册代码
@Override
public BranchRegisterResponse handle(BranchRegisterRequest request, final RpcContext rpcContext) {
//创建返回的实例
BranchRegisterResponse response = new BranchRegisterResponse();
exceptionHandleTemplate(new AbstractCallback<BranchRegisterRequest, BranchRegisterResponse>() {
@Override
public void execute(BranchRegisterRequest request, BranchRegisterResponse response)
throws TransactionException {
try {
//核心分支注册实例
doBranchRegister(request, response, rpcContext);
} catch (StoreException e) {
throw new TransactionException(TransactionExceptionCode.FailedStore, String
.format("branch register request failed. xid=%s, msg=%s", request.getXid(), e.getMessage()), e);
}
}
}, request, response);
return response;
}
@Override
public Long branchRegister(BranchType branchType, String resourceId, String clientId, String xid,
String applicationData, String lockKeys) throws TransactionException {
// key1:获取GlobalSession
GlobalSession globalSession = assertGlobalSessionNotNull(xid, false);
return SessionHolder.lockAndExecute(globalSession, () -> {
// 检查事务状态
globalSessionStatusCheck(globalSession);
globalSession.addSessionLifecycleListener(SessionHolder.getRootSessionManager());
//key2: 创建分支会话
BranchSession branchSession = SessionHelper.newBranchByGlobal(globalSession, branchType, resourceId,
applicationData, lockKeys, clientId);
MDC.put(RootContext.MDC_KEY_BRANCH_ID, String.valueOf(branchSession.getBranchId()));
//key3:增加分支事务锁
branchSessionLock(globalSession, branchSession);
try {
//key4: 全局会话添加分支会话
globalSession.addBranch(branchSession);
} catch (RuntimeException ex) {
// key5: 出现异常释放锁
branchSessionUnlock(branchSession);
throw new BranchTransactionException(FailedToAddBranch, String
.format("Failed to store branch xid = %s branchId = %s", globalSession.getXid(),
branchSession.getBranchId()), ex);
}
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Register branch successfully, xid = {}, branchId = {}, resourceId = {} ,lockKeys = {}",
globalSession.getXid(), branchSession.getBranchId(), resourceId, lockKeys);
}
// key6: 返回分支会话的分支ID
return branchSession.getBranchId();
});
}
创建封装分支事务的信息
public static BranchSession newBranchByGlobal(GlobalSession globalSession, BranchType branchType, String resourceId,
String applicationData, String lockKeys, String clientId) {
BranchSession branchSession = new BranchSession();
branchSession.setXid(globalSession.getXid());
branchSession.setTransactionId(globalSession.getTransactionId());
branchSession.setBranchId(UUIDGenerator.generateUUID());
branchSession.setBranchType(branchType);
branchSession.setResourceId(resourceId);
branchSession.setLockKey(lockKeys);
branchSession.setClientId(clientId);
branchSession.setApplicationData(applicationData);
return branchSession;
}
分支事务加锁
protected void branchSessionLock(GlobalSession globalSession, BranchSession branchSession)
throws TransactionException {
//分支事务的参数校验
String applicationData = branchSession.getApplicationData();
boolean autoCommit = true;
boolean skipCheckLock = false;
if (StringUtils.isNotBlank(applicationData)) {
if (objectMapper == null) {
objectMapper = new ObjectMapper();
}
try {
Map<String, Object> data = objectMapper.readValue(applicationData, HashMap.class);
Object clientAutoCommit = data.get(AUTO_COMMIT);
if (clientAutoCommit != null && !(boolean)clientAutoCommit) {
autoCommit = (boolean)clientAutoCommit;
}
Object clientSkipCheckLock = data.get(SKIP_CHECK_LOCK);
if (clientSkipCheckLock instanceof Boolean) {
skipCheckLock = (boolean)clientSkipCheckLock;
}
} catch (IOException e) {
LOGGER.error("failed to get application data: {}", e.getMessage(), e);
}
}
try {
// 增加分支锁,如果返回false加锁失败我们直接抛出异常
if (!branchSession.lock(autoCommit, skipCheckLock)) {
throw new BranchTransactionException(LockKeyConflict,
String.format("Global lock acquire failed xid = %s branchId = %s", globalSession.getXid(),
branchSession.getBranchId()));
}
} catch (StoreException e) {
if (e.getCause() instanceof BranchTransactionException) {
throw new BranchTransactionException(((BranchTransactionException)e.getCause()).getCode(),
String.format("Global lock acquire failed xid = %s branchId = %s", globalSession.getXid(),
branchSession.getBranchId()));
}
throw e;
}
}
public boolean lock(boolean autoCommit, boolean skipCheckLock) throws TransactionException {
// 必须还AT事务
if (this.getBranchType().equals(BranchType.AT)) {
return LockerManagerFactory.getLockManager().acquireLock(this, autoCommit, skipCheckLock);
}
return true;
}
public boolean acquireLock(BranchSession branchSession, boolean autoCommit, boolean skipCheckLock) throws TransactionException {
if (branchSession == null) {
throw new IllegalArgumentException("branchSession can't be null for memory/file locker.");
}
// 获取分支锁的key
String lockKey = branchSession.getLockKey();
if (StringUtils.isNullOrEmpty(lockKey)) {
// no lock
return true;
}
// get locks of branch
// 行锁收集
List<RowLock> locks = collectRowLocks(branchSession);
if (CollectionUtils.isEmpty(locks)) {
// no lock
return true;
}
// 进行存储
return getLocker(branchSession).acquireLock(locks, autoCommit, skipCheckLock);
}
添加分支事务
private void writeSession(LogOperation logOperation, SessionStorable sessionStorable) throws TransactionException {
// 将事务信息写入数据库
if (!transactionStoreManager.writeSession(logOperation, sessionStorable)) {
if (LogOperation.GLOBAL_ADD.equals(logOperation)) {
throw new GlobalTransactionException(TransactionExceptionCode.FailedWriteSession,
"Fail to store global session");
} else if (LogOperation.GLOBAL_UPDATE.equals(logOperation)) {
throw new GlobalTransactionException(TransactionExceptionCode.FailedWriteSession,
"Fail to update global session");
} else if (LogOperation.GLOBAL_REMOVE.equals(logOperation)) {
throw new GlobalTransactionException(TransactionExceptionCode.FailedWriteSession,
"Fail to remove global session");
} else if (LogOperation.BRANCH_ADD.equals(logOperation)) {
throw new BranchTransactionException(TransactionExceptionCode.FailedWriteSession,
"Fail to store branch session");
} else if (LogOperation.BRANCH_UPDATE.equals(logOperation)) {
throw new BranchTransactionException(TransactionExceptionCode.FailedWriteSession,
"Fail to update branch session");
} else if (LogOperation.BRANCH_REMOVE.equals(logOperation)) {
throw new BranchTransactionException(TransactionExceptionCode.FailedWriteSession,
"Fail to remove branch session");
} else {
throw new BranchTransactionException(TransactionExceptionCode.FailedWriteSession,
"Unknown LogOperation:" + logOperation.name());
}
}
}
如果发生异常
@Override
public boolean unlock() throws TransactionException {
if (this.getBranchType() == BranchType.AT) {
// 释放锁
return LockerManagerFactory.getLockManager().releaseLock(this);
}
return true;
}
public boolean releaseLock(BranchSession branchSession) throws TransactionException {
if (branchSession == null) {
throw new IllegalArgumentException("branchSession can't be null for memory/file locker.");
}
List<RowLock> locks = collectRowLocks(branchSession);
try {
// 释放锁
return getLocker(branchSession).releaseLock(locks);
} catch (Exception t) {
LOGGER.error("unLock error, branchSession:{}", branchSession, t);
return false;
}
}
public boolean releaseLock(List<RowLock> locks) {
if (CollectionUtils.isEmpty(locks)) {
// no lock
return true;
}
try {
return lockStore.unLock(convertToLockDO(locks));
} catch (StoreException e) {
throw e;
} catch (Exception t) {
LOGGER.error("unLock error, locks:{}", CollectionUtils.toString(locks), t);
return false;
}
}
将数据库的锁删除
private static final String BATCH_DELETE_LOCK_SQL = "delete from " + LOCK_TABLE_PLACE_HOLD
+ " where " + ServerTableColumnsName.LOCK_TABLE_XID + " = ? and (" + LOCK_TABLE_PK_WHERE_CONDITION_PLACE_HOLD + ") ";