Introducing Arbiter: A Utility for Generating Oozie Workflows

Posted by on December 16, 2015

At Etsy we have been using Apache Oozie for managing our production workflows on Hadoop for several years. We’ve even recently started using Oozie for managing our ad hoc Hadoop jobs as well. Oozie has worked very well for us and we currently have several dozen distinct workflows running in production. However, writing these workflows by hand has been a pain point for us. To address this, we have created Arbiter, a utility for generating Oozie workflows.

The Problem

Oozie workflows are specified in XML. The Oozie documentation has an extensive overview of writing workflows, but there are a few things that are helpful to know. A workflow begins with the start node:

<start to="fork-2"/>

Each job or other task in the workflow is an action node within a workflow. There are some built-in actions for running MapReduce jobs, standard Java main classes, etc. and you can also define custom action types. This is an example action:

<action name="transactional_lifecycle_email_stats">
    <java>
      <job-tracker>${jobTracker}</job-tracker>
      <name-node>${nameNode}</name-node>
      <configuration>
        <property>
          <name>mapreduce.job.queuename</name>
          <value>rollups</value>
        </property>
      </configuration>
      <main-class>com.etsy.db.VerticaRollupRunner</main-class>
      <arg>--file</arg>
      <arg>transactional_lifecycle_email_stats.sql</arg>
      <arg>--frequency</arg>
      <arg>daily</arg>
      <arg>--category</arg>
      <arg>regular</arg>
      <arg>--env</arg>
      <arg>${cluster_env}</arg>
    </java>
    <ok to="join-2"/>
    <error to="join-2"/>
</action>

Each action defines a transition to take upon success and a (possibly) different transition to take upon failure:

<ok to="join-2"/>
<error to="join-2"/>

To have actions run in parallel, a fork node can be used. All the actions specified in the fork will be run in parallel:

<fork name="fork-2">
    <path start="fork-0"/>
    <path start="transactional_lifecycle_email_stats"/>
</fork>

After these actions there must be a join node to wait for all the forked actions to finish:

<join name="join-2" to="screamapillar"/>

Finally, a workflow ends by transitioning to either the end or kill nodes, for a successful or unsuccessful result, respectively:

<kill name="kill">
    <message>Workflow email-rollups has failed with msg: [${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
<end name="end"/>

Here is a complete example of one of our shorter workflows:

<?xml version="1.0" encoding="UTF-8" standalone="no"?>
<workflow-app xmlns="uri:oozie:workflow:0.2" name="email-rollups">
  <start to="fork-2"/>
  <fork name="fork-2">
    <path start="fork-0"/>
    <path start="transactional_lifecycle_email_stats"/>
  </fork>
  <action name="transactional_lifecycle_email_stats">
    <java>
      <job-tracker>${jobTracker}</job-tracker>
      <name-node>${nameNode}</name-node>
      <configuration>
        <property>
          <name>mapreduce.job.queuename</name>
          <value>rollups</value>
        </property>
      </configuration>
      <main-class>com.etsy.db.VerticaRollupRunner</main-class>
      <arg>--file</arg>
      <arg>transactional_lifecycle_email_stats.sql</arg>
      <arg>--frequency</arg>
      <arg>daily</arg>
      <arg>--category</arg>
      <arg>regular</arg>
      <arg>--env</arg>
      <arg>${cluster_env}</arg>
    </java>
    <ok to="join-2"/>
    <error to="join-2"/>
  </action>
  <join name="join-2" to="screamapillar"/>
  <action name="screamapillar">
    <java>
      <job-tracker>${jobTracker}</job-tracker>
      <name-node>${nameNode}</name-node>
      <configuration>
        <property>
          <name>mapreduce.job.queuename</name>
          <value>${queueName}</value>
        </property>
        <property>
          <name>mapreduce.map.output.compress</name>
          <value>true</value>
        </property>
      </configuration>
      <main-class>com.etsy.oozie.Screamapillar</main-class>
      <arg>--workflow-id</arg>
      <arg>${wf:id()}</arg>
      <arg>--recipient</arg>
      <arg>fake_email</arg>
      <arg>--sender</arg>
      <arg>fake_email</arg>
      <arg>--env</arg>
      <arg>${cluster_env}</arg>
    </java>
    <ok to="end"/>
    <error to="kill"/>
  </action>
  <fork name="fork-0">
    <path start="email_campaign_stats"/>
    <path start="user_language"/>
  </fork>
  <action name="user_language">
    <java>
      <job-tracker>${jobTracker}</job-tracker>
      <name-node>${nameNode}</name-node>
      <configuration>
        <property>
          <name>mapreduce.job.queuename</name>
          <value>rollups</value>
        </property>
      </configuration>
      <main-class>com.etsy.db.VerticaRollupRunner</main-class>
      <arg>--file</arg>
      <arg>user_language.sql</arg>
      <arg>--frequency</arg>
      <arg>daily</arg>
      <arg>--category</arg>
      <arg>regular</arg>
      <arg>--env</arg>
      <arg>${cluster_env}</arg>
    </java>
    <ok to="join-0"/>
    <error to="join-0"/>
  </action>
  <join name="join-0" to="fork-1"/>
  <fork name="fork-1">
    <path start="email_overview"/>
    <path start="trans_email_overview"/>
  </fork>
  <action name="trans_email_overview">
    <java>
      <job-tracker>${jobTracker}</job-tracker>
      <name-node>${nameNode}</name-node>
      <configuration>
        <property>
          <name>mapreduce.job.queuename</name>
          <value>rollups</value>
        </property>
      </configuration>
      <main-class>com.etsy.db.VerticaRollupRunner</main-class>
      <arg>--file</arg>
      <arg>trans_email_overview.sql</arg>
      <arg>--frequency</arg>
      <arg>daily</arg>
      <arg>--category</arg>
      <arg>regular</arg>
      <arg>--env</arg>
      <arg>${cluster_env}</arg>
    </java>
    <ok to="join-1"/>
    <error to="join-1"/>
  </action>
  <join name="join-1" to="join-2"/>
  <action name="email_overview">
    <java>
      <job-tracker>${jobTracker}</job-tracker>
      <name-node>${nameNode}</name-node>
      <configuration>
        <property>
          <name>mapreduce.job.queuename</name>
          <value>rollups</value>
        </property>
      </configuration>
      <main-class>com.etsy.db.VerticaRollupRunner</main-class>
      <arg>--file</arg>
      <arg>zz_email_overview.sql</arg>
      <arg>--frequency</arg>
      <arg>daily</arg>
      <arg>--category</arg>
      <arg>regular</arg>
      <arg>--env</arg>
      <arg>${cluster_env}</arg>
    </java>
    <ok to="join-1"/>
    <error to="join-1"/>
  </action>
  <action name="email_campaign_stats">
    <java>
      <job-tracker>${jobTracker}</job-tracker>
      <name-node>${nameNode}</name-node>
      <configuration>
        <property>
          <name>mapreduce.job.queuename</name>
          <value>rollups</value>
        </property>
      </configuration>
      <main-class>com.etsy.db.VerticaRollupRunner</main-class>
      <arg>--file</arg>
      <arg>zz_email_campaign_stats.sql</arg>
      <arg>--frequency</arg>
      <arg>daily</arg>
      <arg>--category</arg>
      <arg>regular</arg>
      <arg>--env</arg>
      <arg>${cluster_env}</arg>
    </java>
    <ok to="join-0"/>
    <error to="join-0"/>
  </action>
  <kill name="kill">
    <message>Workflow email-rollups has failed with msg: [${wf:errorMessage(wf:lastErrorNode())}]</message>
  </kill>
  <end name="end"/>
</workflow-app>

Having the workflows be defined in XML has been very helpful. We have several validation and visualization tools in multiple languages that can parse the XML and produce useful results without being tightly coupled to Oozie itself. However, the XML is not as useful for the people that work with it. First, it is very verbose. Each new action adds about 20 lines of XML to the workflow, much of which is boilerplate. As a result, our workflows average around 200 lines and the largest is almost 1800 lines long. This also makes it hard for someone to read the workflow and understand what the workflow does and the flow of execution.

Next, defining the flow of execution can be tricky. It is natural to think about the dependencies between actions. Oozie workflows, however, are not specified in terms of these dependencies. The workflow author must satisfy these dependencies by configuring the workflow to run the actions in the proper order. For simple workflows this may not be a problem, but can quickly become complex. Moreover, the author must manually manage parallelism by inserting forks and joins. This makes modifying the workflow more complex. We have found that it’s easy to miss adding an action to a fork, resulting in an orphaned action that doesn’t get run. Another common problem we’ve had with forks is that a single-action fork is considered invalid by Oozie, which means removing the second-last action from a fork requires removing the fork and join entirely.

Introducing Arbiter

Arbiter was created to solve these problems. XML is very amenable to being produced automatically, so there is the opportunity to write the workflows in another format and produce the final workflow definition. We considered several options, but ultimately settled on YAML. There are robust YAML parsers in many languages and we considered it easier for people to read than JSON. We also considered a Scala-based DSL, but we wanted to stick with a markup language for language-agnostic parsing.

Writing Workflows

Here is the same example workflow from above written in Arbiter’s YAML format:

---
name: email-rollups
errorHandler:
  name: screamapillar
  type: screamapillar
  recipients: fake_email
  sender: fake_email
actions:
  - name: email_campaign_stats
    type: rollup
    rollup_file: zz_email_campaign_stats.sql
    category: regular
    dependencies: []
  - name: trans_email_overview
    type: rollup
    rollup_file: trans_email_overview.sql
    category: regular
    dependencies: [email_campaign_stats, user_language]
  - name: email_overview
    type: rollup
    rollup_file: zz_email_overview.sql
    category: regular
    dependencies: [email_campaign_stats, user_language]
  - name: user_language
    type: rollup
    rollup_file: user_language.sql
    category: regular
    dependencies: []
  - name: transactional_lifecycle_email_stats
    type: rollup
    rollup_file: transactional_lifecycle_email_stats.sql
    category: regular
    dependencies: []

The translation of the YAML to XML is highly dependent on the configuration given to Arbiter, which we will cover in the next section. However, there are several points to consider now. First, the YAML definition is only about 20% of the length of the XML. Since the workflow definition is much shorter, it’s easier for someone to read it and understand what the workflow does. In addition, none of the flow control nodes need to be manually specified. Arbiter will insert the start, end, and kill nodes in the correct locations. Forks and joins will also be inserted when actions can be run in parallel.

Most importantly, however, the workflow author can directly specify the dependencies between actions, instead of the order of execution. Arbiter will handle ordering the actions in such a way to satisfy all the given dependencies.

In addition to the standard workflow actions, Arbiter allows you to define an “error handler” action. It will automatically insert this action before any transitions to the end or kill nodes in the workflow. We use this to send an email alert with details about the success and failure of the workflow actions. If these are omitted the workflow will transition directly to the end or kill nodes as appropriate.

Configuration

The mapping between a YAML workflow definition and the final XML is controlled by configuration files. These are also specified in YAML. Here is an example configuration file to accompany the example workflow given above:

---
killName: kill
killMessage: "Workflow $$name$$ has failed with msg: [${wf:errorMessage(wf:lastErrorNode())}]"
actionTypes:
  - tag: java
    name: rollup
    configurationPosition: 2
    properties: {"mapreduce.job.queuename": "rollups"}
    defaultArgs: {
      job-tracker: ["${jobTracker}"],
      name-node: ["${nameNode}"],
      main-class: ["com.etsy.db.VerticaRollupRunner"],
      arg: ["--file", "$$rollup_file$$", "--frequency", "daily", "--category", "$$category$$", "--env", "${cluster_env}"]
    }
  - tag: sub-workflow
    name: sub-workflow
    defaultArgs: {
      app-path: ["$$workflowPath$$"],
      propagate-configuration: []
    }
  - tag: java
    name: screamapillar
    configurationPosition: 2
    properties: {"mapreduce.job.queuename": "${queueName}", "mapreduce.map.output.compress": "true"}
    defaultArgs: {
      job-tracker: ["${jobTracker}"],
      name-node: ["${nameNode}"],
      main-class: ["com.etsy.oozie.Screamapillar"],
      arg: ["--workflow-id", "${wf:id()}", "--recipient", "$$recipients$$", "--sender", "$$sender$$", "--env", "${cluster_env}"]
    }

The key part of the configuration file is the actionTypes setting. Each action type will map to a certain action type in the XML workflow. However, multiple Arbiter action types can map to the same Oozie action type, such as the screamapillar and rollup action types both mapping to the Oozie java action type. This allows you to have meaningful action types in the YAML workflow definitions without the overhead of actually creating custom Oozie action types. Let’s review the parts of an action type definition:

  - tag: java
    name: rollup
    configurationPosition: 2
    properties: {"mapreduce.job.queuename": "rollups"}
    defaultArgs: {
      job-tracker: ["${jobTracker}"],
      name-node: ["${nameNode}"],
      main-class: ["com.etsy.db.VerticaRollupRunner"],
      arg: ["--file", "$$rollup_file$$", "--frequency", "daily", "--category", "$$category$$", "--env", "${cluster_env}"]
    }

The tag key defines the action type tag in the workflow XML. This can be one of the built-in action types like java, or a custom Oozie action type. Arbiter does not need to be made aware of custom Oozie action types. The name key defines the name of this action type, which will be used to set the type of actions in the workflow definition. If the Oozie action type accepts configuration properties from the workflow XML, these are controlled by the configurationPosition and properties keys. properties defines the actual configuration properties that will be applied to every action of this type, and configurationPosition defines where in the generated XML for the action the configuration tag should be placed. The defaultArgs key defines the default elements of the generated XML for actions of this type. The keys are the names of the XML tags, and the values are lists of the values for that tag. Even tags that can appear only once must be specified as a list.

You can also define properties to be populated from values set in the workflow definition. Any string surrounded by $$ will be interpolated in this way. $$rollup_file$$ and $$category$$ are examples of doing so in this configuration file. These will be populated with the values of the rollup_file and category keys from a rollup action in the workflow definition.

Using this configuration file, we could write an action like the following in the YAML workflow definition:

  - name: email_campaign_stats
    type: rollup
    rollup_file: zz_email_campaign_stats.sql
    category: regular
    dependencies: []

Arbiter would then translate this action to the following XML:

<action name="email_campaign_stats">
    <java>
      <job-tracker>${jobTracker}</job-tracker>
      <name-node>${nameNode}</name-node>
      <configuration>
        <property>
          <name>mapreduce.job.queuename</name>
          <value>rollups</value>
        </property>
      </configuration>
      <main-class>com.etsy.db.VerticaRollupRunner</main-class>
      <arg>--file</arg>
      <arg>zz_email_campaign_stats.sql</arg>
      <arg>--frequency</arg>
      <arg>daily</arg>
      <arg>--category</arg>
      <arg>regular</arg>
      <arg>--env</arg>
      <arg>${cluster_env}</arg>
    </java>
    <ok to="join-0"/>
    <error to="join-0"/>
</action>

Arbiter also allows you to specify the name of the kill node and the message it logs with the killName and killMessage properties.

How Arbiter Generates Workflows

Arbiter builds a directed graph of all the actions from the workflow definition it is processing. The vertices of the graph are the actions and the edges are dependencies. The direction of the edge is from the dependency to the dependent action to represent the desired flow of execution. Oozie workflows are required to be acyclic, so if a cycle is detected Arbiter will throw an exception.

The directed graph that Arbiter builds will be made up of one or more weakly connected components. This is the graph from the example workflow above, which has two such components:
An example of a graph that is input to Arbiter
Each of these components is processed independently. First, any vertices with no incoming edges are removed from the graph and inserted into a new result graph. If there is more than one vertex removed Arbiter will also insert a fork/join pair to run them in parallel. Having removed those vertices, the original component will now have been split into one or more new weakly connected components. Each of these components is then recursively processed in this same way.

Once every component has been processed, Arbiter then combines these independent components until it has produced a complete graph. Since these components were initially not connected, they can be run in parallel. If there is more than component, Arbiter will insert a fork/join pair. This results in the following graph for the example workflow, showing the ok transitions between nodes:

An example workflow graph produced by Arbiter
This algorithm biases Arbiter towards satisfying the dependencies between actions over achieving optimal parallelism. In general this algorithm still produces good parallelism, but in certain cases (such as a workflow with one action that depends on every other action), it can degenerate to a fairly linear flow. While it is a conservative choice, this algorithm has still worked out well for most of our workflows and has the advantage of being straightforward to follow in case the generated workflow is incorrect or unusual.

Once this process has finished all the flow control nodes will be present in the workflow graph. Arbiter can then translate this into the XML using the provided configuration files.

Get Arbiter

Arbiter is now available on Github! We’ve been using Arbiter internally already and it’s been very useful for us. If you’re using Oozie we hope Arbiter will be similarly useful for you and welcome any feedback or contributions you have!

Posted by on December 16, 2015
Category: data, engineering, infrastructure Tags: ,

Related Posts

10 Comments

Interesting stuff.
Can you explain me in detail what happens if in the example above the ‘user_language’ action fails ? How is the error propagated (it is, right ?) ? Are the ’email_overview’ and ‘trans_email_overview’ actions still run ?

    In that case we’re limited by what kind of transitions Oozie allows for actions inside a fork/join. You either have to unconditionally transition to the kill node, or transition to the join node. Transitioning to the kill node would stop the workflow if the user_language action fails, but it would also kill any actions running in parallel with it. We generally don’t want that for our workflows, so Arbiter will take the second option of transitioning to the join node. This does mean that the downstream email_overview and trans_email_overview actions will run (likely failing due to their dependency not being satisfied). In cases where this is particularly expensive or problematic, we move that fork/join into a sub-workflow. The parent workflow can then get a single success/failure for the whole sub-workflow and more easily skip downstream actions. It’s also possible to use decision nodes to make more complex decisions based on upstream failures.

      In our current approach we go to the ‘join’-node upon failure. After the join node we have a ‘decision’ node which checks on ‘lastErrorNode’. If that one returns a non-empty string, we go to the kill node.

      We do the same in some cases. We’ve found having the sub-workflows to be helpful in ways other than managing workflow transitions, but using decision nodes in this way is good as well.

hi Andrew,

would you please let me know how do you define prepare with mkdir path and delete path in an action.

    That’s not something that’s currently supported. You can open a issue on the Github project and if you’d like to take a look at adding that we always welcome contributions!

      would you please try and explain in detail about “errorhandler” action in detail.

      You can configure the error handler to be any action type. Arbiter will then ensure that it is the final action to run in the workflow regardless of the rest of the workflow structure. We use this to do reporting on the status of the workflow – look for failed actions, generate links to the logs, and send an email to the appropriate group. It can be whatever makes sense for your use of Oozie though. You also don’t have to have one; if it’s omitted the workflow will be generated the same way, just transitioning directly to end/kill nodes as appropriate.

hi Johnson,
would you please share some insight on how you were able to send email from screamapillar jar action. Are you using smtp server installed on the client on which oozie is running ?

    We’re not actually using any kind of local SMTP server for sending those emails. We use Gearman fairly heavily at Etsy and we had existing setup to send emails via Gearman. The Screamapillar action builds the body of the email and kicks off a Gearman job to handle the actual email send.