I've been spending some time adding several features to the CIF column store to support requests that BigInsights customers have been making (more on this soon). Here are some of the more interesting ones:
Incremental Load Utilities
One of the first requirements was to be able to support efficient incremental loading of data. If you recall how CIF works, we have a directory per partition, and a file per column under each directory. With incremental load -- there are two options: 1) create a new sub-directory in each partition and add the column files in there or 2) append to the existing files in the partition. Append support in newer versions of HDFS makes option 2) possible. While option 1) is easy, it does lead to the problem of creating many small files over several batches of loading -- this can lead to many performance problems down the road. Option 2) does not create new files, but has its own problems: if a load utility, running as a MapReduce job fails, this may leave the column store in an invalid state with partially appended data in several files in different partitions. Solving this ended up being a surprisingly fun problem -- I'll describe this in detail in another post.
Partitioning
This is a relatively simple feature as far as the storage layer is concerned. Hive also supports this to a certain extent. The idea is to be able to partition the data on some key -- such as time, geography, etc -- and store the records corresponding to each partition separately. The partitions don't necessarily have to be semantic, you could partition by the hash of some key column if all you cared about was load balancing. Semantic partitioning allows us to eliminate some partitions from being scanned based on query predicates. The Jaql-CIF integration layer takes predicates from Jaql queries and exploits those to eliminate partitions from the column store that are guaranteed not to contain any relevant data for the query. The same thing can be done if you're using CIF with Hive or Pig or Cascading or any other layer that produces MapReduce jobs.
Of course, maintaining the partitions over a long time frame as new data gets loaded can get tricky -- so we needed to build utilities to split and merge partitions.
Concurrent Readers and Writers
The mechanisms that we introduced to make incremental load safe included a simple, lightweight form of versioning. As a result, we were able to support concurrent readers and writers (load jobs) on the same dataset with a basic form of isolation. Definitely useful in a busy multi-user setting.
Fine-grained Splits
Incremental Load Utilities
One of the first requirements was to be able to support efficient incremental loading of data. If you recall how CIF works, we have a directory per partition, and a file per column under each directory. With incremental load -- there are two options: 1) create a new sub-directory in each partition and add the column files in there or 2) append to the existing files in the partition. Append support in newer versions of HDFS makes option 2) possible. While option 1) is easy, it does lead to the problem of creating many small files over several batches of loading -- this can lead to many performance problems down the road. Option 2) does not create new files, but has its own problems: if a load utility, running as a MapReduce job fails, this may leave the column store in an invalid state with partially appended data in several files in different partitions. Solving this ended up being a surprisingly fun problem -- I'll describe this in detail in another post.
Partitioning
This is a relatively simple feature as far as the storage layer is concerned. Hive also supports this to a certain extent. The idea is to be able to partition the data on some key -- such as time, geography, etc -- and store the records corresponding to each partition separately. The partitions don't necessarily have to be semantic, you could partition by the hash of some key column if all you cared about was load balancing. Semantic partitioning allows us to eliminate some partitions from being scanned based on query predicates. The Jaql-CIF integration layer takes predicates from Jaql queries and exploits those to eliminate partitions from the column store that are guaranteed not to contain any relevant data for the query. The same thing can be done if you're using CIF with Hive or Pig or Cascading or any other layer that produces MapReduce jobs.
Of course, maintaining the partitions over a long time frame as new data gets loaded can get tricky -- so we needed to build utilities to split and merge partitions.
Concurrent Readers and Writers
The mechanisms that we introduced to make incremental load safe included a simple, lightweight form of versioning. As a result, we were able to support concurrent readers and writers (load jobs) on the same dataset with a basic form of isolation. Definitely useful in a busy multi-user setting.
Fine-grained Splits
The initial version of the column store only allowed for an entire partition to be turned into a split. We added an additional index structure that tracks the offests in each column file for the start of every k-th row in the partition. Managing this adds a little bit of overhead, but allows us to produce more splits (to utilize more map slots) if necessary. This turns out to be extremely useful for tasks that are CPU intensive. If it so happens that the data in the partition is sorted by some key, then this index structure can be used as a true coarse-grained index on this key and we can use this to push down split elimination to a lower level than just partition-sized splits.
No comments:
Post a Comment