boundary-layer : Declarative Airflow Workflows

Posted by on November 14, 2018

When Etsy decided last year to migrate our operations to Google Cloud Platform (GCP), one of our primary motivations was to enable our machine learning teams with scalable resources and the latest big-data and ML technologies. Early in the cloud migration process, we convened a cross-functional team between the Data Engineering and Machine Learning Infrastructure groups in order to design and build a new data platform focused on this goal.

One of the first choices our team faced was how to coordinate and schedule jobs across a menagerie of new technologies. Apache Airflow (incubating) was the obvious choice due to its existing integrations with GCP, its customizability, and its strong open-source community; however, we faced a number of open questions that had to be addressed in order to give us confidence in Airflow as a long-term solution.

First, Etsy had well over 100 existing Hadoop workflows, all written for the Apache Oozie scheduler. How would we migrate these to Airflow? Furthermore, how would we maintain equivalent copies of Oozie and Airflow workflows in parallel during the development and validation phases of the migration, without requiring our data scientists to pause their development work?

Second, writing workflows in Airflow (expressed in python as directed acyclic graphs, or DAGs) is non-trivial, requiring new and specialized knowledge. How would we train our dozens of internal data platform users to write Airflow DAGs? How would we provide automated testing capabilities to ensure that DAGs are valid before pushing them to our Airflow instances? How would we ensure that common best-practices are used by all team members? And how would we maintain and update those DAGs as new practices are adopted and new features made available?

Today we are pleased to introduce boundary-layer, the tool that we conceived and built to address these challenges, and that we have released to open-source to share with the Airflow community.

Introduction: Declarative Workflows

Boundary-layer is a tool that enables data scientists and engineers to write Airflow workflows in a declarative fashion, as YAML files rather than as python. Boundary-layer validates workflows by checking that all of the operators are properly parameterized, all of the parameters have the proper names and types, there are no cyclic dependencies, etc. It then translates the workflows into DAGs in python, for native consumption by Airflow.

Here is an example of a very simple boundary-layer workflow:

name: my-dag-1

default_task_args:
  start_date: '2018-10-01'

operators:
- name: print-hello
  type: bash
  properties:
    bash_command: "echo hello"
- name: print-world
  type: bash
  upstream_dependencies:
  - print-hello
  properties:
    bash_command: "echo world"

Boundary-layer translates this into python as a DAG with 2 nodes, each consisting of a BashOperator configured with the provided properties, as well as some auto-inserted parameters:

# Auto-generated by boundary-layer

import os
from airflow import DAG

import datetime

from airflow.operators.bash_operator import BashOperator

DEFAULT_TASK_ARGS = {
        'start_date': '2018-10-01',
    }

dag = DAG(
        dag_id = 'my_dag_1',
        default_args = DEFAULT_TASK_ARGS,
    )

print_hello = BashOperator(
        dag = (dag),
        bash_command = 'echo hello',
        start_date = (datetime.datetime(2018, 10, 1, 0, 0)),
        task_id = 'print_hello',
    )


print_world = BashOperator(
        dag = (dag),
        bash_command = 'echo world',
        start_date = (datetime.datetime(2018, 10, 1, 0, 0)),
        task_id = 'print_world',
    )

print_world.set_upstream(print_hello)

Note that boundary-layer inserted all of the boilerplate of python class imports and basic DAG and operator configuration. Additionally, it validated parameter names and types according to schemas, and applied type conversions when applicable (in this case, it converted date strings to datetime objects).

Generators

Moving from python-based to configuration-based workflows naturally imposes a functionality penalty. One particularly valuable feature of python-based DAGs is the ability to construct them dynamically: for example, nodes can be added and customized by iterating over a list of values. We make extensive use of this functionality ourselves, so it was important to build a mechanism into boundary-layer to enable it.

Boundary-layer generators are the mechanism we designed for dynamic workflow construction. Generators are complete, distinct sub-workflows that take a single, flexibly-typed parameter as input. Each generator must prescribe a mechanism for generating a list of values: for example, lists of items can be retrieved from an API via an HTTP GET request. The python code written by boundary-layer will iterate over the list of generator parameter values and create one instance of the generator sub-workflow for each value. Below is an example of a workflow that incorporates a generator:

name: my-dag-2

default_task_args:
  start_date: '2018-10-01'

generators:
- name: retrieve-and-copy-items
  type: requests_json_generator
  target: sense-and-run
  properties:
    url: http://my-url.com/my/file/list.json
    list_json_key: items

operators:
- name: print-message
  type: bash
  upstream_dependencies:
  - retrieve-and-copy-items
  properties:
    bash_command: echo "all done"
---
name: sense-and-run

operators:
- name: sensor
  type: gcs_object_sensor
  properties:
    bucket: <<item['bucket']>>
    object: <<item['name']>>
- name: my-job
  type: dataproc_hadoop
  properties:
    cluster_name: my-cluster
    region: us-central1
    main_class: com.etsy.jobs.MyJob
    arguments:
    - <<item['name']>>

This workflow retrieves the content of the specified JSON file, extracts the items field from it, and then iterates over the objects in that list, creating one instance of all of the operators in the sense-and-run sub-graph per object.

Note the inclusion of several strings of the form  << ... >>.  These are boundary-layer verbatim strings, which allow us to insert inline snippets of python into the rendered DAG. The item value is the sub-workflow’s parameter, which is automatically supplied by boundary-layer to each instance of the sub-workflow.

Also note that generators can be used in dependency specifications, as indicated by the print-message operator’s upstream_dependencies block. Generators can even be set to depend on other generators, which boundary-layer will encode efficiently, without creating a combinatorially-exploding set of edges in the DAG.

Advanced features

Under the hood, boundary-layer represents its workflows using the powerful networkx library, and this enables a variety of features that require making computational modifications to the graph, adding usability enhancements that go well beyond the core functionality of Airflow itself.

A few of the simpler features that modify the graph include before and after sections of the workflow, which allow us to specify a set of operators that should always be run upstream or downstream of the primary list of operators. For example, one of our most common patterns in workflow construction is to put various sensors in the before block, so that it is not necessary to specify and maintain explicit upstream dependencies between the sensors and the primary operators. Boundary-layer automatically attaches these sensors and adds the necessary dependency rules to make sure that no primary operators execute until all of the sensors have completed.

Another feature of boundary-layer is the ability to prune nodes out of workflows, while maintaining all dependency relationships between the nodes that remain. This was especially useful during the migration of our Oozie workflows. It allowed us to isolate portions of those workflows for running in Airflow and gradually add more portions in stages, until the workflows were fully migrated, without ever having to create the portioned workflows as separate entities.

One of the most useful advanced features of boundary-layer is its treatment of managed resources. We make extensive use of ephemeral, workflow-scoped Dataproc clusters on the Etsy data platform. These clusters are created by Airflow, shared by various jobs that Airflow schedules, and then deleted by Airflow once those jobs are complete. Airflow itself provides no first-class support for managed resources, which can be tricky to configure properly: we must make sure that the resources are not created before they are needed, and that they are deleted as soon as they are not needed anymore, in order to avoid accruing costs for idle clusters. Boundary-layer handles this automatically, computing the appropriate places in the DAG into which to splice the resource-create and resource-destroy operations. This makes it simple to add new jobs or remove old ones, without having to worry about keeping the cluster-create and cluster-destroy steps always installed in the proper locations in the workflow.

Below is an example of a boundary-layer workflow that uses Dataproc resources:

name: my-dag-3

default_task_args:
  start_date: '2018-10-01'
  project_id: my-gcp-project

resources:
- name: dataproc-cluster
  type: dataproc_cluster
  properties:
    cluster_name: my-cluster
    region: us-east1
    num_workers: 128

before:
- name: sensor
  type: gcs_object_sensor
  properties:
    bucket: my-bucket
    object: my-object

operators:
- name: my-job-1
  type: dataproc_hadoop
  requires_resources:
  - dataproc-cluster
  properties:
    main_class: com.etsy.foo.FooJob
- name: my-job-2
  type: dataproc_hadoop
  requires_resources:
  - dataproc-cluster
  upstream_dependencies:
  - my-job-1
  properties:
    main_class: com.etsy.bar.BarJob
- name: copy-data
  type: gcs_to_gcs
  upstream_dependencies:
  - my-job-2
  properties:
    source_bucket: my-bucket
    source_object: my-object
    dest_bucket: your-bucket

In this DAG, the gcs_object_sensor runs first, then the cluster is created, then the two hadoop jobs run in sequence, and then the job’s output is copied while the cluster is simultaneously deleted.

Of course, this is just a simple example; we have some complex workflows that manage multiple ephemeral clusters, with rich dependency relationships, all of which are automatically configured by boundary-layer. For example, see the figure below: this is a real workflow that runs some hadoop jobs on one cluster while running some ML training jobs in parallel on an external service, and then finally runs more hadoop jobs on a second cluster. The complexity of the dependencies between the training jobs and downstream jobs required boundary-layer to insert several flow-controloperators in order to ensure that the downstream jobs start only once all of the upstream dependencies are met.

Conversion from Oozie

One of our primary initial concerns was the need to be able to migrate our Oozie workflows to Airflow. This had to be an automated process, because we knew we would have to repeatedly convert workflows in order to keep them in-sync between our on-premise cluster and our GCP resources while we developed and built confidence in the new platform. The boundary-layer workflow format is not difficult to reconcile with Oozie’s native configuration formats, so boundary-layer is distributed with a parser that does this conversion automatically. We built tooling to incorporate the converter into our CI/CD processes, and for the duration of our cloud validation and migration period, we maintained perfect fidelity between on-premise Oozie and cloud-based Airflow DAGs.

Extensibility

A final requirement that we targeted in the development of boundary-layer is that it must be easy to add new types of operators, generators, or resources. It must not be difficult to modify or add to the operator schemas or the configuration settings for the resource and generator abstractions. After all, Airflow’s huge open-source community (including several Etsy engineers!) ensures that its list of supported operators is growing practically every day. In addition, we have our own proprietary set of operators for Etsy-specific purposes, and we must keep the configurations for these out of the public boundary-layer distribution. We satisfied these requirements via two design choices.

First, every operator, generator, or resource is represented by a single configuration file, and these files get packaged up with boundary-layer. Adding a new operator/generator/resource is accomplished simply by adding a new configuration file. Here is an example configuration, in this case for the AirflowBashOperator:

name: bash
operator_class: BashOperator
operator_class_module: airflow.operators.bash_operator
schema_extends: base

parameters_jsonschema:
  properties:
    bash_command:
      type: string
    
    xcom_push:
      type: boolean
    
    env:
      type: object
      additionalProperties:
        type: string
    
    output_encoding:
      type: string
  
  required:
  - bash_command
  
  additionalProperties: false

We use standard JSON Schemas to specify the parameters to the operator, and we use a basic single-inheritance model to centralize the specification of common parameters in theBaseOperator, as is done in the Airflow code itself.

Second, we implemented a plugin mechanism based on python’s setuptools entrypoints. All of our internal configurations are integrated into boundary-layer via plugins. We package a single default plugin with boundary layer that contains configurations for common open-source Airflow operators. Other plugins can be added by packaging them into separate python packages, as we have done internally with our Etsy-customized plugin. The plugin mechanism has grown to enable quite extensive workflow customizations, which we use at Etsy in order to enable the full suite of proprietary modifications used on our platform.

Conclusion

The boundary-layer project has been a big success for us. All of the nearly-100 workflows that we deploy to our production Airflow instances are written as boundary-layer configurations, and our deployment tools no longer even support python-based DAGs. Boundary-layer’s ability to validate workflow configurations and abstract away implementation details has enabled us to provide a self-service Airflow solution to our data scientists and engineers, without requiring much specialized knowledge of Airflow itself. Over 30 people have contributed to our internal Airflow workflow repository, with minimal process overhead (Jenkins is the only “person” who must approve pull requests), and without having deployed a single invalid DAG.

We are excited to release boundary-layer to the public, in hopes that other teams find it similarly useful. We are committed to supporting it and continuing to add new functionality, so drop us a github issue if you have any requests. And of course, we welcome community contributions as well!

Posted by on November 14, 2018
Category: data, engineering, infrastructure

No responses yet. You could be the first!