Wednesday, December 26, 2012

Sparkler: Large Scale Matrix Factorization Using SGD

Personalized recommendation is now a critical component for many content-based web services such as Netflix, Youtube, Pandora, and the various AppStores. The techniques developed in these contexts are now being adapted for use in less obvious content recommendation settings such as your social newsfeed (Facebook, Twitter), the updates from your professional network (LinkedIn), or even in figuring out what advertisements to show you (Google, Facebook, Yahoo, MSN).

There are many ways to solve this problem, but the technique that has been enjoying substantial success is matrix factorization. In particular, for large datasets, the matrix factorization problem is solved using a technique called Stochastic Gradient Descent (SGD). I've written about this before -- in particular, I've talked about some of the cool research on a distributed variant of SGD that was invented at IBM Almaden by Rainer Gemulla, Peter Haas, and Yannis Sismanis.

Solving the matrix factorization problem on very large matrices (with millions of users and potentially milions of items) is a hard problem that has many data management aspects to it. If the items you are trying to recommend are URLs on the web, you may be stuck with tens or hundreds of millions of items. Of course, since there is a large data management aspect to the problem, you ask "Can you use Hadoop to solve this problem?". Well, implementing SGD-like algorithms on Hadoop poses a major challenge. SGD is an iterative algorithm, and makes many passes over the input data. Hadoop is well known to be inefficient for iterative workloads. In fact, many research papers have been written pointing this out, and systems like HaLoop have been proposed to address these shortcomings.

One of my favorite platforms for iterative workloads is the Spark project out of Berkeley's AMP lab. Spark is a parallel programming framework in Scala that supports efficient iterative algorithms on datasets stored in the aggregate memory of a cluster. This is a great fit for SGD-style algorithms. With help from a summer intern, we tried prototyping a few DSGD algorithms on Spark on a large dataset. Much to our surprise, we found that Spark didn't perform nearly as well as we expected it to. In the figure below, the line titled "Broadcast" (using Spark with broadcast variables to hold the factors) performs slower and slower as the rank of the factor matrices is increased from 25 up to 400.

Spark's programming model (mutable accumulators and broadcast variables, immutable RDDs) requires the programmer to assume that the factor matrices will fit in the memory of a single node. For large datasets, this is not always practical: as the input matrix gets larger, so does the size of the factors. For example, for 100 million customers, to compute factorization of rank 200, one needs to store 200 x 100 million = 20 billion floating point numbers for the factor corresponding to customers -- that amounts to 80GB of data.  Such a large data structure cannot be easily accommodated in the main memory of a commodity node today. This is especially true in the cloud, where it is substantially easier to get a cluster of virtual machines with aggregate memory that far exceeds 80GB rather than a small number of virtual machines, each with 80GB of memory. Even if this data structure is suitably partitioned, in DSGD, the cost of moving different partitions of the factors to the appropriate nodes using Spark's standard abstractions starts to dominate the overall time taken to factorize the matrix.

In a paper that just got accepted at EDBT 2013, we describe a solution to this problem. We built Sparkler, an extension to the Spark platform to make it easier to to solve large scale recommendation problems using DSGD. The main idea is the introduction of a simple distributed memory abstraction called a Carousel Map (CM) that a programmer is expected to use to hold the factor matrices during DSGD. CMs complement Spark's built-in abstractions like broadcast variables and accumulators that are designed for small mutable models. CMs provide a map API for handling large factors in the aggregate memory of the cluster -- with CMs, we no longer require that factor matrices fit in the main memory of a single node. CMs are carefully designed to exploit the access patterns for the factors in a DSGD algorithm so that most of the the time the lookups and gradient updates are to local memory. When a remote data item is requested, the data is arranged so that the likely cells to be accessed in the near future are bulk-transferred to the local node. The details are in the paper, which I'll link to as soon as the camera ready version is out. In an experimental comparison on various factorization tasks, Sparkler with CMs provided a 4x to 21x improvement in performance over plain Spark.

We also added a few other goodies in the Sparkler platform -- automatically picking a good layout for the data (including doing stratification, a key step to parallelize DSGD). This helps minimize data movement during factorization. We also automatically pick an appropriate number of partitions to trade-off partitioning overhead with the benefits of parallelization, and even laid the groundwork for automating fault-tolerance decisions for CMs.

Friday, November 16, 2012

Cloudera's Impala

Now that the dust has settled on Cloudera's announcement of Impala, here are my notes. For some quick background, Monash had some early observations here and here. Marcel and Justin have an excellent blog post here.
  • Impala is a parallel query engine for data in HDFS or HBase (no support for inserts/updates/deletes, at least as of now)
  • Queries in Impala are in HiveSQL so you might be able to take your Hive workload and move parts of it to Impala
  • Impala does not use MapReduce
  • Impala does not have UDFs or UDAs, and doesn't support all the OLAP functions and complex nested subqueries
  • Impala is expected to be between 3x and 50x faster than Hive
  • Impala does support joins (unlike Dremel), and will soon have columnar storage through Trevni, but has a row-oriented runtime.
  • Impala has ODBC connectors to Microstrategy and Tableau
That said, here are some nice things about Impala that, IMHO, Monash missed:
  • Impala has a distinct advantage over something like Hadapt. The storage and replication is managed by HDFS instead of a proprietary subsystem that replicates the data that the Postgres nodes need to have available. One fewer thing to manage and administer.
  • Impala has a distinct advantage in terms of the cost of storage (for archival querying) over MPP vendors. HDFS is *way* cheaper than a bunch of SANs. I don't yet know how well the replication story in Aster, ParAccel, Vertica will hold up against HDFS for cost, reliability, and flexibility.
  • If you use Impala for your query processing, you can continue to use Hadoop for scale-out ELT -- on the same cluster. This might be much harder with other MPP databases: that'll probably require two separate clusters administered independently and connected with a "fat pipe" to ETL data from from the Hadoop cluster to the MPP cluster.
  • Impala costs $0 to try, you only pay for support/management tools :-)
Now what are the potential gotchas with Impala?
  • Since you have to have Impala daemons running on the cluster, where do you put them? Can you also run MapReduce on the nodes that run impalad?
  • Assuming you are running a large cluster with Impala jobs and MapReduce jobs, does the scheduler know how to assign resources across the two kinds of processes on the nodes?
  • If only certain nodes in the cluster are dedicated to be Impala nodes you may not be able to guarantee that all the HDFS data is locally available to some Impala worker. 
I'm sure all these problems can be solved (and are probably already being tackled). It will be interesting to see what sorts of workloads Impala serves well, and how well it plays with the rest of the workloads on a Hadoop cluster. I'm excited that Impala ups the game for structured data processing on Hadoop -- an area that I think still has a long way to go.

Monday, October 8, 2012

MongoDB: Why Database folks have it wrong.

It is easy for relational database folks to dismiss MongoDB as  "silly". Research veterans have poked fun at it, there is a great video on youtube ("MongoDB is Web Scale") that cracked me up with its criticism of Mongo fan-boys, a recent VLDB paper from Wisconsin compares MongoDB and Sharded SQLServer on the YCSB benchmark showing that Mongo's performance falls way behind.

I think all of this misses the point. IMHO, MongoDB is not trying to be a more scalable, higher performance database for application developers who couldn't get this from sharded MySQL or any of the other commercial offerings. To understand why Mongo is so interesting, you have to look at the entire application programming stack, and how it is different from the previous generation. The standard 3-tier architecture of database, app/web-servers, browsers is getting rebuilt using very different technologies driven by the enormous amounts of effort pouring into building applications/sites for mobile devices:

A few years ago, building a Web 2.0 application would require: 1) a team that knew Javascript/AJAX stuff really well, 2) a few guys that knew how to write server-side scripting logic in PHP, or Ruby, or Python, 3) basic MySQL skills because you could use the ORM in the app language (Rails, Django etc.) to do most of the talking to the DB. As more and more applications are getting built by highly skilled web-developers, smaller shops may find that being able to build a larger part of their application in a single language, (say Javascript!) might help them get the job done with fewer people as opposed to this traditional choice.

Given the fact that Javascript is a less than ideal language for developing complex applications, there are many pieces of technologies enabling this push. Technologies like node.js provide server-side scripting in Javascript. Given how much effort has gone into engineering V8, it is no surprise that some people such as the mobile app folks at LinkedIn are beginning to notice that node.js may actually provide faster throughput than RubyOnRails. Now, the same team of web developers building browser code can also build a big chunk of the application logic in Javascript. Technologies like backbone.js provide models with bindings to key-value stores and rich collection APIs to query/process them. This is making it easier to build complex applications in Javascript.

Now, can we provide an persistence layer here that is extremely agile, flexible, and one that can be accessed from code in a Javscript VM? I think MongoDB is trying to be that persistence layer. Relational databases still offer too much friction for flex-schema applications written in Javascript. A database that does a really good job of storing JSON is likely to be more successful. There are certainly many ways to talk to a relational database from a Javascript VM: node.js has ORMs that work with several databases as described here. For high-performance web-sites built using Javascript for both client-side and server-side scripting, talking to relational DBs through ORMs in JS is probably the stable answer for now. But as MongoDB or a similar JSON-store matures, the friction that ORM+RDBM involves may start to outweigh the performance benefits of RDBMSes.

If all three tiers of the stack can now be programmed in Javascript, this opens up very interesting new questions: with the right compiler technologies, we can fluidly determine the boundary between client-side logic and server-side logic as the application and the client hardware (phones, tablets) evolve! Deploying the app on a CPU-poor device? Push more of the processing logic to the server side. Is the app being used on a desktop? Then push more of the processing to the client side. Having everything in Javascript is going to make it much easier to build applications like this!

Tuesday, September 25, 2012

Google Spanner

Google has a paper describing Spanner, the successor to BigTable, that will be presented at OSDI in October this year. Spanner is a globally distributed database and the paper is full of very interesting insights into the problems some of Google's best systems engineers have been tackling over the last five years. Here are my takeaways so far:

1. Spanner was designed to tackle some of the shortcomings of BigTable: a) cross-datacenter replication with the right consistency options, and b) support for application dealing with complex evolving schemas and full-transactions.

2. A critical design difference from BigTable is that Spanner does not use GFS, the distributed filesystem, for replication. Instead, replication is handled by sharding the key space over Paxos cohorts. This is very similar to the replication design used in Spinnaker (I presented this at VLDB 2011). I have described the advantages of this approach over using a distributed filesystem in a previous post. This clearly allows the engineers of Spanner much more control over performance, consistency, and availability trade-offs. Spanner goes well beyond changing just the replication architecture ...

3. Transactions: Spanner supports ACID transactions. Every replica that is a leader in the Paxos cohort also keeps a lock table to implement concurrency control. There's a transaction manager at every spanserver that can use this lock table. Details of the concurrency control algorithms are in Section 4 in the paper. Since each transaction manager's state is made highly available with Paxos, distributed transactions can use 2-Phase commit without having to worry about the lost leader problem. Gray and Lamport described some ideas on combining Paxos with 2-Phase commit for highly available distributed transactions in 2004.

4. TrueTime API: The paper talks about a novel approach for coordinating clocks across the planet to enable features like: a) externally consistent transactions, b) lock-free read-only transactions, and c) non-blocking reads of the past. The fascinating part here is that to get good bounds on clock uncertainty and guarantee high availability, they engineered the TrueTime algorithms around GPS clocks and atomic clocks hooked up to time-servers in each datacenter!

While Spanner does a lot more than the BigTable/GFS combination, the best thing about those systems was the simplicity -- Google engineers cleverly choose  not to build certain features so they could scale and at the same time support a large class of applications. With Spanner, they can probably support most database applications, but the complexity of the system is substantially greater. One would have to understand Chubby, GFS, BigTable, *and* database recovery and concurrency control algorithms to appreciate all the complexity that goes into building a system like Spanner.

Wednesday, August 22, 2012

Amazon Glacier

Amazon recently introduced Glacier, a cloud data archival service. Based on James Hamilton's post, is presumably built on a large, shared, tape infrastructure. With the addition of this service, Amazon now provides a very compelling set of storage services to suit a whole variety of application needs.

Service Purpose Pricing (As of August 2012)
ElastiCache Memcached service 9 cents/hour for 1.7GB node ( = 3800 cents/GB-month, approx.)
DynamoDB Low latency, guaranteed throughput, indexed, SSD-based NoSQL store 100 cents/GB-month
SimpleDBOlder, more complex NoSQL store25 cents/GB-month
EC2 Local Storage Instance-local storage Approx 36 cents/GB-month
EBS Reliable block store 10 cents/GB-month
S3 Cross-datacenter durable storage 12.5 cents/GB-month or, for reduced redundancy, 9.3 cents/GB-month
Glacier Archival storage 1 cent/GB-month

Each service has a different pricing structure based on the I/O requests you subject it to, and different guarantees about how long the requests might take. But if you look at it purely from a storage cost standpoint, you have a clear idea of how much it costs a developer to keep a piece of data in memory, on SSDs, on local on spinning disks, in-datacenter-redundant disks, cross-datacenter-redundant disks, or on tape!

Wednesday, August 8, 2012

Scala, DSLs, and Big Data

I've been playing with Scala for a bunch of different projects the last several months. One of the most interesting things Scala offers for large-scale data management is the possibility of easy-to-build data processing DSLs (Domain Specific Languages).

Why do we suddenly need DSLs? SQL has served us well for the last several decades, and perhaps will continue to do so for most conventional business query processing needs. However, several new classes of data processing problems seem to require programmatic data access, not just query processing. SQL is a great language when your data processing application looks like it needs to perform joins, selections, projections, and aggregation. A vast majority of data processing falls under this category. When the task at hand is substantially different, such as assembling and analyzing DNA sequences, understanding sentiment about various topics from social media feeds, recommending content based on click streams and ratings, or building up ETL workflows (a task that SQL systems have traditionally been bad at), or statistical/predictive analysis - SQL is neither convenient nor a particularly effective interface for expressing the logic involved. Sure, you can solve the problem using SQL + UDFs  + scripting. But there are more convenient ways to do it. Besides, SQL optimizers work hard to figure out a good join order and join plan, de-correlate nested subqueries when required, push down predicates. If you application doesn't really need these sorts of optimization, the inconvenience of expressing your logic in SQL doesn't buy you anything!

The promise of embedded DSLs in Scala is that you can devise a sub-language that is convenient for solving problems in the domain of interest. At the same time, you might also be able to specify certain domain specific optimizations that might help execute the DSL program more efficiently. However, being an embedded DSL means that snippets of code in this DSL are also valid Scala code. So you still get the benefits of the Scala compiler and the optimizations that the JIT compiler can provide (Scala runs on the JVM).  On the easy-to-build front, this also means you don't have to build separate tooling (editors, compilers, debuggers) for the DSL -- you can simply continue to use the tooling around Scala. DSLs provide an interesting new tradeoff between building a library in a host language vs. building an entirely new language.

People have done lots of interesting things in this space:
  • Scalding (A Scala DSL built on Cascading, popular at Twitter)
  • ScalOps (From Tyson Condie and others, formerly at Yahoo Research)
  • Spark (from the AMP Lab at Berkeley)
  • Delite and OptiML (from Stanford)

The SIGMOD demo I did this year for Clydesdale used a simple SQL-like DSL to express the star schema benchmark. I did this mostly to convince myself (and others) that it was simpler/easier to use this DSL mechanism than build a SQL parser and compiler from scratch. It certainly was. Also it let us use java libraries in our data processing without the inconveniences that UDFs (user defined functions) cause in many databases.

Another project where I found this very useful was as a DSL for analyzing DNA sequences and looking for statistically significant SNPs in large data sets. I haven't yet gotten around to writing that work up in detail, but it was a fun project that showed how a DSL that compiles down to the Spark runtime can handily beat an approach like combining Hadoop and R (such as the one in RHIPE or Revolution R's R/Hadoop) and having the developer write code that spans the two systems in awkward ways.

I expect we'll see more interesting DSLs that compile down to runtimes on Hadoop, MPI, Spark, Pregel/Giraffe and other frameworks in the next few years.

Thursday, June 28, 2012

BigInsights at Vestas

Here's a video describing how IBM BigInsights (the product that is the home for CIF) helps Vestas in analyzing weather data to understand wind energy. A great high-level description of the project is hereCIF, in some small way, is helping the world move towards renewable energy. Pretty neat!

Wednesday, June 27, 2012

CIF: New Features in our Hadoop Column Store

I've been spending some time adding several features to the CIF column store to support requests that BigInsights customers have been making (more on this soon). Here are some of the more interesting ones:

Incremental Load Utilities
One of the first requirements was to be able to support efficient incremental loading of data. If you recall how CIF works, we have a directory per partition, and a file per column under each directory. With incremental load -- there are two options: 1) create a new sub-directory in each partition and add the column files in there or 2) append to the existing files in the partition. Append support in newer versions of HDFS makes option 2) possible. While option 1) is easy, it does lead to the problem of creating many small files over several batches of loading -- this can lead to many performance problems down the road. Option 2) does not create new files, but has its own problems: if a load  utility, running as a MapReduce job fails, this may leave the column store in an invalid state with partially appended data in several files in different partitions. Solving this ended up being a surprisingly fun problem -- I'll describe this in detail in another post.

This is a relatively simple feature as far as the storage layer is concerned. Hive also supports this to a certain extent. The idea is to be able to partition the data on some key -- such as time, geography, etc -- and store the records corresponding to each partition separately. The partitions don't necessarily have to be semantic, you could partition by the hash of some key column if all you cared about was load balancing. Semantic partitioning allows us to eliminate some partitions from being scanned based on query predicates. The Jaql-CIF integration layer takes predicates from Jaql queries and exploits those to eliminate partitions from the column store that are guaranteed not to contain any relevant data for the query. The same thing can be done if you're using CIF with Hive or Pig or Cascading or any other layer that produces MapReduce jobs.
Of course, maintaining the partitions over a long time frame as new data gets loaded can get tricky -- so we needed to build utilities to split and merge partitions.

Concurrent Readers and Writers
The mechanisms that we introduced to make incremental load safe included a simple, lightweight form of versioning. As a result, we were able to support concurrent readers and writers (load jobs) on the same dataset with a basic form of isolation. Definitely useful in a busy multi-user setting.

Fine-grained Splits
The initial version of the column store only allowed for an entire partition to be turned into a split. We added an additional index structure that tracks the offests in each column file for the start of every k-th row in the partition. Managing this adds a little bit of overhead, but allows us to produce more splits (to utilize more map slots) if necessary. This turns out to be extremely useful for tasks that are CPU intensive. If it so happens that the data in the partition is sorted by some key, then this index structure can be used as a true coarse-grained index on this key and we can use this to push down split elimination to a lower level than just partition-sized splits. 

Wednesday, June 20, 2012

Academic Research and Real-World Impact

Joey Hellerstein (one of my favorite researchers in CS) responds to Matt Welsh's criticism of the academic research model.

Tuesday, June 5, 2012

Clydesdale Performance Graph

Clydesdale: SIGMOD Demo

An interactive chart comparing Clydesdale and Hive on the Star-Schema Benchmark that I built for a SIGMOD demo. Scale Factor = 1000, Cluster = 9 worker nodes + 1 master, 8 local disks per node

Tuesday, May 29, 2012

Networking Keynote

Here's Hamilton's summary of Urs Holzle's keynote at the 2012 Open Networking Summit that  seems to have a lot in common with Amin Vahdat's keynote at SIGMOD 2012. The message again seems to be that OpenFlow and SDN are ready for real-world use and already in use as Google. Central traffic engineering has many advantages. Fabric-centric protocols and fabric-centric network management will is critical to managing modern networking infrastructure.

Friday, May 25, 2012


SIGMOD 2012 just wrapped up, and it was an excellent year! Since SIGMOD accepted fewer than 50 papers this year, the program was much easier to navigate, and I could make it to most of the talks that I thought were interesting. Here are some highlights and (mostly systems-focused) papers that I think are promising and should be fun reads:

Research Papers
  • "Calvin: Fast Distributed Transactions for Partitioned Database Systems" from Yale: Calvin builds on previous work from Yale on exploiting determinism to get high performance in scale-out transaction processing. Interesting and controversial ideas.
  • "bLSM: A General Purpose Log Structured Merge Tree" from Yahoo: A careful implementation of LSM trees that do a much better job of managing memory and avoiding spikey latencies for reads and writes by carefully scheduling merges.
  •  "Towards a Unified Architecture for in-RDBMS Analytics", from Wisconsin: Brings together a bunch of techniques from convex programming, incremental gradient descent, machine learning, and database systems to support a large class of machine learning algorithms efficiently in the RDBMS. Great combination of a bunch of well-known powerful ideas.
Industrial Systems:
  • "F1-The Fault-Tolerant Distributed RDBMS Supporting Google's Ad Business", from Google: Describes the transactional layer built on top of Spanner to support ACID transactions on a globally scalable datastore.
  • "TAO: How Facebook Serves the Social Graph", from Facebook: Large scale data store from Facebook to serve all those petabytes of content.

Pat Hanrahan talked about the importance of serving the "Data Enthusiast" -- which I interpreted as making database technology much easier to consume, as well as adding new technologies that enabled non-programmers and non-DBAs to access data. He focused on Tableau-style visual exploration and querying.

Amin Vahdat's keynote on networking was one of the most interesting keynotes I've been to in a while. He talked about the current revolution in networking with Software Defined Networking (SDN) rapidly becoming a reality at Google. He also described (perhaps for the first time in public) some of the details of the networking infrastructure in Google both inside the datacenter and across datacenters. His argument was the networking needs data management in the SDN era to do well, and large-scale data management needs "scale-out" networking to really grow to datacenter scales. He pitched it as a perfect opportunity for collaboration across the data management and networking communities!

Monday, May 21, 2012

ZLIB Compression Problem

I recently ran into a weird bug with the FIXED compression strategy in the Zlib library that ships with Hadoop 20.2. Here's a program that simply reads a file, compresses it, and then decompresses it back.

public static void main(String[] args) throws IOException {
        if (args.length < 1) {
            System.out.println("Usage: CompressionVerifier <filename>");
        byte[] onebyte = new byte[1];
        String filename = args[0]; 
        FileInputStream fis = new FileInputStream(new File(filename));
        ByteArrayOutputStream baos = new ByteArrayOutputStream();
        while ( != -1)
        byte[] data = baos.toByteArray();
        System.out.println("Compressing file: " + filename);
        //Now compress data.
        JobConf conf = new JobConf();
        DefaultCodec cc = ReflectionUtils.newInstance(DefaultCodec.class, conf);
        Compressor zcom;
        zcom = new ZlibCompressor(ZlibCompressor.CompressionLevel.DEFAULT_COMPRESSION, 
                ZlibCompressor.CompressionStrategy.FIXED, // Causes error
                //ZlibCompressor.CompressionStrategy.DEFAULT_STRATEGY, //Works fine
                ZlibCompressor.CompressionHeader.DEFAULT_HEADER, 64 * 1024);
        CompressionOutputStream uncompressedByteStream = cc.createOutputStream(baos,zcom);
        byte[] compressedData = baos.toByteArray();
        System.out.println("Finished compressing");
        DefaultCodec c2 = ReflectionUtils.newInstance(DefaultCodec.class, conf);
        CompressionInputStream inpStr = c2.createInputStream(new ByteArrayInputStream(compressedData));
        System.out.println("Starting decompression");
        while (inpStr.available() > 0)
        System.out.println("Verified File!");

On most inputs, it works fine, and the execution is uneventful. On certain inputs, you get an IOException for an "invalid distance code" --

Exception in thread "main" invalid distance code
at Method)

Has anyone else run into this? Using CompressionStrategy.DEFAULT_STRATEGY fixes the problem, so i assume it is specific to the Z_FIXED strategy. If you know the zlib codebase, and care to help verify/fix the problem, let me know, and I can send you the input file that caused the problem.

Friday, April 20, 2012

Chain Replication and Consensus

A few weeks ago, I had a chance to chat with one of the grad students who worked on the FAWN project at CMU. FAWN is a pretty cool systems project and their original paper is on my NoSQL reading list. FAWN uses chain-replication to provide high-availability. I wanted to understand the gap between chain-replication and consensus-based approach like in Spinnaker. The group also has a project called Ouroboros that extends chain-replication with non-blocking node addition and removal protocols, but the properties I discuss here apply to basic chain replication as well as Ouroboros.
Now, as described in the paper, chain replication needs f + 1 nodes to tolerate f failures. Paxos needs 2f + 1 nodes to tolerate f failures. This is a nice advantage for chain replication, but what is it really giving up? Turns out that the answer is that this technique leaves it vulnerable to certain failure sequences.

If we have f + 1 failures at once -- such as the whole cluster losing power because of a data center outage -- we might end up in an inconsistent state with the FAWN strategy.  Here's an example: Once power is restored, say one of the nodes in a group of three, say A comes up. The other two (B and C) have trouble, and have not yet come up. The node A is designated the head of the chain (by an external master), and starts accepting updates. This is okay since chain replication in FAWN allows you to accept writes with a single node (this is not described in the paper, but in a personal communication). Now A fails. One of the other nodes (say B) now comes up, and gets designated the master B. Recall that B only had a transient failure, and has most of the state except the recent writes that were accepted by A. However, if B serves reads, it will serve stale data and the client may notice that the system has lost some of its updates. If B now accepts updates, things could get very difficult to resolve once A comes back up.

This is rather obvious once you think about it:  you can operate in the "read quorum + write quorum < replica group size" region if you are willing to assume that certain failure sequences will not happen. When such a failure sequence happens, you lose consistency! If we're willing to risk the fact that we will never have a cluster-wide outage, FAWN-style chain replication can provide great performance and non-blocking reconfiguration for up to f failures in a replica group with f+1 members. Apart from being more available for reads and writes in the presence of failures, chain replication is a easier to implement than a a consensus protocol. Are there other relaxations from consensus that would be useful if we knew that certain kinds of failure sequences would be exceedingly rare? What if we could assume that two nodes in a replica group would not fail within k minutes of each other?

On a related note, there's a PODC paper from MSR in 2009 that talks about how you can view chain replication as a special case of Vertical Paxos. But in this paper, the write quorum here has to be the entire replica group -- that would mean the system would block for writes if any node failed, but could serve reads as long as at least one node was up. With this more conservative protocol, of course, chain replication would tolerate the same failure sequences that regular Paxos would.

Monday, March 12, 2012

Clydesdale Updates

The Hadoop Summit is coming up in June in San Jose. Every year, this event gets bigger and more interesting.
I've submitted an abstract to talk about Clydesdale and structured data processing on Hadoop in general. You can vote for it here:

In other news, our demonstration proposal for Clydesdale got accepted at SIGMOD. If you're going to be at the conference in Arizona in May this year, drop by for a chat and for the latest updates on what we have cooking in this space.

Tuesday, March 6, 2012

Re-Sequencing in Bioinformatics

Genome sequencing is still one of the major problems in bio-informatics where one has to deal with large amounts of data. As advances in technology make it progressively cheaper to sequence an entire genome, scientists are coming up with many interesting questions they can tackle with sequencing.

I had assumed that the computational aspects of sequencing (or more specifically, re-sequencing) was a mostly solved problem that didn't really need much attention from computer scientists anymore. (I haven't spent much time or energy in this space since my grad school days.) Turns out there have been some very interesting developments in this space in the last three years.

Here's the basic problem definition: you get a tissue sample from someone and stick it in a machine. The machine spits out many "reads" that are short strings over the alphabet {A,C,G,T} -- each of these reads is the sequence for some portion of the DNA in the sample. These are noisy measurements, so there may be occasional errors. Over a few hours, the machine spits out many millions of reads. Now, the computational task is to build up the best guess for the whole sequence from which these reads arose given a reference sequence that is "pretty close".

The general idea that many algorithms follow is to align these reads to the reference sequence while tolerating a few mismatches. Then, for each position in the reference sequence, looking up the consensus of all the aligned reads to figure out the base (A, C, G, T) at that position. There are many variants to this approach, some use indexes on the reference sequence, some use indexes on the reads, some use heuristic based approximate alignments, some use a relatively expensive Smith-Waterman style alignments, sometimes there are multiple candidate reference sequences, and the reads themselves could come with some additional constraints (paired-end reads).

Here's a list of recent papers/projects dealing with the basic version of this problem:

SNAP is the newest one out of Berkeley, MSR, and UCSF and promises to be more than 10x faster than the competition. Apart from the ones listed above, there are many proprietary algorithms that come with the sequencing machine you buy. And of course, there's a host of older algorithms that the papers above built on.

There are some interesting data management aspects to this problem beyond just sequencing: different people care to different degrees about the actual alignment method used and how much data is retained. Often, the interesting analyses come after the alignment and SNP-identification, may only have to go back as far as the aligned sequence.  Some scientists may want to go back all the way to the reads once they have completed some downstream analysis and want to verify the significance of their findings. If you're a big hospital, and are sequencing thousands of patients -- managing the reads, the aligned sequences, and the results of downstream experiments could easily be a multi-petabyte problem.

Thursday, March 1, 2012

Research Jobs at Almaden

Our group at IBM Almaden is looking to hire short term post-docs (for 2 year stints) as well as permanent research staff. If you're a graduating PhD student who has worked on some aspect of large scale data management research, and are excited about the sorts of problems that I describe on this blog, send me an email!

Thursday, February 16, 2012

A NoSQL Reading List

I've been putting together a list of papers for interns, post-docs, and other new folks at Almaden who are getting interested in the NoSQL/NewSQL space. Here's what I have so far. Any suggestions/additions to the list would be welcome -- leave them around as a comment.

Basic System Design
BigTable,  Dynamo, and PNUTS are the three big systems running real applications and should be required reading. FAWN is an excellent read. The HStore paper talks about traditional OLTP, but is definitely closely related. Hyder is also in the traditional OLTP space and is probably less relevant to the NoSQL space than HStore, but this is a very cool system and explores a major departure from traditional OLTP system design. The RamCloud effort from Stanford has made some interesting choices that allow very good availability numbers. Finally, I'll include a shameless plug for my own paper, Spinnaker, which I think makes some interesting points about this space.

Consistency, Indexing, Transactions

Other Industrial/Open-Source Systems and Articles
  • Curt Monash's valiant struggle to distinguish between traditional OLTP and "NoSQL" apps: HVSP 
  • Industrial MySQL Scaleout approaches: Clustrix, Schooner , dbShards
  • Oracle NoSQL Whitepaper 
  • HBase, Cassandra, MongoDB -- codebases/architecture

Tuesday, February 14, 2012

DynamoDB = Cassandra-as-a-Service ?

Amazon recently introduced a new database service: DynamoDB to add to its existing set of database services: SimpleDB and RDS (the hosted MySQL service). DynamoDB seems to be a higher-scale, lower function version of SimpleDB: it does away with the limitation that data be partitioned into 10GB sized SimpleDB domains. It also provides much more predictable performance and you can provision for your expected read and write rates. In exchange, you give up the automatic indexing ability that SimpleDB had. The querying ability goes away too -- you fall back to get/put requests addressed by primary keys. The conditional put/delete calls are still there as a form of simple concurrency control.

DynamoDB comes pretty close to being a pay-as-you-go version of Cassandra. Not surprising, since at least one of the original developers of Cassandra (Avinash Lakshman) actually worked on the Dynamo project at Amazon before joining Facebook.

The major Cassandra advantage seems to be secondary indexes. I'm not sure how important local secondary indexes are for short-request apps, but it could be useful to do predicate scans when you hook up MapReduce to the data in Cassandra. The major feature that DynamoDB has that Cassandra is missing is conditional put/delete calls. This is a rather tricky feature -- Jonathan Ellis (Datastax), Jun Rao, and I tried to add this in the early days of Cassandra ( and quickly realized it was a good bit harder to provide clean semantics for this call on an eventually consistent datastore.

Jonathan Ellis at DataStax posted a feature-comparison of Cassandra and DynamoDB at The short summary is: if you are running things on the cloud, then DynamoDB is indeed a compelling choice.  If you are a very large operation, and want a multi-datacenter deployment, expect to need more functionality than DynamoDB (snapshots, integrated caching), and have an ops team that can manage it, then Cassandra is probably a better bet.

Wednesday, February 8, 2012

Clydesdale Overview

Here's a quick preview of the architecture of Clydesdale and details on its performance. I'll link to the camera-ready version of the EDBT paper soon. The big-picture idea in Clydesdale is the same as Hive: a SQL-like interface for processing structured data on Hadoop. The target space where this is likely to be most useful is large investigative marts. (See my older post and this post from Monash for backgroud.)

The clients submit a SQL query to Clydesdale, the compiler turns it into one or more MapReduce jobs. (There are ways to access Clydesdale using more programmatic APIs, but that's for another post :) )The fact table is on HDFS, stored using CIF. A master copy of the dimension tables is available in HDFS. Dimension tables are also cached on the local storage of each node. The picture below shows the general architecture -- Clydesdale layers neatly on top of unmodified Hadoop.
Clydesdale is focused on processing queries over star schema data sets. It uses several techniques like columnar storage, block iteration, multi-core aware operators, and a multi-way map-side join plan. This combination of techniques adds up to a massive advantage over Hive, especially for the star schema benchmark. The figure below shows the execution time for each of the queries in the benchmark using 1) Clydesdale, 2) the repartition join strategy in Hive, and 3) the map-join strategy in Hive. Depending on the query, Clydesdale is 5x to 83x faster than the best Hive plan, averaging about 38x across the benchmark. These are numbers on a 9-node cluster running the star-schema benchmark at scale factor 1000. The general trend holds for larger clusters and larger datasets. 
To be fair, Hive is general purpose and not really focused on doing well on star-schema workloads. The comparison is not intended as a criticism of Hive, but as evidence for how much performance potential is available on the Hadoop platform for structured data processing! While Hadoop was originally designed for large parallel programming tasks that were more compute-intensive (eg. analysing web pages and building search indexes), the platform is not so bad when it comes to structured data processing. The ideas in Clydesdale can be used to tremendously improve the performance of Hive and similar systems.

What does this mean for the larger Hadoop ecosystem? This is very good news for BI (Business Intelligence) vendors on Hadoop like Pentaho, Datameer, Platfora, and others. If you rely on SQL-like processing to deliver business insights, and the data lies in Hadoop, we can now do a whole lot better than today's Hive performance!

Tuesday, January 10, 2012

Clydesdale: SQL on Hadoop

Clydesdale is an experimental system we built at Almaden for processing structured data on Hadoop. It is obvious that structured data processing is becoming an increasingly important workload on Hadoop clusters. While Hive is great at letting people use SQL to process large amounts of structured data on Hadoop, it is surprisingly inefficient. Researchers have recently argued that there is more than an order of magnitude inefficiency for relational-like workloads on Hive (see this paper). Clydesdale is a new way of tackling this problem. In a paper that was recently accepted to EDBT 2012, we showed that Clydesdale is 38x faster than Hive on the popular Star-Schema Benchmark.

Clydesdale is aimed at workloads where the data fits a star schema. A substantial fraction of structured data repositories follow either a star schema or a snowflake schema. Clydesdale is specialized to perform well for these, but it can work on any arbitrary schema. We draw on many well known techniques from the parallel DBMS world like column oriented storage, tailored join-plans, and multi-core execution strategies to achieve dramatic performance improvements over existing approaches. The best part is that Clydesdale works on a stock Hadoop installation -- no modifications are needed to the underlying implementation! Using the star schema benchmark, we show that Clydesdale is 5x to 89x faster for different queries, averaging a 38x advantage. I'll post a link to the paper with all the details as soon as I have the camera-ready version finalized for EDBT. 

Here's a high-level overview of the architecture and the query processing strategy: The fact table is stored on HDFS using CIF, a columnar storage approach described in our earlier VLDB paper. (I've also previously discussed CIF here and here.) A master copy of the dimension tables is also stored in HDFS, additionally, they are also replicated to the local storage on each node in the cluster. SQL queries submitted to Clydesdale turn into MapReduce jobs. Join processing is done in the map phase: mappers apply predicates to the dimension tables and build hash tables in the setup phase, then, they join the rows of the fact table by probing the hash tables and emitting those that qualify the join. The reduce phase is responsible for grouping and aggregation. Clydesdale draws on several strategies for performance improvement: careful use of multi-core parallelism, employing a tailored star-join plan instead of joining tables two-by-two, columnar storage and -- to a limited extent -- columnar execution. None of these techniques is singularly responsible for the overall improvement -- each of them contribute to the 38x overall speedup we measured over Hive.

Clydesdale's central contribution is a demonstration that a simple architecture that leverages existing techniques from parallel DBMSs can provide substantial benefits for structured data processing on MapReduce. The experimental results demonstrate that for certain structured data processing workloads, it is not necessary to resort to more radical approaches like discarding HDFS and embedding a relational database (such as in HadoopDB); or requiring unusual storage organization and join processing techniques (such as in Llama). Clydesdale layers on an unmodified Hadoop installation and provides major performance improvements for structured data processing.