package net.kzk9;
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; public class WordCount
{
public static class Map extends Mapper<LongWritable, Text, Text, IntWritable> {
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException
{
String line = value.toString();
StringTokenizer tokenizer = new StringTokenizer(line); while(tokenizer.hasMoreTokens())
{
word.set(tokenizer.nextToken());
context.write(word, one); }
} }
public static class Reduce extends Reducer<Text, IntWritable, Text, IntWritable> {
private IntWritable value = new IntWritable(0);
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException
{
int sum = 0;
for(IntWritable value : values)
sum += value.get(); value.set(sum); context.write(key, value);
} }
public static void main(String[] args) throws Exception
{
Configuration conf = new Configuration();
GenericOptionsParser parser = new GenericOptionsParser(conf, args); args = parser.getRemainingArgs();
Job job = new Job(conf, "wordcount"); job.setJarByClass(WordCount.class); job.setMapperClass(Map.class); job.setReducerClass(Reduce.class);
job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class);
job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(TextOutputFormat.class);
job.setNumReduceTasks(1);
FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1]));
boolean success = job.waitForCompletion(true);
System.out.println(success); }
}
export HADOOP_HOME=/usr/lib/hadoop
export DIR=wordcount_classes CLASSPATH=$HADOOP_HOME/hadoop-core.jar for f in $HADOOP_HOME/lib/*.jar; do
CLASSPATH=${CLASSPATH}:$f; done
rm -fR $DIR
mkdir $DIR
javac -classpath $CLASSPATH -d $DIR WordCount.java jar -cvf wordcount.jar -C $DIR .
rm -fR $DIR
- 実装概要
- Mapper
- org.apache.hadoop.mapreduce.Mapper クラスを継承
- 入力Key-Value、出力Key-Value の4つをそれぞれ指定
- map() 関数を実装
- context.write() は何度でも呼び出して良い
public static class Map extends Mapper<KeyIn, KeyOut, ValueIn, ValueOut> {
@Override
protected void map(KeyIn key, ValueIn value, Context context) throws IOException, InterruptedException
{
context.write(new KeyOut, new ValueOut); }
}
-
- Reducer
- org.apache.hadoop.mapreduce.Reducer クラスを継承
- 入力Key-Value、出力Key-Value の4つをそれぞれ指定
- reduce() 関数を実装
- context.write() は何度でも呼び出して良い
public statuc class Reduce extends Reducer<KeyIn, KeyOut, ValueIn, ValueOut> {
@Override
protected void reduce(KeyIn key, Iteravle<ValueIn> values, Context context) throws IOException, InterruptedException
{
contect.write(new KeyOut, new ValueOut); }
}
package net.kzk9;
import net.kzk9.WordCount;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mrunit.mapreduce.MapDriver; import junit.framework.TestCase;
import org.junit.Before;
import org.junit.Test;
public class WordCountTest extends TestCase
{
private WordCount.Map mapper; private MapDriver driver; @Before
public void setUp()
{
mapper = new WordCount.Map(); driver = new MapDriver(mapper);
}
@Test
public void testWordCountMapper() {
driver.withInput(new LongWritable(0), new Text("this is a pen")) .withOutput(new Text("this"), new IntWritable(1)) .withOutput(new Text("is"), new IntWritable(1)) .withOutput(new Text("a"), new IntWritable(1)) .withOutput(new Text("pen"), new IntWritable(1))
.runTest(); }
}