how to do real time stocks analysis with dolphinDB and python

Hi, I am building a code for real time stocks analysis (I am creating my personal algorithmic trading system). For database I use dolphinDB and to receive realtime datafeed I use python. I am from India and work on NSE stocks.  Currently doing experiment on 150 stocks simultaneously. But planning to do 3000 stocks simultaneously in final project. I have build some code to send the realtime data to dolphinDB and tried converting ticks to multiple OHLC. I have few questions as under. 

1) I am running multiple Time Series Aggregator in series. Is this a good idea? If any suggestion please give it.

attachments-2021-09-GLnaobj6613cefc26ea1d.png

2) Volume data I am getting is Cumulative volume. I am using   vol-prev(vol) to get volume for 1 Min OHLC. Is this optimal way to do this?

3) In India NSE market pre-market opens at 9:00 am and pre-market session closes at 9:08 am. Then at 9:15 am main market session starts. How can I align OHLC to start at 9:15 am ?

4) Now the most important question. I want to perform real time technical analysis on each time frame (1 min, 5min etc) and select stocks based on strategy. How can I do this please help me.

attachments-2021-09-cVyl50Rr613cf0ca2fe60.png

5) Send the selected stocks list back to python.

Please help me on the questions listed above. Thank you and please reply as soon as possible.


Rrelevant python code (using websocket streaming from broker, receiving time in unix seconds):- 
def event_handler_quote_update(message):
    try:  
        s.run("tableInsert(t, (convertTZ(timestamp({3}),'Asia/Kolkata','Asia/Kolkata')), `{0}, {1}, {2}, convertTZ(now(),'Asia/Kolkata','Asia/Kolkata'));".format(str(message['instrument'].symbol).replace("-","_").replace("&", "__"),float(message['ltp']),int(message['volume']),int(message['exchange_time_stamp'])*1000))
    except Exception as e:

DolphinDB code:-

ohlc_colNames=`ts`symbol`open`high`low`close`volume
ohlc_colTypes=`TIMESTAMP`SYMBOL`DOUBLE`DOUBLE`DOUBLE`DOUBLE`DOUBLE
ohlc_tmp = table(10000:0,ohlc_colNames,ohlc_colTypes)
t_colNames=`ts`symbol`price`vol`time
t_colTypes=`TIMESTAMP`SYMBOL`DOUBLE`DOUBLE`TIMESTAMP
tmp = table(10000:0,t_colNames,t_colTypes)
enableTableShareAndPersistence(table=streamTable(25000:0,ohlc_colNames,ohlc_colTypes), tableName=`sm1, cacheSize=1200000, retentionMinutes=15)
enableTableShareAndPersistence(table=streamTable(25000:0,ohlc_colNames,ohlc_colTypes), tableName=`sm5, cacheSize=1200000, retentionMinutes=30)
enableTableShareAndPersistence(table=streamTable(25000:0,ohlc_colNames,ohlc_colTypes), tableName=`sm10, cacheSize=1200000, retentionMinutes=60)
enableTableShareAndPersistence(table=streamTable(25000:0,ohlc_colNames,ohlc_colTypes), tableName=`sm15, cacheSize=1200000, retentionMinutes=90)
enableTableShareAndPersistence(table=streamTable(25000:0,ohlc_colNames,ohlc_colTypes), tableName=`sm30, cacheSize=1200000, retentionMinutes=180)
enableTableShareAndPersistence(table=streamTable(25000:0,ohlc_colNames,ohlc_colTypes), tableName=`sh1, cacheSize=1200000, retentionMinutes=1440)
OHLC_sm1 = createTimeSeriesEngine(name="OHLC_sm1", windowSize=60000, step=60000, metrics=<[first(price) as open, max(price) as high, min(price) as low, last(price) as close, (vol-prev(vol)) as volume]>, dummyTable=tmp, outputTable=sm1, timeColumn=`ts, useSystemTime=true, keyColumn=`symbol,  updateTime=60000, useWindowStartTime=false);
ohlc=<[first(open) as open, max(high) as high, min(low) as low, last(close) as close, sum(volume) as volume]>
OHLC_sm5 = createTimeSeriesEngine(name="OHLC_sm5", windowSize=300000, step=300000, metrics=ohlc, dummyTable=ohlc_tmp, outputTable=sm5, timeColumn=`ts, useSystemTime=false, keyColumn=`symbol,  updateTime=60000, useWindowStartTime=true);
OHLC_sm10 = createTimeSeriesEngine(name="OHLC_sm10", windowSize=600000, step=600000, metrics=ohlc, dummyTable=ohlc_tmp, outputTable=sm10, timeColumn=`ts, useSystemTime=false, keyColumn=`symbol,  updateTime=60000, useWindowStartTime=true);
OHLC_sm15 = createTimeSeriesEngine(name="OHLC_sm15", windowSize=900000, step=900000, metrics=ohlc, dummyTable=ohlc_tmp, outputTable=sm15, timeColumn=`ts, useSystemTime=false, keyColumn=`symbol,  updateTime=60000, useWindowStartTime=true);
OHLC_sm30 = createTimeSeriesEngine(name="OHLC_sm30", windowSize=1800000, step=1800000, metrics=ohlc, dummyTable=ohlc_tmp, outputTable=sm30, timeColumn=`ts, useSystemTime=false, keyColumn=`symbol,  updateTime=60000, useWindowStartTime=true);
OHLC_sh1 = createTimeSeriesEngine(name="OHLC_sh1", windowSize=3600000, step=3600000, metrics=ohlc, dummyTable=ohlc_tmp, outputTable=sh1, timeColumn=`ts, useSystemTime=false, keyColumn=`symbol,  updateTime=60000, useWindowStartTime=true);
subscribeTable(tableName="t", actionName="OHLC_sm1", offset=0, handler=append!{OHLC_sm1}, msgAsTable=true)
subscribeTable(tableName="sm1", actionName="OHLC_sm5", offset=0, handler=append!{OHLC_sm5}, msgAsTable=true)
subscribeTable(tableName="sm5", actionName="OHLC_sm10", offset=0, handler=append!{OHLC_sm10}, msgAsTable=true);
subscribeTable(tableName="sm10", actionName="OHLC_sm15", offset=0, handler=append!{OHLC_sm15}, msgAsTable=true)
subscribeTable(tableName="sm15", actionName="OHLC_sm30", offset=0, handler=append!{OHLC_sm30}, msgAsTable=true);
subscribeTable(tableName="sm30", actionName="OHLC_sh1", offset=0, handler=append!{OHLC_sh1}, msgAsTable=true);
请先 登录 后评论

1 个回答

Boye

1)Yes,this is a good idea.

2)  You can use  deltas or eachPre.

3)  Using the createDailyTimeSeriesEngine engines can be aligned.

4)  You can subscribe to the stock list in the client using the Python Streaming API.

请先 登录 后评论
  • 1 关注
  • 0 收藏,1223 浏览
  • Vishvesh Upadhyay 提出于 2021-09-12 01:58

相似问题