canal调度控制器初始化:
public CanalController(final Properties properties)
1. 初始化instance公共全局配置: canal.instance.global.(mode、lazy、manager.address以及spring.xml,并放入内存 InstanceConfig globalInstanceConfig;
canal.instance.global.mode: 用于确定canal instance的全局配置加载方式,主要有两种方式一种是spring本地模式,一种是admin管理端配置, 如果指定了manager地址(canal.admin.manager),则强制使用manager模式,spring模式的是SpringCanalInstanceGenerator,通过参数化将instance名称带入路径classpath:${canal.instance.destination:}/instance.properties,获取本地resource目录下配置文件
private InstanceConfig initGlobalConfig(Properties properties) {
.......
}
2. 初始化每个instance独有的配置,通过destinations配置多个instance,并放入内存Map<String, InstanceConfig> instanceConfigs;
private void initInstanceConfig(Properties properties) {
// 对instance进行逗号分割 存在多个instance的情况下
String destinationStr = getDestinations(properties);
String[] destinations = StringUtils.split(destinationStr, CanalConstants.CANAL_DESTINATION_SPLIT);
for (String destination : destinations) {
// 可针对instance进行单独配置 mode 、Spring、lazy
InstanceConfig config = parseInstanceConfig(properties, destination);
InstanceConfig oldConfig = instanceConfigs.put(destination, config);
if (oldConfig != null) {
logger.warn("destination:{} old config:{} has replace by new config:{}", destination, oldConfig, config);
}
}
}
3. 初始化canal server,server主要有两种方式,官方描述如下:
我们这里使用的是独立部署,会使用两种,均是单利模式,CanalServerWithNetty会将客户端请求派给CanalServerWithEmbeded 进行真正的处理,CanalServerWithNetty接收来自canal client的请求。
// 嵌入式服务实例化
embeddedCanalServer = CanalServerWithEmbedded.instance();
embeddedCanalServer.setCanalInstanceGenerator(instanceGenerator);// 设置自定义的instanceGenerator
.....此处代码省略
// canal.withoutNetty 配置
String canalWithoutNetty = getProperty(properties, CanalConstants.CANAL_WITHOUT_NETTY);
if (canalWithoutNetty == null || "false".equals(canalWithoutNetty)) {
// server 初始化
canalServer = CanalServerWithNetty.instance();
canalServer.setIp(ip);
canalServer.setPort(port);
}
4. 初始化canal server集群模式,根据canal.zkServers配置的zk地址,是否走HA模式, 并进行初始化目录,创建永久节点(集群节点以及instance节点)
// zk 集群地址
final String zkServers = getProperty(properties, CanalConstants.CANAL_ZKSERVERS);
if (StringUtils.isNotEmpty(zkServers)) {
zkclientx = ZkClientx.getZkClient(zkServers);
// 初始化系统目录 /otter/canal/destinations
zkclientx.createPersistent(ZookeeperPathUtils.DESTINATION_ROOT_NODE, true);
// /otter/canal/cluster 整个集群信息
zkclientx.createPersistent(ZookeeperPathUtils.CANAL_CLUSTER_ROOT_NODE, true);
}
每个canal-server对每个instance的管理是交给ServerRunningMonitor类,监控运行状态,有变更的时候会进行相应的变更处理。只有当后面开启自动化扫描,才会进行初始化,每个instance对应一个ServerRunningMonitor。
//ServerRunningMonitors.getRunningMonitor被调用的时候,先对map进行查找,没有的话进行以下
//代码生成 ServerRunningMonitors.setRunningMonitors(MigrateMap.makeComputingMap((Function<String, ServerRunningMonitor>) destination -> {
ServerRunningMonitor runningMonitor = new ServerRunningMonitor(serverData);
runningMonitor.setDestination(destination);
// 回调所做的事情 启动前以及启动后
runningMonitor.setListener(new ServerRunningListener() {
public void processActiveEnter() {...}
public void processActiveExit() {...}
public void processStart() {...}
public void processStop() {...}
});
if (zkclientx != null) {
runningMonitor.setZkClient(zkclientx);
}
// 触发创建一下cid节点
runningMonitor.init();
return runningMonitor;
}));
5. 初始化instance自动化扫描,通过配置canal.auto.scan=true 进行开启自动化扫描。
defaultAction:其作用是如果配置发生了变更,默认应该采取什么样的操作。实现了InstanceAction
接口定义的三个抽象方法:start、stop和reload。当新增一个destination配置时,需要调用start方法来启动一个新instance,当移除一个destination配置时,需要调用stop方法来停止当前instance;当某个destination配置发生变更时,需要调用reload方法来进行重启。instanceConfigMonitors: 监听配置变更,Spring模式定时扫描默认是user.dir + conf目录组成,manager模式通过PlainCanalConfigClient http 方式获取admin端管理的配置。
//当instance发现变更的时候,默认应该采取什么样的操作
autoScan = BooleanUtils.toBoolean(getProperty(properties, CanalConstants.CANAL_AUTO_SCAN));
if (autoScan) {
defaultAction = new InstanceAction() {
public void start(String destination) {...}
public void stop(String destination) {...}
public void reload(String destination) {...}
@Overrid
public void release(String destination) {....}
}
};
// 监听配置
instanceConfigMonitors = MigrateMap.makeComputingMap(mode -> {
//扫描间隔时间
int scanInterval = Integer.valueOf(getProperty(properties,
CanalConstants.CANAL_AUTO_SCAN_INTERVAL,
"5"));
// spring 模式
if (mode.isSpring()) {
SpringInstanceConfigMonitor monitor = new SpringInstanceConfigMonitor();
monitor.setScanIntervalInSecond(scanInterval);
monitor.setDefaultAction(defaultAction);
// 设置conf目录,默认是user.dir + conf目录组成
String rootDir = getProperty(properties, CanalConstants.CANAL_CONF_DIR);
if (StringUtils.isEmpty(rootDir)) {
rootDir = "../conf";
}
if (StringUtils.equals("otter-canal", System.getProperty("appName"))) {
monitor.setRootConf(rootDir);
} else {
// eclipse debug模式
monitor.setRootConf("src/main/resources/");
}
return monitor;
// admin 模式
} else if (mode.isManager()) {
ManagerInstanceConfigMonitor monitor = new ManagerInstanceConfigMonitor();
monitor.setScanIntervalInSecond(scanInterval);
monitor.setDefaultAction(defaultAction);
String managerAddress = getProperty(properties, CanalConstants.CANAL_ADMIN_MANAGER);
monitor.setConfigClient(getManagerClient(managerAddress));
return monitor;
} else {
throw new UnsupportedOperationException("unknow mode :" + mode + " for monitor");
}
}
});
}
canal调度控制器start():
public void start() throws Throwable {
logger.info("## start the canal server[{}({}):{}]", ip, registerIp, port);
// 创建整个canal的工作节点
// /otter/canal/cluster/ip:port
final String path = ZookeeperPathUtils.getCanalClusterNode(registerIp + ":" + port);
// 创建临时节点
initCid(path);
if (zkclientx != null) {
this.zkclientx.subscribeStateChanges(new IZkStateListener() {
public void handleStateChanged(KeeperState state) throws Exception { }
// 新建立连接
public void handleNewSession() throws Exception {
initCid(path);
}
@Override
public void handleSessionEstablishmentError(Throwable error) throws Exception {
logger.error("failed to connect to zookeeper", error);
}
});
}
// 优先启动embedded服务
embeddedCanalServer.start();
// 尝试启动一下非lazy状态的通道
for (Map.Entry<String, InstanceConfig> entry : instanceConfigs.entrySet()) {
final String destination = entry.getKey();
InstanceConfig config = entry.getValue();
// 创建destination的工作节点
if (!embeddedCanalServer.isStart(destination)) {
// HA机制启动 创建监听,有就获取,没有就新增
ServerRunningMonitor runningMonitor = ServerRunningMonitors.getRunningMonitor(destination);
if (!config.getLazy() && !runningMonitor.isStart()) {
runningMonitor.start();
}
}
if (autoScan) {
instanceConfigMonitors.get(config.getMode()).register(destination, defaultAction);
}
}
//启动配置文件自动检测机制
if (autoScan) {
instanceConfigMonitors.get(globalInstanceConfig.getMode()).start();
for (InstanceConfigMonitor monitor : instanceConfigMonitors.values()) {
if (!monitor.isStart()) {
// 启动monitor
monitor.start();
}
}
}
// 启动网络接口
if (canalServer != null) {
// 启动网络接口,监听客户端请求
canalServer.start();
}
}