文章目录
-
- 概要
- 整体架构流程
- 技术名词解释
- 技术细节
- 小结
概要
MySqlConnectorTask,用于读取MySQL的二进制日志并生成对应的数据变更事件
整体架构流程
技术名词解释
数据库模式(Database Schema)
数据库模式是指数据库中数据的组织结构和定义,它描述了数据库中所有对象(如表、视图、索引、存储过程等)的结构和关系。具体来说,数据库模式包括以下几个方面:
1 表结构:定义了数据库中各个表的名称、列的名称、数据类型、约束条件(如主键、外键、唯一性约束等)。
2 关系:描述了表与表之间的关系,如一对多、多对多等。
3 索引:定义了表上的索引,用于提高查询性能。
4 视图:定义了虚拟表,这些虚拟表基于SQL查询结果,可以简化复杂的查询操作。
5 存储过程和函数:定义了数据库中的存储过程和函数,用于执行特定的业务逻辑。
6 触发器:定义了在特定事件发生时自动执行的操作。在 DatabaseHistory 接口中的应用
在 DatabaseHistory 接口中,数据库模式的变更记录和恢复功能主要用于以下场景:
1 记录变更:当数据库模式发生变化时(如添加新表、修改表结构、删除表等),通过 record 方法记录这些变更。
2 恢复:当需要恢复到某个历史点的数据库模式时,通过 recover 方法恢复到指定的历史状态。
通过这些功能,可以有效地管理和追踪数据库模式的变化,确保数据的一致性和完整性。
技术细节
@Override
public void start(Map<String, String> props) {
if (context == null) {
throw new ConnectException("Unexpected null context");
}
// Validate the configuration ...
final Configuration config = Configuration.from(props);
if (!config.validate(MySqlConnectorConfig.ALL_FIELDS, logger::error)) {
throw new ConnectException("Error configuring an instance of " + getClass().getSimpleName() + "; check the logs for details");
}
// Create and configure the database history ...
this.dbHistory = config.getInstance(MySqlConnectorConfig.DATABASE_HISTORY, DatabaseHistory.class);
if (this.dbHistory == null) {
throw new ConnectException("Unable to instantiate the database history class " +
config.getString(MySqlConnectorConfig.DATABASE_HISTORY));
}
Configuration dbHistoryConfig = config.subset(DatabaseHistory.CONFIGURATION_FIELD_PREFIX_STRING, false); // do not remove
// prefix
this.dbHistory.configure(dbHistoryConfig); // validates
this.dbHistory.start();
this.running.set(true);
// Read the configuration ...
final String user = config.getString(MySqlConnectorConfig.USER);
final String password = config.getString(MySqlConnectorConfig.PASSWORD);
final String host = config.getString(MySqlConnectorConfig.HOSTNAME);
final int port = config.getInteger(MySqlConnectorConfig.PORT);
final String initialBinLogFilename = config.getString(MySqlConnectorConfig.INITIAL_BINLOG_FILENAME);
final long serverId = config.getLong(MySqlConnectorConfig.SERVER_ID);
serverName = config.getString(MySqlConnectorConfig.SERVER_NAME.name(), host + ":" + port);
final boolean keepAlive = config.getBoolean(MySqlConnectorConfig.KEEP_ALIVE);
final int maxQueueSize = config.getInteger(MySqlConnectorConfig.MAX_QUEUE_SIZE);
final long timeoutInMilliseconds = config.getLong(MySqlConnectorConfig.CONNECTION_TIMEOUT_MS);
final boolean includeSchemaChanges = config.getBoolean(MySqlConnectorConfig.INCLUDE_SCHEMA_CHANGES);
final long pollIntervalMs = config.getLong(MySqlConnectorConfig.POLL_INTERVAL_MS);
maxBatchSize = config.getInteger(MySqlConnectorConfig.MAX_BATCH_SIZE);
metronome = Metronome.parker(pollIntervalMs, TimeUnit.MILLISECONDS, Clock.SYSTEM);
// Define the filter using the whitelists and blacklists for tables and database names ...
Predicate<TableId> tableFilter = TableId.filter(config.getString(MySqlConnectorConfig.DATABASE_WHITELIST),
config.getString(MySqlConnectorConfig.DATABASE_BLACKLIST),
config.getString(MySqlConnectorConfig.TABLE_WHITELIST),
config.getString(MySqlConnectorConfig.TABLE_BLACKLIST));
if (config.getBoolean(MySqlConnectorConfig.TABLES_IGNORE_BUILTIN)) {
Predicate<TableId> isBuiltin = (id) -> {
return BUILT_IN_DB_NAMES.contains(id.catalog().toLowerCase()) || BUILT_IN_TABLE_NAMES.contains(id.table().toLowerCase());
};
tableFilter = tableFilter.and(isBuiltin.negate());
}
// Create the queue ...
events = new LinkedBlockingDeque<>(maxQueueSize);
batchEvents = new ArrayDeque<>(maxBatchSize);
// Set up our handlers for specific kinds of events ...
tables = new Tables();
tableConverters = new TableConverters(topicSelector, dbHistory, includeSchemaChanges, tables, tableFilter);
eventHandlers.put(EventType.ROTATE, tableConverters::rotateLogs);
eventHandlers.put(EventType.TABLE_MAP, tableConverters::updateTableMetadata);
eventHandlers.put(EventType.QUERY, tableConverters::updateTableCommand);
eventHandlers.put(EventType.EXT_WRITE_ROWS, tableConverters::handleInsert);
eventHandlers.put(EventType.EXT_UPDATE_ROWS, tableConverters::handleUpdate);
eventHandlers.put(EventType.EXT_DELETE_ROWS, tableConverters::handleDelete);
// Set up the log reader ...
client = new BinaryLogClient(host, port, user, password);
client.setServerId(serverId);
client.setKeepAlive(keepAlive);
if (logger.isDebugEnabled()) client.registerEventListener(this::logEvent);
client.registerEventListener(this::enqueue);
client.registerLifecycleListener(traceLifecycleListener());
// Set up the event deserializer with additional types ...
EventDeserializer eventDeserializer = new EventDeserializer();
eventDeserializer.setEventDataDeserializer(EventType.STOP, new StopEventDataDeserializer());
client.setEventDeserializer(eventDeserializer);
// Check if we've already processed some of the log for this database ...
source.setServerName(serverName);
// Get the offsets for our partition ...
Map<String, ?> offsets = context.offsetStorageReader().offset(source.partition());
if (offsets != null) {
source.setOffset(offsets);
// And set the client to start from that point ...
client.setBinlogFilename(source.binlogFilename());
client.setBinlogPosition(source.binlogPosition());
// The event row number will be used when processing the first event ...
logger.info("Restarting MySQL connector '{}' from binlog file {}, position {}, and event row {}",
serverName, source.binlogFilename(), source.binlogPosition(), source.eventRowNumber());
// We have to make our Tables reflect the state of the database at the above source partition (e.g., the location
// in the MySQL log where we last stopped reading. Since the TableConverts writes out all DDL statements to the
// TopicSelector.getTopic(serverName) topic, we can consume that topic and apply each of the DDL statements
// to our Tables object. Each of those DDL messages is keyed by the database name, and contains a single string
// of DDL. However, we should consume no further than offset we recovered above.
try {
logger.info("Recovering MySQL connector '{}' database schemas from history stored in {}", serverName, dbHistory);
DdlParser ddlParser = new MySqlDdlParser();
dbHistory.recover(source.partition(), source.offset(), tables, ddlParser);
tableConverters.loadTables();
logger.debug("Recovered MySQL connector '{}' database schemas: {}", serverName, tables.subset(tableFilter));
} catch (Throwable t) {
throw new ConnectException("Failure while recovering database schemas", t);
}
} else {
// initializes this position, though it will be reset when we see the first event (should be a rotate event) ...
client.setBinlogFilename(initialBinLogFilename);
logger.info("Starting MySQL connector from beginning of binlog file {}, position {}",
source.binlogFilename(), source.binlogPosition());
}
// Start the log reader, which starts background threads ...
try {
logger.debug("Connecting to MySQL server");
client.connect(timeoutInMilliseconds);
logger.debug("Successfully connected to MySQL server and beginning to read binlog");
} catch (TimeoutException e) {
double seconds = TimeUnit.MILLISECONDS.toSeconds(timeoutInMilliseconds);
throw new ConnectException("Timed out after " + seconds + " seconds while waiting to connect to the MySQL database at " + host
+ ":" + port + " with user '" + user + "'", e);
} catch (AuthenticationException e) {
throw new ConnectException("Failed to authenticate to the MySQL database at " + host + ":" + port + " with user '" + user + "'",
e);
} catch (Throwable e) {
throw new ConnectException(
"Unable to connect to the MySQL database at " + host + ":" + port + " with user '" + user + "': " + e.getMessage(), e);
}
}
- 验证配置:从传入的属性中创建配置对象并验证其有效性。
- 创建数据库历史记录:根据配置实例化 DatabaseHistory 对象并启动。
- 读取配置参数:从配置中读取各种必要的参数,如用户名、密码、主机、端口等。
- 定义表过滤器:根据白名单和黑名单定义表过滤器,忽略内置表。
- 创建队列:初始化事件队列和批处理队列。
- 设置事件处理器:为不同的事件类型设置处理器。
- 设置日志读取器:创建并配置 BinaryLogClient,注册事件监听器和生命周期监听器。
- 设置事件反序列化器:配置事件反序列化器以处理特定类型的事件。
- 恢复数据库状态:检查是否有已处理的日志,如果有则恢复数据库模式。
- 连接到 MySQL 服务器:尝试连接到 MySQL 服务器并开始读取二进制日志。
小结
/**
* 该类负责配置和初始化MySQL连接器,包括设置数据库和表的过滤条件、创建事件队列、注册事件处理器、设置二进制日志客户端、恢复数据库模式等。
* 主要功能包括:
* - 应用数据库和表的黑白名单过滤条件。
* - 配置是否忽略内置表。
* - 创建事件队列和批处理事件队列。
* - 注册不同类型的事件处理器。
* - 初始化二进制日志客户端并设置相关参数。
* - 检查并恢复已处理的日志位置。
* - 连接到MySQL服务器并开始读取二进制日志。
*/