一个简单的airflow dolphindb Operator的示例

一个简单的airflow dolphindb Operator的示例

如下是一个简单的airflow+dolphindb operator的示例,使用airflow调度的话可以参考。初始版本约定:

1. 直接采用s.run调用dos脚本;

2. dos实现runDolScr函数;

3. 入参如果dos是双引号的需要手动声明为双引号;


后续优化:仿造spark的operaotor实现submit等逻辑;负载均衡提交到不同节点的逻辑;

 - 定义

from airflow.models.baseoperator import BaseOperator
from typing import Any, Callable, Dict, Iterable, List, Optional
import os
import dolphindb as ddb

class DolphinDBOperator(BaseOperator):

    template_fields = ["op_kwargs"]
    BLUE = '#ffefeb'
    ui_color = BLUE

    shallow_copy_attrs = (
        'op_kwargs'
    )

    def __init__(
        self,
        dos_file: str,
        op_kwargs: Optional[Dict] = None,
        **kwargs,
    ) -> None:
        super().__init__(**kwargs)
        self.op_kwargs = op_kwargs or {}
        self.dos_filepath = dos_file

    def execute(self, context):
        print(self.op_kwargs)
        runScr="""runDolScr({})""".format(",".join(list(self.op_kwargs.values())))
        with open(self.dos_filepath, 'r') as infile:
            scr_f = infile.readlines()
        script = "\n"
        for scr_i in scr_f:
            script = script + scr_i
        script = script + '\n'+runScr+"\n"
        print(script)
        s=ddb.session()
        s.connect("127.0.0.1", 8711, "admin", "xxxxxxxx")
        msg = s.run(script)
        print(msg)
        s.close()
        return msg
       
       

- 使用

 from operators.dolphindb_operator import DolphinDBOperator
# xxxxxx
load_stg_data = DolphinDBOperator(
        task_id = "load_stg_data",
        dos_file = "/home/airflow/load_stg_comm_ctp.dos",
        op_kwargs={
            "cDate":ddb_data_date,
"data_file":"\"/xxx/{{next_ds_nodash}}/xele_{{next_ds_nodash}}.csv\""
        }
    )  

  • 发表于 2022-06-16 17:28
  • 阅读 ( 3620 )
  • 分类:默认分类

你可能感兴趣的文章

相关问题

0 条评论

请先 登录 后评论
dataz
dataz

1 篇文章

作家榜 »

  1. Junxi 73 文章
  2. wfHuang 6 文章
  3. liang.lin 5 文章
  4. mhxiang 4 文章
  5. admin 3 文章
  6. alex 2 文章
  7. 柏木 1 文章
  8. 丘坤威 1 文章