自动装配
Springboot启动的时候会将下面这几个类进行自动装配
SeataRestTemplateAutoConfiguration(装载拦截器)
这里会装配 SeataRestTemplateInterceptor (Seata的拦截器)
@Configuration(proxyBeanMethods = false)
public class SeataRestTemplateAutoConfiguration {
@Bean
public SeataRestTemplateInterceptor seataRestTemplateInterceptor() {
return new SeataRestTemplateInterceptor();
}
@Bean
public SeataRestTemplateInterceptorAfterPropertiesSet seataRestTemplateInterceptorConfiguration() {
return new SeataRestTemplateInterceptorAfterPropertiesSet();
}
}
在bean初始化的时候会将拦截器装配到 RestTemplate 中
public class SeataRestTemplateInterceptorAfterPropertiesSet implements InitializingBean {
@Autowired(required = false)
private Collection<RestTemplate> restTemplates;
@Autowired
private SeataRestTemplateInterceptor seataRestTemplateInterceptor;
@Override
public void afterPropertiesSet() {
if (this.restTemplates != null) {
for (RestTemplate restTemplate : restTemplates) {
List<ClientHttpRequestInterceptor> interceptors = new ArrayList<>(
restTemplate.getInterceptors());
interceptors.add(this.seataRestTemplateInterceptor);
restTemplate.setInterceptors(interceptors);
}
}
}
}
这样每次调用rest接口的时候都会走到拦截器中进行逻辑处理
@Override
public ClientHttpResponse intercept(HttpRequest httpRequest, byte[] bytes,
ClientHttpRequestExecution clientHttpRequestExecution) throws IOException {
// 1、对httpRequest做一个包装
HttpRequestWrapper requestWrapper = new HttpRequestWrapper(httpRequest);
//2、从本地事务上下文(ThreadLocal)中获取全局事务xid
String xid = RootContext.getXID();
// 如果可以从本地事务上下文中获取到全局事务xid,则将其添加到httpRequest的请求头上;
if (!StringUtils.isEmpty(xid)) {
requestWrapper.getHeaders().add(RootContext.KEY_XID, xid);
}
//执行http请求
return clientHttpRequestExecution.execute(requestWrapper, bytes);
}
SeataHandlerInterceptorConfiguration
启动的时候添加一个拦截器 拦截针对的路径为:/**
@ConditionalOnWebApplication
public class SeataHandlerInterceptorConfiguration implements WebMvcConfigurer {
@Override
public void addInterceptors(InterceptorRegistry registry) {
registry.addInterceptor(new SeataHandlerInterceptor()).addPathPatterns("/**");
}
}
SeataHandlerInterceptor
XID是全局事务统一的标识
preHandle方法:执行请求之前,从本地事务上下文(线程本地变量ThreadLocal)中获取全局事务xid;从请求头中获取全局事务xid(rpcXid);如果xid不存在,但是rpcXid存在,则将rpcXid作为xid绑定到全局事务上下文(线程本地变量ThreadLocal)上;
afterCompletion方法:请求执行完毕之后;如果全局事务上下文中xid存在,但是请求头中rpcXid不存在,则不对xid进行处理,
但是如果全局事务上下文中xid存在 并且 请求头中包括rpcXid,则将xid从全局事务上下文中移除,如果移除的xid和rpcXid不相等,则将rpcXid作为xid绑定到全局事务上下文中。
@Override
public boolean preHandle(HttpServletRequest request, HttpServletResponse response,
Object handler) {
String xid = RootContext.getXID();
String rpcXid = request.getHeader(RootContext.KEY_XID);
if (log.isDebugEnabled()) {
log.debug("xid in RootContext {} xid in RpcContext {}", xid, rpcXid);
}
if (xid == null && rpcXid != null) {
RootContext.bind(rpcXid);
if (log.isDebugEnabled()) {
log.debug("bind {} to RootContext", rpcXid);
}
}
return true;
}
@Override
public void afterCompletion(HttpServletRequest request, HttpServletResponse response,
Object handler, Exception e) {
String rpcXid = request.getHeader(RootContext.KEY_XID);
if (StringUtils.isEmpty(rpcXid)) {
return;
}
String unbindXid = RootContext.unbind();
if (log.isDebugEnabled()) {
log.debug("unbind {} from RootContext", unbindXid);
}
if (!rpcXid.equalsIgnoreCase(unbindXid)) {
log.warn("xid in change during RPC from {} to {}", rpcXid, unbindXid);
if (unbindXid != null) {
RootContext.bind(unbindXid);
log.warn("bind {} back to RootContext", unbindXid);
}
}
}
SeataFeignClientAutoConfiguration
处理Feign调用
这三个build最后都是build一个SeataFeignClient用来处理
在通过OpenFeign指定请求之前,也是会从事务上下文中获取xid,然后将其放到请求头中;
@Override
public Response execute(Request request, Request.Options options) throws IOException {
Request modifiedRequest = getModifyRequest(request);
return this.delegate.execute(modifiedRequest, options);
}
private Request getModifyRequest(Request request) {
//获取全局事务ID
String xid = RootContext.getXID();
//如果是空直接返回
if (StringUtils.isEmpty(xid)) {
return request;
}
Map<String, Collection<String>> headers = new HashMap<>(MAP_SIZE);
headers.putAll(request.headers());
List<String> seataXid = new ArrayList<>();
seataXid.add(xid);
headers.put(RootContext.KEY_XID, seataXid);
//设置全局事务id到请求头中组装新的请求
return Request.create(request.method(), request.url(), headers, request.body(),
request.charset());
}
SeataAutoConfiguration
@Bean(BEAN_NAME_SPRING_APPLICATION_CONTEXT_PROVIDER)
@ConditionalOnMissingBean(name = {BEAN_NAME_SPRING_APPLICATION_CONTEXT_PROVIDER})
public SpringApplicationContextProvider springApplicationContextProvider() {
return new SpringApplicationContextProvider();
}
//异常处理
@Bean(BEAN_NAME_FAILURE_HANDLER)
@ConditionalOnMissingBean(FailureHandler.class)
public FailureHandler failureHandler() {
return new DefaultFailureHandlerImpl();
}
//全局事务管理器
@Bean
@DependsOn({BEAN_NAME_SPRING_APPLICATION_CONTEXT_PROVIDER, BEAN_NAME_FAILURE_HANDLER})
@ConditionalOnMissingBean(GlobalTransactionScanner.class)
public GlobalTransactionScanner globalTransactionScanner(SeataProperties seataProperties, FailureHandler failureHandler) {
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Automatically configure Seata");
}
return new GlobalTransactionScanner(seataProperties.getApplicationId(), seataProperties.getTxServiceGroup(), failureHandler);
}
//Seata数据源代理
@Bean(BEAN_NAME_SEATA_AUTO_DATA_SOURCE_PROXY_CREATOR)
@ConditionalOnProperty(prefix = StarterConstants.SEATA_PREFIX, name = {"enableAutoDataSourceProxy", "enable-auto-data-source-proxy"}, havingValue = "true", matchIfMissing = true)
@ConditionalOnMissingBean(SeataAutoDataSourceProxyCreator.class)
public SeataAutoDataSourceProxyCreator seataAutoDataSourceProxyCreator(SeataProperties seataProperties) {
return new SeataAutoDataSourceProxyCreator(seataProperties.isUseJdkProxy(),seataProperties.getExcludesForAutoProxying());
}
初始化全局事务管理器
GlobalTransactionScanner 实现了InitializingBean接口在bean初始化的时候会调用到afterPropertiesSet方法中进行Seata客户端的建立 用来和Seata服务端建立连接
@Override
public void afterPropertiesSet() {
if (disableGlobalTransaction) {
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Global transaction is disabled.");
}
return;
}
//初始化客户端
initClient();
}
private void initClient() {
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Initializing Global Transaction Clients ... ");
}
if (StringUtils.isNullOrEmpty(applicationId) || StringUtils.isNullOrEmpty(txServiceGroup)) {
throw new IllegalArgumentException(String.format("applicationId: %s, txServiceGroup: %s", applicationId, txServiceGroup));
}
//初始化TM(事务管理者)
TMClient.init(applicationId, txServiceGroup);
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Transaction Manager Client is initialized. applicationId[{}] txServiceGroup[{}]", applicationId, txServiceGroup);
}
//初始化RM(资源管理者)
RMClient.init(applicationId, txServiceGroup);
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Resource Manager is initialized. applicationId[{}] txServiceGroup[{}]", applicationId, txServiceGroup);
}
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Global Transaction Clients are initialized. ");
}
//注册关闭钩子
registerSpringShutdownHook();
}
初始化TM(事务管理者)
public static void init(String applicationId, String transactionServiceGroup) {
//创建TM
TmNettyRemotingClient tmNettyRemotingClient = TmNettyRemotingClient.getInstance(applicationId, transactionServiceGroup);
//初始化并启动TM
tmNettyRemotingClient.init();
}
创建TM
public static TmNettyRemotingClient getInstance(String applicationId, String transactionServiceGroup, String accessKey, String secretKey) {
TmNettyRemotingClient tmRpcClient = getInstance();
//应用id
tmRpcClient.setApplicationId(applicationId);
//事务分组
tmRpcClient.setTransactionServiceGroup(transactionServiceGroup);
tmRpcClient.setAccessKey(accessKey);
tmRpcClient.setSecretKey(secretKey);
return tmRpcClient;
}
public static TmNettyRemotingClient getInstance() {
if (instance == null) {
synchronized (TmNettyRemotingClient.class) {
if (instance == null) {
NettyClientConfig nettyClientConfig = new NettyClientConfig();
final ThreadPoolExecutor messageExecutor = new ThreadPoolExecutor(
nettyClientConfig.getClientWorkerThreads(), nettyClientConfig.getClientWorkerThreads(),
KEEP_ALIVE_TIME, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(MAX_QUEUE_SIZE),
new NamedThreadFactory(nettyClientConfig.getTmDispatchThreadPrefix(),
nettyClientConfig.getClientWorkerThreads()),
RejectedPolicies.runsOldestTaskPolicy());
instance = new TmNettyRemotingClient(nettyClientConfig, null, messageExecutor);
}
}
}
return instance;
}
初始化TM
@Override
public void init() {
// registry processor
registerProcessor();
if (initialized.compareAndSet(false, true)) {
super.init();
}
}
@Override
public void init() {
//启动一个延时60s 每隔10秒对Seata Server 发起一个重新连接请求
timerExecutor.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
//和 Seata Server重新连接
clientChannelManager.reconnect(getTransactionServiceGroup());
}
}, SCHEDULE_DELAY_MILLS, SCHEDULE_INTERVAL_MILLS, TimeUnit.MILLISECONDS);
//是否启动客户端批量发送请求
if (NettyClientConfig.isEnableClientBatchSendRequest()) {
mergeSendExecutorService = new ThreadPoolExecutor(MAX_MERGE_SEND_THREAD,
MAX_MERGE_SEND_THREAD,
KEEP_ALIVE_TIME, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(),
new NamedThreadFactory(getThreadPrefix(), MAX_MERGE_SEND_THREAD));
mergeSendExecutorService.submit(new MergedSendRunnable());
}
//每隔3秒执行一次定时任务 做超时检查
super.init();
//启动Netty客户端组件
clientBootstrap.start();
}
初始化RM(资源管理者)
public static void init(String applicationId, String transactionServiceGroup) {
//创建RM
RmNettyRemotingClient rmNettyRemotingClient = RmNettyRemotingClient.getInstance(applicationId, transactionServiceGroup);
rmNettyRemotingClient.setResourceManager(DefaultResourceManager.get());
rmNettyRemotingClient.setTransactionMessageHandler(DefaultRMHandler.get());
//初始化并启动RM
rmNettyRemotingClient.init();
}
初始化数据库连接
//Seata数据源代理
@Bean(BEAN_NAME_SEATA_AUTO_DATA_SOURCE_PROXY_CREATOR)
@ConditionalOnProperty(prefix = StarterConstants.SEATA_PREFIX, name = {"enableAutoDataSourceProxy", "enable-auto-data-source-proxy"}, havingValue = "true", matchIfMissing = true)
@ConditionalOnMissingBean(SeataAutoDataSourceProxyCreator.class)
public SeataAutoDataSourceProxyCreator seataAutoDataSourceProxyCreator(SeataProperties seataProperties) {
return new SeataAutoDataSourceProxyCreator(seataProperties.isUseJdkProxy(),seataProperties.getExcludesForAutoProxying());
}
SeataAutoDataSourceProxyCreator
private final Advisor advisor = new DefaultIntroductionAdvisor(new SeataAutoDataSourceProxyAdvice());
@Override
protected Object[] getAdvicesAndAdvisorsForBean(Class<?> beanClass, String beanName, TargetSource customTargetSource) throws BeansException {
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Auto proxy of [{}]", beanName);
}
//这里会构建一个SeataAutoDataSourceProxyAdvice的代理类 然后当去进行增强的时候会走到SeataAutoDataSourceProxyAdvice的invoke方法
return new Object[]{advisor};
}
SeataAutoDataSourceProxyAdvice
@Override
public Object invoke(MethodInvocation invocation) throws Throwable {
//添加数据源
DataSourceProxy dataSourceProxy = DataSourceProxyHolder.get().putDataSource((DataSource) invocation.getThis());
Method method = invocation.getMethod();
Object[] args = invocation.getArguments();
Method m = BeanUtils.findDeclaredMethod(DataSourceProxy.class, method.getName(), method.getParameterTypes());
if (m != null) {
return m.invoke(dataSourceProxy, args);
} else {
return invocation.proceed();
}
}
这里会去做添加数据源信息 因为这里new了一个 DataSourceProxy 然后会到 DataSourceProxy的构造方法进行数据源的初始化
public DataSourceProxy putDataSource(DataSource dataSource) {
return this.dataSourceProxyMap.computeIfAbsent(dataSource, DataSourceProxy::new);
}
DataSourceProxy
public DataSourceProxy(DataSource targetDataSource, String resourceGroupId) {
super(targetDataSource);
init(targetDataSource, resourceGroupId);
}
private void init(DataSource dataSource, String resourceGroupId) {
this.resourceGroupId = resourceGroupId;
//读取jdbc 等信息
try (Connection connection = dataSource.getConnection()) {
//jdbcURL信息
jdbcUrl = connection.getMetaData().getURL();
//db类型
dbType = JdbcUtils.getDbType(jdbcUrl);
if (JdbcConstants.ORACLE.equals(dbType)) {
userName = connection.getMetaData().getUserName();
}
} catch (SQLException e) {
throw new IllegalStateException("can not init dataSource", e);
}
//将资源注册到TC
DefaultResourceManager.get().registerResource(this);
if (ENABLE_TABLE_META_CHECKER_ENABLE) {
tableMetaExcutor.scheduleAtFixedRate(() -> {
try (Connection connection = dataSource.getConnection()) {
TableMetaCacheFactory.getTableMetaCache(DataSourceProxy.this.getDbType())
.refresh(connection, DataSourceProxy.this.getResourceId());
} catch (Exception ignore) {
}
}, 0, TABLE_META_CHECKER_INTERVAL, TimeUnit.MILLISECONDS);
}
}
RM和TM的注册
可以看到在DataSource初始化完成之后调用 DefaultResourceManager.get().registerResource(this);会去做一个资源的注册 这里最后会走到
RmNettyRemotingClient和TmNettyRemotingClient的onRegisterMsgSuccess方法中进行RM和TM的注册
RM注册
@Override
public void onRegisterMsgSuccess(String serverAddress, Channel channel, Object response,
AbstractMessage requestMessage) {
RegisterRMRequest registerRMRequest = (RegisterRMRequest)requestMessage;
RegisterRMResponse registerRMResponse = (RegisterRMResponse)response;
if (LOGGER.isInfoEnabled()) {
LOGGER.info("register RM success. client version:{}, server version:{},channel:{}", registerRMRequest.getVersion(), registerRMResponse.getVersion(), channel);
}
getClientChannelManager().registerChannel(serverAddress, channel);
String dbKey = getMergedResourceKeys();
if (registerRMRequest.getResourceIds() != null) {
if (!registerRMRequest.getResourceIds().equals(dbKey)) {
sendRegisterMessage(serverAddress, channel, dbKey);
}
}
}
TM注册
@Override
public void onRegisterMsgSuccess(String serverAddress, Channel channel, Object response,
AbstractMessage requestMessage) {
RegisterTMRequest registerTMRequest = (RegisterTMRequest)requestMessage;
RegisterTMResponse registerTMResponse = (RegisterTMResponse)response;
if (LOGGER.isInfoEnabled()) {
LOGGER.info("register TM success. client version:{}, server version:{},channel:{}", registerTMRequest.getVersion(), registerTMResponse.getVersion(), channel);
}
getClientChannelManager().registerChannel(serverAddress, channel);
}
总结
1.客户端启动的时候会通过SpringBoot的自动装配机制加载配置类
2.设置RestTemplate的拦截器用来拦截所有mvc请求 和Feign的拦截器拦截Feign请求设置全局事务ID
3.初始化TM(事务管理者) RM(资源管理者) 建立Netty客户端的通道并启动
4.初始化数据源信息
5.注册TM 和RM到SeataServer