Java MapReduce
明白MapReduce 程序的工作原理之后,下一步便是通过代码来实现它。我们需要三样东西:一个map 函数、一个reduce 函数和一些用来运行作业的代码。map函数由Mapper 接口实现来表示,后者声明了一个map()方法。例2-3 显示了我们的map函数实现。
例2-3. 查找最高气温的Mapper
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
public class MaxTemperatureMapper extends MapReduceBase
implements Mapper {
private static final int MISSING = 9999;
public void map(LongWritable key, Text value,
OutputCollector output, Reporter reporter)
throws IOException {
String line = value.toString();
String year = line.substring(15, 19);
int airTemperature;
if (line.charAt(87) == '+') { // parseInt doesn't like leading plus signs
airTemperature = Integer.parseInt(line.substring(88, 92));
} else {
airTemperature = Integer.parseInt(line.substring(87, 92));
}
String quality = line.substring(92, 93);
if (airTemperature != MISSING && quality.matches("[01459]")) {
output.collect(new Text(year), new IntWritable(airTemperature));
}
}
}
该Mapper接口是一个泛型类型,它有四个形参类型,分别指定map函数的输入键、输入值、输出键和输出值的类型。就目前的示例来说,输入键是一个长整数偏移量,输入值是一行文本,输出键是年份,输出值是气温(整数)。Hadoop自身提供一套可优化网络序列化传输的基本类型,而不直接使用Java内嵌的类型。这些类型均可在org.apache.hadoop.io包中找到。这里我们使用LongWritable类型(相当于Java中的Long类型)、Text类型(相当于Java中的String类型)和IntWritable类型(相当于Java 中的Integer类型)。
map()方法的输入是一个键和一个值。我们首先将包含有一行输入的Text值转换成Java的String类型,之后使用substring()方法提取我们感兴趣的列。
map()方法还提供了OutputCollector实例用于输出内容的写入。在这种情况下,我们将年份数据按Text对象进行读/写 (因为我们把年份当作键),将气温值封装在IntWritable 类型中。
我们只在气温数据不缺失并且所对应质量代码显示为正确的气温读数时,才将其写入输出记录中。
reduce函数通过Reducer进行类似的定义,如例2-4 所示。
例2-4. 查找最高气温的Reducer
import java.io.IOException;
import java.util.Iterator;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
public class MaxTemperatureReducer extends MapReduceBase
implements Reducer {
public void reduce(Text key, Iterator
OutputCollector output, Reporter reporter)
throws IOException {
int maxValue = Integer.MIN_VALUE;
while (values.hasNext()) {
maxValue = Math.max(maxValue, values.next().get());
}
output.collect(key, new IntWritable(maxValue));
}
}
同样,针对reduce函数也有四个形式参数类型用于指定其输入和输出类型。reduce 函数的输入类型必须与map 函数的输出类型相匹配:即Text类型和IntWritable类型。在这种情况下,reduce函数的输出类型也必须是Text和IntWritable这两种类型,分别输出年份和最高气温。该最高气温是通过循环比较当前气温与已看到的最高气温获得的。
第三部分代码负责运行MapReduce 作业(请参见例2-5)。
例2-5. 该应用程序在气象数据集中找出最高气温
import java.io.IOException;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
public class MaxTemperature {
public static void main(String[] args) throws IOException {
if (args.length != 2) {
System.err.println("Usage: MaxTemperature ");
System.exit(-1);
}
JobConf conf = new JobConf(MaxTemperature.class);
conf.setJobName("Max temperature");
FileInputFormat.addInputPath(conf, new Path(args[0]));
FileOutputFormat.setOutputPath(conf, new Path(args[1]));
conf.setMapperClass(MaxTemperatureMapper.class);
conf.setReducerClass(MaxTemperatureReducer.class);
conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(IntWritable.class);
JobClient.runJob(conf);
}
}
JobConf对象指定了作业执行规范。我们可以用它来控制整个作业的运行。在Hadoop 集群上运行这个作业时,我们需要将代码打包成一个JAR文件(Hadoop会在集群上分发这个文件)。我们无需明确指定JAR 文件的名称,而只需在JobConf的构造函数中传递一个类,Hadoop将通过该类查找包含有该类的JAR文件进而找到相关的JAR文件。
构造JobConf对象之后,需要指定输入和输出数据的路径。调用 FileInputFormat类的静态函数addInputPath()来定义输入数据的路径,该路径可以是单个文件、目录(此时,将目录下所有文件当作输入)或符合特定文件模式的一组文件。由函数名可知,可以多次调用addInputPath()实现多路径的输入。
通过调用FileOutputFormat 类中的静态函数 setOutputPath()来指定输出路径。该函数指定了reduce 函数输出文件的写入目录。在运行任务前该目录不应该存在,否则Hadoop 会报错并拒绝运行该任务。这种预防措施是为了防止数据丢失(一个长时间运行任务的结果被意外地覆盖将是非常恼人的)。
接着,通过setMapperClass()和setReducerClass()指定map和reduce类型。
setOutputKeyClass()和setOutputValueClass()控制map和reduce函数的输出类型,正如本例所示,这两个输出类型往往相同。如果不同,map函数的输出类型则通过setMapOutputKeyClass()和setMapOutputValueClass()函数来设置。
输入的类型通过InputFormat类来控制,我们的例子中没有设置,因为使用的是默认的TextInputFormat(文本输入格式)。
在设置定义map 和reduce 函数的类后,便可以开始运行任务。JobClient类的静态函数runJob()会提交作业并等待完成,最后将其进展情况写到控制台。
运行测试
写好MapReduce 作业后,通常会拿一个小型的数据集进行测试以排除代码相关问题。首先,以独立(本机)模式安装Hadoop,详细说明请参见附录A。 在这种模式下,Hadoop在本地文件系统上运行作业运行程序。让我们用前面讨论过的5行采样数据为例子来测试MapReduce作业(考虑到篇幅,这里对输出稍有修改):
% export HADOOP_CLASSPATH=build/classes
% hadoop MaxTemperature input/ncdc/sample.txt output
09/04/07 12:34:35 INFO jvm.JvmMetrics: Initializing JVM Metrics with processName=Job Tracker, sessionId= 09/04/07 12:34:35 WARN mapred.JobClient: Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same.
09/04/07 12:34:35 WARN mapred.JobClient: No job jar file set. User classes may not be found. See JobConf(Class) or JobConf#setJar(String).
09/04/07 12:34:35 INFO mapred.FileInputFormat: Total input paths to process:1
09/04/07 12:34:35 INFO mapred.JobClient: Running job: job_local_0001
09/04/07 12:34:35 INFO mapred.FileInputFormat: Total input paths to process:1
09/04/07 12:34:35 INFO mapred.MapTask: numReduceTasks: 1
09/04/07 12:34:35 INFO mapred.MapTask: io.sort.mb = 100
09/04/07 12:34:35 INFO mapred.MapTask: data buffer = 79691776/99614720
09/04/07 12:34:35 INFO mapred.MapTask: record buffer = 262144/327680
09/04/07 12:34:35 INFO mapred.MapTask: Starting flush of map output
09/04/07 12:34:36 INFO mapred.MapTask: Finished spill 0
09/04/07 12:34:36 INFO mapred.TaskRunner: Task:attempt_local_0001_m_000000_0 is done. And is in the process of commiting
09/04/07 12:34:36 INFO mapred.LocalJobRunner:
file:/Users/tom/workspace/htdg/input/n cdc/sample.txt:0+529
09/04/07 12:34:36 INFO mapred.TaskRunner:Task'attempt_local_0001_m_000000_0' done.
09/04/07 12:34:36 INFO mapred.LocalJobRunner:
09/04/07 12:34:36 INFO mapred.Merger: Merging 1 sorted segments 09/04/07 12:34:36 INFO mapred.Merger: Down to the last merge-pass, with 1 segments left of total size: 57 bytes
09/04/07 12:34:36 INFO mapred.LocalJobRunner:
09/04/07 12:34:36 INFO mapred.TaskRunner: Task:attempt_local_0001_r_000000_0 is done. And is in the process of commiting
09/04/07 12:34:36 INFO mapred.LocalJobRunner:
09/04/07 12:34:36 INFO mapred.TaskRunner: Task attempt_local_0001_r_000000_0 is allowed to commit now
09/04/07 12:34:36 INFO mapred.FileOutputCommitter: Saved output of task 'attempt_local_0001_r_000000_0' to file:/Users/tom/workspace/htdg/output
09/04/07 12:34:36 INFO mapred.LocalJobRunner: reduce > reduce
09/04/07 12:34:36 INFO mapred.TaskRunner:Task'attempt_local_0001_r_000000_0' done.
09/04/07 12:34:36 INFO mapred.JobClient: map 100% reduce 100%
09/04/07 12:34:36 INFO mapred.JobClient: Job complete: job_local_0001
09/04/07 12:34:36 INFO mapred.JobClient: Counters: 13
09/04/07 12:34:36 INFO mapred.JobClient: FileSystemCounters
09/04/07 12:34:36 INFO mapred.JobClient: FILE_BYTES_READ=27571
09/04/07 12:34:36 INFO mapred.JobClient: FILE_BYTES_WRITTEN=53907
09/04/07 12:34:36 INFO mapred.JobClient: Map-Reduce Framework
09/04/07 12:34:36 INFO mapred.JobClient Reduce input groups=2
09/04/07 12:34:36 INFO mapred.JobClient: Combine output records=0
09/04/07 12:34:36 INFO mapred.JobClient: Map input records=5
09/04/07 12:34:36 INFO mapred.JobClient: Reduce shuffle bytes=0
09/04/07 12:34:36 INFO mapred.JobClient: Reduce output records=2
09/04/07 12:34:36 INFO mapred.JobClient: Spilled Records=10
09/04/07 12:34:36 INFO mapred.JobClient: Map output bytes=45
09/04/07 12:34:36 INFO mapred.JobClient: Map input bytes=529
09/04/07 12:34:36 INFO mapred.JobClient: Combine input records=0
09/04/07 12:34:36 INFO mapred.JobClient: Map output records=5
09/04/07 12:34:36 INFO mapred.JobClient: Reduce input records=5
如果调用hadoop 命令的第一个参数是类名,则Hadoop将启动一个JVM来运行这个类。使用hadoop命令运行作业比直接使用Java命令运行更方便,因为前者将Hadoop库文件(及其依赖关系)路径加入到类路径参数中,同时也能获得Hadoop的配置文件。我们需要定义一个 HADOOP_CLASSPATH 环境变量用于添加应用程序类的路径,然后由Hadoop 脚本来执行相关操作。
以本地(独立)模式运行时,本书中所有程序均假设按照这种方式来设置HADOOP_CLASSPATH。命令的运行需要在示例代码所在的文件夹下进行。
运行作业所得到的输出提供了一些有用的信息。无法找到作业JAR 文件的警告信息是意料之中的,因为我们没有使用JAR文件在本地模式下运行。在集群上运行时,将不会看到这个警告。例如,我们可以看到,这个作业有指定的标识,即job_local_0001,并且执行了一个map 任务和一个reduce 任务(使用attempt_local_0001_m_000000_0和 attempt_local_0001_r_000000_0两个ID)。在调试MapReduce作业时,知道作业和任务的ID 是非常有用的。
输出的最后一部分,以Counters为标题,显示在Hadoop 上运行的每个作业的一些统计信息。这些信息对检查这些大量的数据是否按照预期进行处理非常有用。例如,我们查看系统输出的记录信息可知:5 个map 输入产生了5 个map 的输出,然后5 个reduce 输入产生2个reduce 输出。
输出数据写入output目录,其中每个reducer都有一个输出文件。我们的例子中包含一个 reducer,所以我们只能找到一个文件,名为part-00000:
% cat output/part-00000
1949 111
1950 22
这个结果和我们之前手动寻找的结果一样。我们把这个结果解释为1949年的最高气温记录为11.1℃,而1950 年为2.2℃。