云计算 频道

关于MapReduce-Hadoop权威指南连载

  运行分布式的MapReduce作业

  无需修改,便可以在一个完整的数据集上直接运行这个程序。这是MapReduce的优势之一:它可以根据数据量的大小和硬件规模进行扩展。这里有一个运行结果:在一个10节点EC2集群运行High-CPU Extra Large lnstance,程序执行时间只有6分钟。[1]

  我们将在第5章分析在集群上运行程序的机制。

  Hadoop的Streaming

  Hadoop提供了MapReduce的API,并允许你使用非Java的其他语言来写自己的map和reduce函数。Hadoop的Streaming使用Unix标准流作为Hadoop和应用程序之间的接口,所以我们可以使用任何编程语言通过标准输入/输出来写MapReduce程序。

  Streaming天生适合用于文本处理(尽管到0.21.0版本时,它也可以处理二进制流),在文本模式下使用时,它有一个数据的行视图。map的输入数据通过标准输入流传递给map函数,并且是一行一行地传输,最后将结果行写到标准输出。map输出的键/值对是以一个制表符分隔的行,它以这样的形式写到标准输出。reduce 函数的输入格式相同——通过制表符来分隔的键/值对——并通过标准输入流进行传输。reduce函数从标准输入流中读取输入行,该输入已由Hadoop框架根据键排过序,最后将结果写入标准输出。

  下面使用Streaming来重写按年份查找最高气温的MapReduce程序。

  Ruby版本

  例2-8显示了用Ruby编写的map函数。

  例2-8. 用Ruby编写查找最高气温的map函数

  #!/usr/bin/env ruby

  STDIN.each_line do |line|

  val = line

  year, temp, q = val[15,4], val[87,5], val[92,1]

  puts "#{year}\t#{temp}" if (temp != "+9999" && q =~ /[01459]/)

  end

  程序通过程序块执行STDIN(一个IO类型的全局常量)中的每一行来迭代执行标准输入中的每一行。该程序块从输入的每一行中取出相关字段,如果气温有效,就将年份以及气温写到标准输出(使用puts),其中年份和气温之间有一个制表符\t。

  值得一提的是Streaming和Java MapReduce API之间的设计差异。Java API控制的map函数一次只能处理一条记录。针对输入数据中的每一条记录,该框架均需调用Mapper的map()方法来处理,然而在Streaming中,map程序可以自己决定如何处理输入数据,例如,它可以轻松读取并同时处理若干行,因为它受读操作的控制。用户的Java map实现的是“推”记录方式,但它依旧可以同时处理多行,具体做法是通过mapper中实例变量将之前读取的多行汇聚在一起。[2]在这种情况下,需要实现close()方法,以便知道何时读到最后一条记录,进而完成对最后一组记录行的处理。

  由于该脚本只能在标准输入和输出上运行,所以最简单的方式是在Unix管道上进行测试,而不是在Hadoop中进行测试:

  % cat input/ncdc/sample.txt | ch02/src/main/ruby/max_temperature_map.rb

  1950 +0000

  1950 +0022

  1950 -0011

  1949 +0111

  1949 +0078

  例2-9显示的reduce函数更复杂一些。

  例2-9. 用Ruby编写的查找最高气温的reduce函数

  #!/usr/bin/env ruby

  last_key, max_val = nil, 0

  STDIN.each_line do |line|

  key, val = line.split("\t")

  if last_key && last_key != key

  puts "#{last_key}\t#{max_val}"

  last_key, max_val = key, val.to_i

  else

  last_key, max_val = key, [max_val, val.to_i].max

  end

  end

  puts "#{last_key}\t#{max_val}" if last_key

  同样地,程序遍历标准输入中的行,但在我们处理每个键组时,要存储一些状态。在这种情况下,键是气象站的标识符,我们存储看到的最后一个键和迄今为止见到的该键对应的最高气温。MapReduce框架保证了键的有序性,由此我们知道,如果读到一个键与前一个键不同,就需要开始处理一个新的键组。相比之下,Java API系统提供一个针对每个键组的迭代器,而在Streaming中,需要在程序中找出键组的边界。

  我们从每行取出键和值,然后如果正好完成一个键组的处理(last_key&last_key = key),就针对该键组写入该键及其最高气温,用一个制表符来进行分隔,最后开始处理新键组时我们需要重置最高气温值。如果尚未完成对一个键组的处理,那么就只有当前键的最高气温被更新。

  程序的最后一行确保了处理完输入的最后一个键组后会有一行输出。

  现在可以用Unix管线来模拟整个MapReduce管线,该管线与图2-1中显示的Unix管线是相同的:

  % cat input/ncdc/sample.txt | ch02/src/main/ruby/max_temperature_map.rb | \

  sort | ch02/src/main/ruby/max_temperature_reduce.rb

  1949 111

  1950 22

  输出结果和Java程序的一样,所以下一步是通过Hadoop运行它。

  hadoop命令不支持Streaming函数,因此,我们需要在指定Streaming JAR文件流与jar选项时指定。Streaming程序的选项指定了输入和输出路径,以及map和reduce脚本。如下所示:

  % hadoop jar $HADOOP_INSTALL/contrib/streaming/hadoop-*-streaming.jar \

  -input input/ncdc/sample.txt \

  -output output \

  -mapper ch02/src/main/ruby/max_temperature_map.rb \

  -reducer ch02/src/main/ruby/max_temperature_reduce.rb

  在一个集群上运行一个庞大的数据集时,我们要使用-combiner选项来设置合并函数。

  从0.21.0版开始,合并函数可以是任何一个Streaming命令。对于早期版本,合并函数只能用Java编写,所以一个变通的方法是在mapper中进行手动合并,进而避开Java语言。在这里,我们可以把mapper改成流水线:

  % hadoop jar $HADOOP_INSTALL/contrib/streaming/hadoop-*-streaming.jar \

  -input input/ncdc/all \

  -output output \

  -mapper "ch02/src/main/ruby/max_temperature_map.rb | sort |

  ch02/src/main/ruby/max_temperature_reduce.rb" \

  -reducer ch02/src/main/ruby/max_temperature_reduce.rb \

  -file ch02/src/main/ruby/max_temperature_map.rb \

  -file ch02/src/main/ruby/max_temperature_reduce.rb

  还需注意-file选项的使用,在集群上运行Streaming程序时,我们会使用这个选项,从而将脚本传输到集群。

  Python版本

  Streaming支持任何可以从标准输入读取和写入到标准输出中的编程语言,因此对于更熟悉Python的读者,下面提供了同一个例子的Python 版本。[3]map脚本参见例2-10,reduce脚本参见例2-11。

  例2-10. 用Python编写用于查找最高气温的map函数

  #!/usr/bin/env python

  import re

  import sys

  for line in sys.stdin:

  val = line.strip()

  (year, temp, q) = (val[15:19], val[87:92], val[92:93])

  if (temp != "+9999" and re.match("[01459]", q)):

  print "%s\t%s" % (year, temp)

  例2-11. 用Python编写用于查找最高气温的reduce函数

  #!/usr/bin/env python

  import sys

  (last_key, max_val) = (None, 0)

  for line in sys.stdin:

  (key, val) = line.strip().split("\t")

  if last_key and last_key != key:

  print "%s\t%s" % (last_key, max_val)

  (last_key, max_val) = (key, int(val))

  else:

  (last_key, max_val) = (key, max(max_val, int(val)))

  if last_key:

  print "%s\t%s" % (last_key, max_val)

  我们可以像测试Ruby程序那样测试程序并运行作业。例如,可以像下面这样运行测试:

  % cat input/ncdc/sample.txt | ch02/src/main/python/max_temperature_map.py | \

  sort | ch02/src/main/python/max_temperature_reduce.py

  1949 111

  1950 22

  Hadoop的Pipes

  Hadoop的Pipes是Hadoop MapReduce的C++接口代称。不同于使用标准输入和输出来实现map代码和reduce代码之间的Streaming,Pipes使用套接字作为tasktracker与C++版本map函数或reduce函数的进程之间的通道,而未使用JNI。

  我们将用C++重写贯穿本章的示例,然后,我们将看到如何使用Pipes来运行它。例 2-12 显示了用C++语言编写的map函数和reduce 函数的源代码。

  例2-12. 用C++语言编写的MaxTemperature程序

  #include

  #include

  #include

  #include

  #include "hadoop/Pipes.hh"

  #include "hadoop/TemplateFactory.hh"

  #include "hadoop/StringUtils.hh"

  class MaxTemperatureMapper : public HadoopPipes::Mapper {

  public:

  MaxTemperatureMapper(HadoopPipes::TaskContext& context) {

  }

  void map(HadoopPipes::MapContext& context) {

  std::string line = context.getInputValue();

  std::string year = line.substr(15, 4);

  std::string airTemperature = line.substr(87, 5);

  std::string q = line.substr(92, 1);

  if (airTemperature != "+9999" &&

  (q == "0" || q == "1" || q == "4" || q == "5" || q == "9")) {

  context.emit(year, airTemperature);

  }

  }

  };

  class MapTemperatureReducer : public HadoopPipes::Reducer {

  public:

  MapTemperatureReducer(HadoopPipes::TaskContext& context) {

  }

  void reduce(HadoopPipes::ReduceContext& context) {

  int maxValue = INT_MIN;

  while (context.nextValue()) {

  maxValue = std::max(maxValue, HadoopUtils::toInt(context.getInputValue()));

  }

  context.emit(context.getInputKey(), HadoopUtils::toString(maxValue));

  }

  };

  int main(int argc, char *argv[]) {

  returnHadoopPipes::runTask(HadoopPipes::TemplateFactory());

  }

  应用程序对Hadoop C++库链接提供了一个与tasktracker 子进程进行通信的简单封装。通过扩展HadoopPipes命名空间中定义的mapper和reducer两个类,我们定义了map()和reduce()方法,同时我们提供各种情况下map()和reduce()方法的实现。这些方法采用了上下文对象(MapContext类型或ReduceContext类型),进而提供了读取输入数据和写入输出数据,以及通过JobConf类来访问作业配置信息的功能。本例中的处理过程类似于Java的处理方式。

  与Java接口不同,C++接口中的键和值按字节缓冲,用标准模板库(Standard Template Library,STL)中的字符串表示。这样做简化了接口,但把更重的负担留给了应用程序开发人员,因为开发人员必须来回封送(marshall)字符串与特定应用领域内使用的具体类型。这一点在MapTemperatureReducer中有所体现,我们必须把输入值转换为整型值(通过HadoopUtils中定义的方法),然后将找到的最大值转化为字符串后再输出。在某些情况下,我们可以省略这类转化,如MaxTemperatureMapper 中的airTemperature值无需转换为整型,因为map()方法并不将它当作数值类型来处理。

  这个应用程序的入口点是main()方法。它调用HadoopPipes::runTask,该函数连接到Java父进程,并在mapper和reducer之间来回封送数据。runTask()方法被传入一个Factory参数,由此新建mapper或reducer实例。新建mapper还是创建reducer,Java父进程可通过套接字连接进行控制。我们可以用重载模板factory来设置combiner、partitioner、record reader或record writer。

  编译运行

  现在我们可以用Makerfile编译连接例2-13中的程序。

  例2-13. C++版本MapReduce程序的Makefile

  CC = g++

  CPPFLAGS = -m32 -I$(HADOOP_INSTALL)/c++/$(PLATFORM)/include

  max_temperature: max_temperature.cpp

  $ (CC) $(CPPFLAGS) $< -Wall -L$(HADOOP_INSTALL)/c++/$(PLATFORM)/lib

  -lhadooppipes \ -lhadooputils -lpthread -g -O2 -o $@

  在Makefile中需要设置许多环境变量。除了HADOOP_INSTALL变量(如果遵循附录A 中的安装说明,应该已经设置好),还需要定义PLATFORM变量,该变量指定了操作系统、体系结构和数据模型(例如,32 位或64 位)。我在32位Linux系统的机器编译运行了如下内容:

  % export PLATFORM=Linux-i386-32

  % make

  成功编译之后,可以在当前目录中找到名为max_temperature的可执行文件。

  我们需要以伪分布式(pseudo_distrinuted)模式(其中所有守护进程运行在本地计算机上)运行Hadoop来运行Pipes作业,具体设置步骤请参见附录A。Pipes不能在独立模式(本地运行)下运行,因为它依赖于Hadoop的分布式缓存机制,而该机制只有在HDFS 运行时才起作用。

  Hadoop守护进程开始运行后,第一步是把可执行文件复制到HDFS,以便在启动 map和reduce任务时,tasktracker能够找到关联的可执行程序:

  % hadoop fs -put max_temperature bin/max_temperature

  示例数据同样也需要从本地文件系统复制到HDFS。

  现在可以运行这个作业。我们用hadoop Pipes命令使其运行,使用-program参数来传递在HDFS中可执行文件的URI:

  % hadoop pipes \

  -D hadoop.pipes.java.recordreader=true \

  -D hadoop.pipes.java.recordwriter=true \

  -input sample.txt \

  -output output \

  -program bin/max_temperature

  我们使用-D 选项来指定两个属性:hadoop.pipes.java.recordreader和hadoop.pipes.java.recordwriter,这两个属性都被设置为true,表示我们并不指定C++记录读取函数或记录写入函数,而是使用默认的Java设置(用来设置文本输入和输出)。Pipes还允许我们设置一个Java mapper、reducer、合并函数或分区函数。事实上,在任何一个作业中,都可以混合使用Java类或C++类。结果和其他语言版本的结果一样。

  [1] 就其速度而言,这比在单台计算机上依顺序运行的awk快7倍。这里之所以性能不是线性增加是由于输入数据不是均匀切分的。为了方便,输入数据时按年份压缩成gzip文件,这导致晚些年份的数据文件比较大,因为那些年记录的天气数据更多。

  [2] 另一种方法是,可以在新增的MapReduce API中使用“拉”的方式来处理。详情参见第25页的“新增的Java MapReduce API”小节。

  [3] 作为Streaming的替代方案,Python程序员可考虑Dumbo,它能使Streaming MapReduce接口更像Python、更好用。其网址为http://www.lost.fm/dumbo。

0
相关文章