There are many ways to solve this problem, but the technique that has been enjoying substantial success is matrix factorization. In particular, for large datasets, the matrix factorization problem is solved using a technique called Stochastic Gradient Descent (SGD). I've written about this before -- in particular, I've talked about some of the cool research on a distributed variant of SGD that was invented at IBM Almaden by Rainer Gemulla, Peter Haas, and Yannis Sismanis.

Solving the matrix factorization problem on very large matrices (with millions of users and potentially milions of items) is a hard problem that has many data management aspects to it. If the items you are trying to recommend are URLs on the web, you may be stuck with tens or hundreds of millions of items. Of course, since there is a large data management aspect to the problem, you ask "Can you use Hadoop to solve this problem?". Well, implementing SGD-like algorithms on Hadoop poses a major challenge. SGD is an iterative algorithm, and makes many passes over the input data. Hadoop is well known to be inefficient for iterative workloads. In fact, many research papers have been written pointing this out, and systems like HaLoop have been proposed to address these shortcomings.

One of my favorite platforms for iterative workloads is the Spark project out of Berkeley's AMP lab. Spark is a parallel programming framework in Scala that supports efficient iterative algorithms on datasets stored in the aggregate memory of a cluster. This is a great fit for SGD-style algorithms. With help from a summer intern, we tried prototyping a few DSGD algorithms on Spark on a large dataset. Much to our surprise, we found that Spark didn't perform nearly as well as we expected it to. In the figure below, the line titled "Broadcast" (using Spark with broadcast variables to hold the factors) performs slower and slower as the rank of the factor matrices is increased from 25 up to 400.

Spark's programming model (mutable accumulators and broadcast variables, immutable RDDs) requires the programmer to assume that the factor matrices will fit in the memory of a single node. For large datasets, this is not always practical: as the input matrix gets larger, so does the size of the factors. For example, for 100 million customers, to compute factorization of rank 200, one needs to store 200 x 100 million = 20 billion floating point numbers for the factor corresponding to customers -- that amounts to 80GB of data. Such a large data structure cannot be easily accommodated in the main memory of a commodity node today. This is especially true in the cloud, where it is substantially easier to get a cluster of virtual machines with aggregate memory that far exceeds 80GB rather than a small number of virtual machines, each with 80GB of memory. Even if this data structure is suitably partitioned, in DSGD, the cost of moving different partitions of the factors to the appropriate nodes using Spark's standard abstractions starts to dominate the overall time taken to factorize the matrix.

In a paper that just got accepted at EDBT 2013, we describe a solution to this problem. We built Sparkler, an extension to the Spark platform to make it easier to to solve large scale recommendation problems using DSGD. The main idea is the introduction of a simple distributed memory abstraction called a

*Carousel Map*(CM) that a programmer is expected to use to hold the factor matrices during DSGD. CMs complement Spark's built-in abstractions like broadcast variables and accumulators that are designed for small mutable models. CMs provide a map API for handling large factors in the aggregate memory of the cluster -- with CMs, we no longer require that factor matrices fit in the main memory of a single node. CMs are carefully designed to exploit the access patterns for the factors in a DSGD algorithm so that most of the the time the lookups and gradient updates are to local memory. When a remote data item is requested, the data is arranged so that the likely cells to be accessed in the near future are bulk-transferred to the local node. The details are in the paper, which I'll link to as soon as the camera ready version is out. In an experimental comparison on various factorization tasks, Sparkler with CMs provided a

**4x to 21x improvement in performance over plain Spark**.

We also added a few other goodies in the Sparkler platform -- automatically picking a good layout for the data (including doing stratification, a key step to parallelize DSGD). This helps minimize data movement during factorization. We also automatically pick an appropriate number of partitions to trade-off partitioning overhead with the benefits of parallelization, and even laid the groundwork for automating fault-tolerance decisions for CMs.