Analyzing Etsy’s data with Hadoop and Cascading
Browsing Etsy’s marketplace, you can find over five million active item listings posted by 158,000 different sellers. Digging deeper, you may heart an item you really like (we have over 40MM total hearts), or maybe even buy a thing or two (2009 marketplace sales of $180.6MM). On a monthly basis, Etsy generates over 750MM page views – over 60GB of raw web logs per day. On top of this, we doubled in size last year and are continually growing.
As our community has grown larger and larger, so have our data processing needs. In the process of looking for a long-term solution, we sought something that can scale not only with the magnitude of our site, but also something that enables us to ask deep questions about our data. Instead of measuring bottom line numbers – sales per day, traffic volume per url, or site-wide conversion rate – we wanted a means to perform much more specific analyses. What is the hearting rate via users who issued two consecutive searches? What fraction of users who visit Etsy through regretsy.com have never visited us before? What fraction are existing members?
Hadoop’s MapReduce implementation is pervasive and seemed like the obvious choice to us. It’s used by Yahoo!, Facebook, and lots of other web shops operating at massive scale. The framework provides many fundamentals required for effective parallel processing: (1) job coordination, (2) redundant data storage & management (via HDFS), and (3) a flexible parallel programming framework to facilitate a wide array of analysis tasks. I won’t discuss Hadoop too much here — there are plenty of great resources on the web you can check out elsewhere (Hadoop’s project page is a great starting point).
MapReduce jobs consist of two operations: a map, and a reduce. However, most of the data analysis tasks that we want to solve here at Etsy are more naturally expressed using operations such as filters for discarding records, joins for merging records, splits for separating records, or grouping for aggregating and counting. The problem is that standard map and reduce operations are at a different level of abstraction from filters, joins, etc. For example, the canonical word count example used in MapReduce tutorials can be viewed in terms of two operations: (1) a split to separate records into individual terms, and (2) an aggregation on terms to count frequencies. Here, the split operation corresponds to a mapping operation, and the aggregation operation is handled by the reducer. Counting words is fairly straightforward, but Hadoop’s abstraction mismatch results in additional complexity.
Enter Cascading. Cascading provides a data flow framework that leverages Hadoop’s infrastructure while abstracting standard data processing operations (split, joins, etc.) away from underlying mapper and reducer tasks. In terms of our data analysis goals at Etsy, Cascading is a great fit as it combines the scalability of Hadoop with the right level of abstraction to perform deep data diving. The project is relatively young but is currently used in several large-scale settings (check out Rapleaf’s Casading overview), and is similar to Pig in abstracting away low-level map and reduce operations. Unlike Pig, Cascading can be extended via its API to create a Domain Specific Languages (DSL) to streamline operations based on our particular use case or to provide more code structure and cleaner syntax (for all you Java haters). Finally, the data warehousing team at Etsy has a contingent of programming language junkies with lots of experience using JRuby, a Java implementation of Ruby.
Welcome cascading.jruby! Instead of diving into benefits and idiosyncrasies of cascading.jruby (which we’ve adopted based on Grégoire Marabout’s implementation), we’ll dive right into an example: affiliate sales tracking.
Affiliate programs are standard among many online retailers today. Affiliates sign up for the program and can then set up special links to the retailer’s site with their affiliate code embedded. Affiliates are then paid on a per-sale basis, usually a fixed percentage of final sales price. Etsy does not currently have an affiliate program, so this is purely a hypothetical example.
In this example we’ll assume that affiliate tracking codes are embedded in the retailer’s url via the format affiliate_code=XYZ, and that a user is directed to a sales confirmation page immediately after their sale is completed. We’ll also assume that this confirmation page has the sales id parameter embeded in the url (sale_confirmation_id=1234). Affiliates are credited only for sales occurring within ninety days from the referral. This example uses cascading.jruby to implement the entire affiliate tracking infrastructure based on existing event logs (i.e. standard apache logs). No code pushes to production required!
Cascading jobs consist of three components: the data source, data sink, and one or more assemblies. Data flows from source to sink and assemblies are used to modify the flow or branch it. In this example, we’ll create two data assemblies. The first assembly consists of two branches that filter (1) URLs containing an affiliate_code parameter, and (2) sales confirmation pages with a sales_id parameter.
source 'raw_log_data' assembly 'raw_log_data' do # Helper method to parse fields from raw web logs: session_id, url parameters, etc. parse_log :query_fields => ['affiliate_code', 'sale_confirmation_id'] branch 'affiliate_events' do
where 'affiliate_code:string != null' rename 'created_at' => 'affiliate_timestamp' end branch 'sales_events' do where 'sale_confirmation_id:string != null' rename 'created_at' => 'sale_timestamp' end end
In the next assembly, we’ll join the output of these two branches.
assembly 'affiliate_sales' do join 'affiliate_events', 'sales_events', :on => 'session_id' insert 'time_diff' => expr('(sales_timestamp:double-affiliate_timestamp:double)/(24.0*60.0*60.0*1000.0)') where '0.0 <= time_diff:double && time_diff:double <= 90.0'
group_by 'session_id' do min 'time_diff' end project 'affilite_code', 'sale_confirmation_id' end sink 'affiliate_sales'
Here we join affiliate-linked inbound events to sales events. Valid affiliate sales credit requires that a user purchased after he clicked through an affiliate link (i.e. positive time_diff) and within a ninety-day period. The final group_by is used to dedup sales that were linked to multiple affiliate events. In this example, we credit the affiliate who most recently linked the user through to the sale. Finally, we emit the affiliate code & sales confirmation id. In a production system, this output could be exported into a relational database to key off appropriate tables and complete the final sales commission assignment steps (i.e. determine sale price for sales events, compute commissions, etc.).
As you can see from this example, our cascading.jruby framework provides a clean interface to compute complex workflows with minimal coding complexity. Our full stack runs on Amazon’s Elastic MapReduce web service. We store exported web logs on Amazon S3, and Amazon’s EMR takes care of behind-the-scenes cluster management. We’ve had no problem processing a month’s worth of data using 25 machines in a few hours time.
We’re hoping to be able to release our cascading.jruby distribution sometime in the near future. In the meantime, you can use our example as pseudo-code for constructing a Cascading job using their standard API. Stay tuned!
Posted by Daniel Marks, Bill Ulammandakh and Sungwan Jo on 25 Jan, 2017
Posted by Russ Taylor on 04 Nov, 2016
Posted by Bill Ulammandakh on 25 Oct, 2016