基于 MDL 行情插件的中金所 L1 数据处理最佳实践

本文介绍了如何通过 DolphinDB 的 MDL 插件订阅并处理中金所 Level 1 实时数据。首先,文章简要介绍了 MDL 插件的功能和作用。它是基于 MDL 官方提供的行情数据服务 C++ SDK(即 TCP 版本 MDL )实现,提供了实时数据获取和处理的能力。接着文章详细阐述了数据的存储方案,以及实时数据与历史数据字段对齐的方法。此外,本文还提供了实时合成K线的模块,并讨论了数据处理中的关键细节,例如主力合约的选择和成交量加权平均价格(VWAP)的计算方法。值得注意的是,本文的方法不仅适用于中金所 L1 数据,还可以扩展到其他期货交易所的数据处理,因此具有一定的通用性。

本文中的实时数据处理功能依托于DolphinDB的流数据功能。在继续阅读本文之前,建议读者先了解DolphinDB的流数据订阅及流计算引擎的相关概念,以便更好地理解本文所述的实时行情处理流程及技术实现

本教程的代码基于 DolphinDB 2.00.12版本开发,建议用户使用2.00.12及3.00.1版本以上运行。具体 MDL 插件的接口说明,请参考 DolphinDB MDL 插件使用说明。

1. DolphinDB MDL 插件使用流程

MDL 是通联数据提供的高频行情数据服务, DolphinDB 提供了能够从 MDL 服务器获取高频行情数据的 DolphinDB MDL 插件,用户可以通过 DolphinScript 接入数据,实现高效的数据订阅与处理。

以下是接入 MDL 数据的基本流程,详情请参见《 MDL 行情插件最佳实践指南》:

1. 插件安装及加载安装 MDL 插件并加载到 DolphinDB 中。确保您的 DolphinDB 环境满足插件的依赖要求。通常需要下载插件包并在 DolphinDB 中加载,具体步骤请参考官方文档。

installPlugin("MDL") //安装 MDL 插件
loadPlugin("MDL") //加载 MDL 插件

2. 获取数据结构

在接入数据之前,需要通过 MDL 插件获取目标行情数据的结构。用户输入目标行情的数据服务 ID 和消息 ID 号,具体行情号请参考由通联数据提供的《通联数据 MDL 消息参考》。

cffexLevel1Schema = MDL::getSchema(" MDL SID_ MDL _CFFEX", 1) //中金所level1数据结构

该接口返回一张表,含 name、type 两列,共41个字段。

注:有关 MDL 插件对数据品类的支持情况,请联系 DolphinDB 小助手,微信号:dolphndb1。

3. 创建持久化流表

根据获得的数据结构,创建持久化流表以便存储接收到的行情数据。可以配置表的内存大小和持久化时间以控制数据大小。

enableTableShareAndPersistence(
        streamTable(1:0, cffexLevel1Schema["name"], cffexLevel1Schema["type"]),
        "streamCffexLevel1",
        cacheSize = 1000000, // 保留在系统的内存中的表的条数 1000000条
        retentionMinutes=1440,  // 持久化保留时间 1天
        preCache=1000 // 重建时预加载1000条记录
    )

4. 建立 MDL 连接句柄

通过指定 MDL 服务器的地址、端口、认证信息等,建立 MDL 连接句柄。确保连接的稳定性,并进行必要的配置,如获取接收时间戳和延迟时间设置。

handle =MDL::createHandle("Handle_CFFE_L1_Future", host, port, username)

5. 订阅行情数据

根据上述步骤建立的 MDL 连接句柄和输出的流表,用户可以指定 MDL 的数据服务 ID、数据服务版本号、消息 ID 来订阅需要的数据源。用户也可以通过fieldNamefieldValuesextraOrderLeve来过滤不要的字段、值或者指定部分数据源中的档位深度。

MDL::subscribe(handle, objByName("streamCffexLevel1"), 
                      " MDL SID_ MDL _CFFEX", " MDL VID_ MDL _CFFEX", 1)

6. 启动 MDL 连接

启动 MDL 连接并开始数据接收。

MDL::connect MDL (handle)

7. MDL 运行状态监控

通过接口用户可实时查看 MDL 连接的运行状态,确保数据传输正常。该接口会返回一张表,其中包含已经处理的消息数、处理失败的消息数、最后一条错误消息发生的时间等。

MDL::getStatus(handle)

2. 中国金融期货交易所简介

中国金融期货交易所(以下简称“中金所”)是经国务院同意,中国证监会批准设立的,专门从事金融期货、期权等金融衍生品交易与结算的公司制交易所。

中金所期货上市类产品包括:

  • 权益类
    • 沪深300股指期货,简称 IF
    • 中证500股指期货,简称 IC
    • 中证1000股指期货,简称 IM
    • 上证50股指期货,简称 IH
  • 利率类
    • 2年期国债期货,简称 TS
    • 5年期国债期货,简称 TF
    • 10年期国债期货,简称 T
    • 30年期国债期货,简称 TL

3. MDL 行情数据介绍

本节将介绍由通联数据公司提供的中金所期货Level1的行情数据结构。该数据分为历史数据和实时行情数据,通常操作是将历史数据落库后接入实时数据。由于两种数据的结构有所不同,因此需要对实时数据进行处理,将其与历史数据对齐后再接入。本文主要介绍字段对齐部分。

3.1 期货 Level 1 数据说明

期货市场的Level1和 Level 2 数据是指交易行情和订单委托行情(Trades and Quotes, TAQ)的快照数据,快照频率为 500ms 一个推送。 Level 1 和 Level 2 的区别在于Level1的订单委托行情只有一档,而 Level 2 则有五档,所以 Level 2 相比于 Level 1 提供了更深的订单委托行情数据。目前,期货市场的 Level 1 快照为500ms,Level 2 中,中金所和上期所是500ms,其他交易所是250ms。

3.2 中金所 Level 1 数据结构

通联数据提供的中金所 Level 1 分为实时数据及历史数据,以下是两种数据源的结构说明:

表 3-1 实时行情数据结构

列名类型含义
InstruIDSTRING合约代码
LastPriceDOUBLE最新价
PreSetPriceDOUBLE昨结算
OpenPriceDOUBLE今开盘
HighPriceDOUBLE最高价
LowPriceDOUBLE最低价
TurnoverDOUBLE成交金额,单位元,单边计算
OpenIntDOUBLE持仓量,单位手,单边计算
SetPriceDOUBLE今结算
ULimitPriceDOUBLE涨停板价
LLimitPriceDOUBLE跌停板价
TradDayDATE结算日
PreCloPriceDOUBLE昨收盘
VolumeINT成交量,单位手,单边计算
ClosePriceDOUBLE今收盘
PreDeltaDOUBLE昨虚实度
CurrDeltaDOUBLE今虚实度
UpdateTimeTIME最后修改时间
PreOpenIntDOUBLE昨持仓量,单位手,单边计算
BidPrice1DOUBLE申买价一
BidVolume1INT申买量一
AskPrice1DOUBLE申卖价一
AskVolume1INT申卖量一
BidPrice2DOUBLE申买价二
BidVolume2INT申买量二
AskPrice2DOUBLE申卖价二
AskVolume2INT申卖量二
BidPrice3DOUBLE申买价三
BidVolume3INT申买量三
AskPrice3DOUBLE申卖价三
AskVolume3INT申卖量三
BidPrice4DOUBLE申买价四
BidVolume4INT申买量四
AskPrice4DOUBLE申卖价四
AskVolume4INT申卖量四
BidPrice5DOUBLE申买价五
BidVolume5INT申买量五
AskPrice5DOUBLE申卖价五
AskVolume5INT申卖量五
AveragePriceDOUBLE均价
ActionDayDATE交易日

表 3-2 历史行情数据结构

列名类型含义
CONTRACTIDSYMBOL合约代码
LASTPXDOUBLE最新价
PRESETTLEDOUBLE昨结算
OPENPXDOUBLE开盘价
HIGHPXDOUBLE最高价
LOWPXDOUBLE最低价
OPENINTSDOUBLE持仓量
RISELIMITDOUBLE涨停板
FALLLIMITDOUBLE跌停板
PRECLOSEDOUBLE昨收盘
CLOSEPXDOUBLE收盘价
PREDELTADOUBLE昨虚实度
CURRDELTADOUBLE今虚实度
B1DOUBLE申买价一
B2DOUBLE申买价二
B3DOUBLE申买价三
B4DOUBLE申买价四
B5DOUBLE申买价五
BV1INT申买量一
BV2INT申买量二
BV3INT申买量三
BV4INT申买量四
BV5INT申买量五
S1DOUBLE申卖价一
S2DOUBLE申卖价二
S3DOUBLE申卖价三
S4DOUBLE申卖价四
S5DOUBLE申卖价五
SV1INT申卖量一
SV2INT申卖量二
SV3INT申卖量三
SV4INT申卖量四
SV5INT申卖量五
TDATEDATE交易日期
SETTLEMENTPXDOUBLE结算价
TMDOUBLE累计成交金额
CLEARINGDAYDATE清算日期
TTIME+UPDATEMILLISECTIME交易时间+交易时间毫秒
INITOPENINTSINT初始持仓量
AVGPXDOUBLE当日均价
TQINT累计成交量
TTIMETIME交易时间
ExchangeInstIDSYMBOL合约交易所编码
CMDOUBLE瞬时成交金额
OCSYMBOL开平仓性质
LASTQTYINT最新成交量
INTSCHGINT持仓量变化
LIFELOWDOUBLE历史最低价
LIFEHIGHDOUBLE历史最高价
AVGPXDOUBLE当日均价
BIDIMPLYQTYINT申买推导量
ASKIMPLYQTYINT申卖推导量
BSRATIODOUBLE买比
SIDESYMBOL买卖方向
MFLXIDSYMBOL连续合约代码
MFLXNAMESYMBOL连续合约名称
LOCALTMTIME本地时间戳
MARKETSYMBOL交易所
CHGDOUBLE价格涨跌
CHGPCTDOUBLE价格涨跌幅(%)
VARIETIESSYMBOL品种索引
SETTLEGROUPIDSYMBOL结算组代码
SETTLEIDINT结算编号
UNIXLONG交易时间对应的 UNIX 时间戳
MFLAGSYMBOL主力合约标记
SOURCESYMBOL数据源
OffsetINT更新时间毫秒级偏移量
TFLAGSYMBOL日夜盘标记
CONTRACTNAMESYMBOL合约名称
CONTRACTCODESYMBOL合约代码(英文)
CQINT瞬时成交量

3.3 数据存储方案

为实现数据库存储及查询的最优性能,我们需要针对不同数据的特点,设计不同的分区方案存储。本文提供存储期货 Level 1 数据的最佳存储方案,用户也可以根据自身需要进行调整。更多金融数据的存储方案可参考《存储金融数据的分区方案最佳实践》。

3.3.1 期货L1数据存储方案

存储引擎:TSDB

分区方式: VALUE,按天分区

分区字段: TDATE

 def createFutureL1(dbName,tbName){
	if(!existsDatabase(dbName)){
		db = database(dbName, VALUE, 2020.01.01..2020.01.02,,"TSDB")
	}else{
		db = database(dbName)
	}
	name = `TDATE`TTIME`UPDATEMILLISEC`CONTRACTID`ExchangeInstID`CONTRACTNAME`LASTPX`HIGHPX`LOWPX`CQ`CM`OC`TQ`TM`LASTQTY`INITOPENINTS`OPENINTS`INTSCHG`RISELIMIT`FALLLIMIT`PRESETTLE`PRECLOSE`BuyPrice`BuyVol`SellPrice`SellVol`OPENPX`CLOSEPX`SETTLEMENTPX`LIFELOW`LIFEHIGH`AVGPX`BIDIMPLYQTY`ASKIMPLYQTY`BSRATIO`SIDE`MFLXID`MFLXNAME`PREDELTA`CURRDELTA`LOCALTM`MARKET`CHG`CHGPCT`VARIETIES`SETTLEGROUPID`SETTLEID`UNIX`CLEARINGDAY`MFLAG`SOURCE`CONTRACTCODE`Offset`TFLAG
	type = [DATE, SECOND, INT, SYMBOL, SYMBOL, SYMBOL, DOUBLE, DOUBLE, DOUBLE, INT, DOUBLE, SYMBOL, INT, DOUBLE, INT, INT, INT, INT, DOUBLE, DOUBLE, DOUBLE, DOUBLE, DOUBLE[], INT[], DOUBLE[], INT[], DOUBLE, DOUBLE, DOUBLE, DOUBLE, DOUBLE, DOUBLE, INT, INT, DOUBLE, SYMBOL, SYMBOL, SYMBOL, DOUBLE, DOUBLE, TIME, SYMBOL, DOUBLE, DOUBLE, SYMBOL, SYMBOL, INT, LONG, DATE, SYMBOL, SYMBOL, SYMBOL, INT, SYMBOL]
	schemaTable = table(1:0, name, type)
	db.createPartitionedTable(
			table=schemaTable, tableName=tbName, partitionColumns=`TDATE, 
			compressMethods={TradeTime:"delta"}, sortColumns=`CONTRACTID`TTIME, 
			keepDuplicates=ALL)	
}

3.3.2 期货分钟线存储方案

存储引擎:TSDB

分区方式:RANGE,按年分区

分区字段:TDATE

 def createFutureL1_KMin(dbName,tbName){
	if(!existsDatabase(dbName)){
		db = database(dbName, RANGE, 1980.01M + (1..20) * 60,,"TSDB")
	}else{
		db = database(dbName)
	}
	name = `CLEARINGDAY`TDATE`CONTRACTID`MARKET`Offset`bartime`closeprice`openprice`highprice`lowprice`volume`value`vwap`OPENINTS
	type = `DATE`DATE`SYMBOL`SYMBOL`INT`MINUTE`DOUBLE`DOUBLE`DOUBLE`DOUBLE`INT`DOUBLE`DOUBLE`INT
	schemaTable = table(1:0, name, type)
	db.createPartitionedTable(table=schemaTable, tableName=tbName, 
	partitionColumns=`TDATE, compressMethods={TradeTime:"delta"}, 
	sortColumns=`CONTRACTID`TDATE`bartime, keepDuplicates=ALL, 
	sortKeyMappingFunction = [hashBucket{,10}, hashBucket{,50}])	
}

3.4 实时行情对齐历史行情

从表3-1和表3-2可以看出 MDL 提供的历史行情和实时行情字段名和字段数量不完全相同,历史行情提供了更多的信息,因此以下提供对齐方法,以及如何对缺失值处理。

表 3-3 实时行情对齐历史行情字段

实时数据字段实时数据字段含义历史数据字段历史数据字段含义缺失值处理
InstruID合约代码CONTRACTID合约代码
LastPrice最新价LASTPX最新价
PreSetPrice昨结算PRESETTLE昨结算
OpenPrice今开盘OPENPX开盘价
HighPrice最高价HIGHPX最高价
LowPrice最低价LOWPX最低价
OpenInt持仓量,单位手,单边计算OPENINTS持仓量
ULimitPrice涨停板价RISELIMIT涨停板
LLimitPrice跌停板价FALLLIMIT跌停板
PreCloPrice昨收盘PRECLOSE昨收盘
ClosePrice今收盘CLOSEPX收盘价
PreDelta昨虚实度PREDELTA昨虚实度
CurrDelta今虚实度CURRDELTA今虚实度
BidPrice1申买价一B1申买价一
BidPrice2申买价二B2申买价二
BidPrice3申买价三B3申买价三
BidPrice4申买价四B4申买价四
BidPrice5申买价五B5申买价五
BidVolume1申买量一BV1申买量一
BidVolume2申买量二BV2申买量二
BidVolume3申买量三BV3申买量三
BidVolume4申买量四BV4申买量四
BidVolume5申买量五BV5申买量五
AskPrice1申卖价一S1申卖价一
AskPrice2申卖价二S2申卖价二
AskPrice3申卖价三S3申卖价三
AskPrice4申卖价四S4申卖价四
AskPrice5申卖价五S5申卖价五
AskVolume1申卖量一SV1申卖量一
AskVolume2申卖量二SV2申卖量二
AskVolume3申卖量三SV3申卖量三
AskVolume4申卖量四SV4申卖量四
AskVolume5申卖量五SV5申卖量五
ActionDay交易日TDATE交易日期
SetPrice今结算SETTLEMENTPX结算价
Turnover成交金额,单位元,单边计算TM累计成交金额
TradDay结算日CLEARINGDAY清算日期
UpdateTime最后修改时间TTIME+UPDATEMILLISEC交易时间+交易时间毫秒
PreOpenInt昨持仓量,单位手,单边计算INITOPENINTS初始持仓量
AveragePrice均价AVGPX当日均价
Volume成交量,单位手,单边计算TQ累计成交量
缺失TTIME交易时间second(TTIME+UPDATEMILLISEC)
缺失ExchangeInstID合约交易所编码用NULL填充
缺失CM瞬时成交金额用0填充
缺失OC开平仓性质用0填充
缺失LASTQTY最新成交量用NULL填充
缺失INTSCHG持仓量变化通联计算,OPENINTS- INITOPENINTS
缺失LIFELOW历史最低价用NULL填充
缺失LIFEHIGH历史最高价用NULL填充
缺失AVGPX当日均价用NULL填充
缺失BIDIMPLYQTY申买推导量用NULL填充
缺失ASKIMPLYQTY申卖推导量用NULL填充
缺失BSRATIO买比用0填充
缺失SIDE买卖方向用NULL填充
缺失MFLXID连续合约代码用NULL填充
缺失MFLXNAME连续合约名称用NULL填充
LocalTime本地创建时间LOCALTM本地时间戳
缺失MARKET交易所用CFFEX填充
缺失CHG价格涨跌(LASTPX- PRESETTLE)
缺失CHGPCT价格涨跌幅(%)(LASTPX/ PRESETTLE -1)*100%
缺失VARIETIES品种索引用NULL填充
缺失SETTLEGROUPID结算组代码用NULL填充
缺失SETTLEID结算编号用0填充
缺失UNIX交易时间对应的 UNIX 时间戳用0填充
缺失MFLAG主力合约标记按照前一天的持仓量来确定主力合约
缺失SOURCE数据源用“ MDL ”填充
缺失Offset更新时间毫秒级偏移量用NULL填充
缺失TFLAG日夜盘标记用1填充
缺失CONTRACTNAME合约名称用NULL填充
缺失CONTRACTCODE合约代码(英文)去除合约代码的到期时间获得
缺失CQ瞬时成交量用0填充

上述表格中历史数据结构由通联公司提供,包括了多个期货交易所的数据字段或者历史数据字段,并非是实时数据缺少信息,所以我们将中金所没有的字段用空值或其他值填充。其中真正需要计算确定的有有:TTIME, INTSCHG, MARKET, CHG, CHGPCT, MFLAG, SOURCE, CONTRACTCODE

  • TTIME: 交易时间,对最后修改时间取 second 获取,即 second(TTIME+UPDATEMILLISEC)
  • INTSCHG: 持仓量变化,使用持仓量减去初始持仓量 (OPENINTS- INITOPENINTS) 计算
  • MARKET: 交易所,针对的是中金所数据处理,因此填充 CFFEX
  • CHG, CHGPCT: 价格涨跌,价格涨跌幅(%),分别表示价格变化和变化的百分比幅度,因此用LASTPX- PRESETTLE(LASTPX/ PRESETTLE -1)*100%计算
  • SOURCE: 数据源,由于只有一个数据源,所以就只填充 MDL
  • CONTRACTCODE: 通过去除合约代码的到期时间获得。
    • 由于存在 EFP(期货转现货交易,Exchange of Futrues for Physicals)的情况,简单的去除到期时间不一定能够获取到正确的合约代码,还需要进行一层替换,CONTRACTID.regexReplace("[0-9]", "").strReplace("(EFP)", "")
  • MFLAG: 主力合约标记,0表示不是当天的主力合约,1表示是当天的主力合约,主力合约的判断是在盘前通过前一个交易日的持仓量最大的合约来确定,并且在盘中不做更改。主力合约选择需要依据历史数据。

4. DolphinDB MDL 行情数据接入解决方案

DolphinDB 专门提供 MDL 插件,用于实时接入 MDL 行情数据,并提供流计算等功能帮助用户实时计算行情数据,以下为数据接入处理的流程图:

注:历史的行情数据导入请联系 DolphinDB 小助手(dolphindb1)。

4.1 实时行情落库

实时行情落库的步骤通常是接收数据、处理实时数据,最后将数据写入分布式库表。其中,实时数据的处理是指对齐历史结构与实时结构的字段。对齐方法已在上一节中说明。

4.1.1 创建接收行情的流数据表

创建一个名为 quoteData Level1 的持久化共享流表创建成功,用于存放订阅的行情数据。

// 设置流表名称
quoteData = "quoteDataLevel1" 

// 获取行情数据格式
tbSchema = MDL::getSchema(svrID, msgID)

// 创建持久化流数据表
enableTableShareAndPersistence(
    streamTable(1:0, tbSchema["name"], tbSchema["type"]),
    quoteData,
    cacheSize=1000000, // 保留在系统的内存中的表的条数 1000000条
    retentionMinutes=1440,  // 持久化保留时间 1天
    preCache=1000 // 预加载1000条记录
)

4.1.2 连接 MDL 并订阅行情

输入MDL软件的IP、端口及账户信息后,建立句柄,连接后开始接收数据。

// MDL 的 IP、端口及账户
host = 
port =
username = 

// 创建 MDL 句柄
handle = MDL::createHandle("Handle_CFFE_L1_Future", host, port, username)

// 获取流表对象
outputTable = objByName("quoteDataLevel1")

// 订阅行情数据
svrID, svrVersion, msgID = " MDL SID_ MDL _CFFEX", " MDL VID_ MDL _CFFEX", 1
MDL::subscribe(handle, outputTable, svrID, svrVersion, msgID)

// 开始订阅
MDL::connect MDL (handle)

svrID、svrVersion、msgID 分别表示通联插件的数据服务 ID、数据服务版本号、消息 ID,用于获取通联的中金所实时期货数据。

只有在用户执行了MDL::connect MDL (handle)后, MDL 插件才会正式开始接收行情数据。此时可以查看流表:

图 4-1 行情数据流表

4.1.3 实时行情转换落库代码

实时行情数据入库时,我们通常不推荐数据实时入库,因为会造成数据库后台频繁的刷盘,推荐方法两种:

  1. 建立定时任务:盘后取出流表数据,处理后统一入库,附录模块文件中展示该方法;
  2. 建立流订阅:每N小时入库一次,以下代码将展示该方法,设置最大间隔为24小时:

流订阅步骤一:获取主力合约对应的码表

// 根据持仓量获取主力合约码表
dbName = "dfs://tlFutL1"
tbName = "cffexL1"

// 获取上一个交易日
todayDate = date(now())
predate = getMarketCalendar("CFFEX",todayDate-10,todayDate-1)
predate = predate[predate.size()-1]

// 根据持仓量获取主力合约码表
main_contract_map = select *, 
    CONTRACTID.regexReplace("[0-9]", "").strReplace("(EFP)", "") as group 
    from (
      select sum(OPENINTS) as sum_int from loadTable(dbName, tbName
    ) 
    where TDATE=predate group by CONTRACTID) 
    context by CONTRACTID.regexReplace("[0-9]", "").strReplace("(EFP)", "") 
    csort sum_int desc 
    limit 1
share(main_contract_map, "mainContractMap")

流订阅步骤二:根据以上计算或填充规则处理缺失列数据

// L1数据处理函数
def L1_convert_handle(dbName, tbName, msg) {
    // 对应列的名称修改
    origin_name = ["InstruID", "LastPrice", "PreSetPrice", "OpenPrice", "HighPrice", "LowPrice", "OpenInt", "ULimitPrice", "LLimitPrice", "PreCloPrice", "ClosePrice", "PreDelta", "CurrDelta", "BidPrice1", "BidPrice2", "BidPrice3", "BidPrice4", "BidPrice5", "BidVolume1", "BidVolume2", "BidVolume3", "BidVolume4", "BidVolume5", "AskPrice1", "AskPrice2", "AskPrice3", "AskPrice4", "AskPrice5", "AskVolume1", "AskVolume2", "AskVolume3", "AskVolume4", "AskVolume5", "ActionDay", "SetPrice", "Turnover", "TradDay", "UpdateTime", "PreOpenInt", "AveragePrice", "Volume", "LocalTime"]
    new_name = ["CONTRACTID", "LASTPX", "PRESETTLE", "OPENPX", "HIGHPX", "LOWPX", "OPENINTS", "RISELIMIT", "FALLLIMIT", "PRECLOSE", "CLOSEPX", "PREDELTA", "CURRDELTA", "B1", "B2", "B3", "B4", "B5", "BV1", "BV2", "BV3", "BV4", "BV5", "S1", "S2", "S3", "S4", "S5", "SV1", "SV2", "SV3", "SV4", "SV5", "TDATE", "SETTLEMENTPX", "TM", "CLEARINGDAY", "TTIME_UPDATEMILLISEC", "INITOPENINTS", "AVGPX", "TQ", "LOCALTM"]
    tmp = sql(sqlColAlias(sqlCol(origin_name), new_name), msg)
    tmp =  MDL _7_1_0_add_columns(tmp)
    tmp =  MDL _7_1_0_add_MFLAG(tmp)

    // 导入数据库中
    n = count(tmp)
    tb = loadTable(dbName, tbName)
    rows = tb.tableInsert(tmp)
    
    // 判断是否导入成功
    if(n != rows){
      throw "导入失败,需要导入数据" + string(n) + "条,仅导入数据" + string(rows) + "条."
    }
}
// 订阅实时行情并处理落库
subscribeTable(
    name="L1DataConvert",
    tableName="quoteDataLevel1",
    actionName="L1DataConvertProcess",
    handler=L1_convert_handle{dbName, tbName},
    msgAsTable=true,
    batchSize=700000, // 每日数据量大概60w,统一盘后入库
    timeTrigger=true,
    throttle=24*60*60 // 设置为每24h触发一次
)

这里通过subscribeTable函数订阅上一步 quoteDataLevel1 的流表,处理后直接写入分布式库表内。

其中L1_convert_handle函数先将数据列名对齐后,通过 MDL _7_1_0_add_columns函数添加缺失的字段, MDL _7_1_0_add_MFLAG函数判断主力合约标记。最终将数据写入数据库中,如果导入不成功将会报错。具体字段处理方法可查看附录文件。

4.2 实时行情聚合分钟K线

由于数据存在时延,原始行情数据往往在盘后时间到达。所以在计算前,需要先对原始行情的数据时间进行规整处理。而后再将规整的数据进行实时聚合计算及指标计算。DolphinDB提供流计算引擎将行情实时计算,和实时计算需要的其他指标,例如成交量加权平均价格(VWAP)以及判断当前主力合约等。

4.2.1 原始行情数据处理

原始行情的数据处理涉及到对中午、下午收盘时间的规整处理以及对缺失值进行处理,方便K线聚合计算。

4.2.1.1 收盘时间规整处理

由于实时行情从交易所到用户的过程中存在一定的时延,因此临近收盘时的行情数据通常会在交易所收盘后才到达用户手中,导致用户的K线提前闭合,计算K线错误,所以在使用实时行情数据合成K线的时候需要先对行情数据进行时间规整处理。

第一,处理临近收盘时的数据。例如将 11:30:02 的数据规整到 11:30:00 ,避免数据丢失。核心处理逻辑如下,其中的end_time_map分别表示不同品种的中午收盘时间和下午收盘时间,需要用户提前根据合约规则指定。

def orgin_data_process(msg, Process_engine) {
	end_time_map1, start_time_map2 = objByName("edate1"), objByName("sdate2")
	end_time_map2 = objByName("edate2")

    tmp = select InstruID as unified_code, "CCFX" as market,
              concatDateTime(ActionDay, UpdateTime) as data_time, 
              TradDay as trade_date, 
              LastPrice as last_price, 
              PreCloPrice as pre_close_price,
              OpenPrice as open_price, 
              HighPrice as high_price, 
              LowPrice as low_price,
              ClosePrice as close_price,
              Volume as volume,
              Turnover as turnover, 
              long(NULL) as trades_count, 
              ULimitPrice as upper_limit_price, 
              LLimitPrice as lower_limit_price,
              double(NULL) as iopv,
              PreSetPrice as pre_settlement_price, 
              SetPrice as settlement_price,
              OpenInt as open_interest 
          from msg
	
	// 中午收盘时的处理
	update tmp set data_time = concatDateTime(date(data_time), 
		end_time_map1[unified_code.regexReplace("[0-9]", "").strReplace("(EFP)", "")]) 
		where time(data_time)>end_time_map1[unified_code.regexReplace("[0-9]", "").strReplace("(EFP)", "")] 
		and time(data_time)<start_time_map2[unified_code.regexReplace("[0-9]", "").strReplace("(EFP)", "")]

	// 下午收盘时的处理
	update tmp set data_time=concatDateTime(date(data_time), 
      end_time_map2[unified_code.regexReplace("[0-9]", "").strReplace("(EFP)", "")]) 
      where time(data_time)>end_time_map2[unified_code.regexReplace("[0-9]", "").strReplace("(EFP)", "")] 
      and time(data_time)<temporalAdd(end_time_map2[unified_code.regexReplace("[0-9]", "").strReplace("(EFP)", "")], 2,"H")

	tableInsert(getStreamEngine(Process_engine), tmp)
}

第二,处理空值和累计值。对原始数据的空值进行填充,并且将 volume 这样的累计值转化为差值,方便后续计算K线时加总求和。转化代码如下:

@state
def calDeltasDay(val, data_time, unified_code){
    timeMap = objByName(`sdate1)
    return iif(deltas(val)==NULL, iif(timeMap[unified_code.regexReplace("[0-9]", "").strReplace("(EFP)", "")]==NULL, val, 0.0), deltas(val)) 
}

def process_convert(){
    convert = [<market>,<data_time>, 
        <trade_date>,<last_price>, <pre_close_price>,<open_price>, 
        <cummax(iif(high_price==0.0, NULL, high_price))>, 
        <cummin(iif(low_price==0.0, NULL, low_price))>, 
        <close_price>, <volume>, <turnover>, <trades_count>, <upper_limit_price>, 
        <lower_limit_price>, <iopv>,
        <pre_settlement_price>, <settlement_price>, <open_interest>, 
        <iif(deltas(cummax(iif(high_price==0.0, NULL, high_price)))>0.000001, 1, 0) as deltas_high_price>, 
        <iif(abs(deltas(cummin(iif(low_price==0.0, NULL, low_price))))>0.000001, -1, 0) as deltas_low_price>,
        <calDeltasDay(volume, data_time, unified_code) as deltas_volume>, 
        <calDeltasDay(turnover, data_time, unified_code) as deltas_turnover>, 
        <iif(deltas(trades_count)==NULL, trades_count, deltas(trades_count)) as deltas_trades_count>]
    return convert
}

第三,将上述两个处理方法通过ReactiveState Engine流引擎以及流订阅关联,注意流计算的代码开发步骤与链路是相反的。

// 1. 创建中间过程流表
process_table = "processed_data"

colNames = `unified_code`market`data_time`trade_date`last_price`pre_close_price`open_price`high_price`low_price`close_price`volume`turnover`trades_count`upper_limit_price`lower_limit_price`iopv`pre_settlement_price`settlement_price`open_interest`deltas_high_price`deltas_low_price`deltas_volume`deltas_turnover`deltas_trades_count
colTypes = ["SYMBOL","SYMBOL","TIMESTAMP","DATE","DOUBLE","DOUBLE","DOUBLE","DOUBLE","DOUBLE","DOUBLE","LONG","DOUBLE","LONG","DOUBLE","DOUBLE","DOUBLE","DOUBLE","DOUBLE","LONG","DOUBLE","DOUBLE","DOUBLE","DOUBLE","DOUBLE"]

enableTableShareAndPersistence(
    streamTable(100000:0, colNames, colTypes),
    process_table,
    cacheSize = 1000000, // 保留在系统的内存中的表的条数 1000000条
    retentionMinutes=1440, // 持久化保留时间 1天
    preCache=0
)

// 2. 创建填充空值的流引擎
Process_engine = "L1_QuoteData_process"
createReactiveStateEngine(
        name=Process_engine, metrics=process_convert(), 
        dummyTable=process_input_schema(1), 
        outputTable=objByName(process_table), 
        keyColumn="unified_code", 
        keepOrder = true)

// 3. 订阅 QuoteData 
subscribeTable(
    tableName=QuoteData, actionName=QuoteData+"Process",
    handler=orgin_data_process{, Process_engine}, // 数据处理
    msgAsTable=true,
    batchSize=5000, throttle=0.001, reconnect=true
)

接下来就是通过生成的中间数据流表 processed_data 来合成K线。

4.2.2 K线聚合

K线聚合的过程同样分为两步,首先将高频数据通过时序聚合引擎 DailyTimeSeriesEngine 聚合为分钟频数据,再通过响应式状态引擎 ReactiveStateEngine 处理缺失值及计算成交量加权平均价格(VWAP)。

4.2.2.1 时序聚合引擎合成 K 线

1. 确定需要计算的 OHLC 等列,以及计算方法;

// K线计算方法
// priceFilter方法用于筛选出其他期货日夜盘价格,具体代码请参考模块
def process_to_kline(){
	convert = [<first(market) as market>,
			<first(trade_date) as trade_date>,
            <firstNot(priceFilter(last_price, volume, data_time, unified_code)) as open_price>,
			<max(priceFilter(last_price, volume, data_time, unified_code)) as high_price>,
			<min(priceFilter(last_price, volume, data_time, unified_code)) as low_price>,
			<lastNot(last_price) as close_price>,
			<sum(deltas_volume) as volume>,
			<sum(deltas_turnover) as turnover>,
			<sum(deltas_trades_count) as trades_count>,
			<firstNot(pre_close_price) as pre_close_price>,
			<firstNot(upper_limit_price) as upper_limit_price>,
			<firstNot(lower_limit_price) as lower_limit_price>,
			<last(settlement_price) as settlement_price>,
			<last(pre_settlement_price) as pre_settlement_price>,
			<last(open_interest) as open_interest>,
			<firstNot(iopv) as iopv>,
			<tseConstFill(00f) as day_session_open>,
			<tseConstFill(string(NULL)) as domainid>,
			<lastNot(open_price) as first_open_price>,
			<lastNot(low_price) as first_low_price>,
			<lastNot(high_price) as first_high_price>]
	fillList = ["ffill", "ffill", 'null', 'null', 'null', 'null', 0, 0, 
				'null', 'ffill', 'ffill', 'ffill', 'ffill', "ffill", "ffill", 'null', 
				'null', 'null', "ffill", 'ffill', "ffill"]
	return convert, fillList
}

2. 创建两个时序聚合引擎

  • 创建两个引擎的原因:中金所部分期货的下午收盘时间为 15:15:00 ,而其他的则是正常的 15:00:00 ,目前单个时序聚合引擎还无法做到将这两者区分。
  • 开盘第一根K线处理方法:当createDailyTimeSeriesEngine的 closed取值为“right”时,9:30-9:31 中的数据合并的K线被称为9:31,因此为了获取到9:30的 K 线,需要将区间的开始时间往前拉 n 分钟。
// 用于填充值
defg tseConstFill(fillVal){
	return fillVal
}

// n分钟的k线
nMinute = 1

// 创建日聚合引擎1
windowSize = 60000*nMinute
convert, fillList = process_to_kline()
sessionBegin = time([temporalAdd(09:30:00,-nMinute,"m"),13:00:00])
sessionEnd = time(11:30:00 15:00:00)

createDailyTimeSeriesEngine(
    name="Kline_engine00", 
    windowSize=windowSize, step=windowSize, metrics=convert,
    dummyTable=objByName(process_table),
    outputTable=getStreamEngine(cal_Kline_engine), 
    timeColumn=`data_time, 
    keyColumn="unified_code", 
    closed="right",
    useWindowStartTime=false, 
    forceTriggerTime=1, fill=fillList, 
    sessionBegin=sessionBegin, sessionEnd=sessionEnd, 
    mergeSessionEnd=true, forceTriggerSessionEndTime=windowSize
)

// 创建日聚合引擎2
windowSize = 60000*nMinute 
convert, fillList = process_to_kline()
sessionBegin = time([temporalAdd(09:30:00,-nMinute,"m"),13:00:00])
sessionEnd = time(11:30:00 15:15:00)
createDailyTimeSeriesEngine(
    name="Kline_engine15", 
    windowSize=windowSize, step=windowSize, metrics=convert,
    dummyTable=objByName(process_table),
    outputTable=getStreamEngine(cal_Kline_engine), 
    timeColumn=`data_time, 
    keyColumn="unified_code", 
    closed="right",
    useWindowStartTime=false, 
    forceTriggerTime=1, fill=fillList, 
    sessionBegin=sessionBegin, sessionEnd=sessionEnd, 
    mergeSessionEnd=true, forceTriggerSessionEndTime=windowSize
)

4.2.2.2 响应式状态引擎 ReactiveStateEngine处理缺失值及计算VWAP

我们用响应式状态引擎 ReactiveStateEngine对缺失的数据进行填充。其中,VWAP的计算公式为 value/volume,表示一手合约的平均价值,成交额数据的单位是元,成交量的单位是手。但期货的报价并不等于一手合约的价值,例如利率期货合约的报价是百元净价报价,权益类期货合约的报价是指数点。因此在计算 VWAP 时需要一个转换系数,将一手合约的平均价值转换为报价相匹配的单位。如2年期国债期货合约,其报价方式为百元净价报价,合约的标的是面值为200万元人民币、票面利率为3%的名义中短期国债,因此需要将一手合约的平均价值除以20000从而和报价相匹配,其余的期货合约同理。因此,本模块维护了一个转换系数的字典 volmap 记录中金所所有期货合约的转换系数。

// 计算vwap函数,下有详细说明
def calVwap(volume, turnover, unified_code){
	volMap = objByName(`vwap_map) //转换系数字典,参考模块内代码
	vol = nullFill(volMap[unified_code.regexReplace("[0-9]", "").strReplace("(EFP)", "")], 1.0)
	return turnover\volume\vol
}

// k线处理
def cal_kline_convert(){
    convert = [<data_time>,<trade_date>,
            <nullFill(open_price, cumlastNot(close_price).nullFill(pre_close_price))>,
            <nullFill(high_price, cumlastNot(close_price).nullFill(pre_close_price))>,
            <nullFill(low_price, cumlastNot(close_price).nullFill(pre_close_price))>,
            <nullFill(close_price, cumlastNot(close_price).nullFill(pre_close_price))>,
            <nullFill(volume, 0)>,
            <nullFill(turnover, 0.0)>,
            <calVwap(volume, turnover, unified_code).nullFill(cumlastNot(close_price).nullFill(pre_close_price))>,
            <trades_count>,
            <pre_close_price>,
            <upper_limit_price>,
            <lower_limit_price>,
            <settlement_price>,
            <pre_settlement_price>,
            <open_interest>,
            <iopv>,
            <day_session_open>,
            <domainid>
            ]
    return convert
}

// 建立rse引擎
engine = createReactiveStateEngine(name=cal_Kline_engine, metrics=cal_kline_convert(), 
    dummyTable=cal_kline_input(1),
    outputTable=objByName(KlineData),
    keyColumn=["unified_code", "market"], 
    keepOrder = true
)
// 共享该引擎上锁
share(engine ,cal_Kline_engine)

5. 总结

本文详细介绍了 DolphinDB MDL 插件针对中金所 Level1 期货行情数据的实时接入流程。首先,我们介绍了中国金融期货交易所的期货产品以及 MDL 提供的 Level 1 数据结构。针对此,我们提供了中金所 Level1数据及分钟K线的存储方案。在做好准备工作后,我们详细说明了实时接入数据的步骤与细节。通过创建流数据表、连接MDL并订阅行情,本文展示了如何高效地接入、存储并处理实时行情数据。此外,分钟K线的合成过程也得到了详细说明。

通过本文的介绍,读者能够全面了解 DolphinDB MDL 插件的使用方法及在其期货市场中的实际应用场景,为金融数据的实时处理、存储与分析提供了可行的解决方案。

6. 附录

MDLCFFEX.zip

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/956728.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

JupyterLab 安装以及部分相关配置

安装 JupyterLab pip install jupyter启动 JupyterLab jupyter lab [--port <指定的端口号>] [--no-browser] # --port 指定端口 # --no-browser 启动时不打开浏览器安装中文 首先安装中文包 pip install jupyterlab-language-pack-zh-CN安装完成后重启 JupyterLab 选…

LabVIEW电源纹波补偿

在电子设备的电源管理中&#xff0c;电源纹波的存在可能会对设备的稳定性和性能产生负面影响。以某精密电子仪器的电源纹波补偿为例&#xff0c;详细阐述如何运用 LabVIEW 编写程序进行电源纹波补偿。将从电源纹波特点、测量采样、滤波、反馈控制等多个方面展开介绍。 ​ 电源…

嵌入式硬件篇---基本组合逻辑电路

文章目录 前言基本逻辑门电路1.与门&#xff08;AND Gate&#xff09;2.或门&#xff08;OR Gate&#xff09;3.非门&#xff08;NOT Gate&#xff09;4.与非门&#xff08;NAND Gate&#xff09;5.或非门&#xff08;NOR Gate&#xff09;6.异或门&#xff08;XOR Gate&#x…

使用rpc绕过咸鱼sign校验

案例网站是咸鱼 找到加密函数i()&#xff0c;发现参数是由token时间戳appkeydata构成的 js客户端服务 考虑到网站可能有判断时间戳长短而让请求包失效的可能&#xff0c;我们请求包就直接用它的方法生成 下面我们先把token和h置为键值对tjh123 再把方法i()设为全局变量my_…

鸿蒙安装HAP时提示“code:9568344 error: install parse profile prop check error” 问题现象

在启动调试或运行应用/服务时&#xff0c;安装HAP出现错误&#xff0c;提示“error: install parse profile prop check error”错误信息。 解决措施 该问题可能是由于应用使用了应用特权&#xff0c;但应用的签名文件发生变化后未将新的签名指纹重新配置到设备的特权管控白名…

Pix2Pix :用于图像到图像转换的条件生成对抗网络

1. 背景与问题 图像到图像的转换&#xff08;Image-to-Image Translation&#xff09;是计算机视觉中的一个重要任务&#xff0c;指的是在输入一张图像的情况下&#xff0c;生成一张风格、内容或其他条件不同但语义一致的图像。随着深度学习的发展&#xff0c;尤其是生成对抗网…

【大数据2025】Hadoop 万字讲解

文章目录 一、大数据通识大数据诞生背景与基本概念大数据技术定义与特征大数据生态架构概述数据存储数据计算与易用性框架分布式协调服务和任务调度组件数仓架构流处理架构 二、HDFSHDFS 原理总结一、系统架构二、存储机制三、数据写入流程四、心跳机制与集群管理 安全模式&…

docker 安装 nanomq

1.拉取镜像 docker pull emqx/nanomq:latest 2. 创建配置文件夹&#xff08;示例放在/home/nanomq&#xff09; mkdir nanomq chomd 777 nanomq3. 创建配置文件&#xff08;nanomq.conf&#xff09;写入以下内容 mqtt {property_size 32max_packet_size 10KBmax_mqueue_le…

【STM32G4xx的CAN驱动记录】

STM32G4xx的CAN驱动记录 CAN说明CAN的波特率计算数据测试总结 本文主要记录了基于STM32G4xx的CAN接口解析某型号雷达数据遇到的问题及规避方法&#xff0c;CAN总线波特率500Kbps&#xff0c;采样点要求80%附近。 注意CAN总线同步段的时间&#xff01;&#xff01;&#xff01; …

2024年CSDN博客之旅:成长、创作与生活的交响曲

文章目录 《2024年博客之旅&#xff1a;成长、创作与生活的交响曲》一、引言二、个人成长与突破盘点&#xff08;一&#xff09;技术能力的提升&#xff08;二&#xff09;解决问题能力的增强&#xff08;三&#xff09;沟通与表达能力的进步 三、年度创作历程回顾&#xff08;…

微服务与docker

准备工作 在课前资料中给大家提供了黑马商城项目的资料,我们需要先导入这个单体项目。不过需要注意的是,本篇及后续的微服务学习都是基于Centos7系统下的Docker部署,因此你必须做好一些准备: Centos7的环境及一个好用的SSH客户端装好Docker会使用Docker如果是学习过上面Doc…

docker离线安装及部署各类中间件(x86系统架构)

前言&#xff1a;此文主要针对需要在x86内网服务器搭建系统的情况 一、docker离线安装 1、下载docker镜像 https://download.docker.com/linux/static/stable/x86_64/ 版本&#xff1a;docker-23.0.6.tgz 2、将docker-23.0.6.tgz 文件上传到服务器上面&#xff0c;这里放在…

02内存结构篇(D3_对象的创建历程)

目录 一、学习前言 二、对象的创建&#xff1a;指针碰撞 & 空闲列表 三、对象的内存布局&#xff1a;三部分组成 1. 对象头 2. 实例数据 3. 对齐填充 四、对象的访问定位 1. 句柄访问 2. 直接指针访问 3. 两者访问方式比较 一、学习前言 运行时数据区了解了&…

Linux系统之kill命令的基本使用

Linux系统之kill命令的基本使用 一、kill命令介绍1. kill命令简介2. kill命令的使用场景3. kill命令使用注意事项 二、kill命令的使用帮助1. 查看kill命令帮助信息2. kill命令帮助解释 三、kill常用的信号1. 列出所有的信号2.kill常用的信号 四、kill命令的基本使用1. 运行一个…

【银河麒麟高级服务器操作系统】业务访问慢网卡丢包现象分析及处理过程

了解更多银河麒麟操作系统全新产品&#xff0c;请点击访问 麒麟软件产品专区&#xff1a;product.kylinos.cn 开发者专区&#xff1a;developer.kylinos.cn 文档中心&#xff1a;document.kylinos.cn 交流论坛&#xff1a;forum.kylinos.cn 服务器环境以及配置 【内核版本…

leetcode49-字母异位词分组

leetcode 49 思路 通过一个哈希表进行记录每个分组&#xff0c;遍历strs&#xff0c;然后对每个字符串item进行排序&#xff0c;比如&#xff1a;acb bac cab都会被排序为’abc’,然后以abc作为map的key&#xff0c;value就是存放所有匹配出来为key的值&#xff0c;最后把ma…

ChatGPT被曝存在爬虫漏洞,OpenAI未公开承认

OpenAI的ChatGPT爬虫似乎能够对任意网站发起分布式拒绝服务&#xff08;DDoS&#xff09;攻击&#xff0c;而OpenAI尚未承认这一漏洞。 本月&#xff0c;德国安全研究员Benjamin Flesch通过微软的GitHub分享了一篇文章&#xff0c;解释了如何通过向ChatGPT API发送单个HTTP请求…

WGAN - 瓦萨斯坦生成对抗网络

1. 背景与问题 生成对抗网络&#xff08;Generative Adversarial Networks, GANs&#xff09;是由Ian Goodfellow等人于2014年提出的一种深度学习模型。它包括两个主要部分&#xff1a;生成器&#xff08;Generator&#xff09;和判别器&#xff08;Discriminator&#xff09;…

Java工程结构:服务器规约(JVM 碰到 OOM 场景时输出 dump 信息、设置tomcat的 JVM 的内存参数、了解服务平均耗时)

文章目录 I 调用远程操作必须有超时设置。II 推荐了解每个服务大致的平均耗时JVM 的 Xms 和 Xmx 设置一样大小的内存容量让 JVM 碰到 OOM 场景时输出 dump 信息调大服务器所支持的最大文件句柄数(File Descriptor,简写为 fd)高并发服务器建议调小 TCP 协议的 time_wait 超时…

1.3.浅层神经网络

目录 1.3.浅层神经网络 1.3.1 浅层神经网络表示 1.3.2 单个样本的向量化表示 1.3.4 激活函数的选择 1.3.5 修改激活函数 1.3.5 练习​​​​​​​ 1.3.浅层神经网络 1.3.1 浅层神经网络表示 之前已经说过神经网络的结构了&#xff0c;在这不重复叙述。假设我们有如下…