189 8069 5689

如何使用fastcache-创新互联

小编给大家分享一下如何使用fastcache,相信大部分人都还不怎么了解,因此分享这篇文章给大家参考一下,希望大家阅读完这篇文章后大有收获,下面让我们一起去了解一下吧!

创新互联公司服务项目包括庐阳网站建设、庐阳网站制作、庐阳网页制作以及庐阳网络营销策划等。多年来,我们专注于互联网行业,利用自身积累的技术优势、行业经验、深度合作伙伴关系等,向广大中小型企业、政府机构等提供互联网行业的解决方案,庐阳网站推广取得了明显的社会效益与经济效益。目前,我们服务的客户以成都为中心已经辐射到庐阳省份的部分城市,未来相信会继续扩大服务区域并继续获得客户的支持与信任!

VnTrader 2.0版本有不少提速措施,其中lru_cache是提高回测速度一个利器,让我用1.92为主的我很是羡慕。看说这个是python 3.5.2提供的功能,也就没多想。

最近才发现其实有第三方在也支持python 2.7的版本,比如 functools32。还有一个用 C 语言实现的,更快的,同时兼容 Python2 和 Python3 的第三方模块 fastcache 能够实现同样的功能,这里就用fastcache。

安装

很简单,pip直接安装就可以。

pip install fastcache --upgrade

测试是否正确安装

import fastcache
fastcache.test()

简单使用

  • 不用cache时候,运行时间 0.7994769 秒

    from fastcache import clru_cache
    def fib(n):
      if n < 2:
          return n
      return fib(n - 2) + fib(n - 1)
    import time
    dt0 = time.clock()
    for i in range(30):
      fib(i)
    spreadtime = (time.clock() - dt0)
    print ("运行时间 %s 秒" %spreadtime)
  • 使用后,运行时间 0.000185200000004 秒

    @clru_cache(maxsize=999)
    def fib_cache(n):
      if n < 2:
          return n
      return fib_cache(n - 2) + fib_cache(n - 1)
    import time
    dt1 = time.clock()
    for i in range(30):
      fib_cache(i)
    spreadtime = (time.clock() - dt1)
    print ("运行时间 %s 秒" %spreadtime)

使用clru_cache, 更新trader/app/ctaStrategy/ctaBacktesting.py, 让回测历史数据缓存到内存,不用反复读取

  • 通常只在静态方法使用lru_cache,这样才具有意义, 虽然在类方法中也可以使用,但是这样一个是有不同实例一般无法复用,而且会使得过期实例无法垃圾回收。这里新建一个静态方法load_data,负责读取数据库数据。

@clru_cache(maxsize=9999)
def load_data(dbName, symbol, dataStartDate, strategyStartDate, dataEndDate, dataClass):
    dbClient = pymongo.MongoClient(globalSetting['mongoHost'], globalSetting['mongoPort'])
    collection = dbClient[dbName][symbol]
    # 载入初始化需要用的数据
    flt = {'datetime': {'$gte': dataStartDate,
                        '$lt': strategyStartDate}}
    initCursor = collection.find(flt).sort('datetime')
    initData = []  # 清空initData列表
    for d in initCursor:
        data = dataClass()
        data.__dict__ = d
        initData.append(data)
    # 载入回测数据
    if not dataEndDate:
        flt = {'datetime': {'$gte': strategyStartDate}}  # 数据过滤条件
    else:
        flt = {'datetime': {'$gte': strategyStartDate,
                            '$lte': dataEndDate}}
    BackData = []
    dbCursor = collection.find(flt).sort('datetime')
    for dc in dbCursor:
        data = dataClass()
        data.__dict__ = dc
        BackData.append(data)
    count = len(initData) + len(BackData)
    return initData, BackData, count
  • 修改已有方法BacktestingEngine.loadHistoryData; 改为使用刚刚创建静态方法

    def loadHistoryData(self):
        """载入历史数据"""
        self.output(u'开始载入数据')
        # 首先根据回测模式,确认要使用的数据类
        # load_data(dbName, symbol, dataStartDate, strategyStartDate, dataEndDate, dataClass)
        if self.mode == self.BAR_MODE:
            dataClass = VtBarData
            func = self.newBar
            self.initData,self.BackTestData, count = load_data(self.dbName,self.symbol, self.dataStartDate, self.strategyStartDate, self.dataEndDate, dataClass)
        else:
            dataClass = VtTickData
            func = self.newTick
            self.initData, self.BackTestData, count = load_data(self.dbName, self.symbol, self.dataStartDate, self.strategyStartDate, self.dataEndDate, dataClass)
        # 载入初始化需要用的数据
        if self.hdsClient:
            initCursor = self.hdsClient.loadHistoryData(self.dbName,
                                                        self.symbol,
                                                        self.dataStartDate,
                                                        self.strategyStartDate)
            # 将数据从查询指针中读取出,并生成列表
            self.initData = []  # 清空initData列表
            for d in initCursor:
                data = dataClass()
                data.__dict__ = d
                self.initData.append(data)
            # 载入回测数据
            self.dbCursor = self.hdsClient.loadHistoryData(self.dbName,
                                                           self.symbol,
                                                           self.strategyStartDate,
                                                           self.dataEndDate)
            for dc in self.dbCursor:
                data = dataClass()
                data.__dict__ = dc
                self.BackTestData.append(data)
        self.output(u'载入完成,数据量:%s' % count)
  • 修改 BacktestingEngine.runBacktesting; 改为使用换成的BackTestData 队列,而不是数据库指针。

      def runBacktesting(self):
          """运行回测"""
          # 载入历史数据
          self.loadHistoryData()
          # 首先根据回测模式,确认要使用的数据类
          if self.mode == self.BAR_MODE:
              dataClass = VtBarData
              func = self.newBar
          else:
              dataClass = VtTickData
              func = self.newTick
          self.output(u'开始回测')
          self.strategy.onInit()
          self.strategy.inited = True
          self.output(u'策略初始化完成')
          self.strategy.trading = True
          self.strategy.onStart()
          self.output(u'策略启动完成')
          self.output(u'开始回放数据')
          for d in self.BackTestData:
              func(d)
          self.output(u'数据回放结束')

没有使用之前,优化约为100组参数约为运行时间 323.0888239 秒,使用cache优化后运行时间 190.762839 秒

以上是“如何使用fastcache”这篇文章的所有内容,感谢各位的阅读!相信大家都有了一定的了解,希望分享的内容对大家有所帮助,如果还想学习更多知识,欢迎关注创新互联-成都网站建设公司行业资讯频道!


当前文章:如何使用fastcache-创新互联
网站URL:http://gzruizhi.cn/article/ceejjg.html

其他资讯