Nutch 搜索引擎
背景介绍
Nutch这个框架用于构建立可扩展的网络爬虫(crawler)和搜索引擎。它是Apache软件基金会(Apache Software Foundation)的一个项目,Lucene的一个子项目,遵循Apache许可(2.0)。
我们并不想多么深入地细究网络爬虫的知识——这个案例研究的目的是展示Hadoop是如何实现搜索引擎各种典型、复杂处理任务的。感兴趣的用户可以在Nutch官方主页(http://lucene.apache.org/nutch)找到项目相关的大量专门信息。可以这样说,为了创建和维护一个搜索引擎,必须要有下面的子系统。
网页数据库
这个数据库跟踪网络爬虫要抓取的所有网页和它们的状态,如上一次访问的时间,它的抓取状态信息,刷新间隔,内容校验和,等等。用Nutch的专用名词来说,这个数据库称为CrawlDb。
爬取网页清单
网络爬虫定期刷新其Web视图信息,然后下载新的网页(以前没有抓取的)或刷新它们认为已经过期的网页。这些准备爬取的候选网页清单,Nutch称为fetchlist。
原始网页数据
网页内容从远程网站下载,以原始的未解释的格式在本地存储成字节数组。Nutch称这种数据为page content。
解析的网页数据
网页内容用适合的解析器进行解析——Nutch为各种流行格式的文档提供了解析器,如HTML,PDF,Open Office和Microsoft Office,RSS等。
链接图数据库
对于计算基于链接(link)的网页排序(page rank)值来说,如PageRank,这个数据库是必须的。对于Nutch记录的每一个URL,它会包含一串指向它的其他的URL值以及这些URL关联的锚文本(在HTML文件的锚文本元素中得到)。这个数据库称为LinkDb。
全文检索索引
这是一个传统的倒排索引,基于搜集到的所有网页元数据与抽取到的纯文本内容而建立。它是使用卓越的Lucene库(http://lucene.apache.org/java)来实现的。
前面我们简略地提到Hadoop作为一个组件在Nutch系统上得到实现,试图用它提高Nutch系统的可扩展性以及解决那些由集中式数据处理模型引起的一系列瓶颈问题。Nutch也是第一个移植到Hadoop架构之上的公开的概念证明应用,后来它成为Hadoop的一部分,并且事实证明,把Nutch算法和数据结构移植到Hadoop架构所需的工作量惊人地少。这一特点有可能激励大家把Hadoop的开发作为一个子项目,为除Nutch之外的其他应用提供可重用的架构。
目前,几乎所有的Nutch工具都通过运行一个或多个MapReduce作业来处理数据。
数据结构
在Nutch系统中维护着几种主要的数据结构,它们都利用Hadoop I/O类和数据格式来构造。根据数据使用目的和数据创建之后的访问方式,这些数据可以使用Hadoop的映射(map)文件或顺序(sequence)文件进行保存。
因为数据是MapReduce的作业产生和处理的,而这一过程反过来又会执行几个map和reduce任务,所以它的硬盘存储格式符合常用的Hadoop输出格式,即MapFileOutputFormat和SequenceFileOutputFormat两种格式。精确地说,数据被保存成几个map文件或顺序文件,而文件数和创建数据作业中的reduce任务数相等。为了简单,在下面几节的介绍中,我们忽略格式差异。
CrawlDb
CrawlDb存储每个URL的当前状态信息,存储文件是map文件,形式是,这里键使用文本格式,值使用Nutch特定的CrawlDatum类型(它实现Writable接口)。
为了对这些记录提供快速的随机访问能力(用户想在CrawlDb里面检查个人记录信息的时候),这些数据被存储成map文件而不是顺序文件。
CrawlDb最初是通过Injector工具创建的,它只是简单地把初始URL列表(种子列表)的纯文本文件转换成一个map文件,格式如前所述。接着,用爬取和解析的网页信息来对它做更新。稍后将对此进行详细介绍。
LinkDb
这个数据库为Nutch记载的每个URL存储“入链接”(incoming link)信息。它采用map文件格式进行存储,其中Inlinks是URL列表和锚文本数据。注意,这些信息在网页数据收集阶段并不是立刻可以得到的,但是可以获取反向信息,就是这个页面的“出链接”(outlink)信息。反向链接的信息获取是通过一个MapReduce作业完成的,相关详情可参见后文。
分段
在Nutch定义中,“分段”(segment)指的是爬取和解析URL组。图16-5展示了分段的创建和处理过程。
一个分段(文件系统里的一个目录)包含以下几个部分(它们只不过是一些包含MapFileOutputFormat或SequenceFileOutputFormat格式数据的子目录)。
content
content包含下载页面的原始数据,存储为map文件,格式是。为了展示缓存页的视图,这里使用map文件存储数据,因为Nutch需要对文件做快速随机的访问。
crawl_generate
它包含将要爬取的URL列表以及从CrawlDb取到的与这些URL页相关的当前状态信息,对应的顺序文件的格式。这个数据采用顺序文件存储,原因有二:第一,这些数据是按顺序逐个处理的;第二,map文件排序键值的不变性不能满足我们的要求。我们需要尽量分散属于同一台主机的URL,以此减少每个目标主机的负载,这就意味着记录信息基本上是随机排列的。
图16-5. 分割
crawl_fetch
它包含数据爬取的状态信息,即爬取是否成功,响应码是什么,等等。这个数据存储在map文件里,格式是。
crawl_parse
每个成功爬取并解析的页面的出链接列表都保存在这里,因此Nutch通过学习新的URL可以扩展它的爬取前端页。
parse_data
解析过程中收集的元数据;其中还有页面的出链接(frontier)列表。这些信息对于后面介绍的建立反向图(入链接—ink)是相当关键的。
parse_text
页面的纯文本内容适合用Lucene进行索引。这些纯文本存储成map文件,格式是,因此要展示搜索结果列表的概要信息(摘要)的时候,Nutch可以快速地访问这些文件。
Generator工具(图16-5中编号1)运行的时候,CrawlDb里面的数据就会产生一些新的分段,并且开始只包括要爬取的URL列表(是crawl_generat下的子目录)。当这个列表经过几个步骤的处理之后,该分段就从处理工具那里收集输出数据并存放在一系列的子目录里面。
例如,content从Fetecher工具(2)接收数据,这个工具根据fetchlist的URL列表下载网页原始数据。这个工具也把URL的状态信息存储在crawl_fetch里面,因此这些数据后来可以用于更新CrawlDb的页面状态信息。
在分段工具中的其他小模块接收来自Parse分段工具(3)的数据,这个工具读入网页内容,然后基于声明的(或检测到的)MIME类型,选择合适的内容解析器,最后把解析结果存为三部分:
crawl_parse,parse_data和parse_text。然后这些数据被用于更新CrawlDb(4)和创建LinkDb(5)。
这些分段数据一直保留到它们包含的所有数据都过期为止。Nutch采用的是可配置的最大时间限制的方法,当页面保存的时间段超过这个时间限制后,这个页面会被强制进行重新获取;这将有助于操作员淘汰所有过期的分段数据(因为他能肯定超过这个时间限制之后,这个分段里面的所有页面都已经被重新爬取)。
分段数据用来创建Lucene索引(【6】——主要是parse_text和parse_data部分的数据),但是它也提供一种数据存储机制来支持对纯文本数据和原始内容数据的快速检索。当Nutch产生摘要信息的时候(和查询最匹配的文档文本片段),需要第一种纯文本数据;第二种原始数据提供了展现页面的缓存视图的能力。这两种用例下,或是要求产生摘要信息或是要求展现缓存页面,都是直接从map文件获取数据。实际上,即使是针对大规模数据,直接从map文件访问数据的效率都很高。
Nutch系统利用Hadoop进行数据处理的精选实例
下面几节描述了几种Nutch工具的相关详细信息,主要用于说明Nutch系统如何利用MapReduce模型来完成具体的数据处理任务。
链接逆转
爬取到的HTML页面包含HTML链接,这些链接可能指向它本身(内部链接)或指向其他网页。HTML链接从源网页指向目标网页,参见图16-6。
图16-6. 链接逆转
然而,许多计算网页重要性(或质量)的算法需要反向链接的信息,也就是那些具有链接指向当前页面的网页。进行网页爬取的时候,我们并不能得到这些信息。另外,如果能把入链接的锚文本也用于索引,索引也会受益,因为这些锚文本可以从语义上丰富当前页面的内容。
如前所述,Nutch收集出链接信息,然后用这些数据构造一个LinkDb,它包含这种反向链接数据,以入链接和锚文本的形式存放。
本小节概述一下LinkDb工具的实现过程——为了展现处理过程的清晰画面,忽略了很多细节描述(如URL规范化处理和过滤)。我们主要描述一个典型的实例来解释为什么MapReduce模型能够如此合适地地应用到一个搜索引擎所要求的这种关键数据转换处理任务。大规模搜索引擎需要处理大量的网络图数据(许多页面都有很多出/入链接),Hadoop提供的并行处理和容错机制使得这样的处理成为可能。另外,使用map-sort-reduce基本操作可以很容易地表达链接逆转这一过程,我们将在下面进行介绍。
下面的代码片段展示了LinkDb工具的作业初始化过程:
JobConf job = new JobConf(configuration);
FileInputFormat.addInputPath(job, new Path(segmentPath, "parse_data"));
job.setInputFormat(SequenceFileInputFormat.class);
job.setMapperClass(LinkDb.class);
job.setReducerClass(LinkDb.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Inlinks.class);
job.setOutputFormat(MapFileOutputFormat.class);
FileOutputFormat.setOutputPath(job, newLinkDbPath);
可以看出,这个作业的输入源数据是爬取的URL列表(键)以及相应的ParseData记录,ParseData包含的其中一项数据是每个页面的出链接信息,它是一个数组。一个出链接记录包含目标URL以及相应的锚文本。
这个作业的输出也是一个URL列表(键),但是值是入链接,它其实只是一个包含目标URL和锚文本的特殊的入链接集合。
也许出乎我们意料,这些URL一般以纯文本形式存储和处理,而非以java.net.URL或java.net.URI实例的形式。这么做有几个原因:从下载内容里提取的URL通常需要做规范化处理(如把主机名变成小写形式,解析相对路径),或它们是已经坏掉或无效的URL,或它们引用的是不支持的协议。许多规范化和过滤操作能更好地表达成文本模式,它可以跨一个URL的多个组成部分。此外,考虑到链接分析,我们也许仍然想处理和计算这些无效的URL。
让我们进一步查看map()和reduce()的实现——在这个例子中,它们非常简单以至于这两个函数可以在同一个类里实现:
public void map(Text fromUrl, ParseData parseData,
OutputCollector output, Reporter reporter) {
...
Outlink[] outlinks = parseData.getOutlinks();
Inlinks inlinks = new Inlinks();
for (Outlink out : outlinks) {
inlinks.clear(); // instance reuse to avoid excessive GC
String toUrl = out.getToUrl();
String anchor = out.getAnchor();
inlinks.add(new Inlink(fromUrl, anchor));
output.collect(new Text(toUrl), inlinks);
}
}
从这个代码段可以看到,对每个出链接,我们的map()函数产生一对,其中Inlinks只包含一个Inlink,该Inlink是由fromUrl和它的锚文本组成。链接的指向实现了反转。
接着,这些只有一个元素的Inlinks用reduce()方法实现聚集处理:
public void reduce(Text toUrl, Iterator values,
OutputCollector output, Reporter reporter) {
Inlinks result = new Inlinks();
while (values.hasNext()) {
result.add(values.next());
}
output.collect(toUrl, result);
}
从这段代码来看,很明显我们已经得到了我们想要的数据——即指向toUrl变量的所有fromUrl列表以及相应的锚文本信息。逆转过程完成。
然后这些数据以MapFileOutputFormat格式保存,形成新的LinkDb数据库。
产生fetchlist
现在来看一个更加复杂的用例。fetchlist产生于CrawlDb的数据(map文件的格式是,其中crawlDatum包含URL的状态信息),它存放准备爬取的URL列表,然后Nutch Fetcher工具处理这个列表。Fetcher工具本身是一个MapReduce应用程序(后面会介绍)。也就是说输入数据(被分成N份)将由N个map任务处理——Fetcher工具强制执行这样的规则,SequenceFileInputFormat格式的数据不能继续切分。前面我们简单提过,fetchlist是通过一个特殊的方法产生的,因此fetchlist的每部分数据(随后由每个map任务处理)必须满足特定的要求。
1. 来自同一台主机的所有URL最后要放入同一个分区。这是必须的,以便Nutch可以轻松实现in-JVM(java虚拟机里)宿主级封锁来避免目标主机超载。
2. 为了减少发生宿主级的封锁,来自同一台主机的URL应该尽量分开存放(比如和其他主机的URL充分混合)。
3. 任何一个单独主机的URL链接数不能多于x个,从而使得具有很多URL的大网站相对于小网站来说,就不会占主导地位(来自小网站的URL仍然有机会被爬取)。
4. 具有高网页排序值的URL应该优先于低的那些URL。
5. 在fetchlist中,URL总数不能超过y。
6. 输出数据分区数应该和最优的爬取map任务数目一致。
本例中,需要实现两个MapReduce作业来满足所有这些要求,如图16-7所示。同样地,为了简洁,对下面的列表内容,我们将跳过对这些步骤的某些细节描述。
图16-7. 产生fetchlist
步骤1:选择,基于网页排序值排序,受限于每台主机的URL数 这一步骤,Nutch运行一个MapReduce作业来选择一些被认为有资格爬取的URL列表,并根据它们的网页排序值(赋给每个页面的浮点数,如PageRank值)对它们进行排序。输入数据来自CrawlDb,后者是一个格式的map文件。这一作业的输出是>格式的sequence文件,根据排序值降序排列。
首先,我们来看一下作业的设置:
FileInputFormat.addInputPath(job, crawlDbPath);
job.setInputFormat(SequenceFileInputFormat.class);
job.setMapperClass(Selector.class);
job.setPartitionerClass(Selector.class);
job.setReducerClass(Selector.class);
FileOutputFormat.setOutputPath(job, tempDir);
job.setOutputFormat(SequenceFileOutputFormat.class);
job.setOutputKeyClass(FloatWritable.class);
job.setOutputKeyComparatorClass(DecreasingFloatComparator.class);
job.setOutputValueClass(SelectorEntry.class);
Selector类实现了3个函数:mapper,reducer和partitioner。最后一个函数非常有趣:Selector用了一个自定义的Partitioner把来自同一主机的URL分配给同一个reduce任务,这样我们就能满足前面列表的要求3-5。
如果我们不重写默认的partitioner,来自同一主机的URL最终会输出到不同的分区里面,这样我们就不能跟踪和限制URL总数,因为MapReduce任务彼此之间不做任何交流。那么现在的情况是,属于同一台主机的所有URL都会由同一个reduce任务处理,这意味着我们能控制每台主机可以选择多少个URL。
实现一个自定义的partitioner,从而把需要在同一个任务中处理的数据最终放入同一个分区,是很简单的。同一个任务处理的数据就会被放在同一个分区里。我们首先来看一下Selector类如何实现Partitioner接口(它只包含一个方法):
/** Partition by host. */
public int getPartition(FloatWritable key, Writable value, int numReduceTasks) {
return hostPartitioner.getPartition(((SelectorEntry)value).url, key,
numReduceTasks);
}
这个方法返回在0到numReduceTasks – 1之间的一个整数,numReduceTasks是化简任务数。它简单地用原始的URL替换了键,URL数据从SelectorEntry获取,这样做就可以把URL(不是页面排序值)传递给PartitionUrlByHost类实例对象,并且在这里计算出URL属于的切分号:
/** Hash by hostname. */
public int getPartition(Text key, Writable value, int numReduceTasks) {
String urlString = key.toString();
URL url = null;
try {
url = new URL(urlString);
} catch (MalformedURLException e) {
LOG.warn("Malformed URL: '" + urlString + "'");
}
int hashCode = (url == null ? urlString : url.getHost()).hashCode();
// make hosts wind up in different partitions on different runs
hashCode ^= seed;
return (hashCode & Integer.MAX_VALUE) % numReduceTasks;
}
从这个代码片断能看到,分区号的计算只针对URL的主机部分的地址,这意味着属于同一个主机的所有URL最终会被放入同一个分区。
这个作业的输出数据根据网页排序值降序排列。因为CrawlDB中有很多记录有同样的排序值,所以我们不能用MapFileOutputFormat来存储输出文件,否则会违反map文件严格基于主键排序的固定规则。
细心的读者会注意到一点,因为我们不直接使用初始键值,但是我们又想保留这种初始的键值对。这里使用一个SelectorEntry类把初始的键值对传递给下一步骤处理过程。
Selector.reduce()函数跟踪计算URL的总数和每个主机对应的最大URL数,然后简单地摒弃多余的记录。注意,必须对URL总个数的限制进行近似化处理。
我们用总的限制数除以ruduce任务的个数得到当前任务允许拥有的URL的个数的限制范围。但是我们并不能肯定每个任务都能够得到平均的分配数;实际上在大多数情况下很难实现,因为在各个主机中分布的URL数目是不均匀的。不管怎么样,对于Nutch来说,这种近似的控制已经足够了。
步骤2:逆转,基于主机分区,随机排序 在前面,我们用格式存储了一个顺序文件。现在我们必须产生格式的顺序文件来满足前面描述的要求1,2和6。这个处理步骤的输入数据是步骤1的输出数据。
下面的代码片断展示了这个作业过程的初始设置:
FileInputFormat.addInputPath(job, tempDir);
job.setInputFormat(SequenceFileInputFormat.class);
job.setMapperClass(SelectorInverseMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(SelectorEntry.class);
job.setPartitionerClass(PartitionUrlByHost.class);
job.setReducerClass(PartitionReducer.class);
job.setNumReduceTasks(numParts);
FileOutputFormat.setOutputPath(job, output);
job.setOutputFormat(SequenceFileOutputFormat.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(CrawlDatum.class);
job.setOutputKeyComparatorClass(HashComparator.class);
SelectorInverseMapper类简单地删除了当前键(排序值),抽取原始的URL并且把它设置为键,使用SelectorEntry作为值。细心的读者可能质疑:“为什么我们不再进一步,同时再抽取原始的CrawlDatum,把它作为值?”详情参见后文。
这个作业的最终输出是顺序文件,格式是,但是map阶段我们得到的输出是格式。我们必须指出为map输出采用不同的键/值类对象,必须用setMapOutputKeyClass()和setMapOutputValueClass()这两个类设置函数——否则,Hadoop会假定我们用的类与为reduce和reduce输出声明的类一样(这种矛盾通常会导致作业失败)。
map阶段的输出使用PartitionUrlByHost类对象进行切分,因此它又把来自同一主机的URL分配到同一个分区。这就满足要求1。
一旦数据从map任务移到reduce任务,Hadoop就会根据输出数据键comparator的结果对数据排序,这里的comparator是HashComparator类对象。这个类采用简单的哈希机制来混合URL,这个机制可保证来自同一主机的URL会被尽量放在一起。
为了满足要求6,我们把reduce任务的数量设置成希望的Fetcher map任务的数量(前面提到的numParts),记住,每个reduce分区稍后将用于创建一个单独的Fetcher map任务。
PartitionReducer类负责完成最后一步,即把数据转换成数据。使用HashComparator的一个令人惊讶的副作用是几个URL可能具有同样的哈希值,并且Hadoop调用reduce()函数时只传送遇到的第一个键对应的值,具有相等键值的记录被认为是一样的而被删除。现在能明白当初为什么我们必须在SelectorEntry类的记录中保留所有的URL值,因为我们可以从遍历的值中抽取URL。下面是这个方法的实现:
public void reduce(Text key, Iterator values,
OutputCollector output, Reporter reporter) throws IOException {
// when using HashComparator, we get only one input key in case of hash collisions
// so use only URLs extracted from values
while (values.hasNext()) {
SelectorEntry entry = values.next();
output.collect(entry.url, entry.datum);
}
}
最终,reduce任务的输出在Nutch分段目录中crawl_generate子目录下以SequenceFileOutputFormat格式保存。输出文件满足前面的1-6项全部要求。
Fetcher:正在运行的多线程类MapRunner
Nutch的Fetcher应用程序负责从远程站点下载网页内容。因此,为了尽量减少爬取fetchlist的时间,对于这个处理过程来说使用每个机会来做并行处理相当重要。
在Fetcher应用中,已经有一级并行机制——输入fetchlist的若干个分区被分配给多个map任务。然而,这么做实际上远远不够:顺序下载来自不同主机(见前一节对HashComparator的介绍)的URL相当浪费时间。因为,Fetcher的map任务使用多个工作线程同时处理这种数据。
Hadoop 使用MapRunner类来实现对输入数据记录的顺序处理。Fetcher类实现自己的MapRunner类,它使用若干个线程并行处理输入记录。
先从这个作业的设置开始:
job.setSpeculativeExecution(false);
FileInputFormat.addInputPath(job, "segment/crawl_generate");
job.setInputFormat(InputFormat.class);
job.setMapRunnerClass(Fetcher.class);
FileOutputFormat.setOutputPath(job, segment);
job.setOutputFormat(FetcherOutputFormat.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NutchWritable.class);
首先,我们关闭推测执行(speculative execution)。我们不能同时让几个map任务从同一个主机下载内容,因为这可能会打破宿主级的负载限制(如并发请求数和每秒请求数)。
其次,我们使用自定义的InputFormat对象来防止Hadoop将输入数据分区进一步切分为更小的块(分片)导致map任务的数量超过输入分区的数量。这么做又一次保证我们可以控制宿主级的访问限制。
输出数据用自定义的OutputFormat对象来存储,通过使用NutchWirtable类的数据值,它新建了几个输出map文件和顺序文件。NutchWritable类是GenericWritable的子类,它能传递几种不同Writable类的实例对象,但必须事先声明。
Fetcher类实现MapRunner接口,我们把这个类设置为作业的MapRunner实现。相关代码如下:
public void run(RecordReader input,
OutputCollector output,
Reporter reporter) throws IOException {
int threadCount = getConf().getInt("fetcher.threads.fetch", 10);
feeder = new QueueFeeder(input, fetchQueues, threadCount * 50);
feeder.start();
for (int i = 0; i < threadCount; i++) { // spawn threads
new FetcherThread(getConf()).start();
}
do { // wait for threads to exit
try {
Thread.sleep(1000);
} catch (InterruptedException e) {}
reportStatus(reporter);
} while (activeThreads.get() > 0);
}
Fetcher类提前读取许多输入记录数据,使用QueueFeeder线程把输入记录放入为每个主机建立的队列中。然后启动几个FetcherThread实例对象,它们将读取每个主机对应的队列数据,这时QueueFeeder继续读取输入数据来填充这些队列。每个FetcherThread读取全部非空队列中的数据项。
与此同时,map任务的主线程也在不停运转等待所有的线程完成它们的作业。它定期向系统报告状态以保证Hadoop不会认为这个任务已经死掉并把它杀掉。一旦所有项目处理完,循环过程就结束,控制权返同Hadoop,然后Hadoop认为这个map任务即将完成。
索引器:使用自定义的OutputFormat类
这是一个MapReduce应用程序示例,它不会产生顺序文件或map文件,相反它的输出是一个Lucene索引。
再提一下,因为MapReduce应用可能由几个reduce任务组成,所以这个应用的输出可能包含几个不完整的Lucene索引。
Nutch Indxer工具使用CrawlDb,LinkDb和Nutch分段爬取状态信息,解析状态,页面元数据和纯文本数据)的信息,因此这个作业的设置部分将包括添加几个输入路径:
FileInputFormat.addInputPath(job, crawlDbPath);
FileInputFormat.addInputPath(job, linkDbPath);
// add segment data
FileInputFormat.addInputPath(job, "segment/crawl_fetch");
FileInputFormat.addInputPath(job, "segment/crawl_parse");
FileInputFormat.addInputPath(job, "segment/parse_data");
FileInputFormat.addInputPath(job, "segment/parse_text");
job.setInputFormat(SequenceFileInputFormat.class);
job.setMapperClass(Indexer.class);
job.setReducerClass(Indexer.class);
FileOutputFormat.setOutputPath(job, indexDir);
job.setOutputFormat(OutputFormat.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LuceneDocumentWrapper.class);
分散存储在这些输入位置的一个URL的所有相应记录需要合并起来新建Lucene文档(将被加入索引)。
Indexer的Mapper类把输入数据(无论是源数据或是实现类)简单地封装到NutchWritable中,这样reduce阶段可能要使用不同的类来接收不同的源数据,而且它仍然能够一致地为map和reduce步骤的输出值声明一个单独的输出值类(类似于NutchWritable类)。
Reducer方法遍历同一个键(URL)对应的所有值,解封数据(fetch CrawlDatum,CrawlDb CrawlDatum,LinkDb Inlinks,ParseData和ParseText),并用这些信息构建一个Lucene文档,后者被WritableLuceneDocumentWrapper对象封装并被收集。除了所有文本内容外(纯文本数据或是元数据),这个文档也包含类似PageRank值的信息(取自CrawlDb)。Nutch使用这种数值(score)来设置Lucene文档的权重值。
OutputFormat方法是这个工具最有意思的部分:
public static class OutputFormat extends
FileOutputFormat {
public RecordWriter
getRecordWriter(final FileSystem fs, JobConf job,
String name, final Progressable progress) throws IOException {
final Path out = new Path(FileOutputFormat.getOutputPath(job), name);
final IndexWriter writer = new IndexWriter(out.toString(),
new NutchDocumentAnalyzer(job), true);
return new RecordWriter() {
boolean closed;
public void write(WritableComparable key, LuceneDocumentWrapper value)
throws IOException { // unwrap & index doc
Document doc = value.get();
writer.addDocument(doc);
progress.progress();
}
public void close(final Reporter reporter) throws IOException {
// spawn a thread to give progress heartbeats
Thread prog = new Thread() {
public void run() {
while (!closed) {
try {
reporter.setStatus("closing");
Thread.sleep(1000);
} catch (InterruptedException e) { continue; }
catch (Throwable e) { return; }
}
}
};
try {
prog.start();
// optimize & close index
writer.optimize();
writer.close();
} finally {
closed = true;
}
}
};
}
当请求生成一个RecordWriter类的实例对象时,OutputFormat类通过打开一个IndexWriter对象新建一个Lucene索引。然后,针对reduce方法中收集的每个新的输出记录,它解封LuceneDocumentWrapper对象中的Lucene文档,并把它添加到索引。
reduce任务结束的时候,Hadoop会设法关闭RecordWriter对象。本例中,关闭的过程可能持续较长时间,因为我们想在关闭它之前进行索引优化工作。在这段时间中,因为已经没有任何进度更新,所以Hadoop可能会推断该任务已经被挂起,然后它可能会尝试杀死这个任务。因此,我们首先启动一个后台线程来传送让人安心的进度更新消息,然后才开始索引优化工作。一旦优化完成,我们便停止进度更新线程。现在输出索引得以创建、优化和停止更新,它已经准备好应用于任何搜索应用程序中。
总结
这里对Nutch系统的简短综述其实忽略了很多细节,比如错误处理、日志记录、URL过滤和规范化,处理重定向或其他形式的网页“别名”(如镜像),剔除重复
内容,计算PageRank值等。在这个项目的官方主页和wiki页面(http://wiki.apache.org/nutch),可以找到这些方面的介绍及其他更多信息。
当前,Nutch正在被很多组织或个人用户使用。然而,运作一个搜索引擎要求有大量的投资来支持硬件配备,系统集成,自定义开发和索引维护;因此,在大多数情况下,Nutch用于构建商业的垂直或针对领域的搜索引擎。
Nutch正处于积极的开发中,并且该项目紧跟Hadoop的最新版本。因此,它将继续成为使用Hadoop作为核心部件,并且具有良好产出的应用实例。
(作者:Andrzej Bia?ecki)