Analytics adventures with Pig and Cassandra

Photo by Jes

After a lot of head-scratching, I ended up choosing Cassandra as the main data store for my latest project. I needed a system that could handle a loading process throwing hundreds of thousands of items a minute at it, denormalized across multiple indexes, whilst simultaneously serving up results to a web application, and so far it has performed magnificently. 

Unfortunately it has ended up being a victim of its own success. Now that we have tens of millions of pieces of user-generated content, we want to ask the data questions. That's not so easy with NoSQL, so here's some notes on the solution I ended up building.


The only way to run code across data held on a large cluster of machines is to execute the processing on a similarly large cluster, preferably on machines that already have local copies of the data. That means using Hadoop, and since I'm using DataStax EC2 machine images for my Cassandra servers anyway, I started by trying to add some Brisk AMIs to my existing cluster. This is a Hadoop distribution, pre-configured to integrate with Cassandra, designed for exactly what I was hoping to do. Unfortunately I struggled to figure out the right startup parameters to get the newly-created machines talking to my existing cluster, despite some excellent help from Joaquin. I took a break from that, and discovered that hand-building the basic Cassandra components I needed wasn't too hard on my existing Hadoop machines, so I continued with more of a home-brew setup.

Pig or Hive?

In order to run Hadoop jobs on Cassandra data, you need a fast way to pull it out from your tables into a supported coding environment. The only two supported languages for this in the current Cassandra releases are Pig and Hive. Pig is a procedural data-transformation language, whereas Hive looks a lot like SQL. I think a lot more procedurally and I needed something that could handle some tough unpacking and formatting tasks, so I went with Pig.


I don't know if my experiences would have been any better with Hive, but I found I was walking a fairly lonely path using Pig in conjunction with Cassandra. Jeremy Hanna was a life-saver, and Brandon Williams put in a lot of hard work to get me up and running, but my initial encounter involved a couple of days of me tearing my hair out. I was trying to make some sense out of the results I was seeing on the latest stable release of Cassandra, but they left me baffled. It turned out that the recent introduction of types into the Cassandra adapter had broken all the existing example code, and left the schema reported for the data very different from the actual structure that was returned. Happily I was able to monkey up a messy patch, which Brandon then fixed properly, but it definitely made me realize how far out on the bleeding edge I was. That's a place I usually try to avoid for mission-critical projects!

The right tool for the job

With that overcome, I was able to move forward, but not as speedily as I had hoped. Pig's great strength is that it's a domain-specific language for data processing. It's a big bag of useful operations, with no particular grand design for the language. It reminds me of PHP or R, and I don't mean those comparisons as an insult, I have a fondness for these sort of languages. When you're working inside their domain they're extremely productive, you almost never need to install extra dependencies and everything's at your fingertips. Sadly, I found I was operating a bit outside of the mainstream for Pig.

Pig wrestling

As an example, I store a large array of records per-user. In theory, I could store each item as a new row in Cassandra, with a secondary index for the user key, but in practice the performance and storage size overhead of that approach rules it out. A CSV string is a simple but effective way of holding the data, so when I'm running a Pig script, I needed to decode that string back into records. There is a smart CSV loader in the latest release, but it only works when you're reading in files, not on strings you've already loaded. To make the job easier, I reloaded all my data using a format that was going to be a lot easier to parse (stripping out any line terminator or separation characters inside quoted strings, for example) and then set out to do the job using Pig's built-in primitives.

I thought I could just use STRSPLIT to break up my strings into individual rows, but it turns out that it only returns a tuple. This is bad because turning a tuple into separate records for further processing is pretty involved at best. What you really need is what Pig calls a bag, an unordered set of records that can be easily turned into a proper stream of records. TOKENIZE is almost identical to STRSPLIT and returns a bag, so I thought I was in luck. Unfortunately it doesn't take a parameter allowing you to specify what characters to split the string on. Undaunted, I thought I should contribute something back to the code base and create a patch. I dusted off my Java neurons, and created the changes I needed. That let me finally parse the CSV files I was dealing with, but I was stuck as I tried to figure out how to clean up the code to offer it as a patch.

The problem is that my new TOKENIZE takes an optional second parameter to specify the custom characters to split on. I needed to keep the existing behavior for a single parameter call to avoid breaking old scripts. That's easy enough to handle in the execution code, I just check the number of arguments passed in, but there's also a function that exposes the signature of the function. That doesn't support variable numbers of arguments, as a known limitation. The suggested workaround is to remove the signature definition entirely, but since that's presumably there for a reason, it didn't seem a sensible approach. In the end I was stymied, but since I could move forward with a slightly custom branch of Pig, I reluctantly abandoned the patch.

Happily ever after

I don't want to sound too negative about my experiences, now that I've got the basics set up I'm able to write scripts very quickly and answer all sorts of questions. It's also amazing to think about the power of all this free software unleashes, and the generosity of the community who helped me out as a newbie. If you're considering a similar project though, I would either budget for more research time than you might expect, or track down a native guide, somebody who has already done something similar in a production environment.

[UpdateJon Coveney added a great comment explaining more about how to solve the TOKENIZE issue I hit, so I'm including it below, and I'll do another post when I give it a try]

Hey Pete, thanks for writing about your experience. It's a goal of mine to find a project to use Cassandra for, and I'm sure I'll be walking through many similar problems. 

Just wanted to note something about the TOKENIZE piece. Pig can be a bit weird about variable arguments, but in this case, it shouldn't be too bad. You have two options. 

1 is to have an optional constructor which takes one parameter. You could then do 
DEFINE mytokenize TOKENIZE('*'); 

now you could just use mytokenize normally. Implementing this is as difficult as implementing the constructor. 

Another option is that you can have pig take 1 or 2 arguments. The limitation you pointed to doesn't actually apply here…that limitation is that if you have a function that takes a variable number of arguments _that also takes arguments of different types_ (this part is key) THEN you can't do it. [1] 

In your case, tokenize always takes a string, and an optional string delimiter. I personally would go the constructor route, but either is fine 🙂 I usually write UDF's with an initialize method instead of doing it all constructors anyway, so you would only have to check the number of arguments once. 

Happy hacking 

[1] To explain where this applies, think of SQL's coalesce function. This is a function that can take both functions of different types, and varying numbers of them. So you could do coalesce(1,2,3,4,5) or coalesce('hey','you','get','out') or whatever. Pig does not allow you to do this. With the getArgToFuncMapping, you can map to a function that takes a fixed number of arguments. Or you can have a varying number of arguments, but it can't be sensitive to the type of the input…while you can still then implement something like coalesce, it's going to be slow. 

Leave a Reply

Fill in your details below or click an icon to log in: Logo

You are commenting using your account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s

%d bloggers like this: