On the Data Platform team at GoDaddy we use both Oozie and Airflow for scheduling jobs. In the past we’ve found each tool to be useful for managing data pipelines but are migrating all of our jobs to Airflow because of the reasons discussed below. In this article, I’ll give an overview of the pros and cons of using Oozie and Airflow to manage your data pipeline jobs. To help you get started with pipeline scheduling tools I’ve included some sample plugin code to show how simple it is to modify or add functionality in Airflow.
These tools (Oozie/Airflow) have many built-in functionalities compared to Cron.
These are some of the scenarios for which built-in code is available in the tools but not in cron:
With cron, you have to write code for the above functionality, whereas Oozie and Airflow provide it.
Apache Oozie is a workflow scheduler which uses Directed Acyclic Graphs (DAG) to schedule Map Reduce Jobs (e.g. Pig, Hive, Sqoop, Distcp, Java functions). It’s an open source project written in Java. When we develop Oozie jobs, we write bundle, coordinator, workflow, properties file. A workflow file is required whereas others are optional.
At GoDaddy, we use Hue UI for monitoring Oozie jobs.
Apache Airflow is another workflow scheduler which also uses DAGs. It’s an open source project written in python. Some of the features in Airflow are:
At GoDaddy, Customer Knowledge Platform team is working on creating docker for Airflow, so other teams can develop and maintain their own Airflow scheduler.
Airflow has so many advantages and there are many companies moving to Airflow. There is an active community working on enhancements and bug fixes for Airflow. A few things to remember when moving to Airflow:
We are using Airflow jobs for file transfer between filesystems, data transfer between databases, ETL jobs etc. We plan to move existing jobs on Oozie to Airflow.
Unlike Oozie you can add new funtionality in Airflow easily if you know python programming. Below I’ve written an example plugin that checks if a file exists on a remote server, and which could be used as an operator in an Airflow job. Airflow polls for this file and if the file exists then sends the file name to next task using xcom_push(). We often append data file names with the date so here I’ve used glob() to check for a file pattern.
This python file is added to plugins folder in Airflow home directory:
import os
import glob
from Airflow.plugins_manager import AirflowPlugin
from Airflow.utils.decorators import apply_defaults
from Airflow.operators.sensors import BaseSensorOperator
class FileSensorOperator(BaseSensorOperator):
@apply_defaults
def __init__(self, file_path, file_pattern, *args, **kwargs):
super(FileSensorOperator, self).__init__(*args, **kwargs)
self.file_path = file_path
self.file_pattern = file_pattern
# poke is standard method used in built-in operators
def poke(self, context):
file_location = self.file_path
file_name= self.file_pattern
for file in glob.glob(file_location + file_name):
if os.path.exists(file):
context['task_instance'].xcom_push('file_name', file_name)
self.log.info('file exists')
return True
self.log.info('file not exists')
return False
class FilePlugin(AirflowPlugin):
name = 'file_plugin'
operators = [FileSensorOperator]
The below code uses an Airflow DAGs (Directed Acyclic Graph) to demonstrate how we call the sample plugin implemented above. In this code the default arguments include details about the time interval, start date, and number of retries. You can add additional arguments to configure the DAG to send email on failure, for example.
The DAG is divided into 3 tasks.
from datetime import datetime, timedelta
from Airflow.models import Variable
from Airflow import DAG
from Airflow.operators import PythonOperator, ArchiveFileOperator
from Airflow.operators.file_plugin import FileSensorOperator
default_args = {
'owner': 'dag_developer',
'start_date': datetime.now(),
'provide_context': True,
'retries': 2,
'retry_delay': timedelta(seconds=30),
'max_active_runs': 1,
'schedule_interval': '30 18 * * *', #run everyday at 6:30 PM
}
dag = DAG('check_file_and_write_to_file', default_args=default_args)
file_path = Variable.get('source_path')
file_pattern = Variable.get('file_pattern')
archive_path = Variable.get('archive_path')
# Check fo the file pattern in the path, every 5 seconds. Send the exact file name to the next task(process_task)
sensor_task = FileSensorOperator(
task_id='file_sensor',
filepath=file_path,
filepattern=file_pattern,
poke_interval=5,
dag=dag)
# Read the file name from the previous task(sensor_task). Write "Fun scheduling with Airflow" to the file
def process_file(**context):
file_name = context['task_instance'].xcom_pull(
key='file_name', task_ids='file_sensor')
file = open(file_path + file_name, 'w')
file.write('Fun scheduling with Airflow')
file.close()
# Call python function which writes to file
proccess_task = PythonOperator(
task_id='process_the_file',
python_callable=process_file,
dag=dag)
# Archive file once write to file is complete
archive_task = ArchiveFileOperator(
task_id='archive_file',
filepath=file_path,
archivepath=archive_path,
dag=dag)
# This line tells the sequence of tasks called
sensor_task >> proccess_task >> archive_task # ">>" is airflow operator used to indicate sequence of the workflow
Our team has written similar plugins for data quality checks. Unlike Oozie, Airflow code allows code flexibility for tasks which makes development easy. If you’re thinking about scaling your data pipeline jobs I’d recommend Airflow as a great place to get started.
Photo credit: ‘Time‘ by Sean MacEntee on Flickr