前面依次看了nutch的准备工作inject和generate部分,抓取的fetch部分的代码,趁热打铁,我们下面来一睹parse即页面解析部分的代码,这块代码主要是集中在ParseSegment类里面,Let‘s go~~~
上期回顾:上回主要讲的是nutch的fetch部分的功能代码实现,主要是先将segments目录下的指定文件夹作为输入,读取里面将要爬取的url信息存入爬取队列,再根据用户输入的爬取的线程个数thread决定消费者的个数,线程安全地取出爬取队列里的url,然后在执行爬取页面,解析页面源码得出url等操作,最终在segments目录下生成content和crawl_fetch三个文件夹,下面来瞧瞧nutch的parse是个怎么回事……
1.parse部分的入口从代码 parseSegment.parse(segs[0]);开始,进入到ParseSegment类下的parse方法后,首先设置一个当前时间(方便后面比较结束时间之差来得到整个parse所需的时间)。然后就是一个mapreduce过程,初始化了一个job,具体代码如下:
JobConf job = new NutchJob(getConf()); job.setJobName("parse " + segment); FileInputFormat.addInputPath(job, new Path(segment, Content.DIR_NAME)); job.set(Nutch.SEGMENT_NAME_KEY, segment.getName()); job.setInputFormat(SequenceFileInputFormat.class); job.setMapperClass(ParseSegment.class); job.setReducerClass(ParseSegment.class); FileOutputFormat.setOutputPath(job, segment); job.setOutputFormat(ParseOutputFormat.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(ParseImpl.class); JobClient.runJob(job);
可以看出设置的输入为segment文件夹下的文件,输出也是segment文件夹,当然变化的是segment下生成了新的文件夹,提交的mapper和reducer都是parsesegment类。
2.下面就来分别看看ParseSegment类的map和reducer方法。map()方法中首先是一些判定的代码,该函数的主要功能还是集中在以下代码中:
ParseResult parseResult = null; try { parseResult = new ParseUtil(getConf()).parse(content); } catch (Exception e) { LOG.warn("Error parsing: " + key + ": " + StringUtils.stringifyException(e)); return; } for (Entry<Text, Parse> entry : parseResult) { Text url = entry.getKey();//http://www.ahu.edu.cn/ Parse parse = entry.getValue(); ParseStatus parseStatus = parse.getData().getStatus();//success(1,0) long start = System.currentTimeMillis(); reporter.incrCounter("ParserStatus", ParseStatus.majorCodes[parseStatus.getMajorCode()], 1); if (!parseStatus.isSuccess()) { LOG.warn("Error parsing: " + key + ": " + parseStatus); parse = parseStatus.getEmptyParse(getConf()); } // pass segment name to parse data parse.getData().getContentMeta().set(Nutch.SEGMENT_NAME_KEY, getConf().get(Nutch.SEGMENT_NAME_KEY)); // compute the new signature byte[] signature = SignatureFactory.getSignature(getConf()).calculate(content, parse); parse.getData().getContentMeta().set(Nutch.SIGNATURE_KEY, StringUtil.toHexString(signature)); try { scfilters.passScoreAfterParsing(url, content, parse); } catch (ScoringFilterException e) { if (LOG.isWarnEnabled()) { LOG.warn("Error passing score: "+ url +": "+e.getMessage()); } } long end = System.currentTimeMillis(); LOG.info("Parsed (" + Long.toString(end - start) + "ms):" + url); output.collect(url, new ParseImpl(new ParseText(parse.getText()), parse.getData(), parse.isCanonical())); }
其中parseResult 是通过new ParseUtil(getConf()).parse(content);产生的,进入ParseUtil我们可以看出该函数全貌如下:
public ParseUtil(Configuration conf) { this.parserFactory = new ParserFactory(conf); MAX_PARSE_TIME=conf.getInt("parser.timeout", 30); }
而ParserFactory就是调用一个插件来解决页面解析这部分问题的,ParseFactory的代码如下:
public ParserFactory(Configuration conf) { this.conf = conf; ObjectCache objectCache = ObjectCache.get(conf); this.extensionPoint = PluginRepository.get(conf).getExtensionPoint( Parser.X_POINT_ID); this.parsePluginList = (ParsePluginList)objectCache.getObject(ParsePluginList.class.getName()); if (this.parsePluginList == null) { this.parsePluginList = new ParsePluginsReader().parse(conf); objectCache.setObject(ParsePluginList.class.getName(), this.parsePluginList); } if (this.extensionPoint == null) { throw new RuntimeException("x point " + Parser.X_POINT_ID + " not found."); } if (this.parsePluginList == null) { throw new RuntimeException( "Parse Plugins preferences could not be loaded."); } }
当然了,如何调用插件来解决这个问题作者还不是很清楚,但是隐约从代码中已经看到了PluginRepository(插件仓库)、extensionPoint (扩展点)这样的名词了。
让我们再回到map方法,通过调试我们可以看到ParseResult包含了以下信息:
Version: -1 url: http://www.ahu.edu.cn/ base: http://www.ahu.edu.cn/ contentType: application/xhtml+xml metadata: Date=Sat, 02 Aug 2014 13:46:36 GMT nutch.crawl.score=1.0 _fst_=33 nutch.segment.name=20140802214742 Content-Type=text/html Connection=close Accept-Ranges=bytes Server=Apache/2.2.8 (Unix) mod_ssl/2.2.8 OpenSSL/0.9.8e-fips-rhel5 DAV/2 Resin/3.0.25 Content: <!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Transitional//EN" "http://www.w3.org/TR/xhtml1/DTD/xhtml1-transitional.dtd"> <html xmlns="http://www.w3.org/1999/xhtml"> <head> <meta http-equiv="Content-Type" content="text/html; charset=utf-8" />……
随后再通过一个for循环,遍历出其中的解析的详细内容,我们可以看到 Text url = entry.getKey();就是得到当前要解析的url,紧接着执行Parse parse = entry.getValue();其中的Text属性就是解析后的网页的主体信息即过滤了一些网页标签后的结果。剩下的代码主要实现将解析的内容collect出去。
3.执行完map方法后就是reduce,reducer的代码很简洁就一行: output.collect(key, (Writable)values.next()); // collect first value,自带的注解“collect first value”大概的意思就是map中每次只针对某一个url进行处理,所以收集到的解析的<text,parse>也就是唯一一个,自己的拙见啦~~~至此整个parse的过程就执行完毕了。
4.关于segment文件夹下的crawl_parse,parse_data,parse_text三个文件夹是如何生成的,我们可以看看上面job的输出ParseOutputFormat类。进入该类的主体方法getRecordWriter(),首先是一些初始化和变量的赋值,比如url过滤器、url规格化对象的生成,时间间隔、解析的上限等变量的赋值。然后通过以下三行代码定义输出目录:
Path text = new Path(new Path(out, ParseText.DIR_NAME), name); // parse_text
Path data = new Path(new Path(out, ParseData.DIR_NAME), name);//parse_data Path crawl = new Path(new Path(out, CrawlDatum.PARSE_DIR_NAME), name);//crawl_parse
然后再通过以下三个方法生成这三个目录
final MapFile.Writer textOut = new MapFile.Writer(job, fs, text.toString(), Text.class, ParseText.class, CompressionType.RECORD, progress); final MapFile.Writer dataOut = new MapFile.Writer(job, fs, data.toString(), Text.class, ParseData.class, compType, progress); final SequenceFile.Writer crawlOut = SequenceFile.createWriter(fs, job, crawl, Text.class, CrawlDatum.class, compType, progress);
以上就是对于parse过程的一个简单解析,相比前面的三个流程来说,parse模块的实现逻辑相对简单。。。
(备注:涉及到ParseOutputFormat部分还有一些东西没有搞懂,下面的参考博文给了详细的解释,有兴趣可以拜读下)
参考博文:http://blog.csdn.net/amuseme_lu/article/details/6727516
友情赞助
如果你觉得博主的文章对你那么一点小帮助,恰巧你又有想打赏博主的小冲动,那么事不宜迟,赶紧扫一扫,小额地赞助下,攒个奶粉钱,也是让博主有动力继续努力,写出更好的文章^^。
1. 支付宝 2. 微信