Understanding InfluxDB Native Collectors

Benniu Ji
8 min readAug 31, 2022

--

Get Data from EMQX Cloud with InfluxDB Native Collector

In my last post, I demonstrated how to use InfluxDB Native Collector to fetch MQTT data from EMQX cloud. When I first tried Native Collector, I had a great experience with it and it was so easy to use compared to other options like InfluxDB Client Libraries or Telegraf Agent. It greatly accelerates the development of IoT applications.

However, in addition to a great user experience, we may need to think more holistically before applying new technologies to production.

So after trying out this new amazing feature, I was curious about how it was implemented on the back end and how it worked in terms of functionality, architecture, and performance. In this post, I’ll share with you my findings and my personal suggestions on using InfluxDB Native Collector to ingest data from third-party MQTT brokers. Hope this may help.

Understanding InfluxDB Cloud’s Native Collector

Learning from the official documentation is always a good place to start.

This is a new feature and there isn’t much documentation out there describing the design and implementation. I could find two blogs announcing the availability of InfluxDB Native Collector and a page about setting up native MQTT subscriptions in the product documentation.

Diagram by Samantha Wang via the official blog

The diagram in Samantha Wang’s blog describes the differences between the two integration options at a very high level.

Telegraf Agent

The first option is to use Telegraf agent to fetch data from the MQTT broker and persist data into InfluxDB. This requires the installation of additional software, which can be difficult to achieve if you use a managed MQTT service.

Native Ingestion

The second option is to use the new Native Ingestion. As we can see from the diagram, there is no need to install additional software. The messages from the MQTT broker go directly into InfluxDB. Well, we all know that MQTT uses the pub/sub model and there is no way to push data directly to the database without the help of subscribers (like Telegraf Agent does). EMQX Cloud’s built-in data integration is another story where it uses hooks in the broker to send data directly to a third-party database. I will share this approach in another future post. So there is no doubt that InfluxDB Cloud creates a “sidecar” service with the InfluxDB service as a subscriber.

The documentation for Native Collector also describes how to set up MQTT subscriptions on InfluxDB Cloud. But there aren’t many details on its implementation.

Learn How Native Collector Works from EMQX Cloud Dashboard

In my previous post, we set up data ingestion for EMQX Cloud and InfluxDB Cloud. EMQX Cloud provides a dashboard to monitor the client subscriptions as well as the status of the broker. This may give us some hints on how Native Collector works.

If you remember, in the last demo we added a subscription called “influxdb” on InfluxDB cloud. Let’s go back to EMQX Cloud and see what happened after we set up the bridge in the InfluxDB Cloud.

The Connections from InfluxDB Cloud

Here’s the monitoring page from EMQX Cloud.

Monitoring clients via EMQ Cloud Dashboard

From this snapshot, we can see 5 connections. The client id indicates those are the subscribers from InfluxData.

Well, apparently, each Native Subscription rule creates 5 clients by default to subscribe to a given topic. I’m not sure how the number of subscriptions is decided, as it is not available as an optional configuration during the native collector setup. I suspect it may be configurable in the future, as it is a very important configuration that affects not only availability, but also performance. For some high throughput scenarios, 5 subscriptions are not enough. However, considering that this is a new feature and still being improved, it is perfectly acceptable to use a fixed number of clients.

Subscriptions

Let’s move on to the subscriptions part of the monitoring page.

Subscriptions created by InfluxDB Native Collector

Aha, the subscriptions list explains how InfluxDB’s native collector subscribes to topics. It uses shared subscriptions.

If you are familiar with the MQTT protocol, you must know that if there are multiple subscribers to the same topic, messages will be dispatched to each subscriber. This would be a big problem for data persistence, as duplicate messages would be inserted. And using a single subscriber would risk a single point of failure. That’s why InfluxDB Cloud uses shared subscriptions to avoid data duplication while maintaining high availability and good performance of this service.

Client Details

Now, let’s go deeper into the client and see what else we can learn.

Client Details

From the client details page, we know that the native collector uses MQTT protocol version 3.1.1 to establish an MQTT connection, with the clean session set to false. And it is QoS 1 subscription.

To be honest, I don’t quite understand why InfluxDB sets clean session to false. Normally it is used to avoid data loss for individual subscriptions in case of network instability. I guess it’s to avoid resubscribing after reconnection, since the bridge is on a public network, which is not always stable. Another possibility is to avoid data loss in case all subscribers are disconnected at the same time for some reason. But this is very unlikely to happen because we know from the IP addresses that these clients are not from the same node.

It is worth mentioning that QoS 1 is used for subscriptions. It is also not configurable during setup. I guess it will be. QoS 1 is “at least once” messaging. It somehow guarantees that no data is lost, but at the cost of performance, and it can also result in duplicate messages being persisted to the database. It’s a tradeoff between reliability and performance. Different scenarios may have different preferences, so it is best to let the user make the decision.

Summarize How Native Collectors Work

Now we have a clear understanding of how the InfluxDB Cloud’s native MQTT collector works.

Architecture Overview for EMQX Cloud and InfluxCloud Integration

This diagram illustrates the architecture at a high level. Devices continuously send data to MQTT brokers on the EMQX Cloud. InfluxDB Cloud creates 5 shared subscriptions to EMQX Cloud with the same group ID. Through this workflow, messages from devices will be routed to only one subscriber. Once the message reaches the subscriber, it will be parsed to the target measurement defined in the native MQTT subscription rules and finally persisted to the target bucket. This is what I think of as the end-to-end workflow.

Suggestions

Based on the above findings, from an engineering perspective, I make the following suggestions:

Message Sequence

QoS 1 does not preserve the order of messages. Even with QoS 0 or 2, it only preserves the order of normal topics. In shared topics, you will never know the arrival order because the messages are delivered to different subscribers. In most cases, this won’t be a big problem, but if you have order-sensitive metrics in MQTT messages, it may not work as expected.

Suggestion 1: Do not use the default TIMESTAMP on InfluxDB. Place a timestamp in the MQTT message body and parse that value in the collector’s subscription rules.

Balancing Strategy of Shared Subscription

Some MQTT brokers support multiple balancing strategies. For example, EMQX Cloud supports the following five balancing strategies:

You need to be aware of the balancing strategy used in your MQTT broker. EMQX Cloud uses random by default, but it is configurable.

Using “sticky” or “hash” may cause problems if subscribers are unable to handle large traffic.

Suggestion 2: Use ‘random’ or ‘round_robin’ strategy to dispatch the workload to the 5 subscribers.

Performance

Maintaining connected sessions and shared subscriptions using QoS 1 both increase the resource consumption of the MQTT broker. Pay attention to client performance and message reception rates, as otherwise errors such as message accumulation and client crashes can result.

Suggestion 3. Evaluate throughput and monitor client performance. If necessary, contact the InfluxDB Cloud support team to see if it is possible to add more subscribers.

Security

At the time of writing, InfluxDB Cloud’s Native Collector only supports connecting to third-party MQTT brokers over the public network. MQTT over TLS is not yet supported. This is not good news for using it in your production environment. Security will be my main concern before adopting this new native collector. Since it is so important to enterprise customers, I believe the InfluxDB Cloud team is working hard to support MQTTS. Let’s wait and see.

Suggestion 4: For now, please add access control rules along with authentication to minimize the risk.

Suggestion 5: Another attempt would be to contact the InfluxDB Cloud team to see if it is possible to set up VPC peering. I haven’t done this, so I’m not sure it’s feasible in a usage-based plan.

Summary

In this post, I share my thoughts on how InfluxDB implements the Native MQTT Collector. Based on EMQX Cloud’s dashboard, it uses five subscribers with the same group ID on the target topic.

With this approach, there may be some challenges in the adoption process. And I give my 5 suggestions on how to improve the feasibility.

While InfluxDB Native Collector is still in its early stages, it is the easiest and fastest way to get MQTT data into InfluxDB, especially with a fully managed MQTT service like EMQX Cloud.

It’s worth a try!

Btw, EMQX Cloud supports pushing data directly to InfluxDB Cloud through its built-in data integration feature. In my next post, I will compare these two different approaches in terms of performance, security, and functionality. Stay tuned!

--

--