Posted on 2nd July 2010 by Arseny KaplunRecently, we faced a problem of categorization of shopping items : applying a complex set of regular expressions to product description, figure out category of product and extract category-dependent fields.
Since original data is a result of web crawling, majority of records do not change since previous crawl while 1% of records do actually change. This led to idea of incremental processing when we initially select altered records and then process only those that had changed.
Original algorithm was to run a set of map-reduce jobs:
- “Diff”. Compare older data and new data to fetch updated records. Create a ‘diff’ that contains input for updated / new records and list of deleted keys.
- “Extract”. Convert diff to hadoop MapFile format, along with fetching fields (desired processing) for updated & new records. Deleted records are mapped to deletion marker.
- “New”. Rip out new records to a separate output which along with output of step 4 forms final result.
- “Merge”. Go through previous processing results, using diff as a dictionary, replacing updated records and removing deleted ones.
Initial implementation performed really bad, it worked even longer than just processing of all fields on a single machine. So, we started optimization which included following steps:
1. Use custom WritableComparable.
Initially, we used CSV text file as intermediate format. Now, we decided to implement custom Writable class (fig. 1) for values and WritableComparable (fig. 2) class for keys of intermediate data.
figure 1: RecordKeyWritable.java
figure 2: RecordWritable.java
Surprisingly, this optimization produced a little impact, about a 1% of total runtime, mostly by eliminating time for string splitting.
2. Adding combiner
Next, we decided to implement efficient combiner that will combine all records of the same product for both Diff andMerge stages. See figure 3 for example of combiner for Diff stage.
figure 3: RecordCombiner.java
After enabling combiner for Diff we won 15% of time, and Merge time decreased by 20%. Data transfer volume decreased significantly (up to 2x).
3. Upgrade Hadoop 0.18.3 to 0.20.2.
We did not expected that, but just this cut total processing time by 30%, mainly because shuffle performance increased dramatically. At the time of our tests, Hadoop 0.20 could not be deployed on Amazon EC2 very easy, so we made a couple of fixes in scripts. Anyway this should be fixed in Hadoop repository by now.
4. Enable GZIP compression for input, output and intermediate data.
We’ve tried to compress input data, providing input format compression, output compression and intermediate (sequence file) format compression. However, we could not notice impact of any of these optimizations
figure 4: Snippets enabling GZIP compression for M/R output
Gzipped input is understood by fileInputFormat & SequenceFileInputFormat transparently
5. Enable LZO compression for intermediate data.
LZO compression achieves medium compression rate at a pretty good speed of compression and outstanding speed of decompresion. This makes LZO codec ideal for compression of intermediate data (Mapper’s output) so it allows to highly reduce traffic between mappers and reducers at a low computational price. Additionally, it supports independent block decompression. We tried LZOcodec from hadoop-0.18 pack and it allowed us to save 5% of total processing time. Unfortunately, LZO support was removed from hadoop-0.20 due to license restriction. There’re several alternative projects in progress, but at the time of our tests there was no out-of-the-box solution.
figure 5: snippets enabling LZO compression for traffic between mappers & reducers
6. Tune number of mappers and reducers
After a set of experiments we found out that manual setting number of mappers and reducers can speed up M/R substantially. First, check that map tasks are small enough so all nodes are equally loaded and they are not too small to suffer from initialization overhead (map task execution time lower than 20 seconds is known to be bad).
Further optimization is achieved by tuning ratio of number of mappers to number of reducers. Just by setting appropriate values via conf.setNumMapTasks() and conf.setNumReduceTasks() we were able to cut total processing time by 30%! Unfortunately, optimal parameter values strongly depends on data size and distribution. It appears that taking 3 * input size / block_size mappers and 1 reducer per slave node is a good point to start optimizing.
7. Map side join
We have also tried map-side join for Merge. In order to do this we have to get data for “Merge” step sorted and partitioned the same way. This allows us to perform merge join – our mapper reads two input simultaneously matching records of the same key. This also required extra-map reduce to merge old results (as we did not merged results of update&delete with new before). Also we could not avoid sort & shuffle since we needed IdentityReducer to keep processing results in single file. So finally we could not achieve significant performance increase (5% of Merge time savings) at a price of much higher complexity.
8. Put diff map to local disk instead of HDFS
The next step was to put diff-file to each merging node in local file system, instead of sharing it by hdfs. This saved us 10% more of merge time. Anyway map-file lookup was still too slow taking 95% of merging time so we looked for keeping it as in-memory distributed hash map (DHT).
9. Replace MapFile with in-memory cache
Instead of hadoop map file (which is intended to keep index in memory) we switched to use of in-memory hash map keeping both index and values in memory. Result was outstanding: Merge step became 70% faster and New step got 60% faster. Since our diff was small enough to fit in memory of slave node, we used our own simple HT:
- - it serves as a http-service
- - on start it reads the diff file and populates hash table with it
- - the service is terminted when Merge join is over
If diff size gets bigger so it could not fit in memory of a single slave node, we will have to look up for more complicated solution. Probably one of available in-memory hash table implementation would fit, so this needs further investigation..
If you would face a problem of optimizing of map-reduce task here is our suggestions:
- Start with tuning map-reduce tasks number.
- Try latest hadoop version
- Avoid map-file as a dictionary data : try to put it in memory, reconsider algorithm, look for third-party solution.
- Add combiners and switch to custom record format.
- Try replace reduce-side join with map-side one
This table uncovers some details of optimizations tried and level of effort for each of those:
|Custom record format||Total time reduced by 1%||2 hours of coding.|
|Adding combiners||This, along with Custom record
formal>, reduced total time by 17%. Difftime reduced by 15% and Merge time reduced by 20%.
|2 hours of coding.|
|Update hadoop||Total time reduced by 30% by similar speed up of all steps||Had to fix deployment scripts for
EC2, this should be already fixed in codebase.
|GZipcompression of input/output||No visible effect||Two lines in JobDriver
|LZO compression of intermediate data||Total processing time reduces by 5%||Two lines of code in JobDriver in Hadoop 0.18.
Significant effort (2-3 days) of adding LZO-like compression to Hadoop 0.20 (until the compression in incorporated in Hadoop distribution)
|Map-reduce task tuning||Total processing reduced by 30%||A week of experiments. Note that this
should be revisited when data size or distribution changes (and we know it
|Map side join||Negative: total processing time increased.||A week of coding & experiments.|
|Put dictionary data in local file system||10% saved on Merge,
total time reduces by 7%
|Couple of lines in running scripts.|
|Put dictionary data in memory||70% Merge speed up, 60%
New speed up, total time reduced by 50%.
|2 days of coding for simple in-memory hash-table. In case when dictionary data does not fit in memory, reconsideration is
required. Custom in-memory DHT may take a bliss of time, commercial promises
to be pricey, open solutions should be investigated..