DolphinDB实现EMQ X的数据接入

导语 DolphinDB 是由浙江智臾科技有限公司研发的一款高性能分布式时序数据库,融合了分布式存储、分布式计算、流计算和编程语言,为客户提供了轻量级、一站式的大数据解决方案,特别适合量化金...

导语

DolphinDB 是由浙江智臾科技有限公司研发的一款高性能分布式时序数据库,融合了分布式存储、分布式计算、流计算和编程语言,为客户提供了轻量级、一站式的大数据解决方案,特别适合量化金融及工业物联网等领域。DolphinDB为用户提供了多种API接入方式,同时还支持MQTT/OPC/KAFKA等多种数据注入方式。随着DolphinDB的发展,更多厂商开展了与DolphinDB的深度合作。

杭州映云科技有限公司是面向 5G 和物联网市场的消息与流处理领域的领先企业。日前,映云科技技术团队使用自有的Erlang编程语言和DolphinDB开发的数据接入协议,自主开发了 DolphinDB API,可以通过TCP 协议与 DolphinDB进行双向数据传输,为使用DolphinDB的开发者和用户提供了新的交互方式。

v2-4232a456bf74194581debe4ffd164f7b_720w.jpg


EMQ X (简称 EMQ) 是一款完全开源,高度可伸缩,高可用的分布式 MQTT 消息服务器,同时也支持 CoAP/LwM2M 一站式 IoT 协议接入。EMQ 是 5G 时代万物互联的消息引擎,适用于 IoT、M2M 和移动应用程序,可处理千万级别的并发客户端。

#搭建与配置 DolphinDB

目前,EMQ X 仅适配 DolphinDB 1.20.7 及以上的版本。

以 Linux 版本为例,前往官网下载社区最新版本的 Linux64 安装包:https://www.dolphindb.cn/downloads.html

将安装包的 server 目录上传至服务器目录 /opts/app/dolphindb,并测试启动是否正常:

chmod +x ./dolphindb
./dolphindb

## 启动成功后,会进入到 dolphindb 命令行,执行 1+1
>1+1
2

启动成功,并得到正确输出,表示成功安装 DolphinDB。然后使用<CRTL+D> 关闭 DolphinDB。

现在,我们需要打开 DolphinDB 的 StreamTable 的发布/订阅的功能,并创建相关数据表,以实现 EMQ X 消息存储并持久化的功能:

  1. 修改 DolphinDB 的配置文件 vim dolphindb.cfg 加入以下配置项,以打开 发布/订阅 的功能:
## Publisher for streaming
maxPubConnections=10
persistenceDir=/ddb/pubdata/
#persistenceWorkerNum=
#maxPersistenceQueueDepth=
#maxMsgNumPerBlock=
#maxPubQueueDepthPerSite=

## Subscriber for streaming
subPort=8000
#subExecutors=
#maxSubConnections=
#subExecutorPooling=
#maxSubQueueDepth=

2.后台启动DolphinDB服务:

## 启动完成后,DolphinDB 会监听 8848 端口供客户端使用。
nohup ./dolphindb -console 0 &

3.前往 DolphinDB 官网,下载合适的 GUI 客户端连接 DolphinDB 服务:

    • 前往 下载页 (opens new window)下载 DolphinDB GUI
    • DolphinDB GUI 客户端依赖 Java 环境,先确保已经安装 Java
    • 前往 DolphinDB GUI 目录中执行 sh gui.sh 启动客户端
    • 在客户端中添加 Server 并创建一个 Project,和脚本文件。

4.创建分布式数据库,和 StreamTable 表;并将 StreamTable 的数据持久化到分布式表中:

// 创建一个名为 emqx 的 分布式文件数据库
// 并创建一张名为 `msg` 表,按 `clientid` 和 `topic` 的 HASH 值进行分区:
schema = table(1:0, `clientid`topic`qos`payload, [STRING, STRING, INT, STRING])
db1 = database("", HASH, [STRING, 8])
db2 = database("", HASH, [STRING, 8])
db = database("dfs://emqx", COMPO, [db1, db2])
db.createPartitionedTable(schema, "msg",`clientid`topic)

// 创建名为 `st_msg` 的 StreamTable 表,并将数据持久化到 `msg` 表。
share streamTable(10000:0,`clientid`topic`qos`payload, [STRING,STRING,INT,STRING]) as st_msg
msg_ref= loadTable("dfs://emqx", "msg")
subscribeTable(, "st_msg", "save_msg_to_dfs", 0, msg_ref, true)

// 查询 msg_ref;检查是否创建成功
select * from msg_ref;

完成后,可以看到一张空的 msg_ref 已创建成功:

v2-c505fe05c8fd25ed8b3d08a9b21613c6_720w.jpg


至此,DolphinDB 的配置已经完成了。

详细的 DolphinDB 使用文档请参考:

#配置规则引擎

创建规则:

打开 EMQ X Dashboard (opens new window),选择左侧的 “规则” 选项卡。

填写规则 SQL:

SELECT * FROM "t/#"


v2-91dfbea9c6b4e9b868eae8e3eff7a538_720w.jpg


关联动作:

在 “响应动作” 界面选择 “添加”,然后在 “动作” 下拉框里选择 “保存数据到 DolphinDB”。


v2-1b990a48e3fd25b736af70197de8c956_720w.jpg


填写动作参数:

“保存数据到 DolphinDB” 动作需要两个参数:

1). SQL 模板。这个例子里我们向流表 st_msg 中插入一条数据,SQL 模板为:

insert into st_msg values('${clientid}', '${topic}', ${qos}, '${payload}')

注:直接复制上述SQL语句可能会出现换行符,会导致插入失败,请确认在SQL模板中的行数是1


2). 关联资源的 ID。现在资源下拉框为空,可以点击右上角的 “新建资源” 来创建一个DolphinDB 资源:

填写资源配置:

服务器地址填写对应上文部署的 DolphinDB 的服务器,用户名为 admin 密码为 123456

attachments-2022-03-np1xatM262315041da498.png


v2-5c7a109c76a795c0e8ca6aa2927220f8_720w.jpg


点击 “确定” 按钮。

返回响应动作界面,点击 “确定”。


v2-b1bc0d25aacefc6de0b2b2b659377ede_720w.jpg


返回规则创建界面,点击 “创建”。


v2-dbf52132b1210de73cc087045a32c30a_720w.jpg


在规则列表里,点击 “查看” 按钮或规则 ID 连接,可以预览刚才创建的规则:


v2-1ad1531d281883de2c9414b3af02336e_720w.jpg


规则已经创建完成,现在发一条数据:

Topic: "t/a"
QoS: 1
Payload: "hello"

然后检查持久化的 msg_dfs 表,新的数据是否添加成功:

v2-ca3a74b64b93b7abf5ec546259766f53_720w.jpg


声明:此文章转载自EMQ,原文链接:EMQ Docs

0 条评论

请先 登录 后评论
Junxi
Junxi

73 篇文章

作家榜 »

  1. Junxi 73 文章
  2. liang.lin 5 文章
  3. mhxiang 4 文章
  4. wfHuang 3 文章
  5. admin 3 文章
  6. chris 2 文章
  7. 丘坤威 1 文章
  8. 陈枢之 1 文章