【图书连载】MapReduce是一种可用于数据处理的编程模型。该模型比较简单,但用于编写有用的程序并不简单。Hadoop可以运行由各种语言编写的MapReduce程序。本章中,我们将看到用Java、Ruby、Python 和C++语言编写的同一个程序。最重要的是,MapReduce程序本质上是并行运行的,因此可以将大规模的数据分析任务交给任何一个拥有足够多机器的运营商。MapReduce的优势在于处理大规模数据集,所以这里先来看一个数据集。
一个气象数据集
在我们的例子里,要编写一个挖掘气象数据的程序。分布在全球各地的很多气象传感器每隔一小时收集气象数据,进而获取了大量的日志数据。由于这些数据是半结构化数据且是按照记录方式存储的,因此非常适合使用MapReduce来处理。
数据的格式
我们将使用国家气候数据中心(National Climatic Data Center,简称NCDC,网址为http://www.ncdc.noaa.gov/)提供的数据。这些数据按行并以ASCII编码存储,其中每一行是一条记录。该存储格式能够支持众多气象要素,其中许多要素可以有选择性地列入收集范围或其数据所需的存储长度是可变的。为了简单起见,我们重点讨论一些基本要素(如气温等),这些要素始终都有且长度固定。
例2-1显示了一行采样数据,其中重要字段已突出显示。该行数据已被分成很多行以突出显示每个字段,在实际文件中,这些字段被整合成一行且没有任何分隔符。
例2-1. 国家气候数据中心数据记录的格式
0057 332130 # USAF weather station identifier
99999 # WBAN weather station identifier
19500101 # observation date
0300 # observation time
4 +51317 # latitude (degrees x 1000)
+028783 # longitude (degrees x 1000) F
M-12
+0171 # elevation (meters)
99999
V020
320 # wind direction (degrees)
1 # quality code
N
0072
1
00450 # sky ceiling height (meters)
1 # quality code
C
N
010000 # visibility distance (meters)
1 # quality code
N
9 -0128 # air temperature (degrees Celsius x 10)
1 # quality code -0139
# dew point temperature (degrees Celsius x 10)
1 # quality code 10268
# atmospheric pressure (hectopascals x 10)
1 # quality code
数据文件按照日期和气象站进行组织。从1901 年到2001 年,每一年都有一个目录,每一个目录中包含各个气象站该年气象数据的打包文件及其说明文件。例如,1999年对应文件夹下面包含如下记录:
% ls raw/1990 | head
010010-99999-1990.gz
010014-99999-1990.gz
010015-99999-1990.gz
010016-99999-1990.gz
010017-99999-1990.gz
010030-99999-1990.gz
010040-99999-1990.gz
010080-99999-1990.gz
010100-99999-1990.gz
010150-99999-1990.gz
因为有成千上万个气象台,所以整个数据集由大量的小容量文件组成。通常情况下,处理少量的大型文件显得更容易且有效,因此,这些数据需要经过预处理,将每年的数据文件拼接成一个独立文件。具体做法请参见附录C。
使用Unix工具进行数据分析
该数据集中每年全球气温的最高记录是多少?我们先不使用Hadoop来回答这一问题,因为只有提供性能基准和结果检查工具,才能和Hadoop进行有效对比。
传统处理按行存储数据的工具是awk。例2-2是一个用于计算每年最高气温的程序脚本。
例2-2. 该程序从NCDC气象记录中找出每年最高气温
#!/usr/bin/env bash
for year in all/*
do
echo -ne `basename $year .gz`"\t"
gunzip -c $year | \
awk'{ temp = substr($0, 88, 5) + 0;
q = substr($0, 93, 1);
if ( temp!=9999 && q ~ /[01459]/ && temp > max) max = temp}
END { print max }'
done
该脚本循环遍历按年压缩的数据文件,首先显示年份,然后使用awk脚本处理每个文件。awk 脚本从数据中提取两个字段:气温和质量代码。气温值通过加上一个0 转换为整数。接着测试气温值是否有效(用值9999 替代NCDC 数据集中缺少的记录),通过质量代码检测读取的数值是否可疑或错误。如果数据读取正确,那么该值将与目前读取到的最大气温值进行比较,如果该值比原先的最大值大,就替换目前的最大值。处理完文件中所有的行后,再执行END块中的代码并打印出最大气温值。
下面是某次运行结果的起始部分:
% ./max_temperature.sh
1901 317
1902 244
1903 289
1904 256
1905 283
...
由于源文件中的气温值被放大了10倍,所以1901年的最高气温是 31.7°C (20世纪初记录的气温数据比较少,所以该结果是可能的)。使用亚马逊的EC2 High-CPU Extra Large Instance运行该程序,查找一个世纪以来气象数据中的最大气温值需要42分钟。
为了加快处理,我们需要并行运行部分程序。从理论上讲,这很简单:我们可以通过使用计算机上所有可用的硬件线程来处理,其中每个线程处理不同年份的数据。但是,其中依旧存在一些问题。
首先,将任务划分成大小相同的作业块通常并不容易或明显。在我们的例子中,不同年份数据文件的大小差异很大,因此部分线程会比其他线程更早运行结束。即使让它们继续下一步的工作,整个运行时间依旧由处理最长文件所需的时间决定。另一种更好的方法是将输入数据分成固定大小的块,然后把每块分配到各个进程,这样一来,即使有些进程能处理更多数据,我们也可以为它们分配更多的数据。
其次,将独立进程运行的结果合并后,可能还需要进一步的处理。在我们的例子中,每年的结果独立于其他年份,并可能将所有结果拼接起来,然后按年份进行排序。如果使用固定大小块的方法,则需要特定的方法来合并结果。在这个例子中,某年的数据通常被分割成几个块,每个块进行独立处理。我们将最终获得每个数据块中的最高气温,所以最后一步是寻找这些分块数据中的最大值作为该年的最高气温,其他年份的数据均需如此处理。
最后,我们依旧受限于一台计算机的处理能力。如果手上拥有的所有处理器都用上,至少也需要20分钟,结果也就只能这样了。我们不能使它更快。另外,某些数据集的增长会超出一台计算机的处理能力。当我们开始使用多台计算机时,整个大环境中的其他因素将对其产生影响,其中最主要的是协调性和可靠性两大因素。哪个进程负责运行整个作业?我们如何处理失败的进程?
因此,尽管可以实现并行处理,但实际上非常复杂。使用Hadoop之类的框架来实现并行数据处理将很有帮助。
使用Hadoop分析数据
为了充分发挥Hadoop 提供的并行处理优势,我们需要将查询表示成MapReduce 作业。经过一些本地的小规模测试,我们将能够在集群设备上运行Hadoop。
map阶段和reduce阶段
MapReduce任务过程被分为两个处理阶段:map阶段和reduce阶段。每个阶段都以键/值对作为输入和输出,并由程序员选择它们的类型。程序员还需具体定义两个函数:map函数和reduce 函数。
map阶段的输入是原始NCDC数据。我们选择文本格式作为输入格式,以便将数据集的每一行作为一个文本值进行输入。键为该行起始位置相对于文件起始位置的偏移量,但我们不需要这个信息,故将其忽略。
我们的map函数很简单。由于我们只对年份和气温这两个属性感兴趣,所以只需要取出这两个属性数据。在本例中,map函数只是一个数据准备阶段,通过这种方式来准备数据,使reduce函数能在该准备数据上继续处理:即找出每年的最高气温。map函数还是一个比较适合去除已损记录的地方:此处,我们将筛选掉缺失的、可疑的或错误的气温数据。
为了全面了解map 的工作方式,我们思考以下几行作为输入数据的示例数据 (考虑到页面篇幅,去除了一些未使用的列,并用省略号表示):
0067011990999991950051507004...9999999N9+00001+99999999999... 0043011990999991950051512004...9999999N9+00221+99999999999... 0043011990999991950051518004...9999999N9-00111+99999999999... 0043012650999991949032412004...0500001N9+01111+99999999999... 0043012650999991949032418004...0500001N9+00781+99999999999...
这些行以键/值对的方式来表示map函数:
(0, 0067011990999991950051507004...9999999N9+00001+99999999999...)
(106, 0043011990999991950051512004...9999999N9+00221+99999999999...)
(212, 0043011990999991950051518004...9999999N9-00111+99999999999...)
(318, 0043012650999991949032412004...0500001N9+01111+99999999999...)
(424, 0043012650999991949032418004...0500001N9+00781+99999999999...)
键(key)是文件中的行偏移量,map函数并不需要这个信息,所以将其忽略。map函数的功能仅限于提取年份和气温信息(以粗体显示),并将它们作为输出(气温值已用整数表示):
(1950, 0)
(1950, 22)
(1950, −11)
(1949, 111)
(1949, 78)
map函数的输出经由MapReduce框架处理后,最后被发送到reduce函数。这一处理过程中需要根据键对键/值对进行排序和分组。因此,我们的示例中,reduce函数会看到如下输入:
(1949, [111, 78])
(1950, [0, 22, −11])
每一年份后紧跟着一系列气温数据。所有reduce函数现在需要做的是遍历整个列表并从中找出最大的读数:
(1949, 111)
(1950, 22)
这是最终输出结果:每一年的全球最高气温记录。
整个数据流如图2-1所示。在图的底部是Unix的流水线(pipeline,也称管道或管线)命令,用于模拟整个MapReduce的流程,部分内容将在讨论Hadoop Streaming时再次涉及。
▲
Java MapReduce
明白MapReduce 程序的工作原理之后,下一步便是通过代码来实现它。我们需要三样东西:一个map 函数、一个reduce 函数和一些用来运行作业的代码。map函数由Mapper 接口实现来表示,后者声明了一个map()方法。例2-3 显示了我们的map函数实现。
例2-3. 查找最高气温的Mapper
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
public class MaxTemperatureMapper extends MapReduceBase
implements Mapper {
private static final int MISSING = 9999;
public void map(LongWritable key, Text value,
OutputCollector output, Reporter reporter)
throws IOException {
String line = value.toString();
String year = line.substring(15, 19);
int airTemperature;
if (line.charAt(87) == '+') { // parseInt doesn't like leading plus signs
airTemperature = Integer.parseInt(line.substring(88, 92));
} else {
airTemperature = Integer.parseInt(line.substring(87, 92));
}
String quality = line.substring(92, 93);
if (airTemperature != MISSING && quality.matches("[01459]")) {
output.collect(new Text(year), new IntWritable(airTemperature));
}
}
}
该Mapper接口是一个泛型类型,它有四个形参类型,分别指定map函数的输入键、输入值、输出键和输出值的类型。就目前的示例来说,输入键是一个长整数偏移量,输入值是一行文本,输出键是年份,输出值是气温(整数)。Hadoop自身提供一套可优化网络序列化传输的基本类型,而不直接使用Java内嵌的类型。这些类型均可在org.apache.hadoop.io包中找到。这里我们使用LongWritable类型(相当于Java中的Long类型)、Text类型(相当于Java中的String类型)和IntWritable类型(相当于Java 中的Integer类型)。
map()方法的输入是一个键和一个值。我们首先将包含有一行输入的Text值转换成Java的String类型,之后使用substring()方法提取我们感兴趣的列。
map()方法还提供了OutputCollector实例用于输出内容的写入。在这种情况下,我们将年份数据按Text对象进行读/写 (因为我们把年份当作键),将气温值封装在IntWritable 类型中。
我们只在气温数据不缺失并且所对应质量代码显示为正确的气温读数时,才将其写入输出记录中。
reduce函数通过Reducer进行类似的定义,如例2-4 所示。
例2-4. 查找最高气温的Reducer
import java.io.IOException;
import java.util.Iterator;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
public class MaxTemperatureReducer extends MapReduceBase
implements Reducer {
public void reduce(Text key, Iterator
OutputCollector output, Reporter reporter)
throws IOException {
int maxValue = Integer.MIN_VALUE;
while (values.hasNext()) {
maxValue = Math.max(maxValue, values.next().get());
}
output.collect(key, new IntWritable(maxValue));
}
}
同样,针对reduce函数也有四个形式参数类型用于指定其输入和输出类型。reduce 函数的输入类型必须与map 函数的输出类型相匹配:即Text类型和IntWritable类型。在这种情况下,reduce函数的输出类型也必须是Text和IntWritable这两种类型,分别输出年份和最高气温。该最高气温是通过循环比较当前气温与已看到的最高气温获得的。
第三部分代码负责运行MapReduce 作业(请参见例2-5)。
例2-5. 该应用程序在气象数据集中找出最高气温
import java.io.IOException;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
public class MaxTemperature {
public static void main(String[] args) throws IOException {
if (args.length != 2) {
System.err.println("Usage: MaxTemperature ");
System.exit(-1);
}
JobConf conf = new JobConf(MaxTemperature.class);
conf.setJobName("Max temperature");
FileInputFormat.addInputPath(conf, new Path(args[0]));
FileOutputFormat.setOutputPath(conf, new Path(args[1]));
conf.setMapperClass(MaxTemperatureMapper.class);
conf.setReducerClass(MaxTemperatureReducer.class);
conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(IntWritable.class);
JobClient.runJob(conf);
}
}
JobConf对象指定了作业执行规范。我们可以用它来控制整个作业的运行。在Hadoop 集群上运行这个作业时,我们需要将代码打包成一个JAR文件(Hadoop会在集群上分发这个文件)。我们无需明确指定JAR 文件的名称,而只需在JobConf的构造函数中传递一个类,Hadoop将通过该类查找包含有该类的JAR文件进而找到相关的JAR文件。
构造JobConf对象之后,需要指定输入和输出数据的路径。调用 FileInputFormat类的静态函数addInputPath()来定义输入数据的路径,该路径可以是单个文件、目录(此时,将目录下所有文件当作输入)或符合特定文件模式的一组文件。由函数名可知,可以多次调用addInputPath()实现多路径的输入。
通过调用FileOutputFormat 类中的静态函数 setOutputPath()来指定输出路径。该函数指定了reduce 函数输出文件的写入目录。在运行任务前该目录不应该存在,否则Hadoop 会报错并拒绝运行该任务。这种预防措施是为了防止数据丢失(一个长时间运行任务的结果被意外地覆盖将是非常恼人的)。
接着,通过setMapperClass()和setReducerClass()指定map和reduce类型。
setOutputKeyClass()和setOutputValueClass()控制map和reduce函数的输出类型,正如本例所示,这两个输出类型往往相同。如果不同,map函数的输出类型则通过setMapOutputKeyClass()和setMapOutputValueClass()函数来设置。
输入的类型通过InputFormat类来控制,我们的例子中没有设置,因为使用的是默认的TextInputFormat(文本输入格式)。
在设置定义map 和reduce 函数的类后,便可以开始运行任务。JobClient类的静态函数runJob()会提交作业并等待完成,最后将其进展情况写到控制台。
运行测试
写好MapReduce 作业后,通常会拿一个小型的数据集进行测试以排除代码相关问题。首先,以独立(本机)模式安装Hadoop,详细说明请参见附录A。 在这种模式下,Hadoop在本地文件系统上运行作业运行程序。让我们用前面讨论过的5行采样数据为例子来测试MapReduce作业(考虑到篇幅,这里对输出稍有修改):
% export HADOOP_CLASSPATH=build/classes
% hadoop MaxTemperature input/ncdc/sample.txt output
09/04/07 12:34:35 INFO jvm.JvmMetrics: Initializing JVM Metrics with processName=Job Tracker, sessionId= 09/04/07 12:34:35 WARN mapred.JobClient: Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same.
09/04/07 12:34:35 WARN mapred.JobClient: No job jar file set. User classes may not be found. See JobConf(Class) or JobConf#setJar(String).
09/04/07 12:34:35 INFO mapred.FileInputFormat: Total input paths to process:1
09/04/07 12:34:35 INFO mapred.JobClient: Running job: job_local_0001
09/04/07 12:34:35 INFO mapred.FileInputFormat: Total input paths to process:1
09/04/07 12:34:35 INFO mapred.MapTask: numReduceTasks: 1
09/04/07 12:34:35 INFO mapred.MapTask: io.sort.mb = 100
09/04/07 12:34:35 INFO mapred.MapTask: data buffer = 79691776/99614720
09/04/07 12:34:35 INFO mapred.MapTask: record buffer = 262144/327680
09/04/07 12:34:35 INFO mapred.MapTask: Starting flush of map output
09/04/07 12:34:36 INFO mapred.MapTask: Finished spill 0
09/04/07 12:34:36 INFO mapred.TaskRunner: Task:attempt_local_0001_m_000000_0 is done. And is in the process of commiting
09/04/07 12:34:36 INFO mapred.LocalJobRunner:
file:/Users/tom/workspace/htdg/input/n cdc/sample.txt:0+529
09/04/07 12:34:36 INFO mapred.TaskRunner:Task'attempt_local_0001_m_000000_0' done.
09/04/07 12:34:36 INFO mapred.LocalJobRunner:
09/04/07 12:34:36 INFO mapred.Merger: Merging 1 sorted segments 09/04/07 12:34:36 INFO mapred.Merger: Down to the last merge-pass, with 1 segments left of total size: 57 bytes
09/04/07 12:34:36 INFO mapred.LocalJobRunner:
09/04/07 12:34:36 INFO mapred.TaskRunner: Task:attempt_local_0001_r_000000_0 is done. And is in the process of commiting
09/04/07 12:34:36 INFO mapred.LocalJobRunner:
09/04/07 12:34:36 INFO mapred.TaskRunner: Task attempt_local_0001_r_000000_0 is allowed to commit now
09/04/07 12:34:36 INFO mapred.FileOutputCommitter: Saved output of task 'attempt_local_0001_r_000000_0' to file:/Users/tom/workspace/htdg/output
09/04/07 12:34:36 INFO mapred.LocalJobRunner: reduce > reduce
09/04/07 12:34:36 INFO mapred.TaskRunner:Task'attempt_local_0001_r_000000_0' done.
09/04/07 12:34:36 INFO mapred.JobClient: map 100% reduce 100%
09/04/07 12:34:36 INFO mapred.JobClient: Job complete: job_local_0001
09/04/07 12:34:36 INFO mapred.JobClient: Counters: 13
09/04/07 12:34:36 INFO mapred.JobClient: FileSystemCounters
09/04/07 12:34:36 INFO mapred.JobClient: FILE_BYTES_READ=27571
09/04/07 12:34:36 INFO mapred.JobClient: FILE_BYTES_WRITTEN=53907
09/04/07 12:34:36 INFO mapred.JobClient: Map-Reduce Framework
09/04/07 12:34:36 INFO mapred.JobClient Reduce input groups=2
09/04/07 12:34:36 INFO mapred.JobClient: Combine output records=0
09/04/07 12:34:36 INFO mapred.JobClient: Map input records=5
09/04/07 12:34:36 INFO mapred.JobClient: Reduce shuffle bytes=0
09/04/07 12:34:36 INFO mapred.JobClient: Reduce output records=2
09/04/07 12:34:36 INFO mapred.JobClient: Spilled Records=10
09/04/07 12:34:36 INFO mapred.JobClient: Map output bytes=45
09/04/07 12:34:36 INFO mapred.JobClient: Map input bytes=529
09/04/07 12:34:36 INFO mapred.JobClient: Combine input records=0
09/04/07 12:34:36 INFO mapred.JobClient: Map output records=5
09/04/07 12:34:36 INFO mapred.JobClient: Reduce input records=5
如果调用hadoop 命令的第一个参数是类名,则Hadoop将启动一个JVM来运行这个类。使用hadoop命令运行作业比直接使用Java命令运行更方便,因为前者将Hadoop库文件(及其依赖关系)路径加入到类路径参数中,同时也能获得Hadoop的配置文件。我们需要定义一个 HADOOP_CLASSPATH 环境变量用于添加应用程序类的路径,然后由Hadoop 脚本来执行相关操作。
以本地(独立)模式运行时,本书中所有程序均假设按照这种方式来设置HADOOP_CLASSPATH。命令的运行需要在示例代码所在的文件夹下进行。
运行作业所得到的输出提供了一些有用的信息。无法找到作业JAR 文件的警告信息是意料之中的,因为我们没有使用JAR文件在本地模式下运行。在集群上运行时,将不会看到这个警告。例如,我们可以看到,这个作业有指定的标识,即job_local_0001,并且执行了一个map 任务和一个reduce 任务(使用attempt_local_0001_m_000000_0和 attempt_local_0001_r_000000_0两个ID)。在调试MapReduce作业时,知道作业和任务的ID 是非常有用的。
输出的最后一部分,以Counters为标题,显示在Hadoop 上运行的每个作业的一些统计信息。这些信息对检查这些大量的数据是否按照预期进行处理非常有用。例如,我们查看系统输出的记录信息可知:5 个map 输入产生了5 个map 的输出,然后5 个reduce 输入产生2个reduce 输出。
输出数据写入output目录,其中每个reducer都有一个输出文件。我们的例子中包含一个 reducer,所以我们只能找到一个文件,名为part-00000:
% cat output/part-00000
1949 111
1950 22
这个结果和我们之前手动寻找的结果一样。我们把这个结果解释为1949年的最高气温记录为11.1℃,而1950 年为2.2℃。
新增的Java MapReduce API
Hadoop的版本0.20.0包含有一个新的 Java MapReduce API,有时也称为“上下文对象”(context object),旨在使API在今后更容易扩展。新的API 在类型上不兼容先前的API,所以,需要重写以前的应用程序才能使新的API发挥作用。[1]
新增的API 和旧的API 之间,有下面几个明显的区别。
新的API 倾向于使用虚类,而不是接口,因为这更容易扩展。例如,可以无需修改类的实现而在虚类中添加一个方法(即用默认的实现)。在新的API 中, mapper 和reducer现在都是虚类。
新的API 放在org.apache.hadoop.mapreduce 包(和子包)中。之前版本的API 依旧放在org.apache.hadoop.mapred中。
新的API充分使用上下文对象,使用户代码能与MapReduce系统通信。例如,MapContext 基本具备了JobConf、OutputCollector和Reporter的功能。
新的API 同时支持“推”(push)和“拉”(pull)式的迭代。这两类API,均可以将键/值对记录推给mapper,但除此之外,新的API 也允许把记录从map()方法中拉出。对reducer来说是一样的。“拉”式处理数据的好处是可以实现数据的批量处理,而非逐条记录地处理。
新增的API实现了配置的统一。旧API 通过一个特殊的JobConf 对象配置作业,该对象是Hadoop配置对象的一个扩展 (用于配置守护进程,详情请参见第130页的“API配置”小节)。在新的API 中,我们丢弃这种区分,所有作业的配置均通过Configuration 来完成。
新API中作业控制由Job类实现,而非JobClient类,新API中删除了JobClient类。
输出文件的命名方式稍有不同。map的输出文件名为part-m-nnnnn,而reduce的输出为part-r-nnnnn(其中nnnnn表示分块序号,为整数,且从0开始算)。
例2-6 显示了使用新API 重写的MaxTemperature应用。不同之处已加粗显示。
将旧API写的Mapper和Reducer类转换为新API时,记住将map()和reduce()的签名转换为新形式。如果只是将类的继承修改为对新的Mapper和Reducer类的继承,编译的时候也不会报错或显示警告信息,因为新的Mapper和Reducer类同样也提供了等价的map()和reduce()函数。但是,自己写的mapper或reducer代码是不会被调用的,这会导致难以诊断的错误。
例2-6. 用新上下文对象MapReduce API重写的MaxTemperature应用
public class NewMaxTemperature {
static class NewMaxTemperatureMapper
extends Mapper {
private static final int MISSING = 9999;
public void map(LongWritable key, Text value, Context context
throws IOException, InterruptedException {
String line = value.toString();
String year = line.substring(15, 19);
int airTemperature;
if (line.charAt(87) == '+') { // parseInt doesn't like leading plus signs
airTemperature = Integer.parseInt(line.substring(88, 92));
} else {
airTemperature = Integer.parseInt(line.substring(87, 92));
}
String quality = line.substring(92, 93);
if (airTemperature != MISSING && quality.matches("[01459]")) {
context.write(new Text(year), new IntWritable(airTemperature));
}
}
}
static class NewMaxTemperatureReducer
extends Reducer {
public void reduce(Text key, Iterable
Context context)
throws IOException, InterruptedException {
int maxValue = Integer.MIN_VALUE;
for (IntWritable value : values) {
maxValue = Math.max(maxValue, value.get());
}
context.write(key, new IntWritable(maxValue));
}
}
public static void main(String[] args) throws Exception {
if (args.length != 2) {
System.err.println("Usage: NewMaxTemperature ");
System.exit(-1);
}
Job job = new Job();
job.setJarByClass(NewMaxTemperature.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.setMapperClass(NewMaxTemperatureMapper.class);
job.setReducerClass(NewMaxTemperatureReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
[1] 在本书写作期间,0.20发布包中新增的API还不完整(或稳定)。为此,本书仍然使用旧的API。不过,本书所有范例将用新增的API重写(针对0.21.0和更新版本),可从本书网站下载。
横向扩展
前面介绍了MapReduce针对少量输入数据是如何工作的,现在我们开始鸟瞰整个系统以及有大量输入数据时数据是如何处理的。为了简单起见,到目前为止,我们的例子都只是用了本地文件系统中的文件。然而,为了实现横向扩展(scaling out),我们需要把数据存储在分布式文件系统中,一般为HDFS (详见第3章),由此允许Hadoop将MapReduce 计算移到存储有部分数据的各台机器上。下面我们看看具体过程。
数据流
首先定义一些术语。MapReduce作业(job) 是客户端需要执行的一个工作单元:它包括输入数据、MapReduce程序和配置信息。Hadoop将作业分成若干个小任务 (task)来执行,其中包括两类任务:map任务和reduce任务。
有两类节点控制着作业执行过程:一个jobtracker及一系列tasktracker。jobtracker通过调度tasktracker上运行的任务,来协调所有运行在系统上的作业。tasktracker在运行任务的同时将运行进度报告发送给jobtracker,jobtracker由此记录每项作业任务的整体进度情况。如果其中一个任务失败,jobtracker可以在另外一个tasktracker节点上重新调度该任务。
Hadoop将MapReduce的输入数据划分成等长的小数据块,称为输入分片(input split)或简称分片。Hadoop为每个分片构建一个map任务,并由该任务来运行用户自定义的map 函数从而处理分片中的每条记录。
拥有许多分片,意味着处理每个分片所需要的时间少于处理整个输入数据所花的时间。因此,如果我们并行处理每个分片,且每个分片数据比较小,那么整个处理过程将获得更好的负载平衡,因为一台较快的计算机能够处理的数据分片比一台较慢的计算机更多,且成一定的比例。即使使用相同的机器,处理失败的作业或其他同时运行的作业也能够实现负载平衡,并且如果分片被切分得更细,负载平衡的质量会更好。
另一方面,如果分片切分得太小,那么管理分片的总时间和构建map 任务的总时间将决定着作业的整个执行时间。对于大多数作业来说,一个合理的分片大小趋向于HDFS的一个块的大小,默认是64 MB,不过可以针对集群调整这个默认值,在新建所有文件或新建每个文件时具体指定即可。
Hadoop在存储有输入数据(HDFS中的数据)的节点上运行map任务,可以获得非常好的性能。这就是所谓的数据本地化优化(data locality optimization)。现在我们应该清楚为什么非常好的分片的大小应该与块大小相同:因为它是确保可以存储在单个节点上的最大输入块的大小。如果分片跨越两个数据块,那么对于任何一个HDFS 节点,基本上都不可能同时存储这两个数据块,因此分片中的部分数据需要通过网络传输到map任务节点。与使用本地数据运行整个map任务相比,这种方法显然效率更低。
map任务将其输出写入本地硬盘,而非HDFS。这是为什么?因为map的输出是中间结果:该中间结果由reduce任务处理后才产生最终输出结果,而且一旦作业完成,map的输出结果可以被删除。因此,如果把它存储在HDFS中并实现备份,难免有些小题大做。如果该节点上运行的map任务在将map 中间结果传送给reduce 任务之前失败,Hadoop将在另一个节点上重新运行这个map任务以再次构建map中间结果。
reduce任务并不具备数据本地化的优势——单个reduce任务的输入通常来自于所有mapper的输出。在本例中,我们仅有一个reduce 任务,其输入是所有map任务的输出。因此,排过序的map输出需通过网络传输发送到运行reduce 任务的节点。数据在reduce端合并,然后由用户定义的reduce 函数处理。reduce的输出通常存储在HDFS中以实现可靠存储。如第3章所述,对于每个reduce 输出的HDFS块,第一个复本存储在本地节点上,其他复本存储在其他机架节点中。因此,reduce的输出写入HDFS确实需要占用网络带宽,但这与正常的HDFS流水线写入的消耗一样。
一个reduce任务的完整数据流如图2-2所示。虚线框表示节点,虚线箭头表示节点内部的数据传输,而实线箭头表示节点之间的数据传输。
▲图2-2. 一个reduce任务的MapReduce数据流
reduce任务的数量并非由输入数据的大小决定,而是特别指定的。第191页的“默认的MapReduce作业”小节将介绍如何为指定的作业选择reduce任务的数量。
如有多个reduce任务,则每个map任务都会对其输出进行分区(partition),即为每个reduce任务建一个分区。每个分区有许多键(及其对应值),但每个键对应的键/值对记录都在同一分区中。分区由用户定义的分区函数控制,但通常用默认的分区器(partitioner,文中有时也称“分区函数”)通过哈希函数来分区,这种方法很高效。
一般情况下,多个reduce任务的数据流如图2-3所示。该图清楚地表明了为什么map任务和reduce任务之间的数据流称为shuffle(混洗),因为每个reduce 任务的输入都来自许多map任务。混洗一般比此图所示的更复杂,并且调整混洗参数对作业总执行时间会有非常大的影响,详情参见第177页的“混洗和排序”小节。
▲图2-3. 多个reduce任务的数据流
最后,也有可能没有任何reduce任务。当数据处理可以完全并行时,即无需混洗,可能会出现无reduce任务的情况(示例参见第211页的“NLineInputFormat”小节)。在这种情况下,唯一的非本地节点数据传输是map任务将结果写入HDFS(参见图2-4)。
combiner
集群上的可用带宽限制了MapReduce作业的数量,因此最重要的一点是尽量避免map任务和reduce任务之间的数据传输。Hadoop允许用户针对map任务的输出指定一个合并函数(文中有时也称作combiner,就像mapper和reducer一样——译者注)——合并函数的输出作为reduce函数的输入。由于合并函数是一个优化方案,所以Hadoop无法确定针对map任务输出中任一条记录需要调用多少次合并函数(如果需要)。换言之,不管调用合并函数多少次,0次、1次或多次,reducer的输出结果都应一致。
▲图2-4. 无reduce任务的MapReduce数据流
合并函数的规则限定了可以使用的函数类型。这里最好通过一个例子来说明。依旧假设以前计算最高气温的例子,1950年的读数由两个map任务处理(因为它们在不同的分片中)。假设第一个map 的输出如下:
(1950, 0)
(1950, 20)
(1950, 10)
第二个map的输出如下:
(1950, 25)
(1950, 15)
reduce函数被调用时,输入如下:
(1950, [0, 20, 10, 25, 15])
因为25为该列数据中最大的,所以其输出如下:
(1950, 25)
我们可以像使用reduce函数那样,使用合并函数找出每个map任务输出结果中的最高气温。如此一来,reduce函数调用时将被传入以下数据:
(1950, [20, 25])
reduce输出的结果和以前一样。更简单地说,我们可以通过下面的表达式来说明气温数值上的函数调用:
max(0, 20, 10, 25, 15) = max(max(0, 20, 10), max(25, 15)) = max(20, 25) = 25
并非所有函数都具有该属性。[1]例如,如果我们计算平均气温,便不能用平均数作为combiner,因为
mean(0, 20, 10, 25, 15) = 14
而combiner不能取代reduce函数:
mean(mean(0, 20, 10), mean(25, 15)) = mean(10, 20) = 15
为什么呢?我们仍然需要reduce函数来处理不同map输出中具有相同键的记录。但它能有效减少map和reduce之间的数据传输量,在MapReduce作业中使用combiner,是需要慎重考虑的。
指定一个合并函数
让我们回到Java MapReduce 程序,合并函数是通过reducer接口来定义的,并且该例中,它的实现与MaxTemperatureReducer中的reduce函数相同。唯一需要做的修改是在JobConf中设置combiner类(见例2-7)。
例2-7. 使用合并函数快速找出最高气温
public class MaxTemperatureWithCombiner {
public static void main(String[] args) throws IOException {
if (args.length != 2) {
System.err.println("Usage: MaxTemperatureWithCombiner " +
"");
System.exit(-1);
}
JobConf conf = new JobConf(MaxTemperatureWithCombiner.class);
conf.setJobName("Max temperature");
FileInputFormat.addInputPath(conf, new Path(args[0]));
FileOutputFormat.setOutputPath(conf, new Path(args[1]));
conf.setMapperClass(MaxTemperatureMapper.class);
conf.setCombinerClass(MaxTemperatureReducer.class);
conf.setReducerClass(MaxTemperatureReducer.class);
conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(IntWritable.class);
JobClient.runJob(conf);
}
}
[1] 在Gray等人1995年发表的论文“Data Cube: A Relational Aggregation Operator Generalizing Group-By, Cross-Tab, and Sub-Totals”中,具有该属性的函数被称为是“分布式的”。
运行分布式的MapReduce作业
无需修改,便可以在一个完整的数据集上直接运行这个程序。这是MapReduce的优势之一:它可以根据数据量的大小和硬件规模进行扩展。这里有一个运行结果:在一个10节点EC2集群运行High-CPU Extra Large lnstance,程序执行时间只有6分钟。[1]
我们将在第5章分析在集群上运行程序的机制。
Hadoop的Streaming
Hadoop提供了MapReduce的API,并允许你使用非Java的其他语言来写自己的map和reduce函数。Hadoop的Streaming使用Unix标准流作为Hadoop和应用程序之间的接口,所以我们可以使用任何编程语言通过标准输入/输出来写MapReduce程序。
Streaming天生适合用于文本处理(尽管到0.21.0版本时,它也可以处理二进制流),在文本模式下使用时,它有一个数据的行视图。map的输入数据通过标准输入流传递给map函数,并且是一行一行地传输,最后将结果行写到标准输出。map输出的键/值对是以一个制表符分隔的行,它以这样的形式写到标准输出。reduce 函数的输入格式相同——通过制表符来分隔的键/值对——并通过标准输入流进行传输。reduce函数从标准输入流中读取输入行,该输入已由Hadoop框架根据键排过序,最后将结果写入标准输出。
下面使用Streaming来重写按年份查找最高气温的MapReduce程序。
Ruby版本
例2-8显示了用Ruby编写的map函数。
例2-8. 用Ruby编写查找最高气温的map函数
#!/usr/bin/env ruby
STDIN.each_line do |line|
val = line
year, temp, q = val[15,4], val[87,5], val[92,1]
puts "#{year}\t#{temp}" if (temp != "+9999" && q =~ /[01459]/)
end
程序通过程序块执行STDIN(一个IO类型的全局常量)中的每一行来迭代执行标准输入中的每一行。该程序块从输入的每一行中取出相关字段,如果气温有效,就将年份以及气温写到标准输出(使用puts),其中年份和气温之间有一个制表符\t。
值得一提的是Streaming和Java MapReduce API之间的设计差异。Java API控制的map函数一次只能处理一条记录。针对输入数据中的每一条记录,该框架均需调用Mapper的map()方法来处理,然而在Streaming中,map程序可以自己决定如何处理输入数据,例如,它可以轻松读取并同时处理若干行,因为它受读操作的控制。用户的Java map实现的是“推”记录方式,但它依旧可以同时处理多行,具体做法是通过mapper中实例变量将之前读取的多行汇聚在一起。[2]在这种情况下,需要实现close()方法,以便知道何时读到最后一条记录,进而完成对最后一组记录行的处理。
由于该脚本只能在标准输入和输出上运行,所以最简单的方式是在Unix管道上进行测试,而不是在Hadoop中进行测试:
% cat input/ncdc/sample.txt | ch02/src/main/ruby/max_temperature_map.rb
1950 +0000
1950 +0022
1950 -0011
1949 +0111
1949 +0078
例2-9显示的reduce函数更复杂一些。
例2-9. 用Ruby编写的查找最高气温的reduce函数
#!/usr/bin/env ruby
last_key, max_val = nil, 0
STDIN.each_line do |line|
key, val = line.split("\t")
if last_key && last_key != key
puts "#{last_key}\t#{max_val}"
last_key, max_val = key, val.to_i
else
last_key, max_val = key, [max_val, val.to_i].max
end
end
puts "#{last_key}\t#{max_val}" if last_key
同样地,程序遍历标准输入中的行,但在我们处理每个键组时,要存储一些状态。在这种情况下,键是气象站的标识符,我们存储看到的最后一个键和迄今为止见到的该键对应的最高气温。MapReduce框架保证了键的有序性,由此我们知道,如果读到一个键与前一个键不同,就需要开始处理一个新的键组。相比之下,Java API系统提供一个针对每个键组的迭代器,而在Streaming中,需要在程序中找出键组的边界。
我们从每行取出键和值,然后如果正好完成一个键组的处理(last_key&last_key = key),就针对该键组写入该键及其最高气温,用一个制表符来进行分隔,最后开始处理新键组时我们需要重置最高气温值。如果尚未完成对一个键组的处理,那么就只有当前键的最高气温被更新。
程序的最后一行确保了处理完输入的最后一个键组后会有一行输出。
现在可以用Unix管线来模拟整个MapReduce管线,该管线与图2-1中显示的Unix管线是相同的:
% cat input/ncdc/sample.txt | ch02/src/main/ruby/max_temperature_map.rb | \
sort | ch02/src/main/ruby/max_temperature_reduce.rb
1949 111
1950 22
输出结果和Java程序的一样,所以下一步是通过Hadoop运行它。
hadoop命令不支持Streaming函数,因此,我们需要在指定Streaming JAR文件流与jar选项时指定。Streaming程序的选项指定了输入和输出路径,以及map和reduce脚本。如下所示:
% hadoop jar $HADOOP_INSTALL/contrib/streaming/hadoop-*-streaming.jar \
-input input/ncdc/sample.txt \
-output output \
-mapper ch02/src/main/ruby/max_temperature_map.rb \
-reducer ch02/src/main/ruby/max_temperature_reduce.rb
在一个集群上运行一个庞大的数据集时,我们要使用-combiner选项来设置合并函数。
从0.21.0版开始,合并函数可以是任何一个Streaming命令。对于早期版本,合并函数只能用Java编写,所以一个变通的方法是在mapper中进行手动合并,进而避开Java语言。在这里,我们可以把mapper改成流水线:
% hadoop jar $HADOOP_INSTALL/contrib/streaming/hadoop-*-streaming.jar \
-input input/ncdc/all \
-output output \
-mapper "ch02/src/main/ruby/max_temperature_map.rb | sort |
ch02/src/main/ruby/max_temperature_reduce.rb" \
-reducer ch02/src/main/ruby/max_temperature_reduce.rb \
-file ch02/src/main/ruby/max_temperature_map.rb \
-file ch02/src/main/ruby/max_temperature_reduce.rb
还需注意-file选项的使用,在集群上运行Streaming程序时,我们会使用这个选项,从而将脚本传输到集群。
Python版本
Streaming支持任何可以从标准输入读取和写入到标准输出中的编程语言,因此对于更熟悉Python的读者,下面提供了同一个例子的Python 版本。[3]map脚本参见例2-10,reduce脚本参见例2-11。
例2-10. 用Python编写用于查找最高气温的map函数
#!/usr/bin/env python
import re
import sys
for line in sys.stdin:
val = line.strip()
(year, temp, q) = (val[15:19], val[87:92], val[92:93])
if (temp != "+9999" and re.match("[01459]", q)):
print "%s\t%s" % (year, temp)
例2-11. 用Python编写用于查找最高气温的reduce函数
#!/usr/bin/env python
import sys
(last_key, max_val) = (None, 0)
for line in sys.stdin:
(key, val) = line.strip().split("\t")
if last_key and last_key != key:
print "%s\t%s" % (last_key, max_val)
(last_key, max_val) = (key, int(val))
else:
(last_key, max_val) = (key, max(max_val, int(val)))
if last_key:
print "%s\t%s" % (last_key, max_val)
我们可以像测试Ruby程序那样测试程序并运行作业。例如,可以像下面这样运行测试:
% cat input/ncdc/sample.txt | ch02/src/main/python/max_temperature_map.py | \
sort | ch02/src/main/python/max_temperature_reduce.py
1949 111
1950 22
Hadoop的Pipes
Hadoop的Pipes是Hadoop MapReduce的C++接口代称。不同于使用标准输入和输出来实现map代码和reduce代码之间的Streaming,Pipes使用套接字作为tasktracker与C++版本map函数或reduce函数的进程之间的通道,而未使用JNI。
我们将用C++重写贯穿本章的示例,然后,我们将看到如何使用Pipes来运行它。例 2-12 显示了用C++语言编写的map函数和reduce 函数的源代码。
例2-12. 用C++语言编写的MaxTemperature程序
#include
#include
#include
#include
#include "hadoop/Pipes.hh"
#include "hadoop/TemplateFactory.hh"
#include "hadoop/StringUtils.hh"
class MaxTemperatureMapper : public HadoopPipes::Mapper {
public:
MaxTemperatureMapper(HadoopPipes::TaskContext& context) {
}
void map(HadoopPipes::MapContext& context) {
std::string line = context.getInputValue();
std::string year = line.substr(15, 4);
std::string airTemperature = line.substr(87, 5);
std::string q = line.substr(92, 1);
if (airTemperature != "+9999" &&
(q == "0" || q == "1" || q == "4" || q == "5" || q == "9")) {
context.emit(year, airTemperature);
}
}
};
class MapTemperatureReducer : public HadoopPipes::Reducer {
public:
MapTemperatureReducer(HadoopPipes::TaskContext& context) {
}
void reduce(HadoopPipes::ReduceContext& context) {
int maxValue = INT_MIN;
while (context.nextValue()) {
maxValue = std::max(maxValue, HadoopUtils::toInt(context.getInputValue()));
}
context.emit(context.getInputKey(), HadoopUtils::toString(maxValue));
}
};
int main(int argc, char *argv[]) {
returnHadoopPipes::runTask(HadoopPipes::TemplateFactory());
}
应用程序对Hadoop C++库链接提供了一个与tasktracker 子进程进行通信的简单封装。通过扩展HadoopPipes命名空间中定义的mapper和reducer两个类,我们定义了map()和reduce()方法,同时我们提供各种情况下map()和reduce()方法的实现。这些方法采用了上下文对象(MapContext类型或ReduceContext类型),进而提供了读取输入数据和写入输出数据,以及通过JobConf类来访问作业配置信息的功能。本例中的处理过程类似于Java的处理方式。
与Java接口不同,C++接口中的键和值按字节缓冲,用标准模板库(Standard Template Library,STL)中的字符串表示。这样做简化了接口,但把更重的负担留给了应用程序开发人员,因为开发人员必须来回封送(marshall)字符串与特定应用领域内使用的具体类型。这一点在MapTemperatureReducer中有所体现,我们必须把输入值转换为整型值(通过HadoopUtils中定义的方法),然后将找到的最大值转化为字符串后再输出。在某些情况下,我们可以省略这类转化,如MaxTemperatureMapper 中的airTemperature值无需转换为整型,因为map()方法并不将它当作数值类型来处理。
这个应用程序的入口点是main()方法。它调用HadoopPipes::runTask,该函数连接到Java父进程,并在mapper和reducer之间来回封送数据。runTask()方法被传入一个Factory参数,由此新建mapper或reducer实例。新建mapper还是创建reducer,Java父进程可通过套接字连接进行控制。我们可以用重载模板factory来设置combiner、partitioner、record reader或record writer。
编译运行
现在我们可以用Makerfile编译连接例2-13中的程序。
例2-13. C++版本MapReduce程序的Makefile
CC = g++
CPPFLAGS = -m32 -I$(HADOOP_INSTALL)/c++/$(PLATFORM)/include
max_temperature: max_temperature.cpp
$ (CC) $(CPPFLAGS) $< -Wall -L$(HADOOP_INSTALL)/c++/$(PLATFORM)/lib
-lhadooppipes \ -lhadooputils -lpthread -g -O2 -o $@
在Makefile中需要设置许多环境变量。除了HADOOP_INSTALL变量(如果遵循附录A 中的安装说明,应该已经设置好),还需要定义PLATFORM变量,该变量指定了操作系统、体系结构和数据模型(例如,32 位或64 位)。我在32位Linux系统的机器编译运行了如下内容:
% export PLATFORM=Linux-i386-32
% make
成功编译之后,可以在当前目录中找到名为max_temperature的可执行文件。
我们需要以伪分布式(pseudo_distrinuted)模式(其中所有守护进程运行在本地计算机上)运行Hadoop来运行Pipes作业,具体设置步骤请参见附录A。Pipes不能在独立模式(本地运行)下运行,因为它依赖于Hadoop的分布式缓存机制,而该机制只有在HDFS 运行时才起作用。
Hadoop守护进程开始运行后,第一步是把可执行文件复制到HDFS,以便在启动 map和reduce任务时,tasktracker能够找到关联的可执行程序:
% hadoop fs -put max_temperature bin/max_temperature
示例数据同样也需要从本地文件系统复制到HDFS。
现在可以运行这个作业。我们用hadoop Pipes命令使其运行,使用-program参数来传递在HDFS中可执行文件的URI:
% hadoop pipes \
-D hadoop.pipes.java.recordreader=true \
-D hadoop.pipes.java.recordwriter=true \
-input sample.txt \
-output output \
-program bin/max_temperature
我们使用-D 选项来指定两个属性:hadoop.pipes.java.recordreader和hadoop.pipes.java.recordwriter,这两个属性都被设置为true,表示我们并不指定C++记录读取函数或记录写入函数,而是使用默认的Java设置(用来设置文本输入和输出)。Pipes还允许我们设置一个Java mapper、reducer、合并函数或分区函数。事实上,在任何一个作业中,都可以混合使用Java类或C++类。结果和其他语言版本的结果一样。
[1] 就其速度而言,这比在单台计算机上依顺序运行的awk快7倍。这里之所以性能不是线性增加是由于输入数据不是均匀切分的。为了方便,输入数据时按年份压缩成gzip文件,这导致晚些年份的数据文件比较大,因为那些年记录的天气数据更多。
[2] 另一种方法是,可以在新增的MapReduce API中使用“拉”的方式来处理。详情参见第25页的“新增的Java MapReduce API”小节。
[3] 作为Streaming的替代方案,Python程序员可考虑Dumbo,它能使Streaming MapReduce接口更像Python、更好用。其网址为http://www.lost.fm/dumbo。