Analyzing Etsy’s data with Hadoop and Cascading

Posted by on February 24, 2010

Browsing Etsy’s marketplace, you can find over five million active item listings posted by 158,000 different sellers. Digging deeper, you may heart an item you really like (we have over 40MM total hearts), or maybe even buy a thing or two (2009 marketplace sales of $180.6MM). On a monthly basis, Etsy generates over 750MM page views – over 60GB of raw web logs per day. On top of this, we doubled in size last year and are continually growing.

As our community has grown larger and larger, so have our data processing needs. In the process of looking for a long-term solution, we sought something that can scale not only with the magnitude of our site, but also something that enables us to ask deep questions about our data. Instead of measuring bottom line numbers – sales per day, traffic volume per url, or site-wide conversion rate – we wanted a means to perform much more specific analyses. What is the hearting rate via users who issued two consecutive searches? What fraction of users who visit Etsy through regretsy.com have never visited us before? What fraction are existing members?

Hadoop’s MapReduce implementation is pervasive and seemed like the obvious choice to us. It’s used by Yahoo!, Facebook, and lots of other web shops operating at massive scale. The framework provides many fundamentals required for effective parallel processing: (1) job coordination, (2) redundant data storage & management (via HDFS), and (3) a flexible parallel programming framework to facilitate a wide array of analysis tasks. I won’t discuss Hadoop too much here — there are plenty of great resources on the web you can check out elsewhere (Hadoop’s project page is a great starting point).

MapReduce jobs consist of two operations: a map, and a reduce. However, most of the data analysis tasks that we want to solve here at Etsy are more naturally expressed using operations such as filters for discarding records, joins for merging records, splits for separating records, or grouping for aggregating and counting. The problem is that standard map and reduce operations are at a different level of abstraction from filters, joins, etc. For example, the canonical word count example used in MapReduce tutorials can be viewed in terms of two operations: (1) a split to separate records into individual terms, and (2) an aggregation on terms to count frequencies. Here, the split operation corresponds to a mapping operation, and the aggregation operation is handled by the reducer. Counting words is fairly straightforward, but Hadoop’s abstraction mismatch results in additional complexity.

Enter Cascading. Cascading provides a data flow framework that leverages Hadoop’s infrastructure while abstracting standard data processing operations (split, joins, etc.) away from underlying mapper and reducer tasks. In terms of our data analysis goals at Etsy, Cascading is a great fit as it combines the scalability of Hadoop with the right level of abstraction to perform deep data diving. The project is relatively young but is currently used in several large-scale settings (check out Rapleaf’s Casading overview), and is similar to Pig in abstracting away low-level map and reduce operations. Unlike Pig, Cascading can be extended via its API to create a Domain Specific Languages (DSL) to streamline operations based on our particular use case or to provide more code structure and cleaner syntax (for all you Java haters). Finally, the data warehousing team at Etsy has a contingent of programming language junkies with lots of experience using JRuby, a Java implementation of Ruby.

Welcome cascading.jruby! Instead of diving into benefits and idiosyncrasies of cascading.jruby (which we’ve adopted based on Grégoire Marabout’s implementation), we’ll dive right into an example: affiliate sales tracking.

Affiliate programs are standard among many online retailers today. Affiliates sign up for the program and can then set up special links to the retailer’s site with their affiliate code embedded. Affiliates are then paid on a per-sale basis, usually a fixed percentage of final sales price.  Etsy does not currently have an affiliate program, so this is purely a hypothetical example.

In this example we’ll assume that affiliate tracking codes are embedded in the retailer’s url via the format affiliate_code=XYZ, and that a user is directed to a sales confirmation page immediately after their sale is completed. We’ll also assume that this confirmation page has the sales id parameter embeded in the url (sale_confirmation_id=1234). Affiliates are credited only for sales occurring within ninety days from the referral. This example uses cascading.jruby to implement the entire affiliate tracking infrastructure based on existing event logs (i.e. standard apache logs). No code pushes to production required!

Cascading jobs consist of three components: the data source, data sink, and one or more assemblies. Data flows from source to sink and assemblies are used to modify the flow or branch it. In this example, we’ll create two data assemblies. The first assembly consists of two branches that filter (1) URLs containing an affiliate_code parameter, and (2) sales confirmation pages with a sales_id parameter.

source 'raw_log_data'
assembly 'raw_log_data' do
  # Helper method to parse fields from raw web logs: session_id, url parameters, etc.
  parse_log :query_fields => ['affiliate_code', 'sale_confirmation_id']

  branch 'affiliate_events' do
where 'affiliate_code:string != null' rename 'created_at' => 'affiliate_timestamp' end branch 'sales_events' do where 'sale_confirmation_id:string != null' rename 'created_at' => 'sale_timestamp' end end

In the next assembly, we’ll join the output of these two branches.

assembly 'affiliate_sales' do
  join 'affiliate_events', 'sales_events', :on => 'session_id'
  insert 'time_diff' => expr('(sales_timestamp:double-affiliate_timestamp:double)/(24.0*60.0*60.0*1000.0)')
  where '0.0 <= time_diff:double && time_diff:double <= 90.0'
group_by 'session_id' do min 'time_diff' end project 'affilite_code', 'sale_confirmation_id' end sink 'affiliate_sales'

Here we join affiliate-linked inbound events to sales events. Valid affiliate sales credit requires that a user purchased after he clicked through an affiliate link (i.e. positive time_diff) and within a ninety-day period. The final group_by is used to dedup sales that were linked to multiple affiliate events. In this example, we credit the affiliate who most recently linked the user through to the sale. Finally, we emit the affiliate code & sales confirmation id. In a production system, this output could be exported into a relational database to key off appropriate tables and complete the final sales commission assignment steps (i.e. determine sale price for sales events, compute commissions, etc.).

As you can see from this example, our cascading.jruby framework provides a clean interface to compute complex workflows with minimal coding complexity. Our full stack runs on Amazon’s Elastic MapReduce web service. We store exported web logs on Amazon S3, and Amazon’s EMR takes care of behind-the-scenes cluster management. We’ve had no problem processing a month’s worth of data using 25 machines in a few hours time.

We’re hoping to be able to release our cascading.jruby distribution sometime in the near future. In the meantime, you can use our example as pseudo-code for constructing a Cascading job using their standard API. Stay tuned!

Posted by on February 24, 2010
Category: data, infrastructure Tags: ,

Related Posts

11 Comments

Awesome. More reason to read my Hadoop book sitting in my room… post-exams.

Have you considered making a page of open-source projects that you work on? re:Facebook?
http://developers.facebook.com/opensource.php

That’s an excellent suggestion. But until we’ve built one, our github profile page will have to do:
http://github.com/etsy

This sounds similar to CEP engines. Have you considered using something like ESPER? It basically allows you to use a sql query like syntax to query across temporal (time based, sliding window) events.

This looks like a promising lib, I am trying it right now, can you please describe how to package and configure the job für elastic mapreduce?

TIA
Andy

@BEEF: we were very conscious of the similarity of our system to CEP, particularly when we added an event model and visit querying API on top. However, there were several key differences between our system and CEP. For example, all our jobs run in batch whereas CEP sets up infinitely running queries that “fire” when complex events are identified.

@ANAHAP: we have some elaborate build scripts that we’ve yet to package with the c.j gem that let us run jobs on EMR. One of my goals is to clean these up and contribute them to the gem the next time I get a chance to give it some attention. In the meantime, though, the scripts are derived from this:
https://github.com/mrwalker/cascading.jruby/blob/master/bin/make_job

[…] The second component to the recommendation algorithm requires understanding quality: a keyword like ‘BMW’ may be semantically similar on Facebook and Etsy yet Etsy isn’t the best place to buy a BMW. To understand quality, we analyze item page view, purchase, and favoriting behavior that originates from searches on Etsy. High quality and popular items tend to have lots of searches, page views, purchases, and favorites, and mapping these events back to specific search terms is an important data quality measure. Our logging infrastructure enables us to precisely attribute such views, purchases, and favorites to originating searches. Our tracking infrastructure logs all listing ids shown for every search that appears on Etsy. From this, we’re able to join listing ids of purchase, favorite, and item view events back to their originating searches, and then precisely attribute the sale of an item to a specific search, i.e. either ‘pink tutu’ or ‘tutu’. We implement this funnel analysis using a custom event sequence analyzer that runs on our Cascading data flow framework (you can read more about how we use Hadoop and Cascading here). […]

How Does Etsy Use Amazon Web services?…

There are two groups that use AWS heavily: 1. Image archival. 2. Data collection and analytics. I’m a Data Scientist so I know a lot more about the latter. We replicate many useful production database data sets to S3 and also log a wide variety of web…

[…] come to Etsy each month, totaling multi-terabytes of data per month, and we use Hadoop and cascading.jruby to help us analyze this data at […]

Good article. Thank you !
Max

I didn’t know that you can do such a things in ruby O_O

[…] to Amazon, Etsy get their web log data processed on MapReduce, and actually, Etsy have blogged about that here and it is well worth a read if you are interested in data analytics and the requirements of […]