一个简单的airflow dolphindb Operator的示例
如下是一个简单的airflow+dolphindb operator的示例,使用airflow调度的话可以参考。初始版本约定:
1. 直接采用s.run调用dos脚本;
2. dos实现runDolScr函数;
3. 入参如果dos是双引号的需要手动声明为双引号;
后续优化:仿造spark的operaotor实现submit等逻辑;负载均衡提交到不同节点的逻辑;
- 定义
from airflow.models.baseoperator import BaseOperator
from typing import Any, Callable, Dict, Iterable, List, Optional
import os
import dolphindb as ddb
class DolphinDBOperator(BaseOperator):
template_fields = ["op_kwargs"]
BLUE = '#ffefeb'
ui_color = BLUE
shallow_copy_attrs = (
'op_kwargs'
)
def __init__(
self,
dos_file: str,
op_kwargs: Optional[Dict] = None,
**kwargs,
) -> None:
super().__init__(**kwargs)
self.op_kwargs = op_kwargs or {}
self.dos_filepath = dos_file
def execute(self, context):
print(self.op_kwargs)
runScr="""runDolScr({})""".format(",".join(list(self.op_kwargs.values())))
with open(self.dos_filepath, 'r') as infile:
scr_f = infile.readlines()
script = "\n"
for scr_i in scr_f:
script = script + scr_i
script = script + '\n'+runScr+"\n"
print(script)
s=ddb.session()
s.connect("127.0.0.1", 8711, "admin", "xxxxxxxx")
msg = s.run(script)
print(msg)
s.close()
return msg
- 使用
from operators.dolphindb_operator import DolphinDBOperator
# xxxxxx
load_stg_data = DolphinDBOperator(
task_id = "load_stg_data",
dos_file = "/home/airflow/load_stg_comm_ctp.dos",
op_kwargs={
"cDate":ddb_data_date,
"data_file":"\"/xxx/{{next_ds_nodash}}/xele_{{next_ds_nodash}}.csv\""
}
)