原创文章第119篇,专注“个人成长与财富自由、世界运作的逻辑, AI量化投资”。
继续强化学习应用于金融投资。
我们的AI量化平台,针对传统规则量化策略,进行了“积木式”的拆分,这种拆分的好处,就是最大化复用代码逻辑,这样开发策略又快且不容易出错。
针对强化学习环境,我们也打算这么做。看到有些平台,股票一个环境,加密货币一个环境,期货又是另一个环境,甚至把数据源处理都耦合到环境中,这是不对的。维护起来特别麻烦且容易出错。

01 强化学习环境的构成
一个完整的金融强化学习环境,包括数据(通常是OHLC以及特征数据)、交易动作(做多,平仓或者权重),回测系统(回测系统能够对交易动作做出反馈),激励指标(收益率、夏普比等)。
当下很多开源包在实现这个环境,存在一些问题。它们大篇幅做数据特征处理,实现了简单的量化回测,激励指标比较单一。这些环境扩展性不好,更谈不上接入实盘。
前文我们对传统量化与机器学习量化实现了统一的框架,数据管理及自动化标注,接入到backtrader的回测系统中。
同样,我们也希望可以最大化复用之前的成果,把强化学习环境也整合到一起,我们就不必关心数据处理,回测系统等环节的处理。
然而,backtrader本身为是传统量化而生,它的run直接就遍历了整个时间序列,而强化学习是由环境来调用回测引擎,一步一步往前走。
对于backtrader要进行一些改造。
一、重写_run_next
Backtrader在执行run的时间,在_runnext函数里整个顺序进行遍历。我们重写这个函数,只做第一步初始化的运算。
def _runnext(self, runstrats):
'''
Actual implementation of run in full next mode. All objects have its
``next`` method invoke on each data arrival
'''
self.runstrats_container = runstrats
self._init_run()
def _init_run(self):
datas = sorted(self.datas,
key=lambda x: (x._timeframe, x._compression))
datas1 = datas[1:]
data0 = datas[0]
d0ret = True
rsonly = [i for i, x in enumerate(datas)
if x.resampling and not x.replaying]
onlyresample = len(datas) == len(rsonly)
noresample = not rsonly
clonecount = sum(d._clone for d in datas)
ldatas = len(datas)
ldatas_noclones = ldatas - clonecount
dt0 = date2num(datetime.datetime.max) - 2 # default at max
self.bt_state_container = {"datas": datas,
"datas1": datas1,
"data0": data0,
"d0ret": d0ret,
"rsonly": rsonly,
"onlyresample": onlyresample,
"noresample": noresample,
"ldatas_noclones": ldatas_noclones,
"dt0": dt0,
}
二、提供一个step单步执行函数
这个函数的代码大多可以从cerebro里查到,这里就是展开描述。
def _step(self, runstrats, datas, datas1, data0, d0ret, rsonly,
onlyresample, noresample, ldatas_noclones, dt0):
# if any has live data in the buffer, no data will wait anything
newqcheck = not any(d.haslivedata() for d in datas)
if not newqcheck:
# If no data has reached the live status or all, wait for
# the next incoming data
livecount = sum(d._laststatus == d.LIVE for d in datas)
newqcheck = not livecount or livecount == ldatas_noclones
lastret = False
# Notify anything from the store even before moving datas
# because datas may not move due to an error reported by the store
self._storenotify()
if self._event_stop: # stop if requested
return True
self._datanotify()
if self._event_stop: # stop if requested
return True
# record starting time and tell feeds to discount the elapsed time
# from the qcheck value
drets = []
qstart = datetime.datetime.utcnow()
for d in datas:
qlapse = datetime.datetime.utcnow() - qstart
d.do_qcheck(newqcheck, qlapse.total_seconds())
drets.append(d.next(ticks=False))
d0ret = any((dret for dret in drets))
if not d0ret and any((dret is None for dret in drets)):
d0ret = None
if d0ret:
dts = []
for i, ret in enumerate(drets):
dts.append(datas[i].datetime[0] if ret else None)
# Get index to minimum datetime
if onlyresample or noresample:
dt0 = min((d for d in dts if d is not None))
else:
dt0 = min((d for i, d in enumerate(dts)
if d is not None and i not in rsonly))
dmaster = datas[dts.index(dt0)] # and timemaster
self._dtmaster = dmaster.num2date(dt0)
self._udtmaster = num2date(dt0)
# slen = len(runstrats[0])
# Try to get something for those that didn't return
for i, ret in enumerate(drets):
if ret: # dts already contains a valid datetime for this i
continue
# try to get a data by checking with a master
d = datas[i]
d._check(forcedata=dmaster) # check to force output
if d.next(datamaster=dmaster, ticks=False): # retry
dts[i] = d.datetime[0] # good -> store
# self._plotfillers2[i].append(slen) # mark as fill
else:
# self._plotfillers[i].append(slen) # mark as empty
pass
# make sure only those at dmaster level end up delivering
for i, dti in enumerate(dts):
if dti is not None:
di = datas[i]
rpi = False and di.replaying # to check behavior
if dti > dt0:
if not rpi: # must see all ticks ...
di.rewind() # cannot deliver yet
# self._plotfillers[i].append(slen)
elif not di.replaying:
# Replay forces tick fill, else force here
di._tick_fill(force=True)
# self._plotfillers2[i].append(slen) # mark as fill
elif d0ret is None:
# meant for things like live feeds which may not produce a bar
# at the moment but need the loop to run for notifications and
# getting resample and others to produce timely bars
for data in datas:
data._check()
else:
lastret = data0._last()
for data in datas1:
lastret += data._last(datamaster=data0)
if not lastret:
# Only go extra round if something was changed by "lasts"
return True # return somethin signaling the end
# Datas may have generated a new notification after next
self._datanotify()
if self._event_stop: # stop if requested
return True
if d0ret or lastret: # if any bar, check timers before broker
self._check_timers(runstrats, dt0, cheat=True)
if self.p.cheat_on_open:
for strat in runstrats:
strat._next_open()
if self._event_stop: # stop if requested
return True
self._brokernotify()
if self._event_stop: # stop if requested
return True
if d0ret or lastret: # bars produced by data or filters
self._check_timers(runstrats, dt0, cheat=False)
for strat in runstrats:
strat._next()
if self._event_stop: # stop if requested
return True
self._next_writers(runstrats)
self.bt_state_container = {"datas": datas,
"datas1": datas1,
"data0": data0,
"d0ret": d0ret,
"rsonly": rsonly,
"onlyresample": onlyresample,
"noresample": noresample,
"ldatas_noclones": ldatas_noclones,
"dt0": dt0,
}
return False
如此,我们的backtrader就可以实现单步循环,为我们的环境提供回测能力了。
一些思考:
疫情还在持续,不知要多久,不知以何种方式结束。
目前大家看到的,可预见到的,肯定不是大家所期待的。
我们很多时候决定不了什么,只能耐心等待,保护好自己,积蓄能力。
三年后,也许这些都是故事 ,也许一个超级疫苗就研发出来,从此人间皆安。
我们要做的事情是,如果再有类似的“黑天鹅”事件发生,我们是否不那么被动,有更多的选择的权利?!
小时候,受到不公正的待遇,私下默默努力。但现代毕竟不是武侠里的快意恩愁。我们能做的事情是升级自己的环境,有能力远离那些不喜欢的人,不喜欢的事。尽管不好的事情哪里都有,但越往上走,会越发文明,越发自由。
努力的意义是自由,财务自由就是你能离得开职场,进而不受地点的约束,而同样可以过上有品质的生活,你就是自由的。
创业九死一生,若成会带来大自由,不成则更加不自由。读书,写作也许是很好的一条路。读书,写作是在疫情封控这样的情境下都可以做的事情。
代码细节,请前往星球微信群交流。
强化学习框架stable-baseline3以及pandas datareader
ETF轮动+RSRS择时,加上卡曼滤波:年化48.41%,夏普比1.89

















