Sahale: Visualizing Cascading Workflows at Etsy

Posted by on February 11, 2015

The Problem

If you know anything about Etsy engineering culture, you know we like to measure things. Frequent readers of Code As Craft have become familiar with Etsy mantras like “If it moves, graph it.”

When I arrived at Etsy, our Hadoop infrastructure, like most systems at Etsy, was well instrumented at the operational level via Ganglia and the standard suite of exported Hadoop metrics, but was still quite opaque to our end users, the data analysts.

Our visibility problem had to do with a tooling decision made by most Hadoop shops: to abandon tedious and counterintuitive “raw MapReduce” coding in favor of a modern Domain Specific Language like Pig, Hive, or Scalding.

At Etsy, DSL programming on Hadoop has been a huge win. A DSL enables developers to construct complex Hadoop workflows in a fraction of the time it takes to write and schedule a DAG of individual MapReduce jobs. DSLs abstract away the complexities of MapReduce and allow new developers to ramp up on Hadoop quickly.

Etsy’s DSL of choice, Scalding, is a Scala-based wrapper for the Cascading framework. It features a vibrant community, an expressive syntax, and a growing number of large companies like Twitter scaling it out in production.

However, DSLs on Hadoop share a couple of drawbacks that must be addressed with additional tooling. The one that concerned users at Etsy the most was a lack of runtime visibility into the composition and behavior of their Scalding jobs.

The core of the issue is this: Hadoop only understands submitted work at the granularity of individual MapReduce jobs. The JobTracker (or ResourceManager on Hadoop2) is blissfully unaware of the relationships among tasks executed on the cluster, and its web interface reflects this ignorance:

Figure 1: YARN Resource Manager. It’s 4am - do you know where your workflow is?

Figure 1: YARN Resource Manager. It’s 4am – do you know where your workflow is?

A bit dense, isn’t it? This is fine if you are running one raw MapReduce job at a time, but the Cascading framework compiles each workflow down into a set of interdependent MapReduce jobs. These jobs are submitted to the Hadoop cluster asynchronously, in parallel, and ordered only by data dependencies between them.

The result: Etsy users tracking a Cascading workflow via the JobTracker/ResourceManager interface did not have a clear idea how their source code became a Cascading workflow plan, how their Cascading workflows mapped to jobs submitted to Hadoop, which parts of the workflow were currently executing, or how to determine what happened when things went badly.

This situation also meant any attempt to educate users about optimizing workflows, best practices, or MapReduce itself was relegated to a whiteboard, Confluence doc, or (worse) an IRC channel, soon to be forgotten.

What Etsy needed was a user-facing tool for tracking cluster activity at the granularity of Cascading workflows, not MapReduce jobs – a tool that would make core concepts tangible and allow for proactive, independent assessment of workflow behavior at runtime.

Etsy’s Data Platform team decided to develop some internal tooling to solve these problems. The guidelines we used were:

Fast forward to now: I am pleased to announce Etsy’s open-source release of Sahale, a tool for visualizing Cascading workflows, to you, the Hadooping public!

Design Overview

Let’s take a look at how it works under the hood. The design we arrived at involves several discrete components which are deployed separately, as illustrated below:

Figure 2: Design overview.

Figure 2: Design overview.

The components of the tool are: 

The workflow is as follows:

  1. A User launches a Cascading workflow, and a FlowTracker is instantiated.
  2. The FlowTracker begins to dispatch periodic job metric updates to Sahale.
  3. Sahale commits incoming metrics to the database tables.
  4. The user points her browser to the Sahale web app.

About FlowTracker 

That’s all well and good, but how does the FlowTracker capture workflow metrics at runtime?

First, some background. In the Cascading framework, each Cascade is a collection of one or more Flows. Each Flow is compiled by Cascading into one or more MapReduce jobs, which the user’s client process submits to the Hadoop cluster. With a reference to the running Flow, we can track various metrics about the Hadoop jobs the Cascading workflow submits.

Sahale attaches a FlowTracker instance to each Flow in the Cascade. The FlowTracker can capture its reference to the Flow in a variety of ways depending on your preferred Cascading DSL.

By way of an example, let’s take a look at Sahale’s TrackedJob class. Users of the Scalding DSL need only inherit TrackedJob to visualize their own Scalding workflows with Sahale. In the TrackedJob class, Scalding’s Job.run method is overridden to capture a reference to the Flow at runtime, and a FlowTracker is launched in a background thread to handle the tracking:


/**
 * Your jobs should inherit from this class to inject job tracking functionality.
 */
class TrackedJob(args: Args) extends com.twitter.scalding.Job(args) {
  @transient private val done = new AtomicBoolean(false)

override def run(implicit mode: Mode) = {
    mode match {
      // only track Hadoop cluster jobs marked "--track-job"
      case Hdfs(_, _) => if (args.boolean("track-job")) runTrackedJob else super.run
      case _ => super.run
    }
  }

  def runTrackedJob(implicit mode: Mode) = {
    try {
      val flow = buildFlow
      trackThisFlow(flow)
      flow.complete
      flow.getFlowStats.isSuccessful // return Boolean
    } catch {
      case t: Throwable => throw t
    } finally {
      // ensure all threads are cleaned up before we propagate exceptions or complete the run.
      done.set(true)
      Thread.sleep(100)
    }
  }

  private def trackThisFlow(f: Flow[_]): Unit = { (new Thread(new FlowTracker(f, done))).start }
}

The TrackedJob class is specific to the Scalding DSL, but the pattern is easy to extend. We have used the tool internally to track Cascading jobs generated by several different DSLs.

Sahale selects a mix of aggregated and fine-grained metrics from the Cascading Flow and the underlying Hadoop MapReduce jobs it manages. The FlowTracker also takes advantage of the Cascading client’s caching of recently-polled job data whenever possible.

This approach is simple, and has avoided incurring prohibitive data transfer costs or latency, even when many tracked jobs are executing at once. This has proven critical at Etsy where it is common to run many 10’s or 100’s of jobs simultaneously on the same Hadoop cluster. The data model is also easily extendable if additional metrics are desired. Support for a larger range of default Hadoop counters is forthcoming.

About Sahale

Figure 3: Workflow overview page.

Figure 3: Workflow overview page.

The Sahale web app’s home page consists of two tables. The first enumerates running Cascading workflows. The second displays all recently completed workflows. Each table entry includes a workflow’s name, the submitting user, start time, job duration, the number of MapReduce jobs in the workflow, workflow status, and workflow progress.

The navigation bar at the top of the page provides access to the help page to orient new users, and a search feature to expose the execution history for any matching workflows.

Each named workflow links to a detail page that exposes a richer set of metrics:

Figure 4: Workflow details page, observing a job in progress.

Figure 4: Workflow details page, observing a job in progress.

The detail page exposes the workflow structure and job metrics for a single run of a single Cascading workflow. As before, the navigation bar provides links to the help page, back to the overview page, and to a history view for all recent runs of the workflow.

 The detail page consists of five panels:

Users can leverage these displays to access a lot of actionable information as a workflow progresses, empowering them to quickly answer questions like:

In the event of a workflow failure, even novice users can identify the offending job stage or stages as easily as locating the red nodes in the graph view. Selecting failed nodes provides easy access to the relevant Hadoop job logs, and the Sources/SInks tab makes it easy to map a single failed Hadoop job back to the user’s source code. Before Sahale this was a frequent pain point for Hadoop users at Etsy.

Figure 5: A failed workflow. The offending MapReduce job is easy to identify and drill down into.

Figure 5: A failed workflow. The offending MapReduce job is easy to identify and drill down into.

If a user notices a performance regression during iterative workflow development, the Job History link in the navigation bar will expose a comparative view of all recent runs of the same workflow:

Figure 6: History page, including aggregated per-workflow metrics for easy comparisons.

Figure 6: History page, including aggregated per-workflow metrics for easy comparisons.

Here, users can view a set of bar graphs comparing various aggregated metrics from historical runs of the same workflow over time. As in the workflow graphs, color is used to indicate the final status of each charted run or the magnitude of the I/O involved with a particular run.

 Hovering on any bar in a chart displays a popup with additional information. Clicking a bar takes users to the detail page for that run, where they can drill down into the fine-grained metrics. The historical view makes mapping changes in source code back to changes in workflow performance a cinch.

Sahale at Etsy

After running Sahale at Etsy for a year, we’ve seen some exciting changes in the way our users interact with our BigData stack. One of the most gratifying for me is the way new users can ramp up quickly and gain confidence self-assessing their workflow’s performance characteristics and storage impact on the cluster. Here’s one typical example timeline, with workflow and user name redacted to protect the excellent: 

Around January 4th, one of our new users got a workflow up and running. By January 16th, this user had simplified the work needed to arrive at the desired result, cutting the size of the workflow nearly in half. By removing an unneeded aggregation step, the user further optimized the workflow down to a tight 3 stages by Feb. 9th. All of this occurred without extensive code reviews or expert intervention, just some Q&A in our internal IRC channel.

Viewing the graph visualizations for this workflow illustrates its evolution across the timeline much better:

Before

Figure 7a: Before (Jan 4th)

During

Figure 7b: During (Jan. 16th)

After (Feb. 9th)

Figure 7c: After (Feb. 9th)

It’s one thing for experienced analysts to recommend that new users try to filter unneeded data as early in the workflow as possible, or to minimize the number of aggregation steps in a workflow. It’s another thing for our users to intuitively reason about these best practices themselves. I believe Sahale has been a great resource for bridging that gap.

Conclusion

Sahale, for me, represents one of many efforts in our company-wide initiative to democratize Etsy’s data. Visualizing workflows at runtime has enabled our developers to iterate faster and with greater confidence in the quality of their results. It has also reduced the time my team spends determining if a workflow is production-ready before deployment. By open sourcing the tool, my hope is that Sahale can offer the same benefits to the rest of the Big Data community.

Future plans include richer workflow visualizations, more charts, additional metrics/Hadoop counter collection on the client side, more advanced mappings from workflow to users’ source code (for users of Cascading 2.6+), and out-of-the-box support for more Cascading DSLs.

Thanks for reading! As a parting gift, here’s a screenshot of one of our largest workflows in progress:

Figure 8: Etsy runs more Hadoop jobs by 7am than most companies do all day.

Figure 8: Etsy runs more Hadoop jobs by 7am than most companies do all day.

 

Posted by on February 11, 2015
Category: data, databases, engineering, infrastructure, monitoring

9 Comments

http://en.wikipedia.org/wiki/The_Kiss_%28Klimt%29

Check that out in comparison to the last “Graph View” image.

Coincidence? (Probably.)

Funny you should mention, there have been a number of workflow graphs that have circulated internally for this reason. Perhaps we’ll feature our “Hindenburg workflow” in a follow-up post, but the Klimt comparison will be well received!

Hello,

This is so great! I will for sure give a shot to monitor my scalding jobs.
Thanks for open sourcing it.

Best,
Muha

would like to when is the next version release with counters included.

    Thanks for asking, we have plans to publish that update very soon.

Is it currently possible to use Sahale with Java-based Cascading jobs?

    Hi Ken. I haven’t played with that much as we use Scalding for almost everything here, but there’s nothing Scalding-specific outside the TrackedJob and example job that would hamper it. You will probably just need something to instantiate the FlowTracker thread and pick up a Flow reference at job initialization.

    I am working on a few feature upgrade patches now and will take a peek, we should include that code I agree, but if you come up with something solid in the meantime that we can bundle into the repo please send a pull request.

      1. Looks like you reply on the Cascading flow id in the job name, right?

      2. I assume the sequence should be something like:

      a. Create a Flow
      b. Pass it to the FlowTracker constructor, along with an AtomicBoolean(false)
      c. Start the (Runnable) FlowTracker in a thread
      d. Call flow.complete()
      e. Set the AtomicBoolean to true, to get the FlowTracker to stop

Hey Ken, yeah thats exactly the recipe!