Request sample code for true streaming analysis of stock data

Hi,

My goal is to build a Realtime stock analysis platform. There are many databases in my consideration. As of now I am trying dolphinDB since several months. But I found it lacking something.

I have searched on all forums and documents but I can not find anything similar for True Streaming Analysis of stock data.


Problem Statement:

I have stock data stream consists of ts (TIMESTAMP), symbol (SYMBOL), price (DOUBLE), volume (DOUBLE). For multiple symbols (approx. 200 symbols)

From that real time stream, I want to make OHLCV for multiple timeframe 1 Min, 5 Min, 10 Min, 15 Min, 30 Min, 1 Hr . Then Apply technical indicators live VWAP, EMA, MFI, MACD, ADX, slowK, RSI , StochRSI, Bollinger Bands etc. and many more.

The OHLCV and all indicator data SHOULD be update with each and every tick.

Please note that the latency of all such calculation cannot exceed 10 milliseconds.


I have tried many things (you can see my asked question on my profile) but nothing gives true Streaming Analysis.


I hope you understand my question. Please reply me as soon as possible. Kindly solve the problem as soon as possible.


Note:-

If my problem is solved then my plan is to scale my platform to 50000 symbols and more with dolphinDB commercial license. But for now I found support from your side very very poor. If you don't support me then how will it work ?


You can run this python code to get the stream from Binance and directly compare your results of OHLCV and all indicators from Bincance. (This code snippet is fetched directly from their documents)

from binance import ThreadedWebsocketManager
import dolphindb as ddb

api_key = 'mQC5MGZtgN0Ta323WIcHAwrKiCCYTSHGwi1drKR2XaxPH26cT3Qp0C04mgIFvdmO'
api_secret = '3KY4qmsnzLYmv3IXlg72jeW5MH597bDHkSE6oQ13o5nZpaEnCSDQLGG8gib5Tm5E'

s = ddb.session();
s.connect("127.0.0.1",8080,"admin","123456")
t_colNames="`ts`symbol`price`ltq`vol`time"
t_colTypes="`TIMESTAMP`SYMBOL`DOUBLE`DOUBLE`DOUBLE`TIMESTAMP"

try:
    s.run("""
enableTableShareAndPersistence(table=streamTable(25000:0,{0},{1}), tableName=`btc1, cacheSize=1200000, retentionMinutes=10)
    """.format(t_colNames,t_colTypes))
    print("created streamtable")
except:
    print("loaded streamtable")

def main():

    twm = ThreadedWebsocketManager(api_key=api_key, api_secret=api_secret)
    twm.start()

    def handle_socket_message(msg):
        s.run("tableInsert(btc, (convertTZ(timestamp({3}),'UTC','Asia/Kolkata')), `{0}, {1}, {2}, {2}, convertTZ(now(),'Asia/Kolkata','Asia/Kolkata'));".format(str( msg['s']),float( msg['p']),float( msg['q']),int( msg['T'])))
        print("tableInsert(btc, (convertTZ(timestamp({3}),'UTC','Asia/Kolkata')), `{0}, {1}, {2}, {2}, convertTZ(now(),'Asia/Kolkata','Asia/Kolkata'));".format(str( msg['s']),float( msg['p']),float( msg['q']),int( msg['T'])))

    twm.start_trade_socket(callback=handle_socket_message, symbol='BTCUSDT')
    streams = ['btcusdt@trades']
    twm.start_multiplex_socket(callback=handle_socket_message, streams=streams)
    twm.join()

if __name__ == "__main__":
   main()

Thanks & Regards,

Vishvesh Upadhyay.

请先 登录 后评论
  • 0 关注
  • 0 收藏,1154 浏览
  • Vishvesh Upadhyay 提出于 2022-01-30 18:36

相似问题