Thursday, June 28, 2012

BigInsights at Vestas

Here's a video describing how IBM BigInsights (the product that is the home for CIF) helps Vestas in analyzing weather data to understand wind energy. A great high-level description of the project is hereCIF, in some small way, is helping the world move towards renewable energy. Pretty neat!

Wednesday, June 27, 2012

CIF: New Features in our Hadoop Column Store

I've been spending some time adding several features to the CIF column store to support requests that BigInsights customers have been making (more on this soon). Here are some of the more interesting ones:

Incremental Load Utilities
One of the first requirements was to be able to support efficient incremental loading of data. If you recall how CIF works, we have a directory per partition, and a file per column under each directory. With incremental load -- there are two options: 1) create a new sub-directory in each partition and add the column files in there or 2) append to the existing files in the partition. Append support in newer versions of HDFS makes option 2) possible. While option 1) is easy, it does lead to the problem of creating many small files over several batches of loading -- this can lead to many performance problems down the road. Option 2) does not create new files, but has its own problems: if a load  utility, running as a MapReduce job fails, this may leave the column store in an invalid state with partially appended data in several files in different partitions. Solving this ended up being a surprisingly fun problem -- I'll describe this in detail in another post.


Partitioning
This is a relatively simple feature as far as the storage layer is concerned. Hive also supports this to a certain extent. The idea is to be able to partition the data on some key -- such as time, geography, etc -- and store the records corresponding to each partition separately. The partitions don't necessarily have to be semantic, you could partition by the hash of some key column if all you cared about was load balancing. Semantic partitioning allows us to eliminate some partitions from being scanned based on query predicates. The Jaql-CIF integration layer takes predicates from Jaql queries and exploits those to eliminate partitions from the column store that are guaranteed not to contain any relevant data for the query. The same thing can be done if you're using CIF with Hive or Pig or Cascading or any other layer that produces MapReduce jobs.
Of course, maintaining the partitions over a long time frame as new data gets loaded can get tricky -- so we needed to build utilities to split and merge partitions.


Concurrent Readers and Writers
The mechanisms that we introduced to make incremental load safe included a simple, lightweight form of versioning. As a result, we were able to support concurrent readers and writers (load jobs) on the same dataset with a basic form of isolation. Definitely useful in a busy multi-user setting.

Fine-grained Splits
The initial version of the column store only allowed for an entire partition to be turned into a split. We added an additional index structure that tracks the offests in each column file for the start of every k-th row in the partition. Managing this adds a little bit of overhead, but allows us to produce more splits (to utilize more map slots) if necessary. This turns out to be extremely useful for tasks that are CPU intensive. If it so happens that the data in the partition is sorted by some key, then this index structure can be used as a true coarse-grained index on this key and we can use this to push down split elimination to a lower level than just partition-sized splits. 

Wednesday, June 20, 2012

Academic Research and Real-World Impact

Joey Hellerstein (one of my favorite researchers in CS) responds to Matt Welsh's criticism of the academic research model.

Tuesday, June 5, 2012

Clydesdale Performance Graph

Clydesdale: SIGMOD Demo



An interactive chart comparing Clydesdale and Hive on the Star-Schema Benchmark that I built for a SIGMOD demo. Scale Factor = 1000, Cluster = 9 worker nodes + 1 master, 8 local disks per node

Tuesday, May 29, 2012

Networking Keynote

Here's Hamilton's summary of Urs Holzle's keynote at the 2012 Open Networking Summit that  seems to have a lot in common with Amin Vahdat's keynote at SIGMOD 2012. The message again seems to be that OpenFlow and SDN are ready for real-world use and already in use as Google. Central traffic engineering has many advantages. Fabric-centric protocols and fabric-centric network management will is critical to managing modern networking infrastructure.

Friday, May 25, 2012

SIGMOD 2012

SIGMOD 2012 just wrapped up, and it was an excellent year! Since SIGMOD accepted fewer than 50 papers this year, the program was much easier to navigate, and I could make it to most of the talks that I thought were interesting. Here are some highlights and (mostly systems-focused) papers that I think are promising and should be fun reads:

Research Papers
  • "Calvin: Fast Distributed Transactions for Partitioned Database Systems" from Yale: Calvin builds on previous work from Yale on exploiting determinism to get high performance in scale-out transaction processing. Interesting and controversial ideas.
  • "bLSM: A General Purpose Log Structured Merge Tree" from Yahoo: A careful implementation of LSM trees that do a much better job of managing memory and avoiding spikey latencies for reads and writes by carefully scheduling merges.
  •  "Towards a Unified Architecture for in-RDBMS Analytics", from Wisconsin: Brings together a bunch of techniques from convex programming, incremental gradient descent, machine learning, and database systems to support a large class of machine learning algorithms efficiently in the RDBMS. Great combination of a bunch of well-known powerful ideas.
Industrial Systems:
  • "F1-The Fault-Tolerant Distributed RDBMS Supporting Google's Ad Business", from Google: Describes the transactional layer built on top of Spanner to support ACID transactions on a globally scalable datastore.
  • "TAO: How Facebook Serves the Social Graph", from Facebook: Large scale data store from Facebook to serve all those petabytes of content.

Keynotes:
Pat Hanrahan talked about the importance of serving the "Data Enthusiast" -- which I interpreted as making database technology much easier to consume, as well as adding new technologies that enabled non-programmers and non-DBAs to access data. He focused on Tableau-style visual exploration and querying.

Amin Vahdat's keynote on networking was one of the most interesting keynotes I've been to in a while. He talked about the current revolution in networking with Software Defined Networking (SDN) rapidly becoming a reality at Google. He also described (perhaps for the first time in public) some of the details of the networking infrastructure in Google both inside the datacenter and across datacenters. His argument was the networking needs data management in the SDN era to do well, and large-scale data management needs "scale-out" networking to really grow to datacenter scales. He pitched it as a perfect opportunity for collaboration across the data management and networking communities!

Monday, May 21, 2012

ZLIB Compression Problem

I recently ran into a weird bug with the FIXED compression strategy in the Zlib library that ships with Hadoop 20.2. Here's a program that simply reads a file, compresses it, and then decompresses it back.

public static void main(String[] args) throws IOException {
        if (args.length < 1) {
            System.out.println("Usage: CompressionVerifier <filename>");
        }
        byte[] onebyte = new byte[1];
        String filename = args[0]; 
        FileInputStream fis = new FileInputStream(new File(filename));
        ByteArrayOutputStream baos = new ByteArrayOutputStream();
        while (fis.read(onebyte) != -1)
        {
            baos.write(onebyte);
        }
        byte[] data = baos.toByteArray();
        
        System.out.println("Compressing file: " + filename);
        //Now compress data.
        JobConf conf = new JobConf();
        DefaultCodec cc = ReflectionUtils.newInstance(DefaultCodec.class, conf);
        cc.setConf(conf);
        Compressor zcom;
        
        zcom = new ZlibCompressor(ZlibCompressor.CompressionLevel.DEFAULT_COMPRESSION, 
                ZlibCompressor.CompressionStrategy.FIXED, // Causes error
                //ZlibCompressor.CompressionStrategy.DEFAULT_STRATEGY, //Works fine
                ZlibCompressor.CompressionHeader.DEFAULT_HEADER, 64 * 1024);
        
        
        baos.reset();
        CompressionOutputStream uncompressedByteStream = cc.createOutputStream(baos,zcom);
        uncompressedByteStream.write(data);
        uncompressedByteStream.close();
        baos.close();
        byte[] compressedData = baos.toByteArray();
        System.out.println("Finished compressing");
        
        DefaultCodec c2 = ReflectionUtils.newInstance(DefaultCodec.class, conf);
        c2.setConf(conf);
        CompressionInputStream inpStr = c2.createInputStream(new ByteArrayInputStream(compressedData));
        System.out.println("Starting decompression");
        while (inpStr.available() > 0)
        {
            inpStr.read();
        }
        System.out.println("Verified File!");
    }

On most inputs, it works fine, and the execution is uneventful. On certain inputs, you get an IOException for an "invalid distance code" --

Exception in thread "main" java.io.IOException: invalid distance code
at org.apache.hadoop.io.compress.zlib.ZlibDecompressor.inflateBytesDirect(Native Method)
at org.apache.hadoop.io.compress.zlib.ZlibDecompressor.decompress(ZlibDecompressor.java:221)
at org.apache.hadoop.io.compress.DecompressorStream.decompress(DecompressorStream.java:80)
at org.apache.hadoop.io.compress.DecompressorStream.read(DecompressorStream.java:74)
at org.apache.hadoop.io.compress.DecompressorStream.read(DecompressorStream.java:62)
at com.ibm.utils.CompressionVerifier.main(CompressionVerifier.java:65)


Has anyone else run into this? Using CompressionStrategy.DEFAULT_STRATEGY fixes the problem, so i assume it is specific to the Z_FIXED strategy. If you know the zlib codebase, and care to help verify/fix the problem, let me know, and I can send you the input file that caused the problem.