EMQX+HStreamDB 实现物联网流数据高效持久化
创始人
2025-05-29 18:46:44
0

在这里插入图片描述

在 IoT 场景中,通常面临设备数量庞大、数据产生速率高、累积数据量巨大等挑战。因此,如何接入、存储和处理这些海量设备数据就成为了一个关键的问题。

EMQX 作为一款强大的物联网 MQTT 消息服务器,单个集群可处理上亿设备连接,同时提供了丰富的数据集成功能。HStreamDB 作为一款分布式流数据库,不仅可以高效存储来自 EMQX 的海量设备数据,而且提供实时处理分析能力。EMQX 与 HStreamDB 都具备高可扩展性和可靠性,两者结合不仅能够满足大规模 IoT 应用的性能和稳定性需求,同时能够提升应用的实时性。

在这里插入图片描述

近期 EMQX Enterprise 4.4.15 发布,更新了对 HStreamDB 最新版本的支持,本文将具体介绍如何通过 EMQX 规则引擎将数据持久化到 HStreamDB,实现 MQTT 数据流的存储与实时处理。

:本文介绍的集成步骤基于 EMQX 4.4.15 和 HStreamDB 0.14.0 以上版本。

连接到 HStreamDB 集群

在下面的教程中,我们假设有一个正在运行的 EMQX Enterprise 集群和正在运行的 HStreamDB 集群。如需部署 EMQX Enterprise 集群,请参考 EMQX Enterprise docs。如需部署 HStreamDB 集群,请参考 HStreamDB docs,其中包含关于如何用 Docker 快速部署的说明。

我们可以通过 Docker 来部署 HStreamDB 客户端并连接到 HStreamDB 集群:

# 获取帮助信息
docker run -it --rm --name some-hstream-cli --network host hstreamdb/hstream:v0.14.0 hstream --help

我们在此使用 hstream stream 命令创建一个 stream,供接下来的示例使用:

# 使用 hstream stream 命令创建 streams
docker run -it --rm --name some-hstream-cli --network host hstreamdb/hstream:v0.14.0 hstream stream create basic_condition_info_0 -r 3 -b $(( 7 * 24 * 60 * 60 ))

接下来,连接到 HStreamDB 集群,启动交互式 HStream SQL shell:

docker run -it --rm --name some-hstream-cli --network host hstreamdb/hstream:v0.14.0 hstream sql --service-url "<>"
# 如果要使用安全连接,还需要填写 --tls-ca, --tls-key, --tls-cert 参数

如果连接成功,将会出现

      __  _________________  _________    __  ___/ / / / ___/_  __/ __ \/ ____/   |  /  |/  // /_/ /\__ \ / / / /_/ / __/ / /| | / /|_/ // __  /___/ // / / _, _/ /___/ ___ |/ /  / //_/ /_//____//_/ /_/ |_/_____/_/  |_/_/  /_/Command:h                           To show these help info:q                           To exit command line interface:help [sql_operation]        To show full usage of sql statementSQL STATEMENTS:To create a simplest stream:CREATE STREAM stream_name;To create a query select all fields from a stream:SELECT * FROM stream_name EMIT CHANGES;To insert values to a stream:INSERT INTO stream_name (field1, field2) VALUES (1, 2);

可以使用 show streams; 来查看已经创建的 streams 的信息:

> show streams;
+-------------------------------------------+---------+----------------+-------------+
| Stream Name                               | Replica | Retention Time | Shard Count |
+-------------------------------------------+---------+----------------+-------------+
| basic_condition_info_0                    | 3       | 604800 seconds | 1           |
+-------------------------------------------+---------+----------------+-------------+

创建 HStreamDB 资源

在利用 EMQX 规则引擎将数据持久化到 HStreamDB 之前,需要创建一个 HStreamDB 资源。

为此,请访问 EMQX Dashboard,单击 规则引擎 -> 资源创建 ,选择 HStreamDB 资源,输入 HStreamDB 地址并填写必要的选项。可用选项如下表:

在这里插入图片描述

在选择开启 SSL 时,会出现额外的 SSL 配置界面,可以粘贴所需配置内容或上传文件。

在这里插入图片描述
在这里插入图片描述

创建数据持久化到 HStreamDB 的规则

点击 规则引擎 -> 规则 -> 创建

在这里插入图片描述

编辑 SQL 规则并添加操作,您可以在字符串模板中使用 SQL 变量。

请注意,本文档中介绍的 SQL 规则仅供演示,实际的 SQL 应根据业务设计进行编写。

单击 添加操作,选择「数据持久化」以将数据保存到 HStreamDB 中。选择上一步创建的资源并输入参数。可用参数如下表:

在这里插入图片描述
在这里插入图片描述

点击 确定 来确认添加行为。

在这里插入图片描述

在 HStream SQL Shell 中获取实时的数据更新

从 EMQX 规则引擎持久化到 HStreamDB 的数据可以使用 HStream SQL Shell 实时读出新写入 stream 的内容。现在,数据已经被写入 HStreamDB,可以使用任何消费方式来消费消息。文档使用了一个简单的消费方法:使用 HStream SQL shell 进行查询。此外,读者可以自由选择使用自己喜欢的编程语言 SDK 编写消费端。

# docker run -it --rm --name some-hstream-cli --network host hstreamdb/hstream:v0.14.0 hstream sql
> select * from basic_condition_info_0 emit changes;

当前的 select 查询没有结果可供打印出,这是因为还没有数据通过 EMQX 的规则引擎向 HStreamDB 写入。一旦有数据写入,便可以在 HStream SQL shell 观察到数据的即时更新。目前在 HStreamDB 使用 SQL 对 streams 做查询,只会打印出创建查询后的结果。如果在 EMQX 停止向 HStreamDB 写入后创建查询,可能观察不到产生的结果。

向 EMQX 写入消息测试规则引擎

可以使用跨平台的桌面客户端 MQTT X 来连接到 EMQX 并发送消息:

在这里插入图片描述

从 EMQX Dashboard 获取规则引擎的运行数据指标

访问对应的规则引擎界面:

在这里插入图片描述

如果规则引擎运行数据指标正常,则代表 EMQX 会将数据持久化到 HStreamDB。一旦写入成功,便可以在前面步骤启动的 HStream SQL Shell 中看到实时的数据更新。

# docker run -it --rm --name some-hstream-cli --network host hstreamdb/hstream:v0.14.0 hstream sql
> select * from basic_condition_info_0 emit changes;
{"current-number-of-people":247.0,"device-health":true,"number-of-people-in-line":14.0,"submitter":"admin-07","temperature":27.0}
{"current-number-of-people":220.0,"device-health":true,"number-of-people-in-line":13.0,"submitter":"admin-07","temperature":27.2}
{"current-number-of-people":135.0,"device-health":true,"number-of-people-in-line":2.0,"submitter":"admin-01","temperature":26.9}
{"current-number-of-people":137.0,"device-health":true,"number-of-people-in-line":0.0,"submitter":"admin-01","temperature":26.9}

结语

至此,我们就完成了通过 EMQX 规则引擎将数据持久化到 HStreamDB 的主要流程。

将 EMQX 采集到的数据存储到 HStreamDB 后,可以对这些数据进行实时处理与分析,为上层 AI、大数据等应用提供支撑,进一步发掘和利用数据价值。作为首个专为流数据设计的云原生流数据库,HStreamDB 与 EMQX 结合可以实现一站式存储和实时处理海量物联网数据,精简物联网应用数据栈,加速企业的物联网应用开发。

版权声明: 本文为 EMQ 原创,转载请注明出处。

原文链接:https://www.emqx.com/zh/blog/integration-practice-of-emqx-and-hstreamdb

相关内容

热门资讯

领益智造回应折叠屏业务进展:已... 4月7日消息,领益智造在投资者互动平台表示,公司已为国内外头部客户供应折叠屏终端硬件,核心产品涵盖不...
震安科技:目前暂未与华为、比亚... 4月7日消息,震安科技在互动平台表示,公司目前暂未与华为、比亚迪、大疆等企业开展深度合作。(科股宝播...
沪深两市成交额突破1万亿,较上... 4月7日消息,数据显示,沪深两市成交额突破1万亿,较上一日此时缩量超60亿,预计全天成交金额超1.6...
保供应,印度部分炼油厂推迟停产... 4月7日消息,印度石油部官员6日说,该国部分炼油企业推迟停产检修生产设备,以保障国内的燃料需求。印度...
多家品牌金饰价格较上个交易日下... 4月7日消息,今日国内黄金饰品价格对比显示,多家品牌金饰价格较上个交易日下跌约12元,报价在1431...
北部湾港:平陆运河通航后,预计... 4月7日消息,北部湾港在互动平台表示,平陆运河通航后,将有力提升西部陆海新通道运输能力和效率,有助于...
WTI原油期货涨超3% 4月7日消息,WTI原油期货涨超3%,报116美元/桶。(广角观察)
猪肉概念再度活跃,华统股份触及... 4月7日消息,猪肉概念盘中再度活跃,华统股份触及涨停,巨星农牧、龙大美食、立华股份、大禹生物跟涨。消...
英海事分析公司:霍尔木兹海峡通... 4月7日消息,总部位于英国的海事分析公司温沃德6日说,霍尔木兹海峡通行转变为“双通道系统”,分别是伊...
清明假期四川A级景区接待游客近... 4月7日消息,2026年清明假期,四川全省纳入统计的899家A级旅游景区,累计接待游客1499.1万...