1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24 package org.archive.access.nutch;
25
26 import java.io.ByteArrayOutputStream;
27 import java.io.IOException;
28 import java.net.MalformedURLException;
29 import java.net.URI;
30 import java.net.URISyntaxException;
31 import java.net.URL;
32 import java.text.NumberFormat;
33 import java.util.Iterator;
34 import java.util.StringTokenizer;
35 import java.util.regex.Matcher;
36 import java.util.regex.Pattern;
37
38 import org.apache.commons.httpclient.Header;
39 import org.apache.commons.logging.Log;
40 import org.apache.commons.logging.LogFactory;
41 import org.apache.hadoop.conf.Configuration;
42 import org.apache.hadoop.fs.FileSystem;
43 import org.apache.hadoop.fs.Path;
44 import org.apache.hadoop.io.MD5Hash;
45 import org.apache.hadoop.io.MapFile;
46 import org.apache.hadoop.io.SequenceFile;
47 import org.apache.hadoop.io.Text;
48 import org.apache.hadoop.io.Writable;
49 import org.apache.hadoop.io.WritableComparable;
50 import org.apache.hadoop.io.SequenceFile.CompressionType;
51 import org.apache.hadoop.mapred.JobClient;
52 import org.apache.hadoop.mapred.JobConf;
53 import org.apache.hadoop.mapred.Mapper;
54 import org.apache.hadoop.mapred.OutputCollector;
55 import org.apache.hadoop.mapred.RecordWriter;
56 import org.apache.hadoop.mapred.Reporter;
57 import org.apache.hadoop.util.Progressable;
58 import org.apache.hadoop.util.StringUtils;
59 import org.apache.hadoop.util.ToolBase;
60 import org.apache.nutch.crawl.CrawlDatum;
61 import org.apache.nutch.crawl.MapWritable;
62 import org.apache.nutch.fetcher.Fetcher;
63 import org.apache.nutch.fetcher.FetcherOutput;
64 import org.apache.nutch.fetcher.FetcherOutputFormat;
65 import org.apache.nutch.metadata.Metadata;
66 import org.apache.nutch.metadata.Nutch;
67 import org.apache.nutch.net.URLFilters;
68 import org.apache.nutch.net.URLNormalizers;
69 import org.apache.nutch.parse.Outlink;
70 import org.apache.nutch.parse.Parse;
71 import org.apache.nutch.parse.ParseData;
72 import org.apache.nutch.parse.ParseImpl;
73 import org.apache.nutch.parse.ParseOutputFormat;
74 import org.apache.nutch.parse.ParseStatus;
75 import org.apache.nutch.parse.ParseText;
76 import org.apache.nutch.parse.ParseUtil;
77 import org.apache.nutch.protocol.Content;
78 import org.apache.nutch.scoring.ScoringFilterException;
79 import org.apache.nutch.scoring.ScoringFilters;
80 import org.apache.nutch.util.StringUtil;
81 import org.apache.nutch.util.mime.MimeType;
82 import org.apache.nutch.util.mime.MimeTypeException;
83 import org.apache.nutch.util.mime.MimeTypes;
84 import org.archive.io.ArchiveReader;
85 import org.archive.io.ArchiveReaderFactory;
86 import org.archive.io.arc.ARCConstants;
87 import org.archive.io.arc.ARCRecord;
88 import org.archive.io.arc.ARCRecordMetaData;
89 import org.archive.util.Base32;
90 import org.archive.util.MimetypeUtils;
91 import org.archive.util.TextUtils;
92
93 /***
94 * Ingests ARCs writing ARC Record parse as Nutch FetcherOutputFormat.
95 * FOF has four outputs (It used to have 5 but got rid of the
96 * empty content):
97 * <ul><li>crawl_fetch holds a fat CrawlDatum of all vitals including metadata.
98 * Its written below by our {@link WaxFetcherOutputFormat} (innutch by
99 * {@link FetcherOutputFormat}). Here is an example CD: <pre> Version: 4
100 * Status: 5 (fetch_success)
101 * Fetch time: Wed Mar 15 12:38:49 PST 2006
102 * Modified time: Wed Dec 31 16:00:00 PST 1969
103 * Retries since fetch: 0
104 * Retry interval: 0.0 days
105 * Score: 1.0
106 * Signature: null
107 * Metadata: collection:test arcname:IAH-20060315203614-00000-debord arcoffset:5127
108 * </pre></li>
109 * <li>crawl_parse has CrawlDatum of MD5s. Used making CrawlDB.
110 * Its obtained from above fat crawl_fetch CrawlDatum and written
111 * out as part of the parse output done by {@link WaxParseOutputFormat}.
112 * This latter class writes three files. This crawl_parse and both
113 * of the following parse_text and parse_data.</li>
114 * <li>parse_text has text from parse.</li>
115 * <li>parse_data has other metadata found by parse (Depends on
116 * parser). This is only input to linkdb. The html parser
117 * adds found out links here and content-type and discovered
118 * encoding as well as advertised encoding, etc.</li>
119 * </ul>
120 */
121 public class ImportArcs extends ToolBase implements Mapper {
122 public final Log LOG = LogFactory.getLog(ImportArcs.class);
123
124 private static final String WAX_SUFFIX = "wax.";
125 private static final String WHITESPACE = "//s+";
126
127 public static final String ARCFILENAME_KEY = "arcname";
128 public static final String ARCFILEOFFSET_KEY = "arcoffset";
129 public static final String ARCCOLLECTION_KEY = "collection";
130 private static final String CONTENT_TYPE_KEY = "content-type";
131 private static final String TEXT_TYPE = "text/";
132 private static final String APPLICATION_TYPE = "application/";
133 public static final String WAX_COLLECTION_KEY =
134 ImportArcs.WAX_SUFFIX + ImportArcs.ARCCOLLECTION_KEY;
135
136 private static final String PDF_TYPE = "application/pdf";
137
138 private boolean indexAll;
139 private boolean indexRedirects;
140 private int contentLimit;
141 private int pdfContentLimit;
142 private MimeTypes mimeTypes;
143 private String segmentName;
144 private String collectionName;
145
146 private final NumberFormat numberFormatter = NumberFormat.getInstance();
147
148 private int parseThreshold = -1;
149
150 private static final Pattern ARCHIVEIT_COLLECTION =
151 Pattern.compile("(?:ARCHIVEIT-)?([^-//._]+)[-|_|//.]([^//s]+)");
152
153 /***
154 * Usually the URL in first record looks like this:
155 * filedesc://IAH-20060315203614-00000-debord.arc. But in old
156 * ARCs, it can look like this: filedesc://19961022/IA-000001.arc.
157 */
158 private static final Pattern FILEDESC_ARCNAME =
159 Pattern.compile("^(?:filedesc://)(?:[0-9]+///)?(.+)(?://.arc)$");
160
161 /***
162 * Buffer to reuse on each ARCRecord indexing.
163 */
164 private final byte[] buffer = new byte[1024 * 16];
165
166 /***
167 *
168 */
169 private final ByteArrayOutputStream contentBuffer =
170 new ByteArrayOutputStream(1024 * 16);
171
172 /***
173 * How long to spend indexing.
174 */
175 private long maxtime;
176
177 private URLNormalizers urlNormalizers;
178 private URLFilters filters;
179
180 private boolean sha1 = false;
181
182 public ImportArcs() {
183 super();
184 }
185
186 public ImportArcs(Configuration conf) {
187 setConf(conf);
188 }
189
190 public void configure(final JobConf job) {
191 setConf(job);
192 this.indexAll = job.getBoolean("wax.index.all", false);
193 this.indexRedirects = job.getBoolean("wax.index.redirects", false);
194 this.contentLimit = job.getInt("http.content.limit", 1024 * 100);
195 final int pdfMultiplicand = job.getInt("wax.pdf.size.multiplicand", 10);
196 this.pdfContentLimit = (this.contentLimit == -1) ? this.contentLimit
197 : pdfMultiplicand * this.contentLimit;
198 this.mimeTypes = MimeTypes.get(job.get("mime.types.file"));
199 this.segmentName = job.get(Nutch.SEGMENT_NAME_KEY);
200
201 this.maxtime = job.getLong("wax.index.timeout", 60) * 60 * 1000;
202
203
204 System.setProperty("java.protocol.handler.pkgs", "org.archive.net");
205
206
207 this.numberFormatter.setMaximumFractionDigits(2);
208 this.numberFormatter.setMinimumFractionDigits(2);
209 this.parseThreshold = job.getInt("wax.parse.rate.threshold", -1);
210
211 this.urlNormalizers =
212 new URLNormalizers(job, URLNormalizers.SCOPE_FETCHER);
213 this.filters = new URLFilters(job);
214
215 this.sha1 = job.getBoolean("wax.digest.sha1", false);
216 }
217
218 private class ImportArcsReporter implements Reporter {
219 private final Reporter wrappedReporter;
220 private long nextUpdate = 0;
221 private long time = System.currentTimeMillis();
222
223 private static final long FIVE_MINUTES = 1000 * 60 * 5;
224
225 public ImportArcsReporter(final Reporter r) {
226 this.wrappedReporter = r;
227 }
228
229 public void setStatus(final String msg) throws IOException {
230 setStatus(msg, false);
231 }
232
233 public void setStatus(final String msg, final boolean writeThrough)
234 throws IOException {
235 LOG.info(msg);
236
237 long now = System.currentTimeMillis();
238 if (writeThrough || now > this.nextUpdate) {
239 this.wrappedReporter.setStatus(msg);
240 this.nextUpdate = now + 1000;
241 this.time = now;
242 }
243 }
244
245 /***
246 * Update reporter if its a long time since last log only.
247 * @param msg Message to report IF we haven't reported in a long time.
248 * @throws IOException
249 */
250 public void setStatusIfElapse(final String msg)
251 throws IOException {
252 long now = System.currentTimeMillis();
253 if ((now - this.time) > FIVE_MINUTES) {
254 setStatus(msg);
255 }
256 }
257
258 public void progress() throws IOException {
259 this.wrappedReporter.progress();
260 }
261 };
262
263 public void map(final WritableComparable key, final Writable value,
264 final OutputCollector output, final Reporter reporter)
265 throws IOException {
266 final ImportArcsReporter importArcsReporter =
267 new ImportArcsReporter(reporter);
268 final String arcurl = value.toString();
269 if ((arcurl == null) || arcurl.endsWith("work")) {
270 importArcsReporter.setStatus("skipping " + arcurl);
271 return;
272 }
273
274
275 final Thread t = new IndexingThread(arcurl, output, importArcsReporter);
276 t.setDaemon(true);
277 t.start();
278 final long start = System.currentTimeMillis();
279 try {
280 for (long period = this.maxtime; t.isAlive() && (period > 0);
281 period = this.maxtime - (System.currentTimeMillis() - start)) {
282 try {
283 t.join(period);
284 } catch (final InterruptedException e) {
285 e.printStackTrace();
286 }
287 }
288 } finally {
289 cleanup(t, importArcsReporter);
290 }
291 }
292
293 protected void cleanup(final Thread t, final Reporter reporter)
294 throws IOException {
295 if (!t.isAlive()) {
296 return;
297 }
298 reporter.setStatus("Killing indexing thread " + t.getName());
299 t.interrupt();
300 try {
301
302 t.join(1000);
303 } catch (final InterruptedException e) {
304 e.printStackTrace();
305 }
306 if (t.isAlive()) {
307 LOG.info(t.getName() + " will not die");
308 }
309 }
310
311 private class IndexingThread extends Thread {
312 private final String arcLocation;
313 private final OutputCollector output;
314 private final ImportArcsReporter reporter;
315
316 public IndexingThread(final String arcloc, final OutputCollector o,
317 final ImportArcsReporter r) {
318
319 super(arcloc);
320 this.arcLocation = arcloc;
321 this.output = o;
322 this.reporter = r;
323 }
324
325 public void run() {
326 ArchiveReader arc = null;
327
328
329 Thread reportingDuringDownload = null;
330 try {
331 this.reporter.setStatus("opening " + this.arcLocation, true);
332 reportingDuringDownload = new Thread("reportingDuringDownload") {
333 public void run() {
334 while (!this.isInterrupted()) {
335 try {
336 synchronized (this) {
337 sleep(1000 * 60);
338 }
339 reporter.setStatus("downloading " +
340 arcLocation);
341 } catch (final IOException e) {
342 e.printStackTrace();
343
344
345 break;
346 } catch (final InterruptedException e) {
347
348 break;
349 }
350 }
351 }
352 };
353 reportingDuringDownload.setDaemon(true);
354 reportingDuringDownload.start();
355 arc = ArchiveReaderFactory.get(this.arcLocation);
356 } catch (final Throwable e) {
357 try {
358 final String msg = "Error opening " + this.arcLocation
359 + ": " + e.toString();
360 this.reporter.setStatus(msg, true);
361 LOG.info(msg);
362 } catch (final IOException ioe) {
363 LOG.warn(this.arcLocation, ioe);
364 }
365 return;
366 } finally {
367 if ((reportingDuringDownload != null)
368 && reportingDuringDownload.isAlive()) {
369 reportingDuringDownload.interrupt();
370 }
371 }
372
373 arc.setDigest(sha1);
374 String arcName = null;
375 try {
376
377 ImportArcs.this.collectionName = getConf().
378 get(ImportArcs.WAX_SUFFIX + ImportArcs.ARCCOLLECTION_KEY);
379
380 final ParseUtil pu = new ParseUtil(getConf());
381
382 for (final Iterator i = arc.iterator();
383 i.hasNext() && !currentThread().isInterrupted();) {
384 final ARCRecord rec = (ARCRecord) i.next();
385
386 if (arcName == null) {
387
388 arcName = trimARCName(rec.getMetaData().getUrl());
389 if ((ImportArcs.this.collectionName == null) ||
390 (ImportArcs.this.collectionName.length() <= 0)) {
391 ImportArcs.this.collectionName =
392 getCollectionFromArcname(arcName);
393 }
394 if ((ImportArcs.this.collectionName == null) ||
395 (ImportArcs.this.collectionName.length() == 0)) {
396 throw new NullPointerException("Collection name can't "
397 + "be empty");
398 }
399 }
400 if (!isIndex(rec)) {
401 continue;
402 }
403 try {
404 final long recordLength = processRecord(arcName, rec,
405 this.output, this.reporter, pu);
406 if (recordLength >
407 ARCConstants.DEFAULT_MAX_ARC_FILE_SIZE) {
408
409
410
411
412
413
414
415
416 this.reporter.setStatus("skipping "
417 + this.arcLocation
418 + " -- very long record "
419 + rec.getMetaData());
420 break;
421 }
422 } catch (final Throwable e) {
423
424 LOG.warn("Error processing " + rec.getMetaData(), e);
425 }
426 }
427 if (currentThread().isInterrupted()) {
428 LOG.info(currentThread().getName() + " interrupted");
429 }
430 this.reporter.setStatus("closing " + this.arcLocation, true);
431 } catch (final Throwable e) {
432
433 final String msg = "Error parsing " + this.arcLocation;
434 try {
435 this.reporter.setStatus(msg, true);
436 } catch (final IOException ioe) {
437 ioe.printStackTrace();
438 }
439 LOG.warn(msg, e);
440 } finally {
441 try {
442 arc.close();
443 } catch (final IOException e) {
444 e.printStackTrace();
445 }
446 }
447 }
448
449 /***
450 * Strip scheme prefix and arc or arc.gz suffix if present.
451 * First record looks like this:
452 * filedesc://IAH-20060315203614-00000-debord.arc
453 * Strip extraneous prefix and suffix (At least WERA expects an ARC
454 * name without scheme and suffix: i.e.
455 * IAH-20060315203614-00000-debord).
456 */
457 private String trimARCName(String arcname) {
458 final Matcher m = ImportArcs.FILEDESC_ARCNAME.matcher(arcname);
459 if ((m != null) && m.matches()) {
460 arcname = m.group(1);
461 }
462 return arcname;
463 }
464 }
465
466 /***
467 * @param rec ARC Record to test.
468 * @return True if we are to index this record.
469 */
470 protected boolean isIndex(final ARCRecord rec) {
471 return ((rec.getStatusCode() >= 200) && (rec.getStatusCode() < 300))
472 || (this.indexRedirects && ((rec.getStatusCode() >= 300) &&
473 (rec.getStatusCode() < 400)));
474 }
475
476 long processRecord(final String arcName, final ARCRecord rec,
477 final OutputCollector output, final ImportArcsReporter reporter,
478 final ParseUtil parseUtil) throws IOException {
479 final ARCRecordMetaData arcData = rec.getMetaData();
480
481
482 String url = arcData.getUrl();
483 String oldUrl = url;
484 try {
485 url = urlNormalizers.normalize(url,
486 URLNormalizers.SCOPE_FETCHER);
487 url = filters.filter(url);
488 } catch (Exception e) {
489 LOG.warn("Skipping record. Didn't pass normalization/filter " +
490 oldUrl + ": " + e.toString());
491 return 0;
492 }
493
494 final long b = arcData.getContentBegin();
495 final long l = arcData.getLength();
496 final long recordLength = (l > b)? (l - b): l;
497
498
499
500
501 String mimetype =
502 getMimetype(arcData.getMimetype(), this.mimeTypes, url);
503 if (skip(mimetype)) {
504 return recordLength;
505 }
506
507
508 final Metadata metaData = new Metadata();
509 final Header[] headers = rec.getHttpHeaders();
510 for (int j = 0; j < headers.length; j++) {
511 final Header header = headers[j];
512 if (mimetype == null) {
513
514
515
516 if ((header.getName() != null) &&
517 header.getName().toLowerCase().equals(
518 ImportArcs.CONTENT_TYPE_KEY)) {
519 mimetype = getMimetype(header.getValue(), null, null);
520 if (skip(mimetype)) {
521 return recordLength;
522 }
523 }
524 }
525 metaData.set(header.getName(), header.getValue());
526 }
527
528
529
530
531 final String noSpacesMimetype =
532 TextUtils.replaceAll(ImportArcs.WHITESPACE,
533 ((mimetype == null || mimetype.length() <= 0)?
534 "TODO": mimetype),
535 "-");
536 final String recordLengthAsStr = Long.toString(recordLength);
537 reporter.setStatus(getStatus(url, oldUrl, recordLengthAsStr,
538 noSpacesMimetype));
539
540
541 metaData.set("contentLength", recordLengthAsStr);
542
543 rec.skipHttpHeader();
544 reporter.setStatusIfElapse("read headers on " + url);
545
546
547 int total = 0;
548
549 int len = rec.read(this.buffer, 0, this.buffer.length);
550 if (mimetype == null) {
551 MimeType mt = this.mimeTypes.getMimeType(this.buffer);
552 if (mt == null || mt.getName() == null) {
553 LOG.warn("Failed to get mimetype for: " + url);
554 return recordLength;
555 }
556 mimetype = mt.getName();
557 }
558 metaData.set(ImportArcs.CONTENT_TYPE_KEY, mimetype);
559
560
561
562 int readLimit = (ImportArcs.PDF_TYPE.equals(mimetype))?
563 this.pdfContentLimit : this.contentLimit;
564
565
566 this.contentBuffer.reset();
567 while ((len != -1) && ((readLimit == -1) || (total < readLimit))) {
568 total += len;
569 this.contentBuffer.write(this.buffer, 0, len);
570 len = rec.read(this.buffer, 0, this.buffer.length);
571 reporter.setStatusIfElapse("reading " + url);
572 }
573
574
575 rec.close();
576 reporter.setStatusIfElapse("closed " + url);
577
578 final byte[] contentBytes = this.contentBuffer.toByteArray();
579 final CrawlDatum datum = new CrawlDatum();
580 datum.setStatus(CrawlDatum.STATUS_FETCH_SUCCESS);
581
582
583 metaData.set(Nutch.SIGNATURE_KEY, (this.sha1)?
584 rec.getDigestStr():
585 MD5Hash.digest(contentBytes).toString());
586
587 metaData.set(Nutch.SEGMENT_NAME_KEY, this.segmentName);
588
589 metaData.set(Nutch.SCORE_KEY, Float.toString(datum.getScore()));
590
591 final long startTime = System.currentTimeMillis();
592 final Content content = new Content(url, url, contentBytes, mimetype,
593 metaData, getConf());
594
595 datum.setFetchTime(Nutchwax.getDate(arcData.getDate()));
596
597 MapWritable mw = datum.getMetaData();
598 if (mw == null) {
599 mw = new MapWritable();
600 }
601 mw.put(new Text(ImportArcs.ARCCOLLECTION_KEY),
602 new Text(collectionName));
603 mw.put(new Text(ImportArcs.ARCFILENAME_KEY), new Text(arcName));
604 mw.put(new Text(ImportArcs.ARCFILEOFFSET_KEY),
605 new Text(Long.toString(arcData.getOffset())));
606 datum.setMetaData(mw);
607
608 Parse parse = null;
609 ParseStatus parseStatus;
610 try {
611 parse = parseUtil.parse(content);
612 reporter.setStatusIfElapse("parsed " + url);
613 parseStatus = parse.getData().getStatus();
614 } catch (final Exception e) {
615 parseStatus = new ParseStatus(e);
616 }
617 if (!parseStatus.isSuccess()) {
618 final String status = formatToOneLine(parseStatus.toString());
619 LOG.warn("Error parsing: " + mimetype + " " + url + ": " + status);
620 parse = null;
621 } else {
622
623 final double kbPerSecond = getParseRate(startTime,
624 (contentBytes != null) ? contentBytes.length : 0);
625 if (LOG.isDebugEnabled()) {
626 LOG.debug(getParseRateLogMessage(url,
627 noSpacesMimetype, kbPerSecond));
628 } else if (kbPerSecond < this.parseThreshold) {
629 LOG.warn(getParseRateLogMessage(url, noSpacesMimetype,
630 kbPerSecond));
631 }
632 }
633 Writable value = new FetcherOutput(datum, null,
634 parse != null ? new ParseImpl(parse) : null);
635 output.collect(Nutchwax.generateWaxKey(url, this.collectionName),
636 value);
637 return recordLength;
638 }
639
640 protected String getStatus(final String url, String oldUrl,
641 final String recordLengthAsStr, final String noSpacesMimetype) {
642
643
644 if (oldUrl.equals(url)) {
645 oldUrl = "-";
646 }
647 StringBuilder sb = new StringBuilder(128);
648 sb.append("adding ");
649 sb.append(url);
650 sb.append(" ");
651 sb.append(oldUrl);
652 sb.append(" ");
653 sb.append(recordLengthAsStr);
654 sb.append(" ");
655 sb.append(noSpacesMimetype);
656 return sb.toString();
657 }
658
659 protected String formatToOneLine(final String s) {
660 final StringBuffer sb = new StringBuffer(s.length());
661 for (final StringTokenizer st = new StringTokenizer(s, "\t\n\r");
662 st.hasMoreTokens(); sb.append(st.nextToken())) {
663 ;
664 }
665 return sb.toString();
666 }
667
668 protected static String getCollectionFromArcname(final String arcurl)
669 throws URISyntaxException {
670 final URI u = new URI(arcurl);
671 final String p = u.getPath();
672 if (p.length() <= 0) {
673 throw new URISyntaxException("Path is empty.", arcurl);
674 }
675 final int index = p.lastIndexOf('/');
676 String arcname = p;
677 if (index >= 0) {
678 arcname = p.substring(index + 1);
679 }
680 final Matcher m = ImportArcs.ARCHIVEIT_COLLECTION.matcher(arcname);
681 if ((m == null) || !m.matches()) {
682 throw new URISyntaxException("Can't find collection in arcname",
683 arcname);
684 }
685 return m.group(1);
686 }
687
688 protected String getParseRateLogMessage(final String url,
689 final String mimetype, final double kbPerSecond) {
690 return url + " " + mimetype + " parse KB/Sec "
691 + this.numberFormatter.format(kbPerSecond);
692 }
693
694 protected double getParseRate(final long startTime, final long len) {
695
696 long elapsedTime = System.currentTimeMillis() - startTime;
697 elapsedTime = (elapsedTime == 0) ? 1 : elapsedTime;
698 return (len != 0) ? ((double) len / 1024)
699 / ((double) elapsedTime / 1000) : 0;
700 }
701
702 protected boolean skip(final String mimetype) {
703 boolean decision = false;
704
705 if (!this.indexAll) {
706 if ((mimetype == null)
707 || (!mimetype.startsWith(ImportArcs.TEXT_TYPE) && !mimetype
708 .startsWith(ImportArcs.APPLICATION_TYPE))) {
709
710 decision = true;
711 }
712 }
713 return decision;
714 }
715
716 protected String getMimetype(final String mimetype, final MimeTypes mts,
717 final String url) {
718 if (mimetype != null && mimetype.length() > 0) {
719 return checkMimetype(mimetype.toLowerCase());
720 }
721 if (mts != null && url != null) {
722 final MimeType mt = mts.getMimeType(url);
723 if (mt != null) {
724 return checkMimetype(mt.getName().toLowerCase());
725 }
726 }
727 return null;
728 }
729
730 protected static String checkMimetype(String mimetype) {
731 if ((mimetype == null) || (mimetype.length() <= 0) ||
732 mimetype.startsWith(MimetypeUtils.NO_TYPE_MIMETYPE)) {
733 return null;
734 }
735
736
737 try {
738 new MimeType(mimetype);
739 } catch (final MimeTypeException e) {
740 mimetype = null;
741 }
742 return mimetype;
743 }
744
745 public void importArcs(final Path arcUrlsDir, final Path segment,
746 final String collection) throws IOException {
747 LOG.info("ImportArcs segment: " + segment + ", src: " + arcUrlsDir);
748
749 final JobConf job = new JobConf(getConf(), this.getClass());
750
751 job.set(Nutch.SEGMENT_NAME_KEY, segment.getName());
752
753 job.setInputPath(arcUrlsDir);
754 job.setMapperClass(ImportArcs.class);
755
756 job.setOutputPath(segment);
757 job.setOutputFormat(WaxFetcherOutputFormat.class);
758 job.setOutputKeyClass(Text.class);
759 job.setOutputValueClass(FetcherOutput.class);
760
761 if ((collection != null) && (collection.length() > 0)) {
762 job.set(ImportArcs.WAX_SUFFIX + ImportArcs.ARCCOLLECTION_KEY,
763 collection);
764 }
765 job.setJobName("import " + arcUrlsDir + " " + segment);
766
767 JobClient.runJob(job);
768 LOG.info("ImportArcs: done");
769 }
770
771 /***
772 * Override of nutch FetcherOutputFormat so I can substitute my own
773 * ParseOutputFormat, {@link WaxParseOutputFormat}. While I'm here,
774 * removed content references. NutchWAX doesn't save content.
775 * @author stack
776 */
777 public static class WaxFetcherOutputFormat extends FetcherOutputFormat {
778 public RecordWriter getRecordWriter(final FileSystem fs,
779 final JobConf job, final String name, Progressable progress)
780 throws IOException {
781 Path f = new Path(job.getOutputPath(), CrawlDatum.FETCH_DIR_NAME);
782 final Path fetch = new Path(f, name);
783 final MapFile.Writer fetchOut = new MapFile.Writer(fs,
784 fetch.toString(), Text.class, CrawlDatum.class);
785
786 return new RecordWriter() {
787 private RecordWriter parseOut;
788 {
789 if (Fetcher.isParsing(job)) {
790
791
792 this.parseOut = new WaxParseOutputFormat().
793 getRecordWriter(fs, job, name, null);
794 }
795 }
796
797 public void write(WritableComparable key, Writable value)
798 throws IOException {
799 FetcherOutput fo = (FetcherOutput)value;
800 fetchOut.append(key, fo.getCrawlDatum());
801 if (fo.getParse() != null) {
802 parseOut.write(key, fo.getParse());
803 }
804 }
805
806 public void close(Reporter reporter) throws IOException {
807 fetchOut.close();
808 if (parseOut != null) {
809 parseOut.close(reporter);
810 }
811 }
812 };
813 }
814 }
815
816 /***
817 * Copy so I can add collection prefix to produced signature and link
818 * CrawlDatums.
819 * @author stack
820 */
821 public static class WaxParseOutputFormat extends ParseOutputFormat {
822 public final Log LOG = LogFactory.getLog(WaxParseOutputFormat.class);
823
824 private URLNormalizers urlNormalizers;
825 private URLFilters filters;
826 private ScoringFilters scfilters;
827
828
829 public RecordWriter getRecordWriter(FileSystem fs, JobConf job,
830 String name, Progressable progress)
831 throws IOException {
832
833
834
835 this.urlNormalizers =
836 new URLNormalizers(job, URLNormalizers.SCOPE_OUTLINK);
837 this.filters = new URLFilters(job);
838 this.scfilters = new ScoringFilters(job);
839 final float interval =
840 job.getFloat("db.default.fetch.interval", 30f);
841 final boolean ignoreExternalLinks =
842 job.getBoolean("db.ignore.external.links", false);
843 final boolean sha1 = job.getBoolean("wax.digest.sha1", false);
844
845
846 Path text = new Path(new Path(job.getOutputPath(),
847 ParseText.DIR_NAME), name);
848 Path data = new Path(new Path(job.getOutputPath(),
849 ParseData.DIR_NAME), name);
850 Path crawl = new Path(new Path(job.getOutputPath(),
851 CrawlDatum.PARSE_DIR_NAME), name);
852
853 final MapFile.Writer textOut = new MapFile.Writer(job, fs,
854 text.toString(), Text.class, ParseText.class,
855 CompressionType.RECORD);
856
857 final MapFile.Writer dataOut = new MapFile.Writer(job, fs,
858 data.toString(), Text.class, ParseData.class);
859
860 final SequenceFile.Writer crawlOut = SequenceFile.createWriter(fs,
861 job, crawl, Text.class, CrawlDatum.class);
862
863 return new RecordWriter() {
864
865 public void write(WritableComparable key, Writable value)
866 throws IOException {
867
868
869 String collection = null;
870 String fromUrl = null;
871 String fromHost = null;
872 String toHost = null;
873 try {
874 collection = Nutchwax.getCollectionFromWaxKey(key);
875 fromUrl = Nutchwax.getUrlFromWaxKey(key);
876 } catch (IOException ioe) {
877 LOG.warn("Skipping record. Can't parse " + key, ioe);
878 return;
879 }
880 if (fromUrl == null || collection == null) {
881 LOG.warn("Skipping record. Null from or collection " +
882 key);
883 return;
884 }
885
886 Parse parse = (Parse) value;
887
888 textOut.append(key, new ParseText(parse.getText()));
889
890 ParseData parseData = parse.getData();
891
892 String sig = parseData.getContentMeta().get(
893 Nutch.SIGNATURE_KEY);
894 if (sig != null) {
895 byte[] signature = (sha1)?
896 Base32.decode(sig): StringUtil.fromHexString(sig);
897 if (signature != null) {
898
899 CrawlDatum d = new CrawlDatum(
900 CrawlDatum.STATUS_SIGNATURE, 0.0f);
901 d.setSignature(signature);
902 crawlOut.append(key, d);
903 }
904 }
905
906
907 Outlink[] links = parseData.getOutlinks();
908 if (ignoreExternalLinks) {
909 try {
910 fromHost = new URL(fromUrl).getHost().toLowerCase();
911 } catch (MalformedURLException e) {
912 fromHost = null;
913 }
914 } else {
915 fromHost = null;
916 }
917
918 String[] toUrls = new String[links.length];
919 int validCount = 0;
920 for (int i = 0; i < links.length; i++) {
921 String toUrl = links[i].getToUrl();
922 try {
923 toUrl = urlNormalizers.normalize(toUrl,
924 URLNormalizers.SCOPE_OUTLINK);
925 toUrl = filters.filter(toUrl);
926 } catch (Exception e) {
927 toUrl = null;
928 }
929
930 if (fromUrl.equals(toUrl)) toUrl = null;
931 if (toUrl != null)
932 validCount++;
933 toUrls[i] = toUrl;
934 }
935
936 CrawlDatum adjust = null;
937
938
939 for (int i = 0; i < toUrls.length; i++) {
940 if (toUrls[i] == null)
941 continue;
942 if (ignoreExternalLinks) {
943 try {
944 toHost = new URL(toUrls[i]).getHost().
945 toLowerCase();
946 } catch (MalformedURLException e) {
947 toHost = null;
948 }
949 if (toHost == null || !toHost.equals(fromHost)) {
950
951 continue;
952 }
953 }
954
955 CrawlDatum target = new CrawlDatum(
956 CrawlDatum.STATUS_LINKED, interval);
957 Text fromURLUTF8 = new Text(fromUrl);
958 Text targetUrl = new Text(toUrls[i]);
959 adjust = null;
960 try {
961
962
963
964 adjust = scfilters.distributeScoreToOutlink(
965 fromURLUTF8, targetUrl, parseData,
966 target, null, links.length, validCount);
967 } catch (ScoringFilterException e) {
968 if (LOG.isWarnEnabled()) {
969 LOG.warn("Cannot distribute score from " + key
970 + " to " + target + " - skipped ("
971 + e.getMessage());
972 }
973 continue;
974 }
975 Text targetKey =
976 Nutchwax.generateWaxKey(targetUrl, collection);
977 crawlOut.append(targetKey, target);
978 if (adjust != null)
979 crawlOut.append(key, adjust);
980 }
981 dataOut.append(key, parseData);
982 }
983
984 public void close(Reporter reporter) throws IOException {
985 textOut.close();
986 dataOut.close();
987 crawlOut.close();
988 }
989 };
990 }
991 }
992
993 public void close() {
994
995 }
996
997 public static void doImportUsage(final String message,
998 final int exitCode) {
999 if (message != null && message.length() > 0) {
1000 System.out.println(message);
1001 }
1002 System.out.println("Usage: hadoop jar nutchwax.jar import <input>" +
1003 " <output> <collection>");
1004 System.out.println("Arguments:");
1005 System.out.println(" input Directory of files" +
1006 " listing ARC URLs to import.");
1007 System.out.println(" output Directory to import to. Inport is " +
1008 "written to a subdir named");
1009 System.out.println(" for current date under " +
1010 "'<output>/segments/'.");
1011 System.out.println(" collection Collection name. Added to" +
1012 " each resource.");
1013 System.exit(exitCode);
1014 }
1015
1016 public static void main(String[] args) throws Exception {
1017 int res = new ImportArcs().
1018 doMain(NutchwaxConfiguration.getConfiguration(), args);
1019 System.exit(res);
1020 }
1021
1022 public int run(final String[] args) throws Exception {
1023 if (args.length != 3) {
1024 doImportUsage("ERROR: Wrong number of arguments passed.", 2);
1025 }
1026
1027 try {
1028 importArcs(new Path(args[0]), new Path(args[1]), args[2]);
1029 return 0;
1030 } catch(Exception e) {
1031 LOG.fatal("ImportARCs: " + StringUtils.stringifyException(e));
1032 return -1;
1033 }
1034 }
1035 }