HBase 入門 (2)

HadoopMapReduce で HBase を使ってみましょう。

1. セットアップ

$HADOOP_HOME/conf/hadoop-env.sh の HADOOP_CLASSPATH に HBase のパスを加えます。

export HADOOP_CLASSPATH=$HBASE_HOME/hbase-0.2.1.jar:$HBASE_HOME/conf

前回のプログラムで動作確認。

debian:~/tmp/hbase% jar cf hbase-basic.jar HBaseBasic.class
debian:~/tmp/hbase% $HADOOP_HOME/bin/hadoop jar hbase-basic.jar HBaseBasic

前回通りに動けばOKです。

2. MapReduce を理解する

http://hadoop.apache.org/core/docs/r0.17.2/mapred_tutorial.html を読むのがよいと思います。また検索すれば多くの情報がみつかります。Example: WordCount v1.0 が理解できればまあいいでしょう。

3. 簡単な例

前回同様 http://www.nabble.com/Re%3A-Map-Reduce-over-HBase---sample-code-p18253120.html を参考にします。大体同じようなコードですが、バージョンに合わせて修正したりしています。

scores テーブル

grade: course:math course:art
Dan 1 87 97
Dana 2 100 80

この成績表から、各科目の平均点を計算します。

courses テーブル

stats:average
math 93.5
art 88.5
hbase(main):001:0> create 'scores', 'grade', 'course'
0 row(s) in 6.3180 seconds
hbase(main):002:0> create 'courses', 'stats'
0 row(s) in 7.8030 seconds
import java.io.IOException;
import java.io.ByteArrayOutputStream;
import java.io.DataOutputStream;
import java.io.ByteArrayInputStream;
import java.io.DataInputStream;
import java.util.Map;
import java.util.Iterator;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.hbase.io.BatchUpdate;
import org.apache.hadoop.hbase.io.RowResult;
import org.apache.hadoop.hbase.io.Cell;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapred.TableMap;
import org.apache.hadoop.hbase.mapred.TableReduce;
import org.apache.hadoop.hbase.util.Writables;

public class StatsScore  extends Configured implements Tool {

    // Sara {math:62, art:45} -> {math, 62}, {art, 45},
    public static class StatsMap extends TableMap<Text, IntWritable> {
        public void map(ImmutableBytesWritable key, RowResult value,
                        OutputCollector<Text, IntWritable> output,
                        Reporter reporter) throws IOException {
            for (Map.Entry<byte[], Cell> entry : value.entrySet()) {
                Text course = new Text(new String(entry.getKey()).split(":", 2)[1]);
                IntWritable score = new IntWritable();
                Writables.copyWritable(entry.getValue().getValue(), score);
                output.collect(course, score);
            }
        }
    }


    // {math, {62, 45, 87}} -> {math, 65.6}
    public static class StatsReduce extends TableReduce<Text, IntWritable> {
        public void reduce(Text key, Iterator<IntWritable> values,
                           OutputCollector<ImmutableBytesWritable, BatchUpdate> output,
                           Reporter reporter) throws IOException {
            int size = 0;
            int sum = 0;
            while (values.hasNext()) {
                size++;
                sum += values.next().get();
            }
            float average = (float)sum / (float)size;

            BatchUpdate bu = new BatchUpdate(key.getBytes());
            bu.put("stats:average", Writables.getBytes(new FloatWritable(average)));
            output.collect(new ImmutableBytesWritable(key.getBytes()), bu);
        }
    }


    public int run(String[] args) throws Exception {
        JobConf conf = new JobConf(getConf(), this.getClass());
        conf.setJobName("compute average scores");
        TableMap.initJob("scores", "course:", StatsMap.class, Text.class, IntWritable.class, conf);
        TableReduce.initJob("courses", StatsReduce.class, conf);
        JobClient.runJob(conf);
        return 0;
    }

    public static void main(String [] args) throws Exception {
        ToolRunner.run(new Configuration(), new StatsScore(), args);
    }
}

実行。

debian:~/tmp/hbase% javac -d classes StatsScore.java
debian:~/tmp/hbase% jar cf stats-score.jar classes
debian:~/tmp/hbase% $HADOOP_HOME/bin/hadoop jar stats-score.jar StatsScore

結果はてきとうに確認してください。

データの処理の流れは次のようになっています。

scores テーブル

↓

Dan { course:math => 87, course:art => 97 }

↓ map()

(math, 87), (art, 97)

↓

(math, [87, 100])

↓ reduce()

(math, 93.5)

↓

couses テーブル

重要なのは map 関数と reduce 関数です。それぞれ TableMap, TableReduce のサブクラスで定義されています。
TableMap を使うと、テーブル内の全ての row を走査して map に渡すことができます。HTable から map するときは大抵使うことになるでしょう。動作の設定は TableMap.initJob で行います。第2引数では取得したいカラム名をスペース区切りの文字列で渡していて、この場合は scores テーブルの course: カラムファミリーを含む RowResult が map に渡されます。
map の値を reduce で集計、という部分は普通の MapReduce と変わりません。
TableReduce は、output に BatchUpdate を渡すと、TableReduce.initJob で指定したテーブルにコミットしてくれるようになります。テーブルのインスタンスを管理しなくていい、程度のありがたみです。
両 initJob で各種設定を済ませてくれるので、conf.setInputFormat() などを手動で設定する必要はありません。


次回はもうすこし規模の大きい実践的なプログラムをみて、理解を深めます。