Bitmexのチャートをデータベースにインポートする

以前、投稿したBinanceでデータベース作成と同じ要領ですがBitmexの場合はいくつか制約がありそれをクリアしなければなりません。

  • 1分足、5分足、1時間足、日足以外のチャートはほかのチャートを利用して作成する
  • 一回につき10080件以上のデータを取ることができない
  • 150回/5分の制限がある

この3つの制約をクリアしたプログラムを作成しました。Pandasのリサンプリングで不完全なOHLCVを作らないため最大公約数として日足を基準にしています。

get_bitmex_data.py

#!/usr/bin/env python3
import time, calendar, pytz, requests, os, csv, json, time
from datetime import datetime, timedelta, date
import pandas as pd

user_config = {
        'date_from'       : (2019,1,1),
        'date_to'         : ('now'),
        'selected_list'   : [
                             '1m',
                             '3m',
                             '5m',
                             '15m',
                             '30m',
                             '1h',
                             '2h',
                             '4h',
                             '1d',
                            ],
}

# bitmex attributes
mex_params = {
        'max_counter'     : 10000,       # max 10080
        'api_rate_limit'  : 25,          # max 30/min, 150/5min
        'pair_symbol'     : 'XBTUSD',
        #'boolean_reverse' : False,
        #'boolean_partial' : False,
        #'tstype'          : 'UTMS',
        'bitmex_url_req'  : 'https://www.bitmex.com/api/udf/history?',
}

timeframe_list = {
        '1m'  : {'resample': False, 'base': '1m', 'tframe' : 1},
        '3m'  : {'resample': True,  'base': '1m', 'tframe' : 3},
        '5m'  : {'resample': False, 'base': '5m', 'tframe' : 5},
        '15m' : {'resample': True,  'base': '5m', 'tframe' : 15},
        '30m' : {'resample': True,  'base': '5m', 'tframe' : 30},
        '1h'  : {'resample': False, 'base': '1h', 'tframe' : 60},
        '2h'  : {'resample': True,  'base': '1h', 'tframe' : 120},
        '4h'  : {'resample': True,  'base': '1h', 'tframe' : 240},
        '1d'  : {'resample': False, 'base': '1d', 'tframe' : 1440},
}

def main():
    bitmex = {}
    combo_df = {}
    #target = '1h'
    target = user_config['selected_list']
    utime_from, utime_to = set_time()

    for n in target:
        if timeframe_list[n]['resample'] == True:
             base = timeframe_list[n]['base']
             if base in bitmex:
                 combo_df.update({n : df_resampling(combo_df[base], n)})
             else:
                 bitmex[base] = BitmexGetData(utime_from, utime_to, base)
                 combo_df.update({base : bitmex[base].get_data()})
                 combo_df.update({n : df_resampling(combo_df[base], n)})
        elif timeframe_list[n]['resample'] == False:
            if n not in bitmex:
                bitmex[n] = BitmexGetData(utime_from, utime_to, n)
                combo_df.update({n : bitmex[n].get_data()})

    # output to csv
    for key in combo_df:
        df = combo_df[key]
        filename = 'chart_' + key + '.csv'
        df.to_csv(filename, index=False)
        # test
        #print(df)

def set_time():
    date_from, date_to = user_config['date_from'], user_config['date_to']
    one_day = 60 * 60 * 24
    if type(date_to) == str and date_to == 'now':
        #utc_now = datetime.now(pytz.utc)
        utime_to = calendar.timegm(datetime.utcnow().utctimetuple())
        utime_to = utime_to // one_day * one_day
    elif type(date_to) == tuple and len(date_to) == 3:
        utime_to = calendar.timegm(date(date_to[0], date_to[1], date_to[2]).timetuple())
        utime_to = utime_to // one_day * one_day
    else:
        print('ERROR set_time(): invalid data')
        exit(1)

    utime_from = calendar.timegm(date(date_from[0], date_from[1], date_from[2]).timetuple())

    if utime_to < utime_from:
        print('ERROR set_time(): invalid data')
        exit(1)

    return int(utime_from), int(utime_to)

class BitmexGetData:
    def __init__(self, utime_from, utime_to, target):
        self.utime_from = utime_from
        self.utime_to   = utime_to
        self.target     = target

    def get_ohlcv(self):
        timeframe = timeframe_list[self.target]
        limit = mex_params['api_rate_limit']
        params = mex_params

        for start in range(self.utime_from, self.utime_to, timeframe['tframe'] * 60 * mex_params['max_counter']):
            end = start + timeframe['tframe'] * 60 * mex_params['max_counter']

            if end > self.utime_to:
                end = self.utime_to

            params.update({'from' : start, 'to' : end, 'timeframe' : timeframe['tframe']})
            url = '{bitmex_url_req}symbol={pair_symbol}&resolution={timeframe}&from={from}&to={to}'.format(**params)
            res = requests.get(url)
            data = res.json()

            yield data['t'], data['o'], data['h'], data['l'], data['c'], data['v']
            limit += -1

            if limit == 0:
                print('Maximum connection limit reached. Wait 60 seconds...')
                time.sleep(60)
                limit = mex_params['api_rate_limit']

    def get_data(self):
        mex_timestamp, mex_open, mex_high, mex_low, mex_close, mex_volume = [], [], [], [], [], []

        for _timestamp, _open, _high, _low, _close, _volume in BitmexGetData.get_ohlcv(self):
            mex_timestamp.extend(_timestamp)
            mex_open.extend(_open)
            mex_high.extend(_high)
            mex_low.extend(_low)
            mex_close.extend(_close)
            mex_volume.extend(_volume)

        mex_datetime = pd.to_datetime(mex_timestamp, unit='s')
        _index = pd.Index(mex_datetime)
        df = pd.DataFrame({'timestamp' : mex_timestamp, 'open' : mex_open,
                           'high' : mex_high, 'low' : mex_low, 'close' : mex_close,
                           'volume' : mex_volume}, index=_index).sort_values('timestamp')
        # Remove first row
        df = df.drop(df.index[0])
        return df

def df_resampling(df, timeframe):
    tframe = timeframe_list[timeframe]['tframe']

    if tframe < 60:
        new_timeframe = str(tframe) + 'T'
    elif tframe >= 60 and tframe < 1440:
        new_timeframe = str(tframe / 60) + 'H'
    elif tframe >= 1440:
        new_timeframe = str(tframe / 1440) + 'D'

    def_ohlcv = {'timestamp' : 'first', 'open' : 'first', 'high' : 'max', 'low' : 'min',
                 'close' : 'last', 'volume' : 'sum',}
    df = df.resample(new_timeframe).agg(def_ohlcv)

    return df

if __name__ == "__main__":
    main()

10分〜15分程度で各時間足のcsvファイルを書き出します。これをmysqlimportでデータベースにインポートします。

import_csv.sh

#!/bin/sh

csvfiles='chart_15m.csv
chart_1d.csv
chart_1h.csv
chart_1m.csv
chart_2h.csv
chart_30m.csv
chart_3m.csv
chart_4h.csv
chart_5m.csv'

for n in $csvfiles; do
    mysqlimport --ignore-lines=1 \
                --fields-terminated-by=, \
                --local -u guest -p \
                bitmex \
                $n
done

このスクリプトを作成にあたり、次のサイトを参考にしました。大変感謝しています。

追記:

numpyとPythonのListとどちらが速いのか分かりませんがつぎのようにnumpyに置き換かえてみます。

...

import numpy as np

...

            yield ([np.array(data[x]) for x in ['t', 'o', 'h', 'l', 'c', 'v']])            

...

    def get_data(self):
        mex_timestamp, mex_open, mex_high, mex_low, mex_close, mex_volume =  [np.array([]) for x in range(6)]

        for _timestamp, _open, _high, _low, _close, _volume in BitmexGetData.get_ohlcv(self):
            mex_timestamp = np.append(mex_timestamp, _timestamp)
            mex_open      = np.append(mex_open, _open)
            mex_high      = np.append(mex_high, _high)
            mex_low       = np.append(mex_low, _low)
            mex_close     = np.append(mex_close, _close)
            mex_volume    = np.append(mex_volume, _volume)

調べてみると次のようにありました。

NumPyのndarrayは多次元配列を扱うことを目的としたクラスで、事前にメモリ確保しています。np.append関数を使用すると、元のshapeが破壊されてしまうため、要素のコピーが発生して遅くなることがあります。一方で、Pythonのlistは可変長の要素を保持するベクタ型です。少し大きめの要素を事前に確保するので、毎回のコピーは発生しません。

https://deepage.net/features/numpy-append.html

numpyは計算は速いが可変長のリスト処理は遅いということです。