HBase 入門 (2)
Hadoop の MapReduce で HBase を使ってみましょう。
1. セットアップ
$HADOOP_HOME/conf/hadoop-env.sh の HADOOP_CLASSPATH に HBase のパスを加えます。
export HADOOP_CLASSPATH=$HBASE_HOME/hbase-0.2.1.jar:$HBASE_HOME/conf
前回のプログラムで動作確認。
debian:~/tmp/hbase% jar cf hbase-basic.jar HBaseBasic.class debian:~/tmp/hbase% $HADOOP_HOME/bin/hadoop jar hbase-basic.jar HBaseBasic
前回通りに動けばOKです。
2. MapReduce を理解する
http://hadoop.apache.org/core/docs/r0.17.2/mapred_tutorial.html を読むのがよいと思います。また検索すれば多くの情報がみつかります。Example: WordCount v1.0 が理解できればまあいいでしょう。
3. 簡単な例
前回同様 http://www.nabble.com/Re%3A-Map-Reduce-over-HBase---sample-code-p18253120.html を参考にします。大体同じようなコードですが、バージョンに合わせて修正したりしています。
scores テーブル
grade: | course:math | course:art | |
---|---|---|---|
Dan | 1 | 87 | 97 |
Dana | 2 | 100 | 80 |
この成績表から、各科目の平均点を計算します。
courses テーブル
stats:average | |
---|---|
math | 93.5 |
art | 88.5 |
hbase(main):001:0> create 'scores', 'grade', 'course' 0 row(s) in 6.3180 seconds hbase(main):002:0> create 'courses', 'stats' 0 row(s) in 7.8030 seconds
import java.io.IOException; import java.io.ByteArrayOutputStream; import java.io.DataOutputStream; import java.io.ByteArrayInputStream; import java.io.DataInputStream; import java.util.Map; import java.util.Iterator; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.FloatWritable; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.mapred.JobClient; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.OutputCollector; import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.hbase.io.BatchUpdate; import org.apache.hadoop.hbase.io.RowResult; import org.apache.hadoop.hbase.io.Cell; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapred.TableMap; import org.apache.hadoop.hbase.mapred.TableReduce; import org.apache.hadoop.hbase.util.Writables; public class StatsScore extends Configured implements Tool { // Sara {math:62, art:45} -> {math, 62}, {art, 45}, public static class StatsMap extends TableMap<Text, IntWritable> { public void map(ImmutableBytesWritable key, RowResult value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException { for (Map.Entry<byte[], Cell> entry : value.entrySet()) { Text course = new Text(new String(entry.getKey()).split(":", 2)[1]); IntWritable score = new IntWritable(); Writables.copyWritable(entry.getValue().getValue(), score); output.collect(course, score); } } } // {math, {62, 45, 87}} -> {math, 65.6} public static class StatsReduce extends TableReduce<Text, IntWritable> { public void reduce(Text key, Iterator<IntWritable> values, OutputCollector<ImmutableBytesWritable, BatchUpdate> output, Reporter reporter) throws IOException { int size = 0; int sum = 0; while (values.hasNext()) { size++; sum += values.next().get(); } float average = (float)sum / (float)size; BatchUpdate bu = new BatchUpdate(key.getBytes()); bu.put("stats:average", Writables.getBytes(new FloatWritable(average))); output.collect(new ImmutableBytesWritable(key.getBytes()), bu); } } public int run(String[] args) throws Exception { JobConf conf = new JobConf(getConf(), this.getClass()); conf.setJobName("compute average scores"); TableMap.initJob("scores", "course:", StatsMap.class, Text.class, IntWritable.class, conf); TableReduce.initJob("courses", StatsReduce.class, conf); JobClient.runJob(conf); return 0; } public static void main(String [] args) throws Exception { ToolRunner.run(new Configuration(), new StatsScore(), args); } }
実行。
debian:~/tmp/hbase% javac -d classes StatsScore.java debian:~/tmp/hbase% jar cf stats-score.jar classes debian:~/tmp/hbase% $HADOOP_HOME/bin/hadoop jar stats-score.jar StatsScore
結果はてきとうに確認してください。
データの処理の流れは次のようになっています。
scores テーブル ↓ Dan { course:math => 87, course:art => 97 } ↓ map() (math, 87), (art, 97) ↓ (math, [87, 100]) ↓ reduce() (math, 93.5) ↓ couses テーブル
重要なのは map 関数と reduce 関数です。それぞれ TableMap, TableReduce のサブクラスで定義されています。
TableMap を使うと、テーブル内の全ての row を走査して map に渡すことができます。HTable から map するときは大抵使うことになるでしょう。動作の設定は TableMap.initJob で行います。第2引数では取得したいカラム名をスペース区切りの文字列で渡していて、この場合は scores テーブルの course: カラムファミリーを含む RowResult が map に渡されます。
map の値を reduce で集計、という部分は普通の MapReduce と変わりません。
TableReduce は、output に BatchUpdate を渡すと、TableReduce.initJob で指定したテーブルにコミットしてくれるようになります。テーブルのインスタンスを管理しなくていい、程度のありがたみです。
両 initJob で各種設定を済ませてくれるので、conf.setInputFormat() などを手動で設定する必要はありません。
次回はもうすこし規模の大きい実践的なプログラムをみて、理解を深めます。