Un exemple de code
WordCount
Cette simple classe Java lance un job MapReduce Hadoop qui va compter les apparitions de mots dans un lot de fichiers passés en paramètres.
CTRL+C pour copier, CTRL+V pour coller
1
import java.io.IOException;
2
import java.util.StringTokenizer;
3
4
import org.apache.hadoop.conf.Configuration;
5
import org.apache.hadoop.fs.Path;
6
import org.apache.hadoop.io.IntWritable;
7
import org.apache.hadoop.io.Text;
8
import org.apache.hadoop.mapreduce.Job;
9
import org.apache.hadoop.mapreduce.Mapper;
10
import org.apache.hadoop.mapreduce.Reducer;
11
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
12
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
13
14
public class WordCount {
15
16
public static class TokenizerMapper
17
extends Mapper<Object, Text, Text, IntWritable>{
18
19
private final static IntWritable one = new IntWritable(1);
20
private Text word = new Text();
21
22
public void map(Object key, Text value, Context context
23
) throws IOException, InterruptedException {
24
StringTokenizer itr = new StringTokenizer(value.toString());
25
while (itr.hasMoreTokens()) {
26
word.set(itr.nextToken());
27
context.write(word, one);
28
}
29
}
30
}
31
32
public static class IntSumReducer
33
extends Reducer<Text,IntWritable,Text,IntWritable> {
34
private IntWritable result = new IntWritable();
35
36
public void reduce(Text key, Iterable<IntWritable> values,
37
Context context
38
) throws IOException, InterruptedException {
39
int sum = 0;
40
for (IntWritable val : values) {
41
sum += val.get();
42
}
43
result.set(sum);
44
context.write(key, result);
45
}
46
}
47
48
public static void main(String[] args) throws Exception {
49
Configuration conf = new Configuration();
50
Job job = Job.getInstance(conf, "word count");
51
job.setJarByClass(WordCount.class);
52
job.setMapperClass(TokenizerMapper.class);
53
job.setCombinerClass(IntSumReducer.class);
54
job.setReducerClass(IntSumReducer.class);
55
job.setOutputKeyClass(Text.class);
56
job.setOutputValueClass(IntWritable.class);
57
FileInputFormat.addInputPath(job, new Path(args[0]));
58
FileOutputFormat.setOutputPath(job, new Path(args[1]));
59
System.exit(job.waitForCompletion(true) ? 0 : 1);
60
}
61
}
import java.io.IOException; import java.util.StringTokenizer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class WordCount { public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable>{ private final static IntWritable one = new IntWritable(1); private Text word = new Text(); public void map(Object key, Text value, Context context ) throws IOException, InterruptedException { StringTokenizer itr = new StringTokenizer(value.toString()); while (itr.hasMoreTokens()) { word.set(itr.nextToken()); context.write(word, one); } } } public static class IntSumReducer extends Reducer<Text,IntWritable,Text,IntWritable> { private IntWritable result = new IntWritable(); public void reduce(Text key, Iterable<IntWritable> values, Context context ) throws IOException, InterruptedException { int sum = 0; for (IntWritable val : values) { sum += val.get(); } result.set(sum); context.write(key, result); } } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = Job.getInstance(conf, "word count"); job.setJarByClass(WordCount.class); job.setMapperClass(TokenizerMapper.class); job.setCombinerClass(IntSumReducer.class); job.setReducerClass(IntSumReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); System.exit(job.waitForCompletion(true) ? 0 : 1); } }
Remarque : Les types de variables
Lors d'un job Hadoop Java, on utilise des variables de type Text au lieu de String et IntWritable au lieu de Int. Ces types implémentent des interfaces comme Comparable, Writable ou WritableComparable.
Aussi, ces types permettent à Hadoop de sérialiser/désérialiser ces objets beaucoup plus rapidement et de façon plus légère. Ce sont des types optimisés pour Hadoop.