没有成交时,怎么生成分钟k线

目前在拼湊OHLC上,由于有tick才会生成相相应的分钟k线。在一些流动性低的商品,前端需要把每一根OHLC都生成出來,也就是沒有成交的时段,也需要有成交量volume = 0的分钟k线。当没有tick,OHLC都为前一个的Close, 但Volume=0。
现在写的代码如下,效率比较低,问一下DolphinDB database有什么高效的写法?

def kline(code, date, freq,mkt):
    # s = ddb.session()
    # s.connect(
    #     hostdata["ddb"]["host"],
    #     hostdata["ddb"]["port"],
    #     hostdata["ddb"]["account"],
    #     hostdata["ddb"]["password"],
    # )
    bar = int(freq)
    # ttime = datetime.now().strftime("%H:%M:%S")
    today = datetime.now().strftime("%Y.%m.%d")
    dbidx = gettable.get("idxdb")
    tbidx = gettable.get("idxtb")
    if len(code) == 4:
        db = gettable.get("idxdb")
        tb = gettable.get("idxtb")
        m = "Amount"
        #mkt = "IDX"
        timerange = ">"
    else:
        db = gettable.get("tsedb")
        tb = gettable.get("tsetb")
        m = "Close*Volume*1000"
        #mkt = get_reference("mkt", code[1:], 1, "M", "MSTR")
        timerange = ">="
    startt = "09:00:00"
    endtime = "15:00:00" if mkt == "OES" else "13:30:00"
    alitime = "t+s" if mkt == "OES" else "t+s-afterend"  # 興櫃以外 13:30後的合併至13:30
    t = time.time()
    k = """
        SelectCode={};
        SelectDate={};
        msize = {};
        tb = loadTable("{}",{});
        today = {};
        starttime = {} ;
        endtime = {};
        minnum = (endtime - starttime)/60;  //一分線總根數
        if(minnum%msize==0){{bar = minnum/msize;}}
        else{{bar = minnum/msize+1;}}
        lasttime = select last(Time) from tb where Code = SelectCode;//目前最後一根的時間
        tbtime = loadTable("{}",{})
        everyday = select distinct(Date) as d from tbtime where Date<=SelectDate and Code = `001;
        all_date = select d from everyday where rowNo(d)<msize*8; //總共要撈的日期表
        count = 0;
        for(d in all_date.d){{
            tday = table(take(d,bar) as day) ; 
            ttime = table(take(starttime+60*msize*(1..(bar)),bar) as time) ;
            tt = tday<-ttime ;
            if(count==0){{
                if(today==d and lasttime.last_Time[0]<tt.time[count(tt)-1]){{
                    tb1 = select day,temporalFormat(time,"HHmmss") as time from tt where time<=(lasttime.last_Time[0]+60000*msize) order by time;
                }}
                else{{
                    tb1 = select day,temporalFormat(time,"HHmmss") as time from tt order by time;
                }}
            }}
            else{{
                if(today==d and lasttime.last_Time[0]<tt.time[count(tt)-1]){{
                    tb2 = select day,temporalFormat(time,"HHmmss") as time from tt where time<=(lasttime.last_Time[0]+60000*msize) order by time;
                }}
                else{{
                    tb2 = select day,temporalFormat(time,"HHmmss") as time from tt order by time;
                }}               
                tb1.append!(tb2)
            }}
            count+=1;
        }}

        if(count(all_date.d)==0){{
            table = table(0 as d,0 as t,0 as o,0 as h,0 as l,0 as c,0 as v,0 as m ); //沒有資料
        }}
        else{{
            tbzero = select temporalFormat(day,"yyyyMMdd") as d, time as t from tb1 order by day asc,time asc;

            def TimeBar(dt, sTime, barSize) {{
                dayBegin = dt.date().datetime() + sTime.int();
                edtime = {};
                t = dayBegin + bar(dt - dayBegin, barSize);
                s = int(t.time() < edtime) * barSize; //原本算出的t會向前靠齊 09:01:27算在 09:01,+1個bar則向後面的時間靠齊至09:02
                m = t.time()-13:30:00.000;//13:30~14:30都歸在13:30 
                afterend = int(m>0)*int(t.time()<14:30:00)*m/1000; //14:30定盤排除
                return {};
            }}
            size = msize*60;                                                                                                                                        //指數09:00整會傳昨收資料 要排除
            ohlc = select first(Close) as o ,max(Close) as h ,min(Close) as l , last(Close) as c , sum(Volume) as v ,sum({}) as m from tb where Code=SelectCode and Time{}09:00:00.000 and Date<=all_date.d[0] and Date>=all_date.d[count-1] group by Code,TimeBar(datetime(date) + time.second().int(),starttime,size) as T;
            tdb = select temporalFormat(date(T),"yyyyMMdd") as d,temporalFormat(time(T),"HHmmss") as t,o,h,l,c,v,m from ohlc context by date(T);

            if(msize*8>count(all_date)){{
                closenum = 0}}
            else{{
                preday = select d from everyday where rowNo(d)=msize*8;
                preclose = select last(Close) as c from tb where Code=SelectCode and Date=preday.d[0] and Time<=endtime ;
                closenum = preclose.c[count(preclose.c)-1] ; }}
            tbclose = table(0 as d,0 as t,0 as o,0 as h,0 as l,closenum as c,0 as v,0 as m ); //補最前面0 close

            tjoin = lj(tbzero, tdb, `t`d);
            table = select * from tjoin order by d,t ;
            update table set v = v.nullFill(0);
            update table set m = m.nullFill(0);
            table.append!(tbclose); 
        }}
        table
    """.format(
        code, date, bar, db, tb, today, startt, endtime,dbidx,tbidx,  endtime, alitime, m,timerange
    )
    k_line = s.run(k)
    prec = k_line.c[len(k_line.c) - 1]
    k_line = k_line.drop(len(k_line.c) - 1)
    if not (k_line.empty):
        close = k_line["c"].fillna(method="ffill")
        if str(close[len(close) - 1]) != "nan":
            k_line.c = k_line.c.fillna(close)
            k_line.o = k_line.o.fillna(close)
            k_line.l = k_line.l.fillna(close)
            k_line.h = k_line.h.fillna(close)
        if str(prec) == "None" or str(prec) == "nan":
            k_line = k_line.fillna(0)
            k_line.v = k_line.v.astype(int)
        else:
            k_line = k_line.fillna(prec)
            k_line.v = k_line.v.astype(int)
    print("timer_wholetable", time.time() - t)
    return k_line
请先 登录 后评论

2 个回答

Jason Tang - 时序数据库技术支持
/**
 *  Assume two sessions: 09:00 ~13:30, 15:00~17:00
 *  dburl : dfs://tickdb
 *  tableName: Tick
 */
def  getMinuteKline(inCode, inDate){
 t = select first(Date) as Date, firstNot(Close) as Open, min(Close) as Low, max(Close) as High, lastNot(close) as Close from loadTable("dfs://tickdb", "Tick")  where code = inCode, Date = inDate group by minute(Time) as Time order by Time
 leftT = table( join(09:01m .. 13:30m, 15:01m .. 17:00m) as Time)
 result = select leftT.Time as Time, t.Time as ActualTime, Date, Open, Low, High, Close from aj(leftT, t, `Time)
 update result set Open=Close, Low=Close, High=Close where Time != ActualTime
 return result.drop!("ActualTime")
}
请先 登录 后评论
谭华

解决方案:

利用interval函数,按照duration进行窗口滑动,并且可以选择遇到空值的处理方案,本问题是选取前一个有效k线来代替没有值的K线。

代码展示

//数据:
n = 1000000 date = take(2019.11.07 2019.11.08, n) time = (09:30:00.000 + rand(2*60*60*1000, n/2)).sort!() join (13:00:00.000 + rand(2*60*60*1000, n/2)).sort!() timestamp = concatDateTime(date, time) price = 100+cumsum(rand(0.02, n)-0.01) volume = rand(1000, n) symbol = rand(`600519`000001`600000`601766, n) trade = table(symbol,date, time, timestamp, price, volume).sortBy!(`symbol`timestamp)
SQL:
select first(price) as open, max(price) as high, min(price) as low, last(price) as close, sum(volume) as volume from trade group by symbol, date, interval(time,5m, 'prev')

结果展示:

attachments-2024-01-ChhYYk7U65b9b998caa0d.png

请先 登录 后评论