DataX 数据库同步部分源码解析

        在工作中遇到异构数据库同步的问题,从Oracle数据库同步数据到Postgres,其中的很多数据库表超过百万,并且包含空间字段。经过筛选,选择了开源的DataX+DataX Web作为基础框架。DataX 是阿里云的开源产品,大厂的产品值得信赖,而且,DataX在阿里巴巴集团内被广泛使用,承担了所有大数据的离线同步业务,并已持续稳定运行了6年之久,每天完成同步8w多道作业,每日传输数据量超过300TB,经过了时间、实践的检验。这里顺便分析一下源码,看看大厂的程序员是怎么实现数据库的快速全表查询、写入操作,怎么进行多线程管理的。

部分内容参见:        

        GitHub - alibaba/DataX: DataX是阿里云DataWorks数据集成的开源版本。        

         DataX/introduction.md at master · alibaba/DataX · GitHub

        DataX/dataxPluginDev.md at master · alibaba/DataX · GitHub

一、DataX介绍

        DataX 是阿里云 DataWorks数据集成 的开源版本,在阿里巴巴集团内被广泛使用的离线数据同步工具/平台。DataX 实现了包括 MySQL、Oracle、OceanBase、SqlServer、Postgre、HDFS、Hive、ADS、HBase、TableStore(OTS)、MaxCompute(ODPS)、Hologres、DRDS, databend 等各种异构数据源之间高效的数据同步功能。

​        DataX 是一个异构数据源离线同步工具,致力于实现包括关系型数据库(MySQL、Oracle等)、HDFS、Hive、ODPS、HBase、FTP等各种异构数据源之间稳定高效的数据同步功能。

datax_why_new

       为了解决异构数据源同步问题,DataX将复杂的网状的同步链路变成了星型数据链路,DataX作为中间传输载体负责连接各种数据源。当需要接入一个新的数据源的时候,只需要将此数据源对接到DataX,便能跟已有的数据源做到无缝数据同步。

二、源码解析(基于DataX v202309版本)

datax_framework_new

        DataX本身作为离线数据同步框架,采用Framework + plugin架构构建。将数据源读取和写入抽象成为Reader/Writer插件,纳入到整个同步框架中。

  • Reader:Reader为数据采集模块,负责采集数据源的数据,将数据发送给Framework。
  • Writer: Writer为数据写入模块,负责不断向Framework取数据,并将数据写入到目的端。
  • Framework:Framework用于连接reader和writer,作为两者的数据传输通道,并处理缓冲,流控,并发,数据转换等核心技术问题。

2.1 Reader 源码解析(基于OracleReader插件)

        以oraclereader插件为例,看一下Reader的代码。

        oraclereader插件包括Constant.java、OracleReader.java和OracleReaderErrorCode.java三个Java类。先关注一下OracleReader,OracleReader继承Reader基类,在其中,通过内部类Task实现读取数据库操作,将读取的数据交由框架处理。具体为CommonRdbmsReader.Task来实现。在代码中包含了commonRdbmsReaderTask的初始化及读取数据操作等内容。核心为this.commonRdbmsReaderTask.startRead。

public static class Task extends Reader.Task {

	private Configuration readerSliceConfig;
	private CommonRdbmsReader.Task commonRdbmsReaderTask;

	@Override
	public void init() {
		this.readerSliceConfig = super.getPluginJobConf();
		this.commonRdbmsReaderTask = new CommonRdbmsReader.Task(
				DATABASE_TYPE ,super.getTaskGroupId(), super.getTaskId());
		this.commonRdbmsReaderTask.init(this.readerSliceConfig);
	}

	@Override
	public void startRead(RecordSender recordSender) {
		int fetchSize = this.readerSliceConfig
				.getInt(com.alibaba.datax.plugin.rdbms.reader.Constant.FETCH_SIZE);

		this.commonRdbmsReaderTask.startRead(this.readerSliceConfig,
				recordSender, super.getTaskPluginCollector(), fetchSize);
	}

	@Override
	public void post() {
		this.commonRdbmsReaderTask.post(this.readerSliceConfig);
	}

	@Override
	public void destroy() {
		this.commonRdbmsReaderTask.destroy(this.readerSliceConfig);
	}

}

        CommonRdbmsReader.Task的startRead方法如下:

public void startRead(Configuration readerSliceConfig,
					  RecordSender recordSender,
					  TaskPluginCollector taskPluginCollector, int fetchSize) {
	String querySql = readerSliceConfig.getString(Key.QUERY_SQL);
	String table = readerSliceConfig.getString(Key.TABLE);

	PerfTrace.getInstance().addTaskDetails(taskId, table + "," + basicMsg);

	LOG.info("Begin to read record by Sql: [{}\n] {}.",
			querySql, basicMsg);
	PerfRecord queryPerfRecord = new PerfRecord(taskGroupId,taskId, PerfRecord.PHASE.SQL_QUERY);
	queryPerfRecord.start();

	Connection conn = DBUtil.getConnection(this.dataBaseType, jdbcUrl,
			username, password);

	// session config .etc related
	DBUtil.dealWithSessionConfig(conn, readerSliceConfig,
			this.dataBaseType, basicMsg);

	int columnNumber = 0;
	ResultSet rs = null;
	try {
		rs = DBUtil.query(conn, querySql, fetchSize);
		queryPerfRecord.end();

		ResultSetMetaData metaData = rs.getMetaData();
		columnNumber = metaData.getColumnCount();

		//这个统计干净的result_Next时间
		PerfRecord allResultPerfRecord = new PerfRecord(taskGroupId, taskId, PerfRecord.PHASE.RESULT_NEXT_ALL);
		allResultPerfRecord.start();

		long rsNextUsedTime = 0;
		long lastTime = System.nanoTime();
		while (rs.next()) {
			rsNextUsedTime += (System.nanoTime() - lastTime);
			this.transportOneRecord(recordSender, rs,
					metaData, columnNumber, mandatoryEncoding, taskPluginCollector);
			lastTime = System.nanoTime();
		}

		allResultPerfRecord.end(rsNextUsedTime);
		//目前大盘是依赖这个打印,而之前这个Finish read record是包含了sql查询和result next的全部时间
		LOG.info("Finished read record by Sql: [{}\n] {}.",
				querySql, basicMsg);

	}catch (Exception e) {
		throw RdbmsException.asQueryException(this.dataBaseType, e, querySql, table, username);
	} finally {
		DBUtil.closeDBResources(null, conn);
	}
}

        上述代码可见查询数据库的常规步骤,数据库操作通过原生JDBC实现。熟悉的味道,熟悉的配方。详细说明如下。

1.建立数据库链接

Connection conn = DBUtil.getConnection(this.dataBaseType, jdbcUrl,
                    username, password);

        DBUtil内部通过原生Jdbc实现。代码如下:

private static synchronized Connection connect(DataBaseType dataBaseType,
											   String url, Properties prop) {
	try {
		Class.forName(dataBaseType.getDriverClassName());
		DriverManager.setLoginTimeout(Constant.TIMEOUT_SECONDS);
		return DriverManager.getConnection(url, prop);
	} catch (Exception e) {
		throw RdbmsException.asConnException(dataBaseType, e, prop.getProperty("user"), null);
	}
}

2.执行查询操作,返回ResultSet

ResultSet rs = null;
try {
    rs = DBUtil.query(conn, querySql, fetchSize);
    // 其他代码此处省略
}catch (Exception e) {
    throw RdbmsException.asQueryException(this.dataBaseType, e, querySql, table, username);
} finally {
    DBUtil.closeDBResources(null, conn);
} 

        DBUtil内部查询实现代码如下。

/**
 * a wrapped method to execute select-like sql statement .
 *
 * @param conn Database connection .
 * @param sql  sql statement to be executed
 * @return a {@link ResultSet}
 * @throws SQLException if occurs SQLException.
 */
public static ResultSet query(Connection conn, String sql, int fetchSize)
		throws SQLException {
	// 默认3600 s 的query Timeout
	return query(conn, sql, fetchSize, Constant.SOCKET_TIMEOUT_INSECOND);
}

/**
 * a wrapped method to execute select-like sql statement .
 *
 * @param conn         Database connection .
 * @param sql          sql statement to be executed
 * @param fetchSize
 * @param queryTimeout unit:second
 * @return
 * @throws SQLException
 */
public static ResultSet query(Connection conn, String sql, int fetchSize, int queryTimeout)
		throws SQLException {
	// make sure autocommit is off
	conn.setAutoCommit(false);
	Statement stmt = conn.createStatement(ResultSet.TYPE_FORWARD_ONLY,
			ResultSet.CONCUR_READ_ONLY);
	stmt.setFetchSize(fetchSize);
	stmt.setQueryTimeout(queryTimeout);
	return query(stmt, sql);
}

3.获取数据元数据信息

ResultSetMetaData metaData = rs.getMetaData();

4.遍历数据,对数据进行转换并传递给框架

while (rs.next()) {
   rsNextUsedTime += (System.nanoTime() - lastTime);
   this.transportOneRecord(recordSender, rs,
          metaData, columnNumber, mandatoryEncoding, taskPluginCollector);
   lastTime = System.nanoTime();
}

5.在finally块中关闭数据链接

finally {
    DBUtil.closeDBResources(null, conn);
} 

        DBUtil内部关闭链接代码如下。很明显上述代码调用时只传入了connection,会造成只关闭链接,未关闭ResultSet和Statement,有瑕疵。

public static void closeDBResources(ResultSet rs, Statement stmt,
									Connection conn) {
	if (null != rs) {
		try {
			rs.close();
		} catch (SQLException unused) {
		}
	}

	if (null != stmt) {
		try {
			stmt.close();
		} catch (SQLException unused) {
		}
	}

	if (null != conn) {
		try {
			conn.close();
		} catch (SQLException unused) {
		}
	}
}

public static void closeDBResources(Statement stmt, Connection conn) {
	closeDBResources(null, stmt, conn);
}

        在第2步DBUtil内部的查询代码部分,指定了fetchSize参数。

stmt.setFetchSize(fetchSize);

        fetchSize是实现读取数据源表的关键点之一。简单理解,fetchSize定义了本地缓存大小,例如,fetchSize=1000即可简单理解为本地缓存区大小为1000条数据大小,当执行ResultSet.next取数据时,如果本地缓存中没有数据,会从数据库中取出1000条(剩余数据大于1000时为1000,小于1000时为剩余数据)数据放到缓存中,接下来的rs.next操作就是从本地缓存中读取数据,直至缓存区为空才再次请求数据库。通过减少与数据库的交互次数,提升性能。

        如果 fetchsize 设置的太小,会导致程序频繁地访问数据库,影响性能;如果 fetchsize 设置的太大,则可能会导致内存不足。在oraclereader插件的代码Constant.java中定义了fetchSize的默认值。

package com.alibaba.datax.plugin.reader.oraclereader;

public class Constant {

    public static final int DEFAULT_FETCH_SIZE = 1024;

}

 接下来我们看一下transportOneRecord的代码,该代码将一条数据进行转换后传递给Writer。

protected Record transportOneRecord(RecordSender recordSender, ResultSet rs, 
		ResultSetMetaData metaData, int columnNumber, String mandatoryEncoding, 
		TaskPluginCollector taskPluginCollector) {
	Record record = buildRecord(recordSender,rs,metaData,columnNumber,mandatoryEncoding,taskPluginCollector); 
	recordSender.sendToWriter(record);
	return record;
}

        buildRecord方法将一条数据的各个字段按照类型转换为标准数据,方便后续各类数据库写入插件使用实现数据插入。如果数据中包含了不支持的其他字段类型,需要在SQL中通过转换函数进行转换,否则对于不支持的其他字段类型,或在转换过程中出现其他错误,这条数据将被作为脏数据扔掉。当然,也可以修改buildRecord方法代码,让DataX支持更多数据类型的查询和写入。代码如下:

protected Record buildRecord(RecordSender recordSender,ResultSet rs, ResultSetMetaData metaData, int columnNumber, String mandatoryEncoding,
		TaskPluginCollector taskPluginCollector) {
	Record record = recordSender.createRecord();

	try {
		for (int i = 1; i <= columnNumber; i++) {
			switch (metaData.getColumnType(i)) {

			case Types.CHAR:
			case Types.NCHAR:
			case Types.VARCHAR:
			case Types.LONGVARCHAR:
			case Types.NVARCHAR:
			case Types.LONGNVARCHAR:
				String rawData;
				if(StringUtils.isBlank(mandatoryEncoding)){
					rawData = rs.getString(i);
				}else{
					rawData = new String((rs.getBytes(i) == null ? EMPTY_CHAR_ARRAY : 
						rs.getBytes(i)), mandatoryEncoding);
				}
				record.addColumn(new StringColumn(rawData));
				break;

			case Types.CLOB:
			case Types.NCLOB:
				record.addColumn(new StringColumn(rs.getString(i)));
				break;

			case Types.SMALLINT:
			case Types.TINYINT:
			case Types.INTEGER:
			case Types.BIGINT:
				record.addColumn(new LongColumn(rs.getString(i)));
				break;

			case Types.NUMERIC:
			case Types.DECIMAL:
				record.addColumn(new DoubleColumn(rs.getString(i)));
				break;

			case Types.FLOAT:
			case Types.REAL:
			case Types.DOUBLE:
				record.addColumn(new DoubleColumn(rs.getString(i)));
				break;

			case Types.TIME:
				record.addColumn(new DateColumn(rs.getTime(i)));
				break;

			// for mysql bug, see http://bugs.mysql.com/bug.php?id=35115
			case Types.DATE:
				if (metaData.getColumnTypeName(i).equalsIgnoreCase("year")) {
					record.addColumn(new LongColumn(rs.getInt(i)));
				} else {
					record.addColumn(new DateColumn(rs.getDate(i)));
				}
				break;

			case Types.TIMESTAMP:
				record.addColumn(new DateColumn(rs.getTimestamp(i)));
				break;

			case Types.BINARY:
			case Types.VARBINARY:
			case Types.BLOB:
			case Types.LONGVARBINARY:
				record.addColumn(new BytesColumn(rs.getBytes(i)));
				break;

			// warn: bit(1) -> Types.BIT 可使用BoolColumn
			// warn: bit(>1) -> Types.VARBINARY 可使用BytesColumn
			case Types.BOOLEAN:
			case Types.BIT:
				record.addColumn(new BoolColumn(rs.getBoolean(i)));
				break;

			case Types.NULL:
				String stringData = null;
				if(rs.getObject(i) != null) {
					stringData = rs.getObject(i).toString();
				}
				record.addColumn(new StringColumn(stringData));
				break;

			default:
				throw DataXException
						.asDataXException(
								DBUtilErrorCode.UNSUPPORTED_TYPE,
								String.format(
										"您的配置文件中的列配置信息有误. 因为DataX 不支持数据库读取这种字段类型. 字段名:[%s], 字段名称:[%s], 字段Java类型:[%s]. 请尝试使用数据库函数将其转换datax支持的类型 或者不同步该字段 .",
										metaData.getColumnName(i),
										metaData.getColumnType(i),
										metaData.getColumnClassName(i)));
			}
		}
	} catch (Exception e) {
		if (IS_DEBUG) {
			LOG.debug("read data " + record.toString()
					+ " occur exception:", e);
		}
		//TODO 这里识别为脏数据靠谱吗?
		taskPluginCollector.collectDirtyRecord(record, e);
		if (e instanceof DataXException) {
			throw (DataXException) e;
		}
	}
	return record;
}

2.2 关于扩展DataX数据库同步支持字段类型的思考

        第一种方式:2.1中已提到过,如果要同步的数据源包含了DataX不支持的类型,可以通过数据库转换函数将字段转换为String等DataX支持的类型。例如对于Oracle Spatial字段可以通过Oracle Spatial函数SDO_UTIL.TO_WKTGEOMETRY将字段值转换为WKT文本空间数据格式。

        第二种方式:如果数据库没有对应的转换函数,可以修改CommonRdbmsReader.Task类的buildRecord方法代码。同样的对于Oracle Spatial字段,是Types.STRUCT类型,可以根据其结构在buildRecord中自行将其转换为WKT文本空间数据格式。

        第三种方式:对于某些类型的数据库特有的类型,也可以通过第二种方式进行扩展,但是会造成多余的依赖,例如还是Oracle Spatial的字段,如果通过oracle.sql.STRUCT进行解析,这样就会造成CommonRdbmsReader依赖Oracle驱动,进而造成所有涉及关系型数据库的同步都需要依赖oracle驱动,即使可能只是从MySQL同步数据到MySQL。这样更好的方式是在对应的reader或writer中扩展对应的数据类型处理逻辑。在oraclereader中使用CommonRdbmsReader.Task的子类,在子类中重写buildRecord方法。以下为OracleReader.Task类的init方法,在初始化commonRdbmsReaderTask 时,使用内部子类,子类重写buildRecord方法增加特定类型的转换逻辑。在switch块,default分支处理前。

@Override
public void init() {
	this.readerSliceConfig = super.getPluginJobConf();
	this.commonRdbmsReaderTask = new CommonRdbmsReader.Task(
			DATABASE_TYPE ,super.getTaskGroupId(), super.getTaskId()){
		@Override
		protected Record buildRecord(RecordSender recordSender,ResultSet rs, ResultSetMetaData metaData, int columnNumber, String mandatoryEncoding,
				TaskPluginCollector taskPluginCollector) {
			Record record = recordSender.createRecord();

			try {
				for (int i = 1; i <= columnNumber; i++) {
					switch (metaData.getColumnType(i)) {

					case Types.CHAR:
					case Types.NCHAR:
					case Types.VARCHAR:
					case Types.LONGVARCHAR:
					case Types.NVARCHAR:
					case Types.LONGNVARCHAR:
						String rawData;
						if(StringUtils.isBlank(mandatoryEncoding)){
							rawData = rs.getString(i);
						}else{
							rawData = new String((rs.getBytes(i) == null ? EMPTY_CHAR_ARRAY : 
								rs.getBytes(i)), mandatoryEncoding);
						}
						record.addColumn(new StringColumn(rawData));
						break;

					case Types.CLOB:
					case Types.NCLOB:
						record.addColumn(new StringColumn(rs.getString(i)));
						break;

					case Types.SMALLINT:
					case Types.TINYINT:
					case Types.INTEGER:
					case Types.BIGINT:
						record.addColumn(new LongColumn(rs.getString(i)));
						break;

					case Types.NUMERIC:
					case Types.DECIMAL:
						record.addColumn(new DoubleColumn(rs.getString(i)));
						break;

					case Types.FLOAT:
					case Types.REAL:
					case Types.DOUBLE:
						record.addColumn(new DoubleColumn(rs.getString(i)));
						break;

					case Types.TIME:
						record.addColumn(new DateColumn(rs.getTime(i)));
						break;

					// for mysql bug, see http://bugs.mysql.com/bug.php?id=35115
					case Types.DATE:
						if (metaData.getColumnTypeName(i).equalsIgnoreCase("year")) {
							record.addColumn(new LongColumn(rs.getInt(i)));
						} else {
							record.addColumn(new DateColumn(rs.getDate(i)));
						}
						break;

					case Types.TIMESTAMP:
						record.addColumn(new DateColumn(rs.getTimestamp(i)));
						break;

					case Types.BINARY:
					case Types.VARBINARY:
					case Types.BLOB:
					case Types.LONGVARBINARY:
						record.addColumn(new BytesColumn(rs.getBytes(i)));
						break;

					// warn: bit(1) -> Types.BIT 可使用BoolColumn
					// warn: bit(>1) -> Types.VARBINARY 可使用BytesColumn
					case Types.BOOLEAN:
					case Types.BIT:
						record.addColumn(new BoolColumn(rs.getBoolean(i)));
						break;

					case Types.NULL:
						String stringData = null;
						if(rs.getObject(i) != null) {
							stringData = rs.getObject(i).toString();
						}
						record.addColumn(new StringColumn(stringData));
						break;
					
					// 在此处增加对oracle特定类型的处理逻辑

					default:
						throw DataXException
								.asDataXException(
										DBUtilErrorCode.UNSUPPORTED_TYPE,
										String.format(
												"您的配置文件中的列配置信息有误. 因为DataX 不支持数据库读取这种字段类型. 字段名:[%s], 字段名称:[%s], 字段Java类型:[%s]. 请尝试使用数据库函数将其转换datax支持的类型 或者不同步该字段 .",
												metaData.getColumnName(i),
												metaData.getColumnType(i),
												metaData.getColumnClassName(i)));
					}
				}
			} catch (Exception e) {
				if (IS_DEBUG) {
					LOG.debug("read data " + record.toString()
							+ " occur exception:", e);
				}
				//TODO 这里识别为脏数据靠谱吗?
				taskPluginCollector.collectDirtyRecord(record, e);
				if (e instanceof DataXException) {
					throw (DataXException) e;
				}
			}
			return record;
		}
	};
	this.commonRdbmsReaderTask.init(this.readerSliceConfig);
}

        上面的方式会产生很多重复代码。为了进一步优化,我们可以为CommonRdbmsReader.Task增加一个扩展点。

protected Record buildRecord(RecordSender recordSender,ResultSet rs, ResultSetMetaData metaData, int columnNumber, String mandatoryEncoding,
		TaskPluginCollector taskPluginCollector) {
	Record record = recordSender.createRecord();

	try {
		for (int i = 1; i <= columnNumber; i++) {
			switch (metaData.getColumnType(i)) {

			case Types.CHAR:
			case Types.NCHAR:
			case Types.VARCHAR:
			case Types.LONGVARCHAR:
			case Types.NVARCHAR:
			case Types.LONGNVARCHAR:
				String rawData;
				if(StringUtils.isBlank(mandatoryEncoding)){
					rawData = rs.getString(i);
				}else{
					rawData = new String((rs.getBytes(i) == null ? EMPTY_CHAR_ARRAY : 
						rs.getBytes(i)), mandatoryEncoding);
				}
				record.addColumn(new StringColumn(rawData));
				break;

			case Types.CLOB:
			case Types.NCLOB:
				record.addColumn(new StringColumn(rs.getString(i)));
				break;

			case Types.SMALLINT:
			case Types.TINYINT:
			case Types.INTEGER:
			case Types.BIGINT:
				record.addColumn(new LongColumn(rs.getString(i)));
				break;

			case Types.NUMERIC:
			case Types.DECIMAL:
				record.addColumn(new DoubleColumn(rs.getString(i)));
				break;

			case Types.FLOAT:
			case Types.REAL:
			case Types.DOUBLE:
				record.addColumn(new DoubleColumn(rs.getString(i)));
				break;

			case Types.TIME:
				record.addColumn(new DateColumn(rs.getTime(i)));
				break;

			// for mysql bug, see http://bugs.mysql.com/bug.php?id=35115
			case Types.DATE:
				if (metaData.getColumnTypeName(i).equalsIgnoreCase("year")) {
					record.addColumn(new LongColumn(rs.getInt(i)));
				} else {
					record.addColumn(new DateColumn(rs.getDate(i)));
				}
				break;

			case Types.TIMESTAMP:
				record.addColumn(new DateColumn(rs.getTimestamp(i)));
				break;

			case Types.BINARY:
			case Types.VARBINARY:
			case Types.BLOB:
			case Types.LONGVARBINARY:
				record.addColumn(new BytesColumn(rs.getBytes(i)));
				break;

			// warn: bit(1) -> Types.BIT 可使用BoolColumn
			// warn: bit(>1) -> Types.VARBINARY 可使用BytesColumn
			case Types.BOOLEAN:
			case Types.BIT:
				record.addColumn(new BoolColumn(rs.getBoolean(i)));
				break;

			case Types.NULL:
				String stringData = null;
				if(rs.getObject(i) != null) {
					stringData = rs.getObject(i).toString();
				}
				record.addColumn(new StringColumn(stringData));
				break;

			default:
				buildColumn(record, i, rs, metaData, mandatoryEncoding);				
			}
		}
	} catch (Exception e) {
		if (IS_DEBUG) {
			LOG.debug("read data " + record.toString()
					+ " occur exception:", e);
		}
		//TODO 这里识别为脏数据靠谱吗?
		taskPluginCollector.collectDirtyRecord(record, e);
		if (e instanceof DataXException) {
			throw (DataXException) e;
		}
	}
	return record;
}
// 新增的扩展点,可以在子类中处理特殊的字段类型
protected Record buildColumn(Record record, int columnIndex, ResultSet rs, ResultSetMetaData metaData,  String mandatoryEncoding){
	throw DataXException
					.asDataXException(
							DBUtilErrorCode.UNSUPPORTED_TYPE,
							String.format(
									"您的配置文件中的列配置信息有误. 因为DataX 不支持数据库读取这种字段类型. 字段名:[%s], 字段名称:[%s], 字段Java类型:[%s]. 请尝试使用数据库函数将其转换datax支持的类型 或者不同步该字段 .",
									metaData.getColumnName(i),
									metaData.getColumnType(i),
									metaData.getColumnClassName(i)));	
}

        这样,我们在oraclereader中,创建CommonRdbmsReader.Task的子类用于处理特殊字段类型时,只需要重写buildColumn方法即可。这样代码就简洁多了。

@Override
public void init() {
	this.readerSliceConfig = super.getPluginJobConf();
	this.commonRdbmsReaderTask = new CommonRdbmsReader.Task(
			DATABASE_TYPE ,super.getTaskGroupId(), super.getTaskId()){
		@Override
		void buildColumn(Record record, int columnIndex, ResultSet rs, ResultSetMetaData metaData,  String mandatoryEncoding) {
			switch (metaData.getColumnType(columnIndex)) {
				// 在此处增加对oracle特定类型的处理逻辑
				// 解析完成后将数据通过record.addColumn加入到record中
				default:
					// 不进行处理的其他类型字段可以继续抛出异常
					super.buildColumn(record, columnIndex, rs, metaData, mandatoryEncoding);
			}
		}
	};
	this.commonRdbmsReaderTask.init(this.readerSliceConfig);
}

2.3 Writer代码解析(基于postgresqlwriter插件)

        PostgresqlWriter类继承Writer基类。操作数据库的部分封装在其static类型内部类Task中。

public static class Task extends Writer.Task {
	private Configuration writerSliceConfig;
	private CommonRdbmsWriter.Task commonRdbmsWriterSlave;

	@Override
	public void init() {
		this.writerSliceConfig = super.getPluginJobConf();
		this.commonRdbmsWriterSlave = new CommonRdbmsWriter.Task(DATABASE_TYPE){
			@Override
			public String calcValueHolder(String columnType){
				if("serial".equalsIgnoreCase(columnType)){
					return "?::int";
				}else if("bigserial".equalsIgnoreCase(columnType)){
					return "?::int8";
				}else if("bit".equalsIgnoreCase(columnType)){
					return "?::bit varying";
				}
				return "?::" + columnType;
			}
		};
		this.commonRdbmsWriterSlave.init(this.writerSliceConfig);
	}

	@Override
	public void prepare() {
		this.commonRdbmsWriterSlave.prepare(this.writerSliceConfig);
	}

	public void startWrite(RecordReceiver recordReceiver) {
		this.commonRdbmsWriterSlave.startWrite(recordReceiver, this.writerSliceConfig, super.getTaskPluginCollector());
	}

	@Override
	public void post() {
		this.commonRdbmsWriterSlave.post(this.writerSliceConfig);
	}

	@Override
	public void destroy() {
		this.commonRdbmsWriterSlave.destroy(this.writerSliceConfig);
	}

}

1. 建立数据库链接   

     this.commonRdbmsWriterSlave.startWrite 通过DBUtil建立了数据库链接。

// TODO 改用连接池,确保每次获取的连接都是可用的(注意:连接可能需要每次都初始化其 session)
public void startWrite(RecordReceiver recordReceiver,
					   Configuration writerSliceConfig,
					   TaskPluginCollector taskPluginCollector) {
	Connection connection = DBUtil.getConnection(this.dataBaseType,
			this.jdbcUrl, username, password);
	DBUtil.dealWithSessionConfig(connection, writerSliceConfig,
			this.dataBaseType, BASIC_MESSAGE);
	startWriteWithConnection(recordReceiver, taskPluginCollector, connection);
}

        接下来开始操作数据。

public void startWriteWithConnection(RecordReceiver recordReceiver, TaskPluginCollector taskPluginCollector, Connection connection) {
	this.taskPluginCollector = taskPluginCollector;

	// 用于写入数据的时候的类型根据目的表字段类型转换
	this.resultSetMetaData = DBUtil.getColumnMetaData(connection,
			this.table, StringUtils.join(this.columns, ","));
	// 写数据库的SQL语句
	calcWriteRecordSql();

	List<Record> writeBuffer = new ArrayList<Record>(this.batchSize);
	int bufferBytes = 0;
	try {
		Record record;
		while ((record = recordReceiver.getFromReader()) != null) {
			if (record.getColumnNumber() != this.columnNumber) {
				// 源头读取字段列数与目的表字段写入列数不相等,直接报错
				throw DataXException
						.asDataXException(
								DBUtilErrorCode.CONF_ERROR,
								String.format(
										"列配置信息有错误. 因为您配置的任务中,源头读取字段数:%s 与 目的表要写入的字段数:%s 不相等. 请检查您的配置并作出修改.",
										record.getColumnNumber(),
										this.columnNumber));
			}

			writeBuffer.add(record);
			bufferBytes += record.getMemorySize();

			if (writeBuffer.size() >= batchSize || bufferBytes >= batchByteSize) {
				doBatchInsert(connection, writeBuffer);
				writeBuffer.clear();
				bufferBytes = 0;
			}
		}
		if (!writeBuffer.isEmpty()) {
			doBatchInsert(connection, writeBuffer);
			writeBuffer.clear();
			bufferBytes = 0;
		}
	} catch (Exception e) {
		throw DataXException.asDataXException(
				DBUtilErrorCode.WRITE_DATA_ERROR, e);
	} finally {
		writeBuffer.clear();
		bufferBytes = 0;
		DBUtil.closeDBResources(null, null, connection);
	}
}

 2. 获取目标表元数据信息

        DataX 的Writer通过PreparedStatement预编译语句实现数据库写入操作。需要提前基于数据库链接、数据库表、字段信息获取各字段的类型,以便后续根据字段类型进行合理赋值。

// 用于写入数据的时候的类型根据目的表字段类型转换
this.resultSetMetaData = DBUtil.getColumnMetaData(connection,
		this.table, StringUtils.join(this.columns, ","));

        具体通过构建一条查询语句获取表字段元数据信息。因为只需要获取元数据,而不需要查询具体数据,queryColumnSql 的条件被设置为where 1=2。在获取完元数据信息之后会将Statement、ResultSet关闭,需要将元数据转存,此处转存到columnMetaData中。注意,finally块中不能关闭connection。

/**
 * @return Left:ColumnName Middle:ColumnType Right:ColumnTypeName
 */
public static Triple<List<String>, List<Integer>, List<String>> getColumnMetaData(
		Connection conn, String tableName, String column) {
	Statement statement = null;
	ResultSet rs = null;

	Triple<List<String>, List<Integer>, List<String>> columnMetaData = new ImmutableTriple<List<String>, List<Integer>, List<String>>(
			new ArrayList<String>(), new ArrayList<Integer>(),
			new ArrayList<String>());
	try {
		statement = conn.createStatement();
		String queryColumnSql = "select " + column + " from " + tableName
				+ " where 1=2";

		rs = statement.executeQuery(queryColumnSql);
		ResultSetMetaData rsMetaData = rs.getMetaData();
		for (int i = 0, len = rsMetaData.getColumnCount(); i < len; i++) {

			columnMetaData.getLeft().add(rsMetaData.getColumnName(i + 1));
			columnMetaData.getMiddle().add(rsMetaData.getColumnType(i + 1));
			columnMetaData.getRight().add(
					rsMetaData.getColumnTypeName(i + 1));
		}
		return columnMetaData;

	} catch (SQLException e) {
		throw DataXException
				.asDataXException(DBUtilErrorCode.GET_COLUMN_INFO_FAILED,
						String.format("获取表:%s 的字段的元信息时失败. 请联系 DBA 核查该库、表信息.", tableName), e);
	} finally {
		DBUtil.closeDBResources(rs, statement, null);
	}
}

3. 根据表名、列信息构建写入语句

        接下来需要构建带占位符的写入语句。写入语句大概长这样:insert into table_name(column_name) values(?)

// 写数据库的SQL语句
calcWriteRecordSql();

        具体组装代码有好几处。

        例如:OriginalConfPretreatmentUtil类的dealWriteMode方法。

public static void dealWriteMode(Configuration originalConfig, DataBaseType dataBaseType) {
	List<String> columns = originalConfig.getList(Key.COLUMN, String.class);

	String jdbcUrl = originalConfig.getString(String.format("%s[0].%s",
			Constant.CONN_MARK, Key.JDBC_URL, String.class));

	// 默认为:insert 方式
	String writeMode = originalConfig.getString(Key.WRITE_MODE, "INSERT");

	List<String> valueHolders = new ArrayList<String>(columns.size());
	for (int i = 0; i < columns.size(); i++) {
		valueHolders.add("?");
	}

	boolean forceUseUpdate = false;
	//ob10的处理
	if (dataBaseType == DataBaseType.MySql && isOB10(jdbcUrl)) {
		forceUseUpdate = true;
	}

	String writeDataSqlTemplate = WriterUtil.getWriteTemplate(columns, valueHolders, writeMode,dataBaseType, forceUseUpdate);

	LOG.info("Write data [\n{}\n], which jdbcUrl like:[{}]", writeDataSqlTemplate, jdbcUrl);

	originalConfig.set(Constant.INSERT_OR_REPLACE_TEMPLATE_MARK, writeDataSqlTemplate);
}

4. 执行批量写入

        循环获取record,通过witeBuffer.size() 和 数据量的大小控制每次执行批量写入操作的数据量。具体批量写入数据库操作代码见doBatchInsert方法。

Record record;
while ((record = recordReceiver.getFromReader()) != null) {
	if (record.getColumnNumber() != this.columnNumber) {
		// 源头读取字段列数与目的表字段写入列数不相等,直接报错
		throw DataXException
				.asDataXException(
						DBUtilErrorCode.CONF_ERROR,
						String.format(
								"列配置信息有错误. 因为您配置的任务中,源头读取字段数:%s 与 目的表要写入的字段数:%s 不相等. 请检查您的配置并作出修改.",
								record.getColumnNumber(),
								this.columnNumber));
	}

	writeBuffer.add(record);
	bufferBytes += record.getMemorySize();

	if (writeBuffer.size() >= batchSize || bufferBytes >= batchByteSize) {
		doBatchInsert(connection, writeBuffer);
		writeBuffer.clear();
		bufferBytes = 0;
	}
}
if (!writeBuffer.isEmpty()) {
	doBatchInsert(connection, writeBuffer);
	writeBuffer.clear();
	bufferBytes = 0;
}
protected void doBatchInsert(Connection connection, List<Record> buffer)
		throws SQLException {
	PreparedStatement preparedStatement = null;
	try {
		// 设置手动提交事务
		connection.setAutoCommit(false);
		// 构建预编译语句
		preparedStatement = connection
				.prepareStatement(this.writeRecordSql);
		// 循环添加
		for (Record record : buffer) {
            // 添加实际字段参数
			preparedStatement = fillPreparedStatement(
					preparedStatement, record);
			preparedStatement.addBatch();
		}
        // 提交事务
		preparedStatement.executeBatch();
		connection.commit();
	} catch (SQLException e) {
		LOG.warn("回滚此次写入, 采用每次写入一行方式提交. 因为:" + e.getMessage());
		connection.rollback();
		doOneInsert(connection, buffer);
	} catch (Exception e) {
		throw DataXException.asDataXException(
				DBUtilErrorCode.WRITE_DATA_ERROR, e);
	} finally {
        // 关闭preparedStatement,注意此处不会关闭connection
		DBUtil.closeDBResources(preparedStatement, null);
	}
}

        fillPreparedStatement及之后的fillPreparedStatementColumnType决定了写入操作支持的通用数据类型。如果字段包含不支持的数据类型,会抛出异常。

// 直接使用了两个类变量:columnNumber,resultSetMetaData
protected PreparedStatement fillPreparedStatement(PreparedStatement preparedStatement, Record record)
		throws SQLException {
	for (int i = 0; i < this.columnNumber; i++) {
		int columnSqltype = this.resultSetMetaData.getMiddle().get(i);
		String typeName = this.resultSetMetaData.getRight().get(i);
		preparedStatement = fillPreparedStatementColumnType(preparedStatement, i, columnSqltype, typeName, record.getColumn(i));
	}

	return preparedStatement;
}

protected PreparedStatement fillPreparedStatementColumnType(PreparedStatement preparedStatement, int columnIndex,
															int columnSqltype, String typeName, Column column) throws SQLException {
	java.util.Date utilDate;
	switch (columnSqltype) {
		case Types.CHAR:
		case Types.NCHAR:
		case Types.CLOB:
		case Types.NCLOB:
		case Types.VARCHAR:
		case Types.LONGVARCHAR:
		case Types.NVARCHAR:
		case Types.LONGNVARCHAR:
			preparedStatement.setString(columnIndex + 1, column
					.asString());
			break;

		case Types.SMALLINT:
		case Types.INTEGER:
		case Types.BIGINT:
		case Types.NUMERIC:
		case Types.DECIMAL:
		case Types.FLOAT:
		case Types.REAL:
		case Types.DOUBLE:
			String strValue = column.asString();
			if (emptyAsNull && "".equals(strValue)) {
				preparedStatement.setString(columnIndex + 1, null);
			} else {
				preparedStatement.setString(columnIndex + 1, strValue);
			}
			break;

		//tinyint is a little special in some database like mysql {boolean->tinyint(1)}
		case Types.TINYINT:
			Long longValue = column.asLong();
			if (null == longValue) {
				preparedStatement.setString(columnIndex + 1, null);
			} else {
				preparedStatement.setString(columnIndex + 1, longValue.toString());
			}
			break;

		// for mysql bug, see http://bugs.mysql.com/bug.php?id=35115
		case Types.DATE:
			if (typeName == null) {
				typeName = this.resultSetMetaData.getRight().get(columnIndex);
			}

			if (typeName.equalsIgnoreCase("year")) {
				if (column.asBigInteger() == null) {
					preparedStatement.setString(columnIndex + 1, null);
				} else {
					preparedStatement.setInt(columnIndex + 1, column.asBigInteger().intValue());
				}
			} else {
				java.sql.Date sqlDate = null;
				try {
					utilDate = column.asDate();
				} catch (DataXException e) {
					throw new SQLException(String.format(
							"Date 类型转换错误:[%s]", column));
				}

				if (null != utilDate) {
					sqlDate = new java.sql.Date(utilDate.getTime());
				}
				preparedStatement.setDate(columnIndex + 1, sqlDate);
			}
			break;

		case Types.TIME:
			java.sql.Time sqlTime = null;
			try {
				utilDate = column.asDate();
			} catch (DataXException e) {
				throw new SQLException(String.format(
						"TIME 类型转换错误:[%s]", column));
			}

			if (null != utilDate) {
				sqlTime = new java.sql.Time(utilDate.getTime());
			}
			preparedStatement.setTime(columnIndex + 1, sqlTime);
			break;

		case Types.TIMESTAMP:
			java.sql.Timestamp sqlTimestamp = null;
			try {
				utilDate = column.asDate();
			} catch (DataXException e) {
				throw new SQLException(String.format(
						"TIMESTAMP 类型转换错误:[%s]", column));
			}

			if (null != utilDate) {
				sqlTimestamp = new java.sql.Timestamp(
						utilDate.getTime());
			}
			preparedStatement.setTimestamp(columnIndex + 1, sqlTimestamp);
			break;

		case Types.BINARY:
		case Types.VARBINARY:
		case Types.BLOB:
		case Types.LONGVARBINARY:
			preparedStatement.setBytes(columnIndex + 1, column
					.asBytes());
			break;

		case Types.BOOLEAN:
			preparedStatement.setBoolean(columnIndex + 1, column.asBoolean());
			break;

		// warn: bit(1) -> Types.BIT 可使用setBoolean
		// warn: bit(>1) -> Types.VARBINARY 可使用setBytes
		case Types.BIT:
			if (this.dataBaseType == DataBaseType.MySql) {
				preparedStatement.setBoolean(columnIndex + 1, column.asBoolean());
			} else {
				preparedStatement.setString(columnIndex + 1, column.asString());
			}
			break;
		default:
			throw DataXException
					.asDataXException(
							DBUtilErrorCode.UNSUPPORTED_TYPE,
							String.format(
									"您的配置文件中的列配置信息有误. 因为DataX 不支持数据库写入这种字段类型. 字段名:[%s], 字段类型:[%d], 字段Java类型:[%s]. 请修改表中该字段的类型或者不同步该字段.",
									this.resultSetMetaData.getLeft()
											.get(columnIndex),
									this.resultSetMetaData.getMiddle()
											.get(columnIndex),
									this.resultSetMetaData.getRight()
											.get(columnIndex)));
	}
	return preparedStatement;
}

        对于Writer增加支持的字段类型,可以参见2.2。修改上面的方法,将将default内容替换为扩展点函数。在应用时于具体的Write子类中创建CommonRdbmsWriter.Task的子类,扩展支持的字段类型。

5. 关闭数据源

        数据同步完毕,在finally块中清空标记变量、关闭数据源。

finally {
	writeBuffer.clear();
	bufferBytes = 0;
	DBUtil.closeDBResources(null, null, connection);
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/532230.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

【JavaWeb】Day39.MySQL概述——数据库设计-DQL(二)

数据库设计-DQL 聚合函数 聚合函数查询就是纵向查询&#xff0c;它是对一列的值进行计算&#xff0c;然后返回一个结果值。&#xff08;将一列数据作为一个整体&#xff0c;进行纵向计算&#xff09; 语法&#xff1a; select 聚合函数(字段列表) from 表名 ; 注意 : 聚合…

LeetCode 热题 100 | 多维动态规划(二)

目录 1 5. 最长回文子串 2 1143. 最长公共子序列 菜鸟做题&#xff0c;语言是 C 1 5. 最长回文子串 核心思想&#xff1a;把总问题拆解为若干子问题。 总问题&#xff1a;从第 i 个字母到第 j 个字母是回文串子问题&#xff1a;从第 i 1 个字母到第 j - 1 个字母是回文…

Obsidian的初步了解、安装及使用

一、为什么是Obsidian&#xff1f; 笔记软件我用的还是比较多了&#xff0c;一开始用有道云笔记&#xff0c;其实我个人觉得有道云笔记还是做的不错的&#xff0c;除了广告多点、功能弱一点、更新慢一点、偶尔收藏会有问题以外还是不错的&#xff0c;免费软件里性价比算是还可…

前端开发中地图定位与距离计算的应用实践

前端开发中地图定位与距离计算的应用实践 在前端开发中&#xff0c;地图功能的应用日益广泛&#xff0c;无论是用户位置的定位、目标距离的计算&#xff0c;还是地址的解析与展示&#xff0c;地图都发挥着不可替代的作用。本文将重点介绍前端开发中实现地图定位、距离计算以及…

windows 系统下全新下载安装 mysql8.0 数据库(详细)

windows 系统下全新下载安装 mysql8.0 数据库&#xff08;详细&#xff09; 段子手168 1、登录官方网站下载&#xff1a; https://dev.mysql.com/downloads/windows/installer/ 2、下载最新版本&#xff0c;一般可能需要注册登录&#xff0c;下载其他历史版本&#xff0c;请…

【LAMMPS学习】八、基础知识(1.3)从一个输入脚本运行多个模拟

8. 基础知识 此部分描述了如何使用 LAMMPS 为用户和开发人员执行各种任务。术语表页面还列出了 MD 术语&#xff0c;以及相应 LAMMPS 手册页的链接。 LAMMPS 源代码分发的 examples 目录中包含的示例输入脚本以及示例脚本页面上突出显示的示例输入脚本还展示了如何设置和运行各…

【智能算法】省时方便,智能算法统计指标——一键运行~

目录 1.常用统计指标2.参数统计检验3.结果展示4.自定义修改测试框架 1.常用统计指标 测试智能算法性能时&#xff0c;常常会用到以下5种常用指标&#xff0c;简单不赘述&#xff1a; 最优值、最差值、均值、中位数、标准差 2.参数统计检验 单纯依靠常用统计指标说服力不足&…

【noVNC】使用noVNC实现浏览器网页访问vnc(基于web的远程桌面)

1.VNC本身提供的http连接方式&#xff0c;可传输文件&#xff0c;画面有卡顿&#xff0c;需要安装jre 2.noVNC访问方式&#xff0c;不可传输文件&#xff0c;画面较为流畅&#xff0c;不用安装插件运行环境 一、noVNC 是什么 Web 端的Vnc软件&#xff0c;通过noVNC&#xff0…

CSS 实现伸缩导航仪表板侧边栏菜单

CSS 实现伸缩导航仪表板侧边栏菜单 效果展示 展开状态 收起状态 CSS 知识点 回顾曲面圆角的实现知识点 字体库准备 菜单的图标使用的是ionicons的图标库&#xff0c;所以需要页面需要引入对应的文件。 <scripttype"module"src"https://unpkg.com/i…

进程间通信 (匿名管道)

一、进程间通信的概念 进程间通信是一个进程把自己的数据交给另一个进程&#xff0c;它可以帮助我们进行数据传输、资源共享、通知事件和进程控制。 进程间通信的本质是让不同的进程看到同一份资源。因此&#xff0c;我们要有&#xff1a; 1、交换数据的空间。2、这个空间不能由…

CNN-Transformer时间序列预测

部分代码&#xff1a; # CNN-Transformer class CNNTransformerEncoder(nn.Module):def __init__(self, input_features, transformer_encoder_heads,embedding_features, cnn_kernel_size, dim_feedforward_enc, n_encoder_layer):super(CNNTransformerEncoder, self).__init…

分析染色体级别的基因组装配揭示了六倍体栽培菊花的起源和进化-文献精读-7

Analyses of a chromosome-scale genome assembly reveal the origin and evolution of cultivated chrysanthemum 分析染色体级别的基因组装配揭示了栽培菊花的起源和进化 六倍体植物基因组的文献&#xff0c;各位同仁还有什么有特色的基因组评论区留言~ 摘要 菊花&#xf…

spring boot —— Spring-Cloud-Zuul(网关服务getway),kafka笔记

一、 引入zuul依赖&#xff1a; org.springframework.cloud spring-cloud-starter-zuul 二、创建应用主类。使用EnableZuulProxy注解开启zuul的API网关服务功能&#xff1a; EnableZuulProxy SpringCloudApplication public class Application { public static void mai…

【漏洞复现】WordPress Welcart 任意文件读取漏洞(CVE-2022-4140)

0x01 产品简介 Welcart 是一款免费的 WordPress 电子商务插件。Welcart 具有许多用于制作在线商店的功能和自定义设置。您可以轻松创建自己的原始在线商店。 0x02 漏洞概述 Welcart存在任意文件读取漏洞&#xff0c;未授权的攻击者可以通过该漏洞读取任意文件&#xff0c;获…

【RAG实践】Rerank,让大模型 RAG 更近一步

RAGRerank原理 上一篇【RAG实践】基于LlamaIndex和Qwen1.5搭建基于本地知识库的问答机器人 我们介绍了什么是RAG&#xff0c;以及如何基于LLaMaIndex和Qwen1.5搭建基于本地知识库的问答机器人&#xff0c;原理图和步骤如下&#xff1a; 这里面主要包括包括三个基本步骤&#…

【无标题】系统思考—心智模式

“直到你使无意识变为有意识&#xff0c;它将指导你的生活并且你会称之为命运。”—卡尔荣格 心智模式深藏于我们内心之中&#xff0c;它潜移默化地影响着我们对世界的理解和判断。往往这些影响是如此隐蔽&#xff0c;以至于我们自己都未必察觉到是什么在驱动我们的选择、决策…

ES7-10:async和await、异步迭代..

1-ES7新特性 indexof如果没有就返回-1&#xff0c;有就返回索引 如果仅仅查找数据是否在数组中,建议使用includes,如果是查找数据的索引位置,建议使用indexOf更好一些 2-ES8-async和await 所有的需要异步处理的Promise对象都写在async中await等待结果 async、await 使异步操…

【MATLAB源码-第184期】基于matlab的FNN预测人民币美元汇率 输出预测图误差图RMSE R2 MAE MBE等指标

操作环境&#xff1a; MATLAB 2022a 1、算法描述 前馈神经网络&#xff08;Feedforward Neural Network, FNN&#xff09;是最简单也是应用最广泛的人工神经网络之一。在许多领域&#xff0c;尤其是数据预测方面&#xff0c;FNN已经展现出了卓越的性能和强大的适应性。 一、…

贪心算法|406.根据身高重建队列

力扣题目链接 class Solution { public:static bool cmp(const vector<int>& a, const vector<int>& b) {if (a[0] b[0]) return a[1] < b[1];return a[0] > b[0];}vector<vector<int>> reconstructQueue(vector<vector<int>…

鸿蒙应用开发之图案密码锁组件

前面学习了导航组件,现在来学习另一个密码设置和验证组件,这种组件比较常用。因为当用触屏手机之后,屏幕上就可以滑动操作,比普通PC电脑要多一些功能了。早前的密码都是输入数字,没有滑动输入九宫格的密码。 大体如下面的界面: 采用这种密码,一般情况下是不用记住数字,…