Making Your Data Flow Resilient With Apache NiFi Clustering

Brent Segner
12 min readFeb 4, 2021

--

By

and

Introduction

In a previous blog we covered the need to take into account a number of factors relevant to both the infrastructure and application when evaluating the workloads placement and performance within an edge computing environment. These data points included standard measurements around network bandwidth, CPU and RAM utilization, disk I/O performance as well as other more transient items such as adjacent services and resource availability. While each of these data points are critical inputs towards operating an efficient edge computing cloud environment and ensuring the overall health of the applications, there are other analytical factors to consider. In this blog we’ll touch on some of the numerous challenges that can be encountered with the collection and transforming of data into a format that is serviceable for use in analytics as well as how to construct a resilient data flow ensuring data continuity.

Other challenges including topics such as data storage and retention notwithstanding, there are four principal areas that we’ll focus on in this blog. The four areas are specifically associated with the collection and transformation of data required to build an effective analytical framework for infrastructure and applications running in an edge computing environment and the approach we took to solve those challenges.

Data Sources

The variety of applications supported at the edge bring along with them a variety of sources from which data ranging from application performance metrics to infrastructure telemetry data can be consumed. In this type of environment, that means sourcing data from many different endpoints and technologies where data originates or is deposited. These endpoints can range from S3 Object Storage locations for logs to message bus technologies like Apache Kafka or Pulsar for streaming and time series data. The variety of endpoints require an analytic framework that can integrate with many potential upstream data sources.

Data Format

With the large number of data points required for analytics to paint a complete picture, multiple source endpoints are oftentimes a necessary evil. Ingesting data from a variety of sources also frequently involves handling data in multiple formats and structures. While it would be nice if every source passed data in a common format such as JSON, the reality is far different. A critical component to an effective analytical framework is the ability to not only be agnostic to the format of ingested data, but also possessing the capability to transform that data into a standard structure that makes analysis and model building consistent and predictable.

Data Volume

Consistent to the number of locations, instances and devices supported in a cloud environment, is the number of data points generated. In simple terms, the larger the infrastructure and applications are in scale and geography the more data points will be generated. Depending on the size of the infrastructure deployment and applications, it would not be unusual to see hundreds of thousands to millions of data points generated on an hourly basis. Having a framework that is optimized and scalable to handle significant volume is essential as any down time on the data flows, represents potentially large holes in your datasets.

Data Targets

Similar to the number of source locations, depending on what tools are being used for analysis and internal policies around data security and retention, there may be multiple endpoints where the data ultimately resides after it has been ingested. For example this could be public cloud provided database services like Big Query or an internal service such as a data lake or an analytical service like Spark Streaming. In future articles we will discuss these potential endpoints and use cases for each in greater detail, but for the purposes of this particular discussion we will assume that most users will inherently have multiple endpoints for the data after it has been collected and transformed which requires an agile mechanism for reaching destination endpoints for the data flows.

Approaches to the Challenge

In order to meet the needs around data ingestion, flow control, and mediation there are a few different ways to approach the challenge each with pros and cons. The first would be to leverage a managed service within one of the cloud service providers (AWS, Google, Azure, etc…) which offer elegant and scalable solutions. The second approach would be to utilize a proprietary service like StreamSets which simplifies the initial set up and offers additional paid support and tooling to accelerate the deployment. In our case, we turned to a third option which was to leverage the open source community to help address the business need for a flexible and scalable solution.

The Apache NiFi (formerly known as NiagaraFiles) project originated and was developed by the NSA until it was transitioned to the open source community in 2014. This project offers a flexible solution to the problems around creating and tracking data flows from source through destination, while providing low latency and high throughput as well as supporting the need for dynamic prioritization. The Java based application can be deployed on a number of different appliances ranging from VMs or physical servers running Linux to a simple Windows or MacOS desktop workstation. One of the principal benefits of the utility is the ease at which data flows can be quickly established due in part to the large number of flow processors natively supported. These open source processors allow for NiFi to interface directly with a multitude of different platforms and services for data ingestion, manipulation and aggregation both as source and target.

While use cases may vary with respect to which ETL (extract, transform, load) solution is most appropriate address the complexity in data volume or sources and targets locations, for the purposes of this article we are going to focus specifically on how we architected our NiFi deployment to provide the flexibility and resiliency needed to run at scale in a big data environment by clustering the service.

Deployment

Although it is entirely possible to deploy NiFi in a single node configuration, this does not represent a best practice for an enterprise graded deployment and would introduce unnecessary risk into a production environment where scaling to meet demand and resiliency are paramount. In order to get around this concern, as of release 1.0.0, NiFi provides the ability to cluster nodes together using either an embedded or external Apache Zookeeper instance as a highly reliable distributed coordinator. While a simple Google search shows there is plenty of debate around whether it is better to use an embedded or external Zookeeper service as both sides have merit, for the sake of argument and this blog, we will use the embedded flavor in the deployment.

Note: When clustering, it is best to stick with an odd number of instances to alleviate issues around Zookeeper negotiating which will become the master. Per the Apache NiFi project…

“Generally, it is advisable to run ZooKeeper on either 3 or 5 nodes. Running on fewer than 3 nodes provides less durability in the face of failure. Running on more than 5 nodes generally produces more network traffic than is necessary. Additionally, running ZooKeeper on 4 nodes provides no more benefit than running on 3 nodes, ZooKeeper requires a majority of nodes be active in order to function.”

In this example, we will build out a three node NiFi cluster using Ubuntu virtual machines that are sized to 4 vCPU, 8GB RAM and 20GB of disk that has Java version 11.0.9.1 and common Linux utilities like Nano already installed. Depending on the environment, it may be necessary to confirm that all three instances have the ability to connect to one another before proceeding forward with the rest of the steps in the guide.

Since the deployment will use nifi host references for each node, the first step is to update the /etc/hosts file with the name and IP address of each node that will be in the cluster. This step will need to be completed on each of the nodes in the deployment.

| sudo nano /etc/hosts

Example (Do not use underscores “_” in the name) :

145.4.104.2 nifi-worker-2

145.4.104.3 nifi-worker-3

145.4.104.4 nifi-worker-4

After the host file on each server is updated with the cluster information, it is time to download the binaries on each of the hosts. There are a number of different mirror sites available, but we used the first one on the list at https://www.apache.org/dyn/closer.lua?path=/nifi/1.12.1/nifi-1.12.1-bin.tar.gz for the purposes of this build.

| wget https://apache.claz.org/nifi/1.12.1/nifi-1.12.1-bin.tar.gz

Once the binaries are downloaded into the root directory, the files can be unzipped on each of the servers which will establish the NiFi directory and file structure.

| tar xzvf nifi-1.12.1-bin.tar.gz

With the directories intact, it is time to start editing the nifi.properties with the cluster information.

| sudo nano nifi-1.12.1/conf/nifi.properties

The first step is to set the nifi.state.management.embedded.zookeeper.start=true. This will initiate the Zookeeper service when the NiFi instance is started.

Next the nifi.remote.input.host and nifi.remote.input.port need to be updated. Since we are currently on the nifi-worker-2 server, the field was updated accordingly with nifi.remote.input.host=nifi-worker-2. As this process is repeated on the other nodes in the cluster, the appropriate host name should be added.

The other field in this section that needs to be updated from the default value is the nifi.remote.input.socket.port. Although the service is not incredibly particular about what port is used, make sure the port you assign has not been used by any other service on the VM. In our case, 9998 was available so we set the value to nifi.remote.input.socket.port=9998.

In the web properties section, the nifi.web.http.host value will need to be updated with the appropriate reference for the server. This field is important to prevent confusion within the user interface.

The cluster node section, we will start to provide some of the specific information needed for the host to be connected to and be properly identified within the Zookeeper cluster. The first setting that needs to be updated is the nifi.cluster.is.node which should be set to true. This identifies that the node should be included in the NiFi cluster.

Next, the nifi.cluster.node.address needs to be updated with the name used for the current node within the /etc/hosts file. In this particular case, we updated it to nifi-worker-2 to keep the terminology consistent across the deployment.

The final value that needs to be updated within the cluster node section is the nifi.cluster.protocol.port value. Similar to what we did in the previous section with the socket.port, the service is not incredibly particular about exactly what port is used so we just set the default value of 9999 since it was not being used anywhere else.

Since we are not introducing security or load balancing in this guide, the final section that needs to be updated is the Zookeeper connection information in the nifi.zookeeper.connect.string. This is a comma delimited string of each of the nodes within the cluster using the default port of 2181. In our example, we used the following setting of:

nifi.zookeeper.connect.string=nifi_worker_2:2181,nifi_worker_3:2181,nifi_worker_4:2181

After the connection string is updated, the nano.properties file can be saved and exited.

The next file to be updated is the zookeeper.properties. In this file, we will add the server references for Zookeeper to track all of the servers within the cluster.

server.2=nifi_worker_2:2888:3888;2181

server.3=nifi_worker_3:2888:3888;2181

server.4=nifi_worker_4:2888:3888;2181

| sudo nano nifi-1.12.1/conf/zookeeper.properties

With multiple servers located in a cluster, Zookeeper needs a mechanism to keep track of which service is running on each host. In order to track the services, a file named myid needs to be deposited into a specific folder identified in the earlier section of the zookeeper.properties file as dataDir.

Since nothing else in this file needs to be modified, you can now save and exit the zookeeper.properties file to perform the final step of the configuration by adding/updating the myid file.

| mkdir nifi-1.12.1/state

| mkdir nifi-1.12.1/state/zookeeper

| cd nifi-1.12.1/state/zookeeper

| nano myid

In this example, we are working on nifi-worker-2, so we will simply place a “2” in the file. As you iterate through the other nodes in your cluster, the number should be updated accordingly (i.e. nifi-worker-3 would be updated with a “3” and so forth).

Once this update has been completed on all three servers within the Nifi cluster, it is time to go back to the home directory and start the service on each of the servers.

| cd

| nifi-1.12.1/bin/nifi.sh start

After the NiFi service is started, you can check to make sure that it is running using the “status” command.

| nifi-1.12.1/bin/nifi.sh status

If you are interested in tracking the status of the clustering, you can tail the nifi-app.log.

| tail -f nifi-1.12.1/logs/nifi-app.log

It will take up to four minutes for the different nodes to negotiate which one will act as the master, but after that process is completed, you should see the heartbeat information being passed between the different nodes.

If you run into any issues with either the cluster not forming correctly or the NiFi service dying, the nifi-bootstrap and nifi-user logs are helpful for troubleshooting.

| tail -f nifi-1.12.1/logs/nifi-bootstrap.log

| tail -f nifi-1.12.1/logs/nifi-user.log

Note, even if the logs look clean in one node, there may be an issue being reported only on a single node in the cluster so make sure you tail the various log files in all nodes associated the cluster.

Provided there are no issues either launching NiFi or clustering the service, the UI should now be available at any of the three servers within the cluster http://hostname:8080/nifi/ which would be http://145.4.104.2:8080/nifi/ for our example cluster.

When you navigate to the page, you will see the number of nodes in the cluster in the top left hand corner of the page.

You can see details on your cluster be clicking on the three bars on the upper right and then clicking ‘Cluster’

With the cluster now up and running, you are ready to start adding processors and flowfiles which starts to get your ETL pipeline established and your data flowing supported by a scalable, resilient clustered architecture. A simple flowfile is shown below.

Conclusion

In the end, given the variety of different ways that 5G applications and services are and will be deployed into an edge computing environment, the selection of tools used to build ETL pipelines are as numerous as the opinions on which ones are best to use. The reality around which ETL tools should be used oftentimes comes down to personal preference coupled with the nuances of the applications being deployed and the infrastructure supporting them. While there are many aspects of a mature ETL pipeline each with their own considerations, this blog was intended to illustrate one approach we have taken to deploy open source tools in a way that provides both a higher level of resilience as well as the ability to scale to meet the data processing needs of different analytical workloads.

Note: The information and perspectives held within this blog represent my personal opinion and not that of my employer or foundations that I am affiliated with.

--

--

Brent Segner

Distinguished Engineer @ Capital One| Focused on all things FinOps | Passionate about Data Science, Machine Learning, AI, Python & Open Source