Airflow and Cassandra: Writing to Cassandra from Airflow

Airflow and Cassandra: Writing to Cassandra from Airflow

In this article, we are going to build a simple Extract, Transform, and Load (ETL) data pipeline using Apache Airflow, and Cassandra. Airflow is going to be the orchestration tool and we are going to load our data into Apache Cassandra. Apache Airflow is an open-source project that was developed at Airbnb in 2015, and Apache Cassandra was a database that was created at Facebook and was later open-sourced to the public. The focus is going to be on writing to Cassandra using the Cassandra-Airflow provider.

ETL Pipeline using Apache Airflow and Cassandra

Data Engineering is becoming more important in businesses every day. Data engineering has been available at the company’s disposal for a very long time, but many businesses recently see the need to enhance and grow their data ecosystem to better drive business decisions, and management. Many different tools have helped to grow this rich ecosystem, these tools are game-changers when it comes to data processing, data transformation, data integration, data ingestion and other data management method. Some of the tools that help to grow this ecosystem are Apache Airflow, and Apache Cassandra. In this post, we are going to look at writing data to Cassandra databases from Airflow using the Airflow Cassandra provider. You can jump right into the ETL implementation on writing to a Cassandra database if you already know how Airflow works, the code for the ETL pipeline is available on Github

A diagram showing an ETL data pipeline

In this article, we are going to look at:

  • Introduction to Apache Airflow
  • Important components of an Airflow Workflow
  • Running Airflow using Docker
  • Implement the ETL pipeline
    • Extract data from the Live API data
    • Send the data to an S3 bucket
    • Insert data into Cassandra

Introduction to Apache Airflow

Apache Airflow is a popular orchestration tool for workflow management, any individual with Python skills will be able to write Airflow DAGs and start solving problems with the orchestration tool. Interestingly, you can bring in your existing Python code, and design Airflow to help schedule the workflows based on your schedule plan. Presently, there are different workflow orchestration engines available in the data world, some examples of them are Luigi, Argo, and Prefect. 

Though, if you want a full-featured, mature tool, then you would want to go for Apache Airflow. Airflow would absolutely be worth the time. The ultimate goal of this blog post is to extract from live news data, process this data, and load this data into the Apache Cassandra database. We have used the Apache Airflow-Cassandra provider for writing the data.

The architecture of Apache Airflow

For a start, we are going to look at the architecture of Airflow, and design a single node deployment. This type of deployment is going to work as expected in production pending the time when your business grows. The single-node architecture implies that your Airflow server, worker, the Airflow scheduler and the metadata database e.t.c all sit on the same machine. With this, we must be sure to allocate enough resources for our deployment. But with time you would need to move your resources to a distributed environment, this means you would need to set up a distributed system for your deployment to be fault-tolerant, scalable, and performant. In the next section, we are going to look at important components of Airflow workflows.

A diagram showing Apache Airflow architectural diagram

Important components of an Airflow Workflow

DAGs: DAGs are the building blocks of Airflow workflows, DAG is known as Directed Acyclic Graphs, they collect Tasks together, organized with dependencies and relationships between the Tasks to define how they should run.

Providers: Airflow has some nice functionality that allows you to write some basic tasks, but the capabilities of Apache Airflow can be extended by installing additional packages, called Providers. These providers are created by the Airflow community. An example of the providers is the apache-airflow-providers-apache-cassandra, the apache-airflow-providers-amazon, and the apache-airflow-providers-apache-spark. These providers have nice operators and hooks that can help you to interact with different resources easily. For example, interacting with AWS, Azure, Cassandra, and Spark resources from Airflow. Presently there are more than 60 maintained provider packages and they allow you to work with different engines. You can check them out from apache-airflow-providers.

Operators: Operators just like operations, they generate tasks that become nodes in DAGs. They become the nodes of the DAGs when instantiated. Interestingly, all Operators are derived from BaseOperator and they inherit many attributes, and methods that way. Example of Operators are the BashOperator, SparkSubmitOperator, PythonOperator, EmailOperator, SimpleHttpOperator, ExternalTaskSensor, HivePartitionSensor, SparkSqlOperator, and S3KeySensor e.t.c.

Airflow Operators are sectioned into three,

  1. Transfer Operators: They are operators that move data from one system to another system.
  2. Sensor Operators: They are operators that will continue to run until a condition is met, also they are been triggered whenever an event occurred. Interestingly all sensors are derived from the BaseSensorOperator.
  3. These are operators that perform an action or instruct other systems to perform an action. You can read more on this topic here.
The relationships of Airflow Dags, Providers, Operators, Tasks and Hooks

Tasks: Tasks are the basis of the unit of execution in Airflow, Tasks are usually arranged into DAGs and they have upstream and downstream dependent relationships set between them. In Airflow there are three kinds of tasks, they are the Operators, the Sensors, and the TaskFlow.

  • Operators: these are task templates that you can string together quickly to build most parts of your DAGs.
  • Sensors: They are a special subclass of the Operator that all that it does is wait for external events to happen, and then another process is started. 
  • TaskFlow: TaskFlow is a Python decorated custom function that is packaged as a Task.

Hooks: Hooks are the high-level interface that helps with connection to external resources as quickly, and easy as possible. Interestingly, hooks are often the building blocks that Operators are built out of, and they allow Operators to communicate with external resources. In our case, we have used the CassandraHook.

Connections: Connections are a set of parameters that allow airflow to connect to external resources and systems. The parameters are connection credentials that are stored, for the purpose of talking (connecting) to external systems. Examples of these are the usernames, the password, the hostname, secret keys, and access keys to resources/engines. In our case, we have created connections to the S3 bucket and the Cassandra database. In the next section of this post, we are going to implement the pipeline shown earlier.

Implement the ETL pipeline

With the introduction to the core concept of Airflow, and its important components. We are going to set up Apache Airflow using a Docker container, luckily for us Apache Airflow is customizable, we will extend the functionality of Airflow by installing additional Python packages. For example, the Cassandra provider does not come with the standard Airflow setup, so before we can communicate with the Cassandra database we must install the Apache Cassandra provider and this has been provided by the Airflow community apache-airflow-providers-apache-cassandra.

Most times, we would usually need to customize our Airflow instance to suit our deployment, so we are going to install the needed Python libraries. 

  • Installing Airflow using Docker

According to Airflow documentation, for the purpose of deployment in production, it is recommended for us to set up Airflow using Docker. In our case, we are going to pull the latest airflow docker-compose.yaml file from Github, then we are going to add more functionality to this. Here are the steps to achieve this.

  • Make a directory
mkdir Airflow-Project
  • Pull the docker-compose.yaml
curl -LfO 'https://airflow.apache.org/docs/apache-airflow/2.2.5/docker-compose.yaml'
mkdir -p ./dags ./logs ./plugins
echo -e "AIRFLOW_UID=$(id -u)" > .env

If you would like to stick to the features of Airflow and continue using only the install packages and providers, just hit;

docker-compose up -d

In our case, we would need to extend Airflow with more packages and customization, so we would include a Dockerfile, and we would point the docker-compose.yaml file to the airflowguy image to add additional functionality to our deployment. In your docker-compose.yaml file, change the image to “airflowguy” instead of ${AIRFLOW_IMAGE_NAME:-apache/airflow:2.2.5}. 

docker-compose file of capturing Airflow image section

Now create your Dockerfile and include the following lines in the Dockerfile. You can add your file dependencies in the Dockerfile file, and all libraries you would like to include in the deployments. In our case, we have included just a few providers that would be used in our workflow.

Our Dockerfile look like this. 

The section of the Dockerfile
  • Build the image, and start all the containers by using the commands below
docker build . -f Dockerfile --tag airflowguy
  • Now start the start and run the containers with the command below
docker-compose up -d

Let’s build our Airflow DAG

The code for the ETL pipeline is available on Github. So at this stage, we are going to design and implement our DAG. Below are our steps:

Airflow Graph view of data pipeline DAG
  • Extract data from the Live API data

The API we have used in this article is https://mediastack.com/. The news API contains the latest information about what is happening around the world. You can set up an account to have access to the API by following this documentation https://mediastack.com/documentation.

Code used for extracting API data
  • Send the JSON data to an S3 bucket

Create your S3 bucket on AWS, and make sure to set up the necessary security group for the S3 bucket. Also, make sure you set your S3 connection right on the Airflow UI, and with that, we are going to insert it into our Cassandra database.

Data uploaded to S3 bucket from Airflow

Airflow S3 connection configuration page
  • Define the dependencies of Tasks

At this stage, all our Tasks are all set, we can now define how we want our Tasks to run. 

Dependency of tasks
  • Insert data into Cassandra

Here we are going to;

  1. Create our keyspace, and the table to house our data in Cassandra. You will find the CQL commands that help to create all these on the Github page. 
  2. Set up the connection to Cassandra on the Airflow UI. In our case, we have used the localhost, but please use your node IP address and you should separate the IP addresses with a “,” in a situation where there is more than one node in the cluster.
Cassandra Cluster configuration page of Airflow

Result of data written to Cassandra database from Airflow

Final Note

For our deployment, we make sure to allocate more computing resources to the Airflow instance. So for your deployment, always make sure you allocate enough resources to Airflow and never allow your deployment to fight for resources in production. Also, make sure you don’t pull a large dataset with Xcom between Tasks of your Dags but instead save it to storage e.g. S3 and Azure Blob,  and let the next Task pick the data afterwards. Very importantly, make sure your Airflow instance and your Cassandra deployment can communicate with each other in whichever network they are running on.

Cassandra.Link

Cassandra.Link is a knowledge base that we created for all things Apache Cassandra. Our goal with Cassandra.Link was to not only fill the gap of Planet Cassandra but to bring the Cassandra community together. Feel free to reach out if you wish to collaborate with us on this project in any capacity.

We are a technology company that specializes in building business platforms. If you have any questions about the tools discussed in this post or about any of our services, feel free to send us an email!


Join Anant's Newsletter

Subscribe to our monthly newsletter below and never miss the latest Cassandra and data engineering news!