Etsy Icon>

Code as Craft

Sahale: Visualizing Cascading Workflows at Etsy main image

Sahale: Visualizing Cascading Workflows at Etsy

  image

The Problem

If you know anything about Etsy engineering culture, you know we like to measure things. Frequent readers of Code As Craft have become familiar with Etsy mantras like “If it moves, graph it."

When I arrived at Etsy, our Hadoop infrastructure, like most systems at Etsy, was well instrumented at the operational level via Ganglia and the standard suite of exported Hadoop metrics, but was still quite opaque to our end users, the data analysts.

Our visibility problem had to do with a tooling decision made by most Hadoop shops: to abandon tedious and counterintuitive “raw MapReduce” coding in favor of a modern Domain Specific Language like Pig, Hive, or Scalding.

At Etsy, DSL programming on Hadoop has been a huge win. A DSL enables developers to construct complex Hadoop workflows in a fraction of the time it takes to write and schedule a DAG of individual MapReduce jobs. DSLs abstract away the complexities of MapReduce and allow new developers to ramp up on Hadoop quickly.

Etsy’s DSL of choice, Scalding, is a Scala-based wrapper for the Cascading framework. It features a vibrant community, an expressive syntax, and a growing number of large companies like Twitter scaling it out in production.

However, DSLs on Hadoop share a couple of drawbacks that must be addressed with additional tooling. The one that concerned users at Etsy the most was a lack of runtime visibility into the composition and behavior of their Scalding jobs.

The core of the issue is this: Hadoop only understands submitted work at the granularity of individual MapReduce jobs. The JobTracker (or ResourceManager on Hadoop2) is blissfully unaware of the relationships among tasks executed on the cluster, and its web interface reflects this ignorance:

Figure 1: YARN Resource Manager. It’s 4am – do you know where your workflow is?

A bit dense, isn’t it? This is fine if you are running one raw MapReduce job at a time, but the Cascading framework compiles each workflow down into a set of interdependent MapReduce jobs. These jobs are submitted to the Hadoop cluster asynchronously, in parallel, and ordered only by data dependencies between them.

The result: Etsy users tracking a Cascading workflow via the JobTracker/ResourceManager interface did not have a clear idea how their source code became a Cascading workflow plan, how their Cascading workflows mapped to jobs submitted to Hadoop, which parts of the workflow were currently executing, or how to determine what happened when things went badly.

This situation also meant any attempt to educate users about optimizing workflows, best practices, or MapReduce itself was relegated to a whiteboard, Confluence doc, or (worse) an IRC channel, soon to be forgotten.

What Etsy needed was a user-facing tool for tracking cluster activity at the granularity of Cascading workflows, not MapReduce jobs - a tool that would make core concepts tangible and allow for proactive, independent assessment of workflow behavior at runtime.

Etsy’s Data Platform team decided to develop some internal tooling to solve these problems. The guidelines we used were:

  • Developers should be empowered to develop an intuitive understanding of their Cascading jobs through workflow visualizations, and charted job metrics.
  • Developers should be able to evaluate for themselves the resource impact their workflows have on the cluster. Metrics presented should be relevant to end users.
  • Developers should be able to easily identify the particular MapReduce job in a Cascading workflow that causes a failure during execution.
  • Developers should be able to quickly locate and access Hadoop job logs that are most relevant to a workflow failure.
  • Developers should be able to visualize both running and completed workflows.
  • Developers should have access to a historical view of completed executions to map changes in job code to runtime behavior over many dev cycles.

Fast forward to now: I am pleased to announce Etsy’s open-source release of Sahale, a tool for visualizing Cascading workflows, to you, the Hadooping public!

Design Overview

Let’s take a look at how it works under the hood. The design we arrived at involves several discrete components which are deployed separately, as illustrated below:

Figure 2: Design overview.

The components of the tool are:

  • Sahale, a NodeJS server that collects incoming JSON data dispatched by clients (Cascading jobs) and updates database tables accordingly. The same server also exposes the browser-based user interface for end users.
  • FlowTracker, a small Scala project packaged with Sahale, that produces a JAR file you can include in your Cascading jobs to track and report metrics from running jobs.
  • A MySQL database available to the Sahale app.

The workflow is as follows:

  1. A User launches a Cascading workflow, and a FlowTracker is instantiated.
  2. The FlowTracker begins to dispatch periodic job metric updates to Sahale.
  3. Sahale commits incoming metrics to the database tables.
  4. The user points her browser to the Sahale web app.

About FlowTracker

That’s all well and good, but how does the FlowTracker capture workflow metrics at runtime?

First, some background. In the Cascading framework, each Cascade is a collection of one or more Flows. Each Flow is compiled by Cascading into one or more MapReduce jobs, which the user’s client process submits to the Hadoop cluster. With a reference to the running Flow, we can track various metrics about the Hadoop jobs the Cascading workflow submits.

Sahale attaches a FlowTracker instance to each Flow in the Cascade. The FlowTracker can capture its reference to the Flow in a variety of ways depending on your preferred Cascading DSL.

By way of an example, let’s take a look at Sahale’s TrackedJob class. Users of the Scalding DSL need only inherit TrackedJob to visualize their own Scalding workflows with Sahale. In the TrackedJob class, Scalding’s Job.run method is overridden to capture a reference to the Flow at runtime, and a FlowTracker is launched in a background thread to handle the tracking:


/**
 * Your jobs should inherit from this class to inject job tracking functionality.
 */
class TrackedJob(args: Args) extends com.twitter.scalding.Job(args) {
  @transient private val done = new AtomicBoolean(false)
override def run(implicit mode: Mode) = {
    mode match {
      // only track Hadoop cluster jobs marked "--track-job"
      case Hdfs(_, _) => if (args.boolean("track-job")) runTrackedJob else super.run
      case _ => super.run
    }
  }
  def runTrackedJob(implicit mode: Mode) = {
    try {
      val flow = buildFlow
      trackThisFlow(flow)
      flow.complete
      flow.getFlowStats.isSuccessful // return Boolean
    } catch {
      case t: Throwable => throw t
    } finally {
      // ensure all threads are cleaned up before we propagate exceptions or complete the run.
      done.set(true)
      Thread.sleep(100)
    }
  }
  private def trackThisFlow(f: Flow[_]): Unit = { (new Thread(new FlowTracker(f, done))).start }
}

The TrackedJob class is specific to the Scalding DSL, but the pattern is easy to extend. We have used the tool internally to track Cascading jobs generated by several different DSLs.

Sahale selects a mix of aggregated and fine-grained metrics from the Cascading Flow and the underlying Hadoop MapReduce jobs it manages. The FlowTracker also takes advantage of the Cascading client’s caching of recently-polled job data whenever possible.

This approach is simple, and has avoided incurring prohibitive data transfer costs or latency, even when many tracked jobs are executing at once. This has proven critical at Etsy where it is common to run many 10’s or 100’s of jobs simultaneously on the same Hadoop cluster. The data model is also easily extendable if additional metrics are desired. Support for a larger range of default Hadoop counters is forthcoming.

About Sahale

Figure 3: Workflow overview page.

The Sahale web app’s home page consists of two tables. The first enumerates running Cascading workflows. The second displays all recently completed workflows. Each table entry includes a workflow’s name, the submitting user, start time, job duration, the number of MapReduce jobs in the workflow, workflow status, and workflow progress.

The navigation bar at the top of the page provides access to the help page to orient new users, and a search feature to expose the execution history for any matching workflows.

Each named workflow links to a detail page that exposes a richer set of metrics:

Figure 4: Workflow details page, observing a job in progress.

The detail page exposes the workflow structure and job metrics for a single run of a single Cascading workflow. As before, the navigation bar provides links to the help page, back to the overview page, and to a history view for all recent runs of the workflow.

The detail page consists of five panels:- A table-row style record exposing summary stats similar to the overview page.

  • A dual series area chart (left-top panel) displaying various per-step Cascading metrics.
  • A stacked bar chart (right-top panel) depicting per-step running times.
  • A graph view of the Cascading workflow’s job plan. Vertices represent individual MapReduce jobs, edges are data dependencies. Job status is exposed via color.
  • A detailed set of job metrics for a single workflow step, including progress indicators, source and sink field schemas, and handy links to Hadoop logs.

Users can leverage these displays to access a lot of actionable information as a workflow progresses, empowering them to quickly answer questions like:- Which stages of my workflow are slowest and could benefit from optimization?

  • Which are the most expensive stages of my job in terms of time, cluster resources, volume of data processed, etc.?
  • Which stages of my workflow are benefitting from data locality, and which are not?
  • How much of my workflow can be submitted in parallel, and how much must be serialized due to data dependencies.
  • Could I restructure my source code to help Cascading generate a simpler or more parallelizable workflow plan? Which parts?

In the event of a workflow failure, even novice users can identify the offending job stage or stages as easily as locating the red nodes in the graph view. Selecting failed nodes provides easy access to the relevant Hadoop job logs, and the Sources/SInks tab makes it easy to map a single failed Hadoop job back to the user’s source code. Before Sahale this was a frequent pain point for Hadoop users at Etsy.

Figure 5: A failed workflow. The offending MapReduce job is easy to identify and drill down into.

If a user notices a performance regression during iterative workflow development, the Job History link in the navigation bar will expose a comparative view of all recent runs of the same workflow:

Figure 6: History page, including aggregated per-workflow metrics for easy comparisons.

Here, users can view a set of bar graphs comparing various aggregated metrics from historical runs of the same workflow over time. As in the workflow graphs, color is used to indicate the final status of each charted run or the magnitude of the I/O involved with a particular run.

Hovering on any bar in a chart displays a popup with additional information. Clicking a bar takes users to the detail page for that run, where they can drill down into the fine-grained metrics. The historical view makes mapping changes in source code back to changes in workflow performance a cinch.

Sahale at Etsy

After running Sahale at Etsy for a year, we’ve seen some exciting changes in the way our users interact with our BigData stack. One of the most gratifying for me is the way new users can ramp up quickly and gain confidence self-assessing their workflow’s performance characteristics and storage impact on the cluster. Here’s one typical example timeline, with workflow and user name redacted to protect the excellent:

Around January 4th, one of our new users got a workflow up and running. By January 16th, this user had simplified the work needed to arrive at the desired result, cutting the size of the workflow nearly in half. By removing an unneeded aggregation step, the user further optimized the workflow down to a tight 3 stages by Feb. 9th. All of this occurred without extensive code reviews or expert intervention, just some Q&A in our internal IRC channel.

Viewing the graph visualizations for this workflow illustrates its evolution across the timeline much better:

Figure 7a: Before (Jan 4th)
Figure 7b: During (Jan. 16th)
Figure 7c: After (Feb. 9th)

It’s one thing for experienced analysts to recommend that new users try to filter unneeded data as early in the workflow as possible, or to minimize the number of aggregation steps in a workflow. It’s another thing for our users to intuitively reason about these best practices themselves. I believe Sahale has been a great resource for bridging that gap.

Conclusion

Sahale, for me, represents one of many efforts in our company-wide initiative to democratize Etsy’s data. Visualizing workflows at runtime has enabled our developers to iterate faster and with greater confidence in the quality of their results. It has also reduced the time my team spends determining if a workflow is production-ready before deployment. By open sourcing the tool, my hope is that Sahale can offer the same benefits to the rest of the Big Data community.

Future plans include richer workflow visualizations, more charts, additional metrics/Hadoop counter collection on the client side, more advanced mappings from workflow to users’ source code (for users of Cascading 2.6+), and out-of-the-box support for more Cascading DSLs.

Thanks for reading! As a parting gift, here’s a screenshot of one of our largest workflows in progress:

Figure 8: Etsy runs more Hadoop jobs by 7am than most companies do all day.