TWAP
TWAP交易时间加权平均价格Time Weighted Average Price 模型,是把一个母单的数量平均地分配到一个交易时段上。该模型将交易时间进行均匀分割,并在每个分割节点上将拆分的订单进行提交。例如,可以将某个交易日的交易时间平均分为N 段,TWAP 策略会将该交易日需要执行的订单均匀分配在这N个时间段上去执行,从而使得交易均价跟踪TWAP,计算公式为:
TWAP不考虑交易量的因素。TWAP的基准是交易时段的平均价格,它试图付出比此时段内平均买卖差价小的代价执行一个大订单。TWAP模型设计的目的是使交易对市场影响减小的同时提供一个较低的平均成交价格,从而达到减小交易成本的目的。在分时成交量无法准确估计的情况下,该模型可以较好地实现算法交易的基本目的。
但是使用TWAP过程中的一个问题是,在订单规模很大的情况下,均匀分配到每个节点上的下单量仍然较大,当市场流动性不足时仍可能对市场造成一定的冲击。另一方面,真实市场的成交量总是在波动变化的,将所有的订单均匀分配到每个节点上显然是不够合理的。因此,算法交易研究人员很快建立了基于成交量变动预测的VWAP 模型。不过,由于TWAP 操作和理解起来非常简单,因此其对于流动性较好的市场和订单规模较小的交易仍然适用。
VWAP
假设已经选好股,要大量买入,但是单凭交易员的操作海量单而且要完成买入1000万股这些的操作是有点的困难的。那么这时候怎样解决拆单,防止冲击成本的问题就非常重要:算法交易。现在市面上的流行算法交易有两种,第一种是VWAP,一种是TWAP。但是每种算法交易也有它的坏处,就是很容给人看出操作手法(如果策略比较简单的情况下),所以这种需要不断优化。
VWAP是Volume Weighted Average Price 的缩写,译为成交量加权平均价。VWAP策略即是一种拆分大额委托单,在约定时间段内分批执行,以期使得最终买入或卖出成交均价尽量接近该段时间内整个市场成交均价的算法交易策略。
VWAP内容:包含宏观和微观两个层面,宏观层面要解决如何拆分大额委托单的问题,需要投资者对股票的日内成交量做出预测,建议按两分钟的时间长度来拆分订单。微观层面要确定是用限价单还是市价单来发出交易指令,考虑到VWAP是一种被动跟踪市场均价的策略,建议采用市价委托方式,一方面有利于控制最终成交均价与市场均价之间的偏差,另一方面也可以提高委托成交的效率,避免限价单长时间挂单不能成交的风险。
传统的VWAP策略,只是一种被动型的策略,而且在这个策略当中,最重要有以下的因素:历史成交量,未来的成交量预测、市场动态总成交量,拆单的时间段(就是总共要将总单拆分成多少单分别以怎样的时间频率交易)。
表示第i次交易价格按成交量加权的权重。
宏观拆单VWAP:假设投资者要在2024年5月10日当天以市场均价买入某只股票1000万股,宏观策略可以告诉投资者如何拆分这1000万股的委托单,在当天什么时间下多少单。市场通行的做法是采取等时长下单,例如每5分钟下一笔单,这样原来的1000万股委托单将被拆分成笔小额委托单,分时执行。
其中V为拆分前委托单的总量。很显然,当时,上式取最小值0,也就是说如果投资者能够准确预测市场每个时间段的成交量占当日成交量的比例,那么投资者按这个比例拆分委托单,分时成交,那么最后总的成交均价将于市场成交均价相当。因此,拆单策略的一个关键在于对日内成交量的预测。
但是这种方法的一个缺陷是,根据历史交易来预测未来的预测交易量。对VWAP预测成交量比例有进行改进,利用动态的成交量来做一个预测。例如说,根据前两分钟的成交量来预测未来两分钟的成交量。这种策略有效降低成交成本。在检测VWAP策略是否有效的时候,引入一个指标,绝对平均值偏差:
除个别股票外,VWAP-D与市场均价的偏差都小于VWAP-B,这主要得益于其对日内成交量预测的实时动态调整。另外,股票市值越大,其股价与成交量的波动性也就越小,VWAP策略执行的效果也就越好,与市场均价的偏差也就越小。总体上来看,大盘股要好于中盘股,中盘股要好于小盘股,但之间的差距并不明显。如果考虑到资金的冲击成本,大盘股与小盘股之间的这种差距将会拉大、对于一个策略来说,MAPE越小,策略效果越好,越大,卖出价格越高。
较为高级的VWAP模型要使用交易所单簿(Order Book)的详细信息,这要求系统能够得到即时的第二级市场数据(Level II Market Data)。
VWAP模型对于在几个小时内执行大单的效果最好。在交易量大的市场中,VWAP效果比在流动性差的市场中要好。在市场出现重要事件的时候往往效果不那么好。如果订单非常大,譬如超过市场日交易量的1%的话,即便VWAP可以在相当大的程度上改善市场冲击,但市场冲击仍然会以积累的方式改变市场,最终使得模型的效果差于预期。
VWAP算法交易的目的是最小化冲击成本,并不寻求最小化所有成本。理论上,在没有额外的信息,也没有针对股票价格趋势的预测的情况下,VWAP 是最优的算法交易策略。
data:
example:
import csv
# global file path
filePath = r"C:\Users\ainve\Desktop\TWAP_VWAP_code-master\TWAP_VWAP_code-master\data\market_price.csv"
# read the csv file to get market data
def readData(filePath):
marketDataTable = list()
try:
fileIn = open(filePath, 'r')
reader = csv.reader(fileIn)
for row in reader:
marketDataTable.append(row)
finally:
fileIn.close()
return marketDataTable
# calculate vwap value
def calc_vwap(marketDataTable):
n = len(marketDataTable) - 1
total_sum = 0.0
volume_sum = 0
for i in range(1, n + 1):
high_price = float(marketDataTable[i][8])
low_price = float(marketDataTable[i][9])
price = (high_price + low_price) / 2
volume = int(marketDataTable[i][10])
total_sum += price * volume # Average price per minute weighted by volume
volume_sum += volume
return total_sum / volume_sum
# calculate vwap value
def calc_twap(marketDataTable):
n = len(marketDataTable) - 1
price_sum = 0.0
for i in range(1, n + 1):
high_price = float(marketDataTable[i][8])
low_price = float(marketDataTable[i][9])
price = (high_price + low_price) / 2 # Average price per minute
price_sum += price
return price_sum / n
if __name__ == "__main__":
print("reading market data")
marketDataTable = readData(filePath)
print("calculating TWAP and VWAP")
print("VWAP: ", calc_vwap(marketDataTable))
print("TWAP: ", calc_twap(marketDataTable))
(计算的指标应该是实时更新动态的)
import csv
import warnings
warnings.filterwarnings("ignore")
# global file path
filePath = r"C:\Users\ainve\Desktop\TWAP_VWAP_code-master\TWAP_VWAP_code-master\data\market_price.csv"
marketDataTable = pd.read_csv(filePath)
# calculate vwap value
def calc_vwap(marketDataTable):
total_sum = 0.0
volume_sum = 0
for i in range(len(marketDataTable)):
high_price = float(marketDataTable['highPrice'][i])
low_price = float(marketDataTable['lowPrice'][i])
price = (high_price + low_price) / 2
volume = int(marketDataTable['totalVolume'][i])
total_sum += price * volume # Average price per minute weighted by volume
volume_sum += volume
marketDataTable.at[i, 'vwap'] = total_sum / volume_sum
return marketDataTable
# calculate vwap value
def calc_twap(marketDataTable):
price_sum = 0.0
for i in range(len(marketDataTable)):
high_price = float(marketDataTable['highPrice'][i])
low_price = float(marketDataTable['lowPrice'][i])
price = (high_price + low_price) / 2 # Average price per minute
price_sum += price
marketDataTable.at[i, 'twap'] = price_sum / (i+1)
print(price_sum)
print(i+1)
return marketDataTable
# for i in range(1,len(marketDataTable)):
for j in range(234,len(marketDataTable)):
print(calc_vwap(marketDataTable.iloc[:j+1])[-3:]) # 切片左开右闭,j=234??
print(calc_twap(marketDataTable.iloc[:j+1])[-3:])
掘金量化APIdemo:(看不到逻辑源码,只有部分API接口调用)(算法交易函数 - Python - 掘金量化 (myquant.cn))
"""
TWAP算法和VWAP算法的介绍
TWAP:在设定的时间范围内匀速下单,降低市场冲击,最小化与市场TWAP的偏差;
VWAP:在设定的时间范围内对根据对市场成交量分布的预测进行下单,降低市场冲击,最小化与市场VWAP的偏差;
二、TWAP算法和VWAP算法参数
开始时间:策略开始执行的时间(剔除非交易时间段)。如果开始时间早于策略下达时间点时,则使用下达时间作为开始时间
结束时间:策略停止执行的时间(剔除非交易时间段)。过了结束时间还未完成的数量,将会自动释放到指令。算法执行的区间段,时间越短,任务执行强度(委托频率和单笔委托量)越高
量比比例:策略的成交数量与策略执行期间市场的总成交量(不包括策略执行之前和结束之后的市场成交量)之比。对于跟量和跟价策略,量比比例参数是作为目标比例来参考;而对于其它策略,是作为上限来控制
委托最小金额:控制子单单笔委托的最小金额 该参数只适用于股票。A股单位为元
基准价格:算法模型的参考基准价格,子单限价单价格不能超过该价格的不利价位方向;当填入价格为0时,则不设置基准价
"""
# coding=utf-8
from gm.api import *
from gm.model import DictLikeAlgoOrder
from gm.pb.account_pb2 import AlgoOrder
from datetime import timedelta
"""
算法单新增api在 sdk 的 gm.api.trade.py 文件里, 有如下函数, 具体函数签名可点进去看api文档
algo_order
algo_order_cancel
algo_order_pause
get_algo_child_orders
get_algo_orders
增加的算法单的状态常量
AlgoOrderStatus_Unknown = 0
AlgoOrderStatus_Resume = 1 # 恢复母单
AlgoOrderStatus_Pause = 2 # 暂停母单
AlgoOrderStatus_PauseAndCancelSubOrders = 3 # 暂停母单并撤子单
algo_param算法参数
time_start str 开始时间
time_end str 结束时间
part_rate flaot 量比比例 (0 ~ 1)
min_amount int 委托最小金额
"""
# TWAP算法和VWAP算法示例, 仅接口使用示例
def init(context):
time = (context.now + timedelta(seconds=3)).strftime('%H:%M:%S')
schedule(schedule_func=algo, date_rule='1d', time_rule=time)
def algo(context):
# 算法名
algo_name = 'twap'
# 算法参数格式如下
algo_param = {'time_start': '14:00:00','time_end': '16:00:00','part_rate': 0.5, 'min_amount': 1000}
symbol = 'SHSE.600008'
# 基准价, 算法母单需要是限价单
price = current(symbol)[0]['price']
aorder = algo_order(symbol=symbol, volume=2000, side=OrderSide_Buy, order_type=OrderSide_Buy,
position_effect=PositionEffect_Open, price=price, algo_name=algo_name, algo_param=algo_param)
# 提取算法单的 cl_ord_id 委托客户端ID, 用于其它api的查询, 或者撤单时用
context.algo_order_id = aorder[0]['cl_ord_id']
# 暂停或重启或者撤销算法母单
# aorders = get_algo_orders(account='')
# # 暂停订单,修改订单结构的母单状态字段 algo_status为1 恢复母单, 2 暂停母单, 3 暂停母单并撤子单
# alorders01 = [{'cl_ord_id': aorders[0]['cl_ord_id'], 'account_id': aorders[0]['account_id'], 'algo_status': 3}]
# algo_order_pause(alorders01)
# 撤销指定cl_ord_id的算法母单
# aorders = get_algo_orders(account='')
# wait_cancel_orders = [{'cl_ord_id': aorders[0]['cl_ord_id'], 'account_id': aorders[0]['account_id']}]
# algo_order_cancel(wait_cancel_orders)
def on_order_status(context, order):
# 算法子单已成
if order['status'] == 3:
# 查询指定cl_ord_id算法母单的所有子单
child_order = get_algo_child_orders(context.algo_order_id, account='')
print('算法子单: child_order ={}'.format(child_order))
def on_algo_order_status(context, algo_order):
# type: (Context, DictLikeAlgoOrder) -> NoReturn
"""
算法单状态事件. 参数algo_order为算法单的信息
响应算法单状态更新事情,下算法单后状态更新时被触发
3.0.125 后增加.
"""
print('算法单状态变化: algo_order={}'.format(algo_order))
# 算法母单已报
if algo_order['status'] == 1:
# 查询算法母单, 默认账户account填空
aorders = get_algo_orders(account='')
print('算法母单: aorders ={}'.format(aorders))
if __name__ == '__main__':
'''
strategy_id策略ID,由系统生成
filename文件名,请与本文件名保持一致
mode实时模式:MODE_LIVE回测模式:MODE_BACKTEST
token绑定计算机的ID,可在系统设置-密钥管理中生成
backtest_start_time回测开始时间
backtest_end_time回测结束时间
backtest_adjust股票复权方式不复权:ADJUST_NONE前复权:ADJUST_PREV后复权:ADJUST_POST
backtest_initial_cash回测初始资金
backtest_commission_ratio回测佣金比例
backtest_slippage_ratio回测滑点比例
'''
run(strategy_id='strategy_id',
filename='main.py',
mode=MODE_LIVE,
token='token',
backtest_start_time='2020-11-02 08:00:00',
backtest_end_time='2020-11-02 16:00:00',
backtest_adjust=ADJUST_PREV,
backtest_initial_cash=10000000,
backtest_commission_ratio=0.0001,
backtest_slippage_ratio=0.0001)
冰山算法
现在主流的交易所一般都使用Order Book(订单簿)进行交易,交易所在内部的Order Book上记录所有买家和卖家的报价:
Bid表示买家,Offer表示卖家,这张报价单表示买卖双方发出的所有报价单(Limit Order)。这张表才是高频交易最关心的信息。任意时刻,买家的出价总是低于卖家(比如这里的99对101)。所以报价虽然一直在变化,但是只有报价是不会有任何成交的。什么时候会产生交易呢?有两种情况,第一是任一方发出市价单(Market Order),比如一个买家发出一张单量为10的市价单,就可以买到卖方在101价格上挂的10份,这个交易成功之后,Order Book会变成这样:
第二是发出一个价格等于对方最优报价的限价单,也会产生和上述情况相同的结果。需要强调的是,虽然真正的Order Book只存在于交易所内部,所有交易都在交易所内完成,但是交易所会把每笔报价和市价单都转发给所有人,所以所有的买家和卖家都可以自己维护一个同样的数据结构,相当于交易所Order Book的镜像。通过跟踪分析自己手里这份的镜像变化,来制定交易策略,这才是高频交易算法的核心思想。
这张图对应文章开始时的那个Order Book,应该可以明白地看出,横轴表示价格,纵轴表示订单量,绿色表示买家,红四表示卖家。这样做的目的是为了引出主题:冰山订单。通过上述基本分析可以看出,交易所内的交易数据是完全公开的,市场上任意时刻,有谁想要买/卖多少,所有人一目了然,没有任何秘密。这样做本身是有经济学意义的,因为只有展示出买卖的需求,才会吸引潜在的商家来交易,所以在市场上一定程度的公开自己的需求是必要的。但这样同时带来一个严重的后果,一旦有某个人想要大量买/卖,他所发出的巨额限价单会直接展示给所有人。比如一个买家挂出巨额买单后,Order Book会像这样:
这对他非常不利,因为所有人都会利用这个信息来跟他做对。大家会判断,现在市场上存在大量的买压,于是会出现一大批为了赚钱而冲进来的人抢购,价格会快速上升,这样原来这个人可以在99这个价位买到的东西,很快就变得要在更高的价位上才能买到。这种情况,那些后来的人做的就是Front running,而原来的那个人则面对逆向选择风险。为了解决这个问题,交易所提供了一种针对性的工具,就是所谓的冰山订单(Iceberg Order)。这种订单可以很大,但只有一小部分是公开出来的,大部分则隐藏起来,除了交易所和发单者本人谁也看不到,真的像一座“冰山”一样。比如像这样:
灰色的部分就是冰山订单隐藏的部分。这样,只有当有对应隐藏量的交易发生时,交易所才会通知其他人,就避免了别人利用显示订单的信息来做Front running。凡事有一利必有一弊。冰山订单虽然保护了发单者的利益,但是对于其他市场参与者来说却又变成了一种不公平的规则。那些有真实的交易需求的参与者,会因为对局势的误判而损失惨重。所以接下来的问题就变成,如何发现市场上的冰山订单?有的时候,冰山订单是挂在最优买价和卖价之间(spread),price=100,首先有一种最简单的方法。
即发一个最小额度的限价单在spread里,紧跟着取消这个订单。比如这个例子中,发出一个卖价为100的限价单然后取消。因为这个价格本身对不上显式的买价(99),如果没有冰山单的存在,一定不会成交。但有冰山单的情况下,一旦交易所收到这个卖单,会立刻成交冰山单中对应的量,而之后的取消指令就无效了。这样,以一种微小的成本,就可以发现市场中隐藏着的订单。事实上,的确有人会做这种事情,频繁的发单然后取消,在最优价差之间形成一种高频扰动,用来探测隐藏单。为了应对这种扰动探测,大家一般都不会直接挂单在spread里。而是会像之前那样和普通的限价单挂在一起,这样发生交易之后,你就很难推测消耗掉的究竟是正常的限价单,还是冰山订单。那么应该怎么做呢?首先有一个直接的思路。冰山订单的存在,一定程度上反映了挂单人对市场情况的解读,认为有必要使用冰山订单而做出的判断。需要强调的是,使用冰山订单并不是没有代价的,因为你隐藏了真实的需求,在屏蔽掉潜在的攻击者的同时,也屏蔽掉了真正的交易者!而且会使得成交时间显著增加——因为没人知道你想买/卖这么多,你只能慢慢等待对手盘的出现。所以当有人下决定发出冰山订单的时候,也会有对市场情况的考虑,只有合适的时机才会做这种选择。什么是合适的时机?有一些数据应该是相关的,比如买卖价差spread,买单量对卖单量的比值等。对这些数据,你可以在历史数据上做回归分析,建立起他们和冰山订单之间的线性/非线性模型。通过历史数据训练出来的这个模型,就可以作为你在实时交易时使用的冰山订单探测器。这是 On the Dark Side of the Market: Identifying and Analyzing Hidden Order Placements 这篇论文使用的方法。可以在此基础上做HMM,SVM,神经网络之类的高级模型,但基本思路是一致的:通过盘口分析计算存在冰山订单的概率。但这种建模方式并不精确。
Prediction of Hidden Liquidity in the Limit Order Book of GLOBEX Futures 这篇论文介绍了高频世界里,有一条永恒的建模准则值得铭记:先看数据再建模。
交易所对于冰山订单是这样做的:一个冰山订单包含两个参数,V表示订单总量,p表示公开显示的量。比如V=100,p=10的冰山单,实际上隐藏的量是90。如果有针对这个订单的交易发生,比如交易量10,交易所会顺序发出三条信息:
- 成交10
- Order Book的Top bid size -10
- 新Bid +10
这三条信息一定会连续出现,并且第三条和第一条的时差dt很小。这样做的原因是尽管冰山订单存在隐藏量,但是每次的交易只能对显示出的量(p)发生,p被消耗掉以后,才会从剩余的隐藏量中翻新出一分新的p量。这样,每个人从交易所收到的信息仍然可以在逻辑上正确的更新Order Book,就好像冰山订单并不存在一样。因此,一旦在数据中观察到这个规律,我们就可以非常有把握的判定市场中存在冰山订单,并且连p的值都可以确定!接下来的关键问题是,如何确定V的值,即判断这个冰山订单的剩余存量有多少?这个问题从本质上说没法精确求解,因为V和p都是由下单人自己决定的,可以是任意值。但可以从两点考虑:第一,两个值都是整数;第二,人类不是完美的随机数生成器,下决定会遵循一定规律。从这两点出发,可以对V和p建立概率模型,即计算一个给定的(V,p)值组合出现的概率是多少?简单说,可以在历史数据上通过kernel estimation技术来估算他们的概率密度函数的形状。在数据上估算出来的概率密度函数可能会是这样的:
这样,当你在实时数据中观测到一个p的值时,就可以得出对应的V值的条件概率密度函数,即上图的一个切面,比如(p = 8):
V|p = 8
接下来显然就很容易计算V最可能是什么值了。这条函数曲线还有一个重要的作用是帮助你动态评估剩余存量,比如当你观察到已经有5份p被消耗掉,即可推出V>=40,由上图即可推出新的V值和剩余存量(V-5p)。算法的核心在于,通过在实时数据中监测短时间内连续出现的三条相关记录判断冰山订单的存在,而对冰山订单的量化则通过由历史数据训练出的概率模型来完成。也可看出,这种算法并不是什么作弊神器。它只是利用市场上的公开数据所做的一种推测。而且这个推测也仅仅是基于概率的,更多的应该是作为一种参考。
冰山算法逻辑
冰山委托指的是投资者在进行大额交易时,为避免对市场造成过大冲击,将大单委托自动拆为多笔委托,根据当前的最新买一/卖一价格和客户设定的价格策略自动进行小单委托,在上一笔委托被全部成交或最新价格明显偏离当前委托价时,自动重新进行委托(Python版冰山委托策略 (fmz.com))。
效果:减少大额买单/卖单对市场价格的影响,进行大额买入时,可以防止因大额买单造成价格提升而增加自己的买入成本;在大额卖出时,可以防止因大额卖单造成拉低价格减少自己的卖出利润。
数据参数对照:
- 委托价格=最新买1价X(1-委托深度)
- 实际市场委托深度=(最后成交价格 - 上次委托价格)/ 上次委托价格
- 随机单次购买数量=单次购买数量均值 X(100-单次均值浮点数)% + (单次均值浮动点数X2)%X单次购买数量均值X随机数0~1
- 可用金额= 取账户计价货币,随机单次购买数量,购买剩余总金额数最小值
- 购买数量=可用金额/委托价格
- 购买剩余总金额= 购买总金额-(初始账户计价货币-账户计价货币)
规则:
- 在最新成交价格距离该笔委托超过委托深度X2时自动撤单(说明偏离太大)
- 当策略总成交量等于总委托数量时停止委托
- 最新成交价格高于最高限制买入价格停止委托
- 在最新成交价格低于最高限制买入价格恢复委托
主要参数:
- 买入金额
- 单笔购买数量
- 委托深度
- 最高价格
- 价格轮询间隔
- 单次购买数量均值浮点数
- 最小交易量
# 买入
import random # 导入随机数库
def CancelPendingOrders(): # CancelPendingOrders 函数作用是取消当前交易对所有挂单。
while True: # 循环检测,调用GetOrders 函数,检测当前挂单,如果orders 为空数组,即len(orders) 等于0,说明订单全部取消了,可以退出函数,调用return 退出。
orders = _C(exchange.GetOrders)
if len(orders) == 0 :
return
for j in range(len(orders)): # 遍历当前挂单数组,调用取消订单的函数CancelOrder,逐个取消挂单。
exchange.CancelOrder(orders[j]["Id"])
if j < len(orders) - 1: # 除了最后一个订单,每次都执行Sleep 让程序等待一会儿,避免撤单过于频繁。
Sleep(Interval)
LastBuyPrice = 0 # 设置一个全局变量,记录最近一次买入的价格。
InitAccount = None # 设置一个全局变量,记录初始账户资产信息。
def dispatch(): # 冰山委托逻辑的主要函数
global InitAccount, LastBuyPrice # 引用全局变量
account = None # 声明一个变量,记录实时获取的账户信息,用于对比计算。
ticker = _C(exchange.GetTicker) # 声明一个变量,记录最近行情。
LogStatus(_D(), "ticker:", ticker) # 在状态栏输出时间,最新行情
if LastBuyPrice > 0: # 当LastBuyPrice大于0时,即已经委托开始时,执行if条件内代码。
if len(_C(exchange.GetOrders)) > 0: # 调用exchange.GetOrders 函数获取当前所有挂单,判断有挂单,执行if条件内代码。
if ticker["Last"] > LastBuyPrice and ((ticker["Last"] - LastBuyPrice) / LastBuyPrice) > (2 * (EntrustDepth / 100)): # 检测偏离程度,如果触发该条件,执行if内代码,撤单。
Log("偏离过多, 最新成交价:", ticker["Last"], "委托价", LastBuyPrice)
CancelPendingOrders()
else :
return True
else : # 如果没有挂单,证明订单完全成交了。
account = _C(exchange.GetAccount) # 获取当前账户资产信息。
Log("买单完成, 累计花费:", _N(InitAccount["Balance"] - account["Balance"]), "平均买入价:", _N((InitAccount["Balance"] - account["Balance"]) / (account["Stocks"] - InitAccount["Stocks"]))) # 打印交易信息。
LastBuyPrice = 0 # 重置 LastBuyPrice为0
BuyPrice = _N(ticker["Buy"] * (1 - EntrustDepth / 100)) # 通过当前行情和参数,计算挂单价格。
if BuyPrice > MaxBuyPrice: # 判断是否超过参数设置的最大价格
return True
if not account: # 如果 account 为 null ,执行if 语句内代码,重新获取当前资产信息,复制给account
account = _C(exchange.GetAccount)
if (InitAccount["Balance"] - account["Balance"]) >= TotalBuyNet: # 判断买入所花费的总钱数,是不是超过参数设置。
return False
# 订单撮合成交逻辑---
RandomAvgBuyOnce = (AvgBuyOnce * ((100.0 - FloatPoint) / 100.0)) + (((FloatPoint * 2) / 100.0) * AvgBuyOnce * random.random()) # 随机数 0~1
UsedMoney = min(account["Balance"], RandomAvgBuyOnce, TotalBuyNet - (InitAccount["Balance"] - account["Balance"]))
# 订单撮合成交逻辑---
BuyAmount = _N(UsedMoney / BuyPrice) # 计算买入数量
if BuyAmount < MinStock: # 判断买入数量是否小于 参数上最小买入量限制。
return False
LastBuyPrice = BuyPrice # 记录本次下单价格,赋值给LastBuyPrice
exchange.Buy(BuyPrice, BuyAmount, "花费:¥", _N(UsedMoney), "上次成交价", ticker["Last"]) # 下单
return True
def main():
global LoopInterval, InitAccount # 引用 LoopInterval, InitAccount 全局变量
CancelPendingOrders() # 开始运行时,取消所有挂单
InitAccount = _C(exchange.GetAccount) # 初始记录 开始时的账户资产
Log(InitAccount) # 打印初始账户信息
if InitAccount["Balance"] < TotalBuyNet: # 如果初始时资产不足,则抛出错误,停止程序
raise Exception("账户余额不足")
LoopInterval = max(LoopInterval, 1) # 设置LoopInterval至少为1
while dispatch(): # 主要循环,不停调用 冰山委托逻辑函数 dispatch ,当dispatch函数 return false 时才停止循环。
Sleep(LoopInterval * 1000) # 每次循环都暂停一下,控制轮询频率。
Log("委托全部完成", _C(exchange.GetAccount)) # 当循环执行跳出时,打印当前账户资产信息。
# 卖出
import random
def CancelPendingOrders():
while True:
orders = _C(exchange.GetOrders)
if len(orders) == 0:
return
for j in range(len(orders)):
exchange.CancelOrder(orders[j]["Id"])
if j < len(orders) - 1:
Sleep(Interval)
LastSellPrice = 0
InitAccount = None
def dispatch():
global LastSellPrice, InitAccount
account = None
ticker = _C(exchange.GetTicker)
LogStatus(_D(), "ticker:", ticker)
if LastSellPrice > 0:
if len(_C(exchange.GetOrders)) > 0:
if ticker["Last"] < LastSellPrice and ((LastSellPrice - ticker["Last"]) / ticker["Last"]) > (2 * (EntrustDepth / 100)):
Log("偏离过多,最新成交价:", ticker["Last"], "委托价", LastSellPrice)
CancelPendingOrders()
else :
return True
else :
account = _C(exchange.GetAccount)
Log("买单完成,累计卖出:", _N(InitAccount["Stocks"] - account["Stocks"]), "平均卖出价:", _N((account["Balance"] - InitAccount["Balance"]) / (InitAccount["Stocks"] - account["Stocks"])))
LastSellPrice = 0
SellPrice = _N(ticker["Sell"] * (1 + EntrustDepth / 100))
if SellPrice < MinSellPrice:
return True
if not account:
account = _C(exchange.GetAccount)
if (InitAccount["Stocks"] - account["Stocks"]) >= TotalSellStocks:
return False
RandomAvgSellOnce = (AvgSellOnce * ((100.0 - FloatPoint) / 100.0)) + (((FloatPoint * 2) / 100.0) * AvgSellOnce * random.random())
SellAmount = min(TotalSellStocks - (InitAccount["Stocks"] - account["Stocks"]), RandomAvgSellOnce)
if SellAmount < MinStock:
return False
LastSellPrice = SellPrice
exchange.Sell(SellPrice, SellAmount, "上次成交价", ticker["Last"])
return True
def main():
global InitAccount, LoopInterval
CancelPendingOrders()
InitAccount = _C(exchange.GetAccount)
Log(InitAccount)
if InitAccount["Stocks"] < TotalSellStocks:
raise Exception("账户币数不足")
LoopInterval = max(LoopInterval, 1)
while dispatch():
Sleep(LoopInterval)
Log("委托全部完成", _C(exchange.GetAccount))
MVWAP策略
MVWAP(Modified Volume Weighted Average Price),成交量加权平均价格优化算法。根据市场实时价格和市场的关系,对下单量的大小进行调整与控制,统一将这一类算法称为MVWAP。
当市场实时价格小于此时的市场时,在原有计划交易量的基础上进行放大,如果能够将放大的部分成交或部分成交,则有助于降低 成交的price;反之,当市场实时价格大于此时的市场时,在原有计划交易量的基础上进行缩减,也有助于降低成交的𝑝𝑟𝑖𝑐𝑒 ,从而达到控制交易成本的目的。
在MVWAP 策略中,除了成交量的预测方式之外(通常也是按照历史成交量加权平均进行预测),同样很重要的是对于交易量放大或减小的定量控制。一种简单的办法是在市场实时价格低于或高于时,将下一时段的下单量按固定比例放大或缩小,那么这个比例参数就存在一个最优解的问题。如果考虑得更为复杂和细致,这个比例还可以是一个随价格偏差(市场实时价格与之差)变化的函数。
VP策略
VP(Volume Participation)固定百分比成交策略,与VWAP 策略类似,都是跟踪市场真实成交量的变化,从而制定相应的下单策略。不同的是,VWAP 是在确定某个交易日需要成交数量或成交金额的基础上,对该订单进行拆分交易;而VP则是确定一个固定的跟踪比例,根据市场真实的分段成交量,按照该固定比例进行下单。
例如,将某个交易日均分为48段,每段5分钟。根据预测成交量,按照10%的固定比例进行下单。这样的策略所带来的结果是,当所需要成交的订单金额较小时,可能会在交易时间结束之前就完成所有交易,从而造成对市场均价跟踪偏离的风险。
因此,该策略适用于规模较大、计划多个交易日完成的订单交易,此时若能选择合适的固定百分比,使得成交能够有效完成,则VP 是一种可以较好跟踪市场均价的算法交易策略。
IS策略
IS(Implementation Shortfall),执行落差交易策略(看得见冲击成本——IS算法交易策略),是以执行落差为决策基础的一种算法交易策略。执行落差被定义为目标交易资产组合与实际成交资产组合在交易金额上的差异。
IS 策略的目标是执行落差最小化,或者说是在综合考虑冲击成本和市场风险后,通过寻找最优解来跟踪价格基准的一种策略。假设目标交易价格为,实际交易价格为 ,则IS策略的最终目标为:
IS 的基本流程:
1、确定目标交易价格 ,作为交易基准,这个价格可以是到达价、开盘价、前一日收盘价等。再设定一个容忍价格,作为交易的边界条件。
2、当市场实际价格低于或高于时,按一定的策略下单进行买入或卖出交易。
3、当市场实际价格高于或低于时,不进行买入或卖出交易。
4、当市场实际价格处于和 之间时,可以按照介于积极和消极交易策略之间的策略进行交易。
使用IS 的优点:
1、IS 策略较为全面地分析了交易成本的各个部分,在冲击成本、时间风险、价格增长等因素之间取得了较好的平衡,更加符合最优交易操作的目标。
2、IS 策略根据目标价格对交易过程的优化,更加符合投资决策的过程。
3、IS 策略多用于组合交易,而对于组合交易来说该算法能够利用交易清单上股票间的相关性更好地控制风险。
Step策略
Step 策略实际是一种对价格进行分层成交的策略,目标是在买入(卖出)交易中尽可能地压低(提升)成交均价。简单来讲,Step 就是在不同的价格区间进行不同成交量比例的配置。例如在VWAP 或TWAP 策略中,通常按照预测成交量的一定比例K进行实际下单。假设在开市前预计要买入某支前收盘价为20元的股票,则对其进行成交量分层设定:
开盘后在VWAP 或TWAP 的基础之上,当价格在19 至21 元浮动时,按预测成交量的10%进行成交;当价格超过21元时则不做任何交易;当价格小于等于19元时,按预测成交量的30%买入。
更为激进的一种是称为Aggressive Step 的策略,这种策略在价格低于最优交易区域边界时会将所有市场上的订单统统吃掉。具体来说,Aggressive Step 策略同样在买入(卖出)交易中进行分层,例如在上述交易方案中,前两个区域的策略不变,当价格小于等于19 元时,不管市价跌到多少,都按19 元的限价报单成交,直至价格回升至19 元以上或拟交易订单全部完成。不过这种策略不容易对交易量进行控制,并且容易造成价格异动,增加证券交易的隐形成本。
盯住盘口策略
(PEG):该策略随时根据目标股票的盘口情况进行下单。PEG 首先会实时监测盘口中的最低卖出价格或最高买入价格,并按照一定的策略(或比例)下达买入限价指令或卖出限价指令。如果交易指令未能完成,并且市场价格开始偏离限价指令的价格,则对上述订单进行撤单,并且根据最新的盘口信息重新发出相应的限价指令;如果交易指令全部完成,继续按照上述策略(比例)发出买入限价指令或卖出限价指令,直至订单全部完成或交易时间结束。该策略的优点在于对市场的冲击可以做出较好的定量控制,而缺点在于跟踪市场均价容易出现偏离,并且每个交易日的成交量不可控。
Exchanges – Fincyclopedia
Equity Portfolio & Electronic Trading (opco.com)
git clone URL
- ATS-SMART 算法:根据用户委托交易特征,通过对市场高频数据的实时分析和处理,利用程序化分析动态选择最优算法执行用户委托。具有高完成度,高稳定性,高隐蔽性,低敞口等特点,在不同持仓风格场景下,中长期均体现出稳定超越 TWAP/VWAP 基准的效果
- ZC-POV 算法:比例成交算法(Percentage of volume),通过追踪目标股票流动性,按照市场参与度完成用户委托交易。算法针对性解决股票交易中大金额委托造成的交易冲击、目的暴露、完成率低等问题,更加贴近客户的真实委托意愿
ATS-SMART 算法参数
开始时间(P1):策略开始执行的时间(剔除非交易时间段)。如果开始时间早于策略下达时间点时,则使用下达时间作为开始时间
结束参考时间(P2): 结束参考时间,和结束时间一致
结束时间(P3):策略停止执行的时间(剔除非交易时间段)。过了结束时间还未完成的数量,将会自动释放到指令。算法执行的区间段,时间越短,任务执行强度(委托频率和单笔委托量)越高
结束时间是否有效:算法在结束时间是否有效,如设为无效,则以收盘时间为结束时间
涨停时是否停止卖出:标的在涨停时是否需要停止卖出
跌停时是否撤单:标的在跌停时是否需要撤单
最小交易金额:控制子单单笔委托的最小金额 该参数只适用于股票。A 股单位为元 **基准价格:**此算法不生效
注意:p3 必须晚于 p2,p2 必须晚于 p1,交易时长必须大于 3 分钟,不得晚于 14:55,交易数据不应该早于当前时间
ZC-POV 算法参数
参与率:市场参与率,即每日委托总量占市场真实交易额的比例(上交易日参考标准),默认 30%,最高委托量比例限制为 45%。
基准价格:算法模型的参考基准价格,这里指限价。卖出时,当市场价格低于此价格就停止交易,再次高于此价格就恢复交易,并且补回前面应停止交易而少交易的量;买入时,当市场价格高于此价格就停止交易,再次低于此价格就恢复交易,并且补回前面应停止交易而少交易的量。当填入价格为 0 时,则不设置基准价。
限价订单簿(LOB)
从LOB挖掘高频因子(量化交易因子挖掘笔记-从限价订单簿(LOB)挖掘高频价量因子 - 知乎 (zhihu.com))
订单簿:编写的基于限价订单簿的匹配引擎。
特点:
标准价格-时间优先
支持市场订单和限价订单
添加、取消和更新订单
Install package:
pip install orderbook
Import package:
from orderbook import OrderBook
Create an Order Book:
order_book = OrderBook()
Key Functions:
process_order
cancel_order
modify_order
get_volume_at_price
get_best_bid
get_best_ask
path: ..\OrderBook\orderbook\test\example.py
Data Structure
"""
Orders are sent to the order book using the process_order function. The Order is created using a quote.
订单通过 process_order 功能发送到订单簿。订单使用报价创建。
"""
# For a limit order
quote = {'type' : 'limit',
'side' : 'bid',
'quantity' : 6,
'price' : 108.2,
'trade_id' : 001}
# and for a market order:
quote = {'type' : 'market',
'side' : 'ask',
'quantity' : 6,
'trade_id' : 002}
datademo:
orderbook.py
def __str__(self):
tempfile = StringIO()
tempfile.write("***Bids***\n")
if self.bids != None and len(self.bids) > 0:
for key, value in reversed(self.bids.price_map.items()):
tempfile.write('%s' % value)
tempfile.write("\n***Asks***\n")
if self.asks != None and len(self.asks) > 0:
for key, value in self.asks.price_map.items():
tempfile.write('%s' % value)
tempfile.write("\n***Trades***\n")
if self.tape != None and len(self.tape) > 0:
num = 0
for entry in self.tape:
if num < 10: # get last 5 entries
tempfile.write(str(entry['quantity']) + " @ " + str(entry['price']) + " (" + str(entry['timestamp']) + ") " + str(entry['party1'][0]) + "/" + str(entry['party2'][0]) + "\n")
num += 1
else:
break
tempfile.write("\n")
return tempfile.getvalue()
result: