云计算 频道

Hadoop应用实例

  Rackspace的日志处理

  Rackspace Hosting一直为企业提供管理系统,以同样的方式,Mailtrust在2007秋变成Rackspace的邮件分部。Rackspace目前在几百台服务器上为100多万用户和几千家公司提供邮件服务。

  要求/问题

  通过系统传输Rackspace用户的邮件产生了相当大的“文件”路径信息,它们以各种格式的日志文件的形式存放,每天大约有150 GB。聚集这些数据对系统发展规划以及了解用户如何使用我们的系统是非常有帮助的,并且,这些记录对系统故障排查也有好处。

  假如一封邮件发送失败或用户无法登陆系统,这时非常重要的事是让我们的客服能找到足够的问题相关信息开始调试。为了能够快速发现这些信息,我们不能把日志文件放在产生它们的机器上或以其原始格式存放。相反,我们使用Hadoop来做大量的日志处理工作,而其结果被Lucene索引之后用来支持客服的查询需求。

  日志

  数量级最大的两种日志格式是由Postfix邮件发送代理和Microsoft Exchange Server产生的。所有通过我们系统的邮件都要在某个地方使用Postfix邮件代理服务器,并且大部分消息都要穿越多个Postfix服务器。Exchange是必须独立的系统,但是其中有一类profix服务器充当一个附加保护层,它们使用SMTP协议在各个环境下的托管邮箱之间传递消息。

  消息要穿越很多机器,但是每个服务器只知道邮件的目的地,然后发送邮件到下一个负责的服务器。因此,为了给消息建立完整的历史信息,我们的日志处理系统需要拥有系统的全局视图。Hadoop给予我们的最大帮助是:随着我们的系统发展壮大,系统日志量也随之增长。为了使我们的日志处理逻辑仍然可行,我们必须确保它能扩展。MapReduce就是一个可以处理这种数据增长的完美系统架构。

  简史

  我们日志处理系统的前几个版本都基于MySQL的,但随着我们拥有越来越多的日志机器,我们达到了一个MySQL服务器能够处理的极限。虽然该数据库模式已经进行了适度的非规范化处理,使其能够较轻松地进行数据切片,但目前MySQL对数据分区的支持仍然很脆弱。我们没有在MySQL上去实现自己的切片和处理方案,而是选择使用Hadoop。

  选择Hadoop

  一旦选择在RDBMS(关系型数据库管理系统)上对数据进行分片存储,你就丧失了SQL在数据集分析处理方面的很多优势。Hadoop使我们能够使用针对小型数据集使用的同样的算法来轻松地并行处理所有数据。

  收集和存储

  日志收集

  产生日志的服务器分布在多个数据中心,但目前我们只有一个单独的Hadoop集群,位于其中一个数据中心(见图16-8)。为了汇总日志数据并把它们放入集群,我们使用syslog-ng(Unix syslog机制的替代机制)和一些简单的脚本来控制在如何Hadoop上新建文件。

  图16-8. Rackspace的Hadoop数据流

  在一个数据中心里,syslog-ng用于从source(源)机器传送日志数据到一组负载均衡的collector(收集器)机器。在这些收集器上,每种类型的日志数据被汇成一个单独的数据流,并且用gzip格式进行轻量级的压缩(图16-8步骤A)。远程收集器的数据通过SSH通道跨数据中心传送到Hadoop集群所在的“本地收集器”(local collector)上(步骤B)。

  一旦压缩的日志流到达本地收集器,数据就会被写入Hadoop(步骤C)。目前我们使用简单的Python脚本把输入数据缓存到本地硬盘,并且定期使用Hadoop命令行界面把数据放入Hadoop集群。当缓存日志数据量达到Hadoop数据块大小的倍数或是缓存已经经过了足够长的时间时,脚本程序开始复制日志缓存数据到Hadoop的各个输入文件夹。

  这种从不同数据中心安全地汇总日志数据的方法在Hadoop支持SOCKS之前就已经有人开发使用了,SOCKS是通过hadoop.rpc.socket.factory.class.default参数和SocksSocketFactory类实现的。通过直接使用远程收集器对SOCKS的支持和HDFS(分布式Hadoop文件系统)的API(应用程序编程接口),我们能够从系统中消除一个磁盘的写入操作和降低系统的复杂性。我们计划在将来的开发中实现一个使用这些特性的替代品。

  一旦原始日志被存放到Hadoop上,这些日志就已经准备好交给我们的MapReduce作业处理了。

  日志存储

  我们的Hadoop集群目前包含15个datanode(数据节点),每个节点都使用普通商用CPU和3个500 GB的硬盘。我们对文件使用默认的复本因子3,这些文件有6个月的存档期限,其中两个复本用于其他用途。

  Hadoop的namenode(域名节点)使用的硬件和datanode相同。为了提供比较高的可用性,我们使用两个辅助namenode和一个虚拟IP,该IP可以很容易地指向3台机器中具有HDFS快照的硬盘。这表明在故障转移情形下,根据辅助namenode的快照时间,我们可能会丢失最多30分钟的数据。虽然这对于我们的日志处理应用来说是可接受的,但是其他Hadoop应用可能要求通过为namenode镜像提供共享存储的能力来实现无损的故障转移。

  日志的MapReduce模型

  处理

  在分布式系统中,唯一标识符令人失望的是它们极少是真正唯一的。所有的电子邮件消息都拥有一个(所谓的)唯一标识符,叫message-id,它由消息发起的主机产生,但是一个不良客户端能够轻松发送重复消息副本。另外,因为Postfix设计者并不相信message-id可以唯一地标识消息,所以他们不得不提出设计一个独立的ID(标识)叫queue-id,在本地机器的生命周期内唯一。

  尽管message-id趋向于成为消息的权威标识,但在Postfix日志中,需要使用queue-id来查找message-id。看例16-1第二行(为了适合页面大小,日志行的格式做了调整),你将发现十六进制字符串1DBD21B48AE,它就是该行消息的queue-id。因为日志收集的时候(可能每隔几小时进行一次),每个消息(包括它的message-id)的信息都输出到单独的行,所以让我们的解析代码保留消息状态是必要的。

  例16-1. Postfix 日志行

  Nov 12 17:36:54 gate8.gate.sat.mlsrvr.com postfix/smtpd[2552]: connect from hostname

  Nov 12 17:36:54 relay2.relay.sat.mlsrvr.com postfix/qmgr[9489]: 1DBD21B48AE:

  from=, size=5950, nrcpt=1 (queue active)

  Nov 12 17:36:54 relay2.relay.sat.mlsrvr.com postfix/smtpd[28085]: disconnect from

  hostname

  Nov 12 17:36:54 gate5.gate.sat.mlsrvr.com postfix/smtpd[22593]: too many errors

  after DATA from hostname

  Nov 12 17:36:54 gate5.gate.sat.mlsrvr.com postfix/smtpd[22593]: disconnect from

  hostname

  Nov 12 17:36:54 gate10.gate.sat.mlsrvr.com postfix/smtpd[10311]: connect from

  hostname

  Nov 12 17:36:54 relay2.relay.sat.mlsrvr.com postfix/smtp[28107]: D42001B48B5:

  to=, relay=hostname[ip], delay=0.32, delays=0.28/0/0/0.04,

  dsn=2.0.0, status=sent (250 2.0.0 Ok: queued as 1DBD21B48AE)

  Nov 12 17:36:54 gate20.gate.sat.mlsrvr.com postfix/smtpd[27168]: disconnect from

  hostname

  Nov 12 17:36:54 gate5.gate.sat.mlsrvr.com postfix/qmgr[1209]: 645965A0224: removed

  Nov 12 17:36:54 gate2.gate.sat.mlsrvr.com postfix/smtp[15928]: 732196384ED: to=

  apreduce@rackspace.com>, relay=hostname[ip], conn_use=2, delay=0.69, delays=0.04/

  0.44/0.04/0.17, dsn=2.0.0, status=sent (250 2.0.0 Ok: queued as 02E1544C005)

  Nov 12 17:36:54 gate2.gate.sat.mlsrvr.com postfix/qmgr[13764]: 732196384ED: removed

  Nov 12 17:36:54 gate1.gate.sat.mlsrvr.com postfix/smtpd[26394]: NOQUEUE: reject: RCP

  T from hostname 554 5.7.1 : Client host rejected: The

  sender's mail server is blocked; from= to=

  uce@rackspace.com> proto=ESMTP helo=

  从MapReduce的角度看,日志的每一行是一个单独的键/值。第一步,我们需要把所有的行和一个单独的queue-id键联系起来,然后执行reduce过程判断日志消息值数据是否能表明这个queue-id对应的数据是完整的。

  类似地,一旦我们拥有一个消息完整的queue-id,在第二步,我们需要根据message-id对消息进行分组。我们把每个完整的queue-id和message-id对应(Map)起来,让它们作为键(key),而它对应的日志行作为值(value)。在Reduce阶段,我们判断针对某个message-id的所有的queue-id是否都表明消息已经离开我们的系统。

  邮件日志的MapReduce作业的两阶段处理和它们的InputFormat与OutputFormat形成了一种“分阶段事件驱动架构”(staged event-driven archilecture,SEDA)应用类型。在SEDA里,一个应用被分解为若干个“阶段”,“阶段”通过数据队列区分。在Hadoop环境下,队列可能是MapReduce作业使用的HDFS中的一个输入文件夹或MapReduce作业在Map和Reduce处理步骤之间形成的隐性的数据队列。

  在图16-9中,各个阶段之间的箭头代表数据队列,虚线箭头表示隐性的MapReduce数据队列。每个阶段都能通过这些队列发送键值对(SEDA称之为事件或消息)给其他处理阶段。

  图16-9. MapReduce链

  阶段1:Map 在我们的邮件日志处理作业的第一阶段,Map阶段的输入或是以行号为键、以对应的日志消息为值的数据,或是以queue-id为键、以对应的日志消息数组作为值的数据。当我们处理来自输入文件数据队列的源日志文件的时候,产生第一种类型的输入,而第二种类型是一种中间格式,它用来表示一个我们已经试图处理但因为queue-id不完整而重新进行数据排队的queue-id的状态信息。

  为了能处理这两种格式的输入,我们实现了Hadoop的InputFormat类,它根据FileSplit输入文件的扩展名把工作委托给底层的SequenceFileRecordReader类或LineRecordReader类处理。这两种输入格式的文件来自HDFS中不同的输入文件夹(数据队列)。

  阶段1:Reduce 在这一阶段,Reduce根据queue-id是否拥有足够的日志行来判定它是否完整。假如queue-id已经完整,便输出以message-id作为键、以HopWritable对象为值的数据对。否则,queue-id被设置为键,日志行数组重新列队并和下一组原始日志进行Map处理。这个过程将持续到queue-id已经完整或操作超时。

  HopWritable 对象是POJO对象(Plain Old Java Objects,简单Java对象),实现了Hadoop的Writable接口。它从一台单独服务器的视角完整地描述一条消息,包括发送地址和IP,消息发送给其他服务器的尝试记录,标准的消息头信息。

  通过实现OutputFormat类完成输出不同的结果,这一过程对应于我们的两个InputFormat对象输入格式。在Hadoop API在版本r0.17.0添加MultipleSequenceFileOutputFormat类之前,我们已经实现MultipleSequenceFileOutputFormat类,它们实现同样的目标:我们需要Reduce作业的输出对根据其键的特点存储到不同的文件。

  阶段2:Map 在邮件日志处理作业的第二个步骤,输入是从上个阶段得到的数据,它是以message-id为键、以HopWritable类对象数据为值的数据对。这一步骤并不包含任何逻辑处理:而是使用标准的SequenceFileInputFormat类和IdentityMapper类简单地合并来自第一阶段的输入数据。

  阶段2:Reduce 在最终的reduce步骤,我们想判断针对某个通过系统的message-id,收集到的所有HopWritable对象是否能表示它经过系统的整个消息路径。一条消息路径实际上是一个有向图(通常是没有循环的,但如果服务器被错误设置,有可能会包含循环)。在这个图里,点代表服务器,可标记多个queue-id,服务器之间消息的传送形成了边。对这个应用,我们使用的是JGraphT图库。

  对于输出,我们又一次使用MultiSequenceFileOutputFormat类对象。如果reducer判定对于某个message-id的所有queue-id能够创建一条完整的消息路径,消息就会被序列化,并排队等候SolrOutputFormat类的处理。否则,消息的HopWritable对象会被列入阶段2:Map阶段,然后使用下一批queue-id等待重新处理。

  SolrOutputFormat类包含一个嵌入式Apache Solr实例对象——Solr wiki(http://wiki.apache. org/solr/EmbeddedSolr)最初提出的一种流行的方法——来产生本地硬盘的索引信息。关闭OutputFormat类包括把硬盘索引压缩到输出文件的最终地址。与使用Solr’s HTTP接口或直接使用Lucene相比,这种方法有以下几个优点:

  l 我们能实施Solr 模式(http://wiki.apache.org/solr/SchemaXml);

  l Map和Reduce保持幂等性;

  l 搜索节点不承担索引负载。

  我们目前使用默认的HashPartitioner类来决定Reduce任务和特定键之间的对应关系,就是说键是半随机分布的。在以后的新版系统中,我们将实现一个新的Partitioner,它通过发送地址(我们最通用的搜索词)来切分数据。一旦索引以发送者为单位分割,我们就能够使用地址的哈希值来判断在哪里合并或查询索引,并且我们的搜索API也只需要和相关的对地址的哈希值节点进行通信交流。

  合并相近词搜索

  在一系列的MapReduce阶段完成之后,一系列不同计算机会得知新的索引的信息,进而可以进行索引合并。这些搜索节点它们还运行Apache Tomcat和Solr来托管已经完成的索引信息,这些搜索节点不仅具有把索引合并置于本地磁盘的服务(见图16.8步骤D),它们还运行Apache Tomeate和Solr来托管已完成的索引信息。

  来自SolrOutputFormat类的每个压缩文件都是一个完整的Lucene索引,Lucene提供IndexWriter.addIndexes()方法支持快速合并多个索引。我们的MergeAgent服务把每个新索引解压到Lucene RAMDirectory或FSDirectory(根据文件的大小),把它们合并到本地硬盘,然后发送一个请求给Solr实例,后者负责提供索引服务并使更新后的索引能够用于查询处理。

  切片 Query/Management(查询/管理) API 是一个PHP代码层,它主要是处理输出索引在所有搜索节点上的“切片”(sharding)。我们使用一个简单的“一致性哈希”(consistent hashing)来判定搜索节点和索引文件之间的对应关系。目前,索引首先按照创建时间切片,然后再根据其文件名的哈希值切片,但是我们计划将来用对发送地址的哈希值来取代对文件名的哈希值(见阶段2:Reduce)。

  因为HDFS已经处理了Lucene索引的复制问题,所以没有必要在Solr实例中保留多个副本。相反,在故障转移时,相应的搜索节点会被完全删除,然后由其他节点负责合并索引。

  搜索结果 使用这个系统,从产生日志到获得搜索结果供客服团队使用,我们获得了15分钟的周转时间。

  我们的搜索API支持Lucene的全部查询语法,因此我们常常可以看到下面这样的复杂查询:

  sender:"mapreduce@rackspace.com" -recipient:"hadoop@rackspace.com"

  recipient:"@rackspace.com" short-status:deferred timestamp:[1228140900 TO 2145916799]

  查询返回的每个结果都是一个完整的序列化消息路径,它表明了各个服务器和接收者收是否收到了这个消息。现在我们把这个路径用一个2D图展示出来(图16-10),用户可以通过扩展自己感兴趣的节点来和这个图互动,但是在这个数据的可视化方面还有很多需要改进的地方。

  图16-10. 数据树

  为分析进行存档

  除了为客服提供简短词语的搜索功能之外,我们也对日志数据的分析感兴趣。

  每晚,我们运行一系列的MapReduce作业,它们的输入是白天产生的索引数据。我们实现了SolrInputFormat类对象,它可以拖回并解压索引,然后用对的形式输出每个文档。使用这种InputFormat类,我们可以遍历一天产生的所有消息路径,可以回答我们邮件系统的几乎任何问题,包括:

  l 每个域的数据(病毒程序,垃圾邮件,连接状况,收件人)

  l 最有效的垃圾邮件规则

  l 特定用户产生的负载

  l 消息量反弹的原因

  l 连接的地理分布信息

  l 特定机器之间的平均时间延迟

  因为在Hadoop上,我们拥有好几个月的压缩索引信息,所以还能够回顾性地回答夜间日志概要工作忽略的问题。例如,我们近期想确定每个月消息发送量最大的IP地址,这个任务我们可以通过一个简单的一次性MapReduce作业来完成。

  (作者:Stu Hood)

0
相关文章