Tuesday, February 10, 2015

WSDM 2015

I recently got back from WSDM which ended up being a very interesting conference with some great talks.


I enjoyed all three keynotes. Mike Franklin's talk focused on AMPLab's research progress and the steady stream of artifacts they've assembled into the Berkeley Data Analytics Stack (BDAS -- pronounced "bad-ass" :)). In addition to Spark, the stack now includes SparkSQL, GraphX, MLLib, Spark Streaming, and Velox. All of these projects pose interesting systems questions and have made a lot of progress on the kinds of tooling that will come in handy for large scale data analysis beyond SQL queries.

Lada Adamic's talk was a fun visual tour through lots of interesting analyses of Facebook data that her team has been running. In particular, she's been using the data to understand what makes a particular piece of content rapidly gain a lot of popularity (or go "viral"). "Sharing" behavior is unique to social networks -- traditional web properties have spent lots of energy understanding what makes people click. The "share" action is a completely different beast to study. Lada reported that they have had some success predicting if a piece of content, once it reaches K users, if it will get to 2K. The prediction accuracy improves as we get to larger K. Unfortunately, she reported that the features that predict virality mostly have to do with the speed at which it is spreading, so we don't yet have a handle on how best to craft viral content

Thorsten Joachims had a technical keynote that followed the arc of his previous work on learning to rank in the context of search. He talked about careful design of interventions in interactive processes to elicit information that can be suitably leveraged by a machine learning algorithm to improve ranking. Lots of examples with evidence from experiments he ran with the arxiv search prototype that have since been reproduced by Yahoo, Baidu, etc.

Research Talks

Here's a subset of research talks that I thought were fun and interesting:

Thursday, January 22, 2015

The Tail at Scale

Another blog post recommending a paper to read ....

One of the challenges of building large systems on shared infrastructure like AWS and the Google Cloud is that you may have to think harder about dealing with variations in response time than you might if you were designing for dedicated hardware (like a traditional data warehouse).  This is obviously critical when you're building latency-critical services that power interactive experiences (websites, mobile apps, video/audio streaming services, etc.). Depending on the operating point, you're likely to see this effect even with batch processing systems like Map-Reduce and large scale machine learning systems.

A CACM article from Jeff Dean and Luiz Andre Barosso -- The Tail at Scale contains many valuable lessons on how to engineer systems to be robust to tail latencies. Some of the interesting ideas from the paper are to use:

  1. Hedged requests: The client sends a request to one replica, and after a short delay, sends a second request to another replica. As soon as one response is received, the other request is cancelled.
  2. Tied requests: The client sends a request to two replicas (also with a short delay) making sure the requests have metadata so that they know about each other. As soon as one replica starts processing a request, it sends a cancellation message to the other replica (this keeps the client out of the loop for the cancel-logic). Proper implementations of hedged requests and tied requests can significantly improve the 99th percentile latency with a negligible (2% - 3%) increase in the number of requests sent.
  3. Oversharding/Micro-Partitions: The portion of data to be served is divided into many more shards than there are machines to serve them. Each server is assigned several shards to manage. This way, load balancing can be managed in a fairly fine-grained manner by moving a small number of shards at a time from one server to another. This allows us better control in managing the variability in load across machines. Systems like Bigtable (and open-source implementations like HBase), for instance, have servers that each manage dozens of shards.
There are lots of other ideas and interesting discussions in the paper. I strongly recommend it!

Tuesday, December 30, 2014

Machine Learning in Practice

A bunch of experienced Google engineers recently published a paper at a NIPS workshop titled "Machine Learning: The High Interest Credit Card of Technical Debt". This is a great paper that distills several dozen years worth of experience in building and maintaining a large-scale, complex, industrial-strength machine-learning system. If you haven't seen this paper I highly recommend it to get a sense for the patterns and anti-patterns they've observed along with supporting anecdotes.

The perspective is very different from a typical  ML paper -- you're not going to see a new model or a clever optimization strategy. The focus is mostly on the places where you have to go beyond the generally understood good software engineering practices to make the job of operating a large-scale ML system manageable. It's a short and fun read. Even if you've run an industrial ML system for a few years, you'll likely learn a trick or two from this paper.

Thursday, July 31, 2014

Data management and Machine Learning -- Picking off Feature Engineering

Over the last few years, there have been many interesting efforts from the data management research community to support various kinds of machine learning workloads on large data sets. Projects like MLBase, MADLib, SystemML, ScalOps on Hyracks, as well as systems like Spark, Mahout, have surfaced many interesting ideas.

I think a recent line of work in the database community that focuses on the "feature engineering" phase of a machine learning project is particularly promising. Brainwash, outlined at CIDR in 2013 and Columbus, described at SIGMOD this year argue that the "feature engineering" pipeline that needs to be built to feed any particular learning algorithm tends to take up the majority effort in any machine learning project. In fact, I've heard many practitioners estimate the feature engineering and "pipeline work" to consume about 90% of the resources (time, people) in a machine learning project. This includes getting all the features together in one place, gathering statistics about the features, possibly normalizing them, and making sure any transformations applied correctly before handing it over to the learning algorithm.

Columbus  provides a library of functions in R to build up a feature selection pipeline. In return for expressing the feature selection workflow using the interface provided by this library, Columbus promises several optimizations by reasoning about 1) user-specified error tolerance, 2) sophistication of learning task, and 3) amount of reuse. In experiments, the naive execution strategy was more than an order of magnitude slower than the plan Columbus picked for the program. The optimizations exploit well-known opportunities -- view materialization from databases, sophisticated sampling of input data (constrained by target accuracy) to reduce overall work, and using QR factorization to solve repeated least-squares problems efficiently.

There are some (non-technical) flaws in the paper -- while some enterprise feature-engineering workloads may look like what they outline, things look very different in internet companies that repeatedly solve learning problems on large datasets. (Besides, R is often not the tool of choice for large learning problems, so this is a bit of a random nit.)  For ad-hoc machine learning tasks in enterprise settings, I can see this being a reasonable fit. Also with Columbus, are you still writing your feature-engineering code in R if you're using the library functions provided? You have to pass in selection predicated in strings parsed/interpreted by Columbus -- you're now limited not by what you can do in R, but what support is built into Columbus for feature engineering. Is this expressive enough to be broadly useful without having to hack up the R parser? (On a side-note, a DSL friendly language like Scala might offer some nice advantages here, but then you're no longer in an R console.)

I do think the choice of focusing on the feature-engineering workflow is brilliant, and perhaps the place where the data management community can add the most value. Broader "declarative framework for machine learning" approaches are riskier (even when they make the sensible choice of doing a domain specific language in a DSL-friendly host-language) and may take much longer to have an impact. I'm excited to see where this line of research leads.

I'd recommend reading the Columbus paper. It won SIGMOD's best paper, and is a fun read. There are several ideas here that might be broadly useful to the more ambitious systems that are trying to support full fledged machine learning workflows.

Wednesday, March 26, 2014

Recommendations: Why RMSE can be misleading

I've recently spent some time working on machine learning and recommendation engines. If you're building a recommendation engine, you're typically trying to optimize for some metric of "goodness" for the user. In a Netflix-like setting, it could be how much time does a user spend watching the content you recommended? Picking good offline metrics (without actually watching how the user is responding) can be really tricky. The RMSE (Root Mean Square Error), the staple of many research papers, can be particularly misleading in many cases.

Assume, we have 100 items that a user rates between 1 to 5 stars (much like in the Netflix problem). For simplicity, assume that the first three items have 5-star ratings, and the rest have a 1-star rating.

Product True Rating Algo. A Predictions Algo. B Predictions
P001 5 2 1
P002 5 2 1
P003 5 2 1
P004 1 2 1
... ...2 1
P100 1 2 1

Consider Algorithm A that predicts that all the ratings will be 2. The RMSE for this dataset = sqrt((97 + 27)/100) = 1.11. Now consider Algorithm B that predicts all ratings to be 1. The RMSE for this dataset is sqrt(48/100) = 0.693. Algorithm B produced a huge improvement in RMSE over algorithm A, but is it really any better at differentiating between the items that the user liked vs. ones that she didn't? If you are going to use the recommendations to solve a ranking problem, RMSE is a pretty useless measure in this context. A better metric would capture the fact that you're trying to use the recommendations to display the best items to the users, hoping that the user clicks/buys/watches/engages with/likes what you recommended. Being accurate on items way beyond the top few that the user is likely to engage with is not very useful at all.

Now on the other hand, if the "rating" we have is binary -- say someone "likes" the movie or not -- say 1 or 0. (In reality there's a third state, where someone watches a movie, and then doesn't rate it. You could can map this state to a 1 or 0 with a few application-specific assumptions). With a binary rating, the RMSE simply counts how many predictions you got right. Because what we really have here is a classification problem, and not a ranking problem, RMSE ends up being more reasonable.

There are several papers that talk about vastly superior metrics for ranking (that actually work in practice!) that I'll try and describe them in future posts.

Wednesday, March 5, 2014

Secondary Indexing in HBase

One of the problems I explored during the last few months at Almaden was secondary indexing in a scale-out NoSQL datastore. A solution I designed and implemented in collaboration with folks at IBM Watson has made it into the IBM BigInsights product and into EDBT 2014. Here's the full paper.

Secondary Indexes: Local and Global
There are two ways to think about secondary indexes in a distributed system -- "local indexes" and "global indexes". A local index simply indexes the data within each node. If you want to find all the rows which have attribute color set to 'red', each node in the system would be able to find the matching rows using a secondary index on the color column instead of doing a full scan. On the other hand, if you had a global secondary index on color, probing this would return all the keys which have color set to 'red'. This information is likely on just one node -- you don't have to query all the nodes in the system to locate the relevant keys.

Cassandra has has local secondary indexes for a while. HBase has had prototypes for both local and global indexes. See here for some discussion on possible implementations. Local indexes tend to be a little easier to design and implement since you don't have to deal with any notion of a distributed transaction. With a global index, you have to deal with the possibility that there might be a failure between updating the node with the base data and the node with the index data. Also, update latency for an indexed table gets much worse -- you need to update the base table, delete the old index entry, and update the index with the new entry. This translates to two updates and a read. For stores based on Bigtable-like tablets and a log-structured merge tree design, the read latency might even be worse than the write latency. Our paper describes a solution around both these problems for HBase : 1) guaranteeing atomicity in the face of failures, and 2) minimizing the impact to latency.

Asynchronous Indexing
The core novel idea in the first approach, which we call async-simple is to implement indexing as an asynchronous operation. Update the base table right away, and respond, but then guarantee that within a "short while", the indexes will be consistent. That is, you have read-your-writes consistency for base data, but a more relaxed form of consistency for the index data. For many applications this is a reasonable trade-off that gets you very good latency guarantees. For the applications that need better consistency guarantees, we proposed async-session, which provides session consistency for the index data. That is, you'll be able to see the effects of your own updates within a session. We show that both of these techniques can be implemented while guaranteeing atomic index updates with nearly no impact on update latency. You don't need distributed transactions to get this right. Here's how we solved this -- we added two new components to the RegionServer an Asynchronous Update Queue (AUQ) and an Asynchronous Processing Service (APS). The AUQ is an in-memory data structure that temporarily stores all base updates that require index processing. These live in the same RegionServer as the one responsible for the base row. The APS is a background service that de-queues from the AUQ, inserts new values into the index and deletes the corresponding old values if present.

Recovery is guaranteed by the fact that 1) the WAL entry for the base update has enough information to re-construct the index updates and re-populate the AUQ during recovery, 2) we use the same timestamp for the index updates as for the base data, and 3) we ensure that the AUQ can always be reconstructed using the base WAL entries without resorting to a separate log for the AUQ. Property 3) requires careful coordination between the AUQ and rolling forward the WAL to ensure that it is not garbage collected (when memtables get flushed to disk) until the APS has drained all the corresponding entries from the AUQ. Note that during recovery, we may redo some of the index updates that were done prior to failure. However, this is okay because re-applying an update with an identical timestamp is an idempotent operation. (See Pat Helland's paper here for a nice discussion on how to use idempotent messaging to avoid distributed transactions.)

Relaxed Consistency
This means there will be moments in time when the index is not consistent with the base data, and that may be visible to clients. This is unavoidable without an expensive distributed locking or a Generalized Snapshot Isolation-style solution. A small amount of client logic is usually enough to deal with this inconsistency. For applications that do not want to deal with this, we implemented session-consistency where you get to see all the updates made within your session, but this may not hold across sessions. We were able to do this with minor changes to the HBase driver -- you essentially track the necessary state in the client driver. This works fine for small sessions (few KB of in-session updates), which is likely to be the case for most applications that run on HBase style datastores.

The implementation relies mostly on using CoProcessors. In fact, async-simple required nearly no changes to the core, while async-session required changes to the client library. I think a similar implementation can be done in Cassandra as well. The experiments in the paper are pretty basic and straightforward. The most interesting result is that the latency of updates with async-simple and async-session is nearly the same as an update (Put operation) on an un-indexed table.
The main concern is that the throughput caps off lower than the un-indexed updates, but that's understandable that the indexed-update is 2 Puts + a read while the base update is simply 1 update. With some straighforward tuning of the sizes of the AUQ and memtable flush frequency, and assuming relatively cheap reads, we should be able to get indexed updates with the same latency as base updates maxing out at roughly a third the throughput of base updates.

Tuesday, June 25, 2013

Big Fast Data Blog from Wisconsin

My advisor Jignesh Patel has a new blog on data management. This is going to be a great place to get the latest on data management from Wisconsin. His first post is on BitWeaving -- a SIGMOD 2013 paper that takes the column store idea to its logical extreme. Check it out here.