使用Hadoop分析数据
为了充分发挥Hadoop 提供的并行处理优势,我们需要将查询表示成MapReduce 作业。经过一些本地的小规模测试,我们将能够在集群设备上运行Hadoop。
map阶段和reduce阶段
MapReduce任务过程被分为两个处理阶段:map阶段和reduce阶段。每个阶段都以键/值对作为输入和输出,并由程序员选择它们的类型。程序员还需具体定义两个函数:map函数和reduce 函数。
map阶段的输入是原始NCDC数据。我们选择文本格式作为输入格式,以便将数据集的每一行作为一个文本值进行输入。键为该行起始位置相对于文件起始位置的偏移量,但我们不需要这个信息,故将其忽略。
我们的map函数很简单。由于我们只对年份和气温这两个属性感兴趣,所以只需要取出这两个属性数据。在本例中,map函数只是一个数据准备阶段,通过这种方式来准备数据,使reduce函数能在该准备数据上继续处理:即找出每年的最高气温。map函数还是一个比较适合去除已损记录的地方:此处,我们将筛选掉缺失的、可疑的或错误的气温数据。
为了全面了解map 的工作方式,我们思考以下几行作为输入数据的示例数据 (考虑到页面篇幅,去除了一些未使用的列,并用省略号表示):
0067011990999991950051507004...9999999N9+00001+99999999999... 0043011990999991950051512004...9999999N9+00221+99999999999... 0043011990999991950051518004...9999999N9-00111+99999999999... 0043012650999991949032412004...0500001N9+01111+99999999999... 0043012650999991949032418004...0500001N9+00781+99999999999...
这些行以键/值对的方式来表示map函数:
(0, 0067011990999991950051507004...9999999N9+00001+99999999999...)
(106, 0043011990999991950051512004...9999999N9+00221+99999999999...)
(212, 0043011990999991950051518004...9999999N9-00111+99999999999...)
(318, 0043012650999991949032412004...0500001N9+01111+99999999999...)
(424, 0043012650999991949032418004...0500001N9+00781+99999999999...)
键(key)是文件中的行偏移量,map函数并不需要这个信息,所以将其忽略。map函数的功能仅限于提取年份和气温信息(以粗体显示),并将它们作为输出(气温值已用整数表示):
(1950, 0)
(1950, 22)
(1950, −11)
(1949, 111)
(1949, 78)
map函数的输出经由MapReduce框架处理后,最后被发送到reduce函数。这一处理过程中需要根据键对键/值对进行排序和分组。因此,我们的示例中,reduce函数会看到如下输入:
(1949, [111, 78])
(1950, [0, 22, −11])
每一年份后紧跟着一系列气温数据。所有reduce函数现在需要做的是遍历整个列表并从中找出最大的读数:
(1949, 111)
(1950, 22)
这是最终输出结果:每一年的全球最高气温记录。
整个数据流如图2-1所示。在图的底部是Unix的流水线(pipeline,也称管道或管线)命令,用于模拟整个MapReduce的流程,部分内容将在讨论Hadoop Streaming时再次涉及。
▲