1   /*
2    * $Id: ImportArcs.java 1445 2007-01-19 05:10:40Z stack-sf $
3    * 
4    * Copyright (C) 2003 Internet Archive.
5    * 
6    * This file is part of the archive-access tools project
7    * (http://sourceforge.net/projects/archive-access).
8    * 
9    * The archive-access tools are free software; you can redistribute them and/or
10   * modify them under the terms of the GNU Lesser Public License as published by
11   * the Free Software Foundation; either version 2.1 of the License, or any
12   * later version.
13   * 
14   * The archive-access tools are distributed in the hope that they will be
15   * useful, but WITHOUT ANY WARRANTY; without even the implied warranty of
16   * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser
17   * Public License for more details.
18   * 
19   * You should have received a copy of the GNU Lesser Public License along with
20   * the archive-access tools; if not, write to the Free Software Foundation,
21   * Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
22   */
23  
24  package org.archive.access.nutch;
25  
26  import java.io.ByteArrayOutputStream;
27  import java.io.IOException;
28  import java.net.MalformedURLException;
29  import java.net.URI;
30  import java.net.URISyntaxException;
31  import java.net.URL;
32  import java.text.NumberFormat;
33  import java.util.Iterator;
34  import java.util.StringTokenizer;
35  import java.util.regex.Matcher;
36  import java.util.regex.Pattern;
37  
38  import org.apache.commons.httpclient.Header;
39  import org.apache.commons.logging.Log;
40  import org.apache.commons.logging.LogFactory;
41  import org.apache.hadoop.conf.Configuration;
42  import org.apache.hadoop.fs.FileSystem;
43  import org.apache.hadoop.fs.Path;
44  import org.apache.hadoop.io.MD5Hash;
45  import org.apache.hadoop.io.MapFile;
46  import org.apache.hadoop.io.SequenceFile;
47  import org.apache.hadoop.io.Text;
48  import org.apache.hadoop.io.Writable;
49  import org.apache.hadoop.io.WritableComparable;
50  import org.apache.hadoop.io.SequenceFile.CompressionType;
51  import org.apache.hadoop.mapred.JobClient;
52  import org.apache.hadoop.mapred.JobConf;
53  import org.apache.hadoop.mapred.Mapper;
54  import org.apache.hadoop.mapred.OutputCollector;
55  import org.apache.hadoop.mapred.RecordWriter;
56  import org.apache.hadoop.mapred.Reporter;
57  import org.apache.hadoop.util.Progressable;
58  import org.apache.hadoop.util.StringUtils;
59  import org.apache.hadoop.util.ToolBase;
60  import org.apache.nutch.crawl.CrawlDatum;
61  import org.apache.nutch.crawl.MapWritable;
62  import org.apache.nutch.fetcher.Fetcher;
63  import org.apache.nutch.fetcher.FetcherOutput;
64  import org.apache.nutch.fetcher.FetcherOutputFormat;
65  import org.apache.nutch.metadata.Metadata;
66  import org.apache.nutch.metadata.Nutch;
67  import org.apache.nutch.net.URLFilters;
68  import org.apache.nutch.net.URLNormalizers;
69  import org.apache.nutch.parse.Outlink;
70  import org.apache.nutch.parse.Parse;
71  import org.apache.nutch.parse.ParseData;
72  import org.apache.nutch.parse.ParseImpl;
73  import org.apache.nutch.parse.ParseOutputFormat;
74  import org.apache.nutch.parse.ParseStatus;
75  import org.apache.nutch.parse.ParseText;
76  import org.apache.nutch.parse.ParseUtil;
77  import org.apache.nutch.protocol.Content;
78  import org.apache.nutch.scoring.ScoringFilterException;
79  import org.apache.nutch.scoring.ScoringFilters;
80  import org.apache.nutch.util.StringUtil;
81  import org.apache.nutch.util.mime.MimeType;
82  import org.apache.nutch.util.mime.MimeTypeException;
83  import org.apache.nutch.util.mime.MimeTypes;
84  import org.archive.io.ArchiveReader;
85  import org.archive.io.ArchiveReaderFactory;
86  import org.archive.io.arc.ARCConstants;
87  import org.archive.io.arc.ARCRecord;
88  import org.archive.io.arc.ARCRecordMetaData;
89  import org.archive.util.Base32;
90  import org.archive.util.MimetypeUtils;
91  import org.archive.util.TextUtils;
92  
93  /***
94   * Ingests ARCs writing ARC Record parse as Nutch FetcherOutputFormat.
95   * FOF has four outputs (It used to have 5 but got rid of the
96   * empty content):
97   * <ul><li>crawl_fetch holds a fat CrawlDatum of all vitals including metadata.
98   * Its written below by our {@link WaxFetcherOutputFormat} (innutch by
99   * {@link FetcherOutputFormat}).  Here is an example CD: <pre>  Version: 4
100  *  Status: 5 (fetch_success)
101  *  Fetch time: Wed Mar 15 12:38:49 PST 2006
102  *  Modified time: Wed Dec 31 16:00:00 PST 1969
103  *  Retries since fetch: 0
104  *  Retry interval: 0.0 days
105  *  Score: 1.0
106  *  Signature: null
107  *  Metadata: collection:test arcname:IAH-20060315203614-00000-debord arcoffset:5127 
108  * </pre></li>
109  * <li>crawl_parse has CrawlDatum of MD5s.  Used making CrawlDB.
110  * Its obtained from above fat crawl_fetch CrawlDatum and written
111  * out as part of the parse output done by {@link WaxParseOutputFormat}.
112  * This latter class writes three files.  This crawl_parse and both
113  * of the following parse_text and parse_data.</li>
114  * <li>parse_text has text from parse.</li>
115  * <li>parse_data has other metadata found by parse (Depends on
116  * parser).  This is only input to linkdb.  The html parser
117  * adds found out links here and content-type and discovered
118  * encoding as well as advertised encoding, etc.</li>
119  * </ul>
120  */
121 public class ImportArcs extends ToolBase implements Mapper {
122     public final Log LOG = LogFactory.getLog(ImportArcs.class);
123 
124     private static final String WAX_SUFFIX = "wax.";
125     private static final String WHITESPACE = "//s+";
126 
127     public static final String ARCFILENAME_KEY = "arcname";
128     public static final String ARCFILEOFFSET_KEY = "arcoffset";
129     public static final String ARCCOLLECTION_KEY = "collection";
130     private static final String CONTENT_TYPE_KEY = "content-type";
131     private static final String TEXT_TYPE = "text/";
132     private static final String APPLICATION_TYPE = "application/";
133     public static final String WAX_COLLECTION_KEY =
134         ImportArcs.WAX_SUFFIX + ImportArcs.ARCCOLLECTION_KEY;
135 
136     private static final String PDF_TYPE = "application/pdf";
137 
138     private boolean indexAll;
139     private boolean indexRedirects;
140     private int contentLimit;
141     private int pdfContentLimit;
142     private MimeTypes mimeTypes;
143     private String segmentName;
144     private String collectionName;
145 
146     private final NumberFormat numberFormatter = NumberFormat.getInstance();
147 
148     private int parseThreshold = -1;
149 
150     private static final Pattern ARCHIVEIT_COLLECTION =
151         Pattern.compile("(?:ARCHIVEIT-)?([^-//._]+)[-|_|//.]([^//s]+)");
152     
153     /***
154      * Usually the URL in first record looks like this:
155      * filedesc://IAH-20060315203614-00000-debord.arc.  But in old
156      * ARCs, it can look like this: filedesc://19961022/IA-000001.arc.
157      */
158     private static final Pattern FILEDESC_ARCNAME =
159         Pattern.compile("^(?:filedesc://)(?:[0-9]+///)?(.+)(?://.arc)$");
160 
161     /***
162      * Buffer to reuse on each ARCRecord indexing.
163      */
164     private final byte[] buffer = new byte[1024 * 16];
165     
166     /***
167      * 
168      */
169     private final ByteArrayOutputStream contentBuffer =
170         new ByteArrayOutputStream(1024 * 16);
171 
172     /***
173      * How long to spend indexing.
174      */
175     private long maxtime;
176     
177     private URLNormalizers urlNormalizers;
178     private URLFilters filters;
179 
180     private boolean sha1 = false;
181     
182     public ImportArcs() {
183     	super();
184     }
185     
186     public ImportArcs(Configuration conf) {
187       setConf(conf);
188     }
189     
190     public void configure(final JobConf job) {
191         setConf(job);
192         this.indexAll = job.getBoolean("wax.index.all", false);
193         this.indexRedirects = job.getBoolean("wax.index.redirects", false);
194         this.contentLimit = job.getInt("http.content.limit", 1024 * 100);
195         final int pdfMultiplicand = job.getInt("wax.pdf.size.multiplicand", 10);
196         this.pdfContentLimit = (this.contentLimit == -1) ? this.contentLimit
197                 : pdfMultiplicand * this.contentLimit;
198         this.mimeTypes = MimeTypes.get(job.get("mime.types.file"));
199         this.segmentName = job.get(Nutch.SEGMENT_NAME_KEY);
200         // Value is in minutes.
201         this.maxtime = job.getLong("wax.index.timeout", 60) * 60 * 1000;
202 
203         // Get the rsync protocol handler into the mix.
204         System.setProperty("java.protocol.handler.pkgs", "org.archive.net");
205 
206         // Format numbers output by parse rate logging.
207         this.numberFormatter.setMaximumFractionDigits(2);
208         this.numberFormatter.setMinimumFractionDigits(2);
209         this.parseThreshold = job.getInt("wax.parse.rate.threshold", -1);
210         
211         this.urlNormalizers =
212         	new URLNormalizers(job, URLNormalizers.SCOPE_FETCHER);
213         this.filters = new URLFilters(job);
214         
215         this.sha1 = job.getBoolean("wax.digest.sha1", false);
216     }
217     
218     private class ImportArcsReporter implements Reporter {
219         private final Reporter wrappedReporter;
220         private long nextUpdate = 0;
221         private long time = System.currentTimeMillis();
222 
223         private static final long FIVE_MINUTES = 1000 * 60 * 5;
224         
225         public ImportArcsReporter(final Reporter r) {
226             this.wrappedReporter = r;
227         }
228         
229         public void setStatus(final String msg) throws IOException {
230             setStatus(msg, false);
231         }
232         
233         public void setStatus(final String msg, final boolean writeThrough)
234         throws IOException {
235             LOG.info(msg);
236             // Only update tasktracker every second -- not for every record.
237             long now = System.currentTimeMillis();
238             if (writeThrough || now > this.nextUpdate) {
239                 this.wrappedReporter.setStatus(msg);
240                 this.nextUpdate = now + 1000;
241                 this.time = now;
242             }
243         }
244         
245         /***
246          * Update reporter if its a long time since last log only.
247          * @param msg Message to report IF we haven't reported in a long time.
248          * @throws IOException
249          */
250         public void setStatusIfElapse(final String msg)
251         throws IOException {
252             long now = System.currentTimeMillis();
253             if ((now - this.time) > FIVE_MINUTES) {
254                 setStatus(msg);
255             }
256         }
257 
258         public void progress() throws IOException {
259             this.wrappedReporter.progress();
260         }
261     };
262 
263     public void map(final WritableComparable key, final Writable value,
264             final OutputCollector output, final Reporter reporter)
265             throws IOException {
266         final ImportArcsReporter importArcsReporter =
267             new ImportArcsReporter(reporter);
268         final String arcurl = value.toString();
269         if ((arcurl == null) || arcurl.endsWith("work")) {
270             importArcsReporter.setStatus("skipping " + arcurl);
271             return;
272         }
273 
274         // Set off indexing in a thread so I can cover it with a timer.
275         final Thread t = new IndexingThread(arcurl, output, importArcsReporter);
276         t.setDaemon(true);
277         t.start();
278         final long start = System.currentTimeMillis();
279         try {
280             for (long period = this.maxtime; t.isAlive() && (period > 0);
281                 period = this.maxtime - (System.currentTimeMillis() - start)) {
282                 try {
283                     t.join(period);
284                 } catch (final InterruptedException e) {
285                     e.printStackTrace();
286                 }
287             }
288         } finally {
289             cleanup(t, importArcsReporter);
290         }
291     }
292 
293     protected void cleanup(final Thread t, final Reporter reporter)
294             throws IOException {
295         if (!t.isAlive()) {
296             return;
297         }
298         reporter.setStatus("Killing indexing thread " + t.getName());
299         t.interrupt();
300         try {
301             // Give it some time to die.
302             t.join(1000);
303         } catch (final InterruptedException e) {
304             e.printStackTrace();
305         }
306         if (t.isAlive()) {
307             LOG.info(t.getName() + " will not die");
308         }
309     }
310 
311     private class IndexingThread extends Thread {
312         private final String arcLocation;
313         private final OutputCollector output;
314         private final ImportArcsReporter reporter;
315 
316         public IndexingThread(final String arcloc, final OutputCollector o,
317                 final ImportArcsReporter r) {
318             // Name this thread same as ARC location.
319             super(arcloc);
320             this.arcLocation = arcloc;
321             this.output = o;
322             this.reporter = r;
323         }
324 
325         public void run() {
326             ArchiveReader arc = null;
327             // Need a thread that will keep updating TaskTracker during long
328             // downloads else tasktracker will kill us.
329             Thread reportingDuringDownload = null;
330             try {
331                 this.reporter.setStatus("opening " + this.arcLocation, true);
332                 reportingDuringDownload = new Thread("reportingDuringDownload") {
333                     public void run() {
334                         while (!this.isInterrupted()) {
335                             try {
336                                 synchronized (this) {
337                                     sleep(1000 * 60); // Sleep a minute.
338                                 }
339                                 reporter.setStatus("downloading " +
340                                     arcLocation);
341                             } catch (final IOException e) {
342                                 e.printStackTrace();
343                                 // No point hanging around if we're failing
344                                 // status.
345                                 break;
346                             } catch (final InterruptedException e) {
347                                 // Interrupt flag is cleared. Just fall out.
348                                 break;
349                             }
350                         }
351                     }
352                 };
353                 reportingDuringDownload.setDaemon(true);
354                 reportingDuringDownload.start();
355                 arc = ArchiveReaderFactory.get(this.arcLocation);
356             } catch (final Throwable e) {
357                 try {
358                     final String msg = "Error opening " + this.arcLocation
359                             + ": " + e.toString();
360                     this.reporter.setStatus(msg, true);
361                     LOG.info(msg);
362                 } catch (final IOException ioe) {
363                     LOG.warn(this.arcLocation, ioe);
364                 }
365                 return;
366             } finally {
367                 if ((reportingDuringDownload != null)
368                         && reportingDuringDownload.isAlive()) {
369                     reportingDuringDownload.interrupt();
370                 }
371             }
372 
373             arc.setDigest(sha1);
374             String arcName = null;
375             try {
376                 // If empty collection name, take arc prefix later below.
377                 ImportArcs.this.collectionName = getConf().
378                     get(ImportArcs.WAX_SUFFIX + ImportArcs.ARCCOLLECTION_KEY);
379 
380                 final ParseUtil pu = new ParseUtil(getConf());
381                 // Iterate over each ARCRecord.
382                 for (final Iterator i = arc.iterator();
383                         i.hasNext() && !currentThread().isInterrupted();) {
384                     final ARCRecord rec = (ARCRecord) i.next();
385                     // First entry has arc name, usually.
386                     if (arcName == null) {
387                         
388                         arcName = trimARCName(rec.getMetaData().getUrl());
389                         if ((ImportArcs.this.collectionName == null) ||
390                                 (ImportArcs.this.collectionName.length() <= 0)) {
391                             ImportArcs.this.collectionName =
392                                 getCollectionFromArcname(arcName);
393                         }
394                         if ((ImportArcs.this.collectionName == null) ||
395                                 (ImportArcs.this.collectionName.length() == 0)) {
396                             throw new NullPointerException("Collection name can't "
397                                     + "be empty");
398                         }
399                     }
400                     if (!isIndex(rec)) {
401                         continue;
402                     }
403                     try {
404                         final long recordLength = processRecord(arcName, rec,
405                             this.output, this.reporter, pu);
406                         if (recordLength >
407                                 ARCConstants.DEFAULT_MAX_ARC_FILE_SIZE) {
408                             // Now, if the content length is larger than a
409                             // standard ARC, then it is most likely the last
410                             // record in the ARC because ARC is closed after we
411                             // exceed 100MB (DEFAULT_MAX_ARC...). Calling
412                             // hasNext above will make us read through the
413                             // whole record, even if its a 1.7G video. On a
414                             // loaded machine, this might cause us timeout with
415                             // tasktracker -- so, just skip out here.
416                             this.reporter.setStatus("skipping "
417                                     + this.arcLocation
418                                     + " -- very long record "
419                                     + rec.getMetaData());
420                             break;
421                         }
422                     } catch (final Throwable e) {
423                         // Failed parse of record. Keep going.
424                         LOG.warn("Error processing " + rec.getMetaData(), e);
425                     }
426                 }
427                 if (currentThread().isInterrupted()) {
428                     LOG.info(currentThread().getName() + " interrupted");
429                 }
430                 this.reporter.setStatus("closing " + this.arcLocation, true);
431             } catch (final Throwable e) {
432                 // Problem parsing arc file.
433                 final String msg = "Error parsing " + this.arcLocation;
434                 try {
435                     this.reporter.setStatus(msg, true);
436                 } catch (final IOException ioe) {
437                     ioe.printStackTrace();
438                 }
439                 LOG.warn(msg, e);
440             } finally {
441                 try {
442                     arc.close();
443                 } catch (final IOException e) {
444                     e.printStackTrace();
445                 }
446             }
447         }
448 
449         /***
450          *  Strip scheme prefix and arc or arc.gz suffix if present.
451          *  First record looks like this:
452          *      filedesc://IAH-20060315203614-00000-debord.arc
453          * Strip extraneous prefix and suffix (At least WERA expects an ARC 
454          * name without scheme and suffix: i.e.
455          * IAH-20060315203614-00000-debord).
456          */
457         private String trimARCName(String arcname) {
458             final Matcher m = ImportArcs.FILEDESC_ARCNAME.matcher(arcname);
459             if ((m != null) && m.matches()) {
460                 arcname = m.group(1);
461             }
462             return arcname;
463         }
464     }
465 
466     /***
467      * @param rec ARC Record to test.
468      * @return True if we are to index this record.
469      */
470     protected boolean isIndex(final ARCRecord rec) {
471         return ((rec.getStatusCode() >= 200) && (rec.getStatusCode() < 300))
472             || (this.indexRedirects && ((rec.getStatusCode() >= 300) &&
473                 (rec.getStatusCode() < 400)));
474     }
475 
476     long processRecord(final String arcName, final ARCRecord rec,
477             final OutputCollector output, final ImportArcsReporter reporter,
478             final ParseUtil parseUtil) throws IOException {
479         final ARCRecordMetaData arcData = rec.getMetaData();
480         
481         // Get URL.
482         String url = arcData.getUrl();
483         String oldUrl = url;
484         try {
485             url = urlNormalizers.normalize(url,
486                 URLNormalizers.SCOPE_FETCHER);
487             url = filters.filter(url); // filter the url
488         } catch (Exception e) {
489         	LOG.warn("Skipping record. Didn't pass normalization/filter " +
490                 oldUrl + ": " + e.toString());
491             return 0;
492         }
493         
494         final long b = arcData.getContentBegin();
495         final long l = arcData.getLength();
496         final long recordLength = (l > b)? (l - b): l;
497 
498         // Look at ARCRecord meta data line mimetype. It can be empty.  If so,
499         // two more chances at figuring it either by looking at HTTP headers or
500         // by looking at first couple of bytes of the file.  See below.
501         String mimetype =
502             getMimetype(arcData.getMimetype(), this.mimeTypes, url);
503         if (skip(mimetype)) {
504             return recordLength;
505         }
506 
507         // Copy http headers to nutch metadata.
508         final Metadata metaData = new Metadata();
509         final Header[] headers = rec.getHttpHeaders();
510         for (int j = 0; j < headers.length; j++) {
511             final Header header = headers[j];
512             if (mimetype == null) {
513                 // Special handling. If mimetype is still null, try getting it
514                 // from the http header. I've seen arc record lines with empty
515                 // content-type and a MIME unparseable file ending; i.e. .MID.
516                 if ((header.getName() != null) &&
517                         header.getName().toLowerCase().equals(
518                             ImportArcs.CONTENT_TYPE_KEY)) {
519                     mimetype = getMimetype(header.getValue(), null, null);
520                     if (skip(mimetype)) {
521                         return recordLength;
522                     }
523                 }
524             }
525             metaData.set(header.getName(), header.getValue());
526         }
527         
528         // This call to reporter setStatus pings the tasktracker telling it our
529         // status and telling the task tracker we're still alive (so it doesn't
530         // time us out).
531         final String noSpacesMimetype =
532             TextUtils.replaceAll(ImportArcs.WHITESPACE,
533                 ((mimetype == null || mimetype.length() <= 0)?
534                         "TODO": mimetype),
535                     "-");
536         final String recordLengthAsStr = Long.toString(recordLength);
537         reporter.setStatus(getStatus(url, oldUrl, recordLengthAsStr,
538             noSpacesMimetype));
539         
540         // This is a nutch 'more' field.
541         metaData.set("contentLength", recordLengthAsStr);
542 
543         rec.skipHttpHeader();
544         reporter.setStatusIfElapse("read headers on " + url);
545 
546         // TODO: Skip if unindexable type.
547         int total = 0;
548         // Read in first block. If mimetype still null, look for MAGIC.
549         int len = rec.read(this.buffer, 0, this.buffer.length);
550         if (mimetype == null) {
551             MimeType mt = this.mimeTypes.getMimeType(this.buffer);
552             if (mt == null || mt.getName() == null) {
553                 LOG.warn("Failed to get mimetype for: " + url);
554                 return recordLength;
555             }
556             mimetype = mt.getName();
557         }
558         metaData.set(ImportArcs.CONTENT_TYPE_KEY, mimetype);
559         
560         // How much do we read total? If pdf, we will read more. If equal to -1,
561         // read all.
562         int readLimit = (ImportArcs.PDF_TYPE.equals(mimetype))?
563             this.pdfContentLimit : this.contentLimit;
564         // Reset our contentBuffer so can reuse.  Over the life of an ARC
565         // processing will grow to maximum record size.
566         this.contentBuffer.reset();
567         while ((len != -1) && ((readLimit == -1) || (total < readLimit))) {
568             total += len;
569             this.contentBuffer.write(this.buffer, 0, len);
570             len = rec.read(this.buffer, 0, this.buffer.length);
571             reporter.setStatusIfElapse("reading " + url);
572         }
573         // Close the Record.  We're done with it.  Side-effect is calculation
574         // of digest -- if we're digesting.
575         rec.close();
576         reporter.setStatusIfElapse("closed " + url);
577         
578         final byte[] contentBytes = this.contentBuffer.toByteArray();
579         final CrawlDatum datum = new CrawlDatum();
580         datum.setStatus(CrawlDatum.STATUS_FETCH_SUCCESS);
581         
582         // Calculate digest or use precalculated sha1.
583         metaData.set(Nutch.SIGNATURE_KEY, (this.sha1)?
584             rec.getDigestStr():
585             MD5Hash.digest(contentBytes).toString());
586         
587         metaData.set(Nutch.SEGMENT_NAME_KEY, this.segmentName);
588         // Score at this stage is 1.0f.
589         metaData.set(Nutch.SCORE_KEY, Float.toString(datum.getScore()));
590 
591         final long startTime = System.currentTimeMillis();
592         final Content content = new Content(url, url, contentBytes, mimetype,
593                 metaData, getConf());
594 
595         datum.setFetchTime(Nutchwax.getDate(arcData.getDate()));
596         
597         MapWritable mw = datum.getMetaData();
598         if (mw == null) { 
599            mw = new MapWritable();
600         }
601         mw.put(new Text(ImportArcs.ARCCOLLECTION_KEY),
602             new Text(collectionName));
603         mw.put(new Text(ImportArcs.ARCFILENAME_KEY), new Text(arcName));
604         mw.put(new Text(ImportArcs.ARCFILEOFFSET_KEY),
605             new Text(Long.toString(arcData.getOffset())));
606         datum.setMetaData(mw);
607 
608         Parse parse = null;
609         ParseStatus parseStatus;
610         try {
611             parse = parseUtil.parse(content);
612             reporter.setStatusIfElapse("parsed " + url);
613             parseStatus = parse.getData().getStatus();
614         } catch (final Exception e) {
615             parseStatus = new ParseStatus(e);
616         }
617         if (!parseStatus.isSuccess()) {
618             final String status = formatToOneLine(parseStatus.toString());
619             LOG.warn("Error parsing: " + mimetype + " " + url + ": " + status);
620             parse = null;
621         } else {
622             // Was it a slow parse?
623             final double kbPerSecond = getParseRate(startTime,
624                     (contentBytes != null) ? contentBytes.length : 0);
625             if (LOG.isDebugEnabled()) {
626                 LOG.debug(getParseRateLogMessage(url,
627                     noSpacesMimetype, kbPerSecond));
628             } else if (kbPerSecond < this.parseThreshold) {
629                 LOG.warn(getParseRateLogMessage(url, noSpacesMimetype,
630                         kbPerSecond));
631             }
632         }
633         Writable value = new FetcherOutput(datum, null,
634             parse != null ? new ParseImpl(parse) : null);
635         output.collect(Nutchwax.generateWaxKey(url, this.collectionName),
636             value);
637         return recordLength;
638     }
639     
640     protected String getStatus(final String url, String oldUrl,
641         final String recordLengthAsStr, final String noSpacesMimetype) {
642         // If oldUrl is same as url, don't log.  Otherwise, log original so we
643         // can keep url originally imported.
644         if (oldUrl.equals(url)) {
645             oldUrl = "-";
646         }
647         StringBuilder sb = new StringBuilder(128);
648         sb.append("adding ");
649         sb.append(url);
650         sb.append(" ");
651         sb.append(oldUrl);
652         sb.append(" ");
653         sb.append(recordLengthAsStr);
654         sb.append(" ");
655         sb.append(noSpacesMimetype);
656         return sb.toString();
657     }
658     
659     protected String formatToOneLine(final String s) {
660         final StringBuffer sb = new StringBuffer(s.length());
661         for (final StringTokenizer st = new StringTokenizer(s, "\t\n\r");
662                 st.hasMoreTokens(); sb.append(st.nextToken())) {
663             ;
664         }
665         return sb.toString();
666     }
667 
668     protected static String getCollectionFromArcname(final String arcurl)
669             throws URISyntaxException {
670         final URI u = new URI(arcurl);
671         final String p = u.getPath();
672         if (p.length() <= 0) {
673             throw new URISyntaxException("Path is empty.", arcurl);
674         }
675         final int index = p.lastIndexOf('/');
676         String arcname = p;
677         if (index >= 0) {
678             arcname = p.substring(index + 1);
679         }
680         final Matcher m = ImportArcs.ARCHIVEIT_COLLECTION.matcher(arcname);
681         if ((m == null) || !m.matches()) {
682             throw new URISyntaxException("Can't find collection in arcname",
683                     arcname);
684         }
685         return m.group(1);
686     }
687 
688     protected String getParseRateLogMessage(final String url,
689             final String mimetype, final double kbPerSecond) {
690         return url + " " + mimetype + " parse KB/Sec "
691                 + this.numberFormatter.format(kbPerSecond);
692     }
693 
694     protected double getParseRate(final long startTime, final long len) {
695         // Get indexing rate:
696         long elapsedTime = System.currentTimeMillis() - startTime;
697         elapsedTime = (elapsedTime == 0) ? 1 : elapsedTime;
698         return (len != 0) ? ((double) len / 1024)
699                 / ((double) elapsedTime / 1000) : 0;
700     }
701 
702     protected boolean skip(final String mimetype) {
703         boolean decision = false;
704         // Are we to index all content?
705         if (!this.indexAll) {
706             if ((mimetype == null)
707                     || (!mimetype.startsWith(ImportArcs.TEXT_TYPE) && !mimetype
708                             .startsWith(ImportArcs.APPLICATION_TYPE))) {
709                 // Skip any but basic types.
710                 decision = true;
711             }
712         }
713         return decision;
714     }
715     
716     protected String getMimetype(final String mimetype, final MimeTypes mts,
717             final String url) {
718         if (mimetype != null && mimetype.length() > 0) {
719             return checkMimetype(mimetype.toLowerCase());
720         }
721         if (mts != null && url != null) {
722             final MimeType mt = mts.getMimeType(url);
723             if (mt != null) {
724                 return checkMimetype(mt.getName().toLowerCase());
725             }
726         }
727         return null;
728     }
729     
730     protected static String checkMimetype(String mimetype) {
731         if ((mimetype == null) || (mimetype.length() <= 0) ||
732                 mimetype.startsWith(MimetypeUtils.NO_TYPE_MIMETYPE)) {
733             return null;
734         }
735         
736         // Test the mimetype makes sense. If not, clear it.
737         try {
738             new MimeType(mimetype);
739         } catch (final MimeTypeException e) {
740             mimetype = null;
741         }
742         return mimetype;
743     }
744 
745     public void importArcs(final Path arcUrlsDir, final Path segment,
746             final String collection) throws IOException {
747         LOG.info("ImportArcs segment: " + segment + ", src: " + arcUrlsDir);
748 
749         final JobConf job = new JobConf(getConf(), this.getClass());
750 
751         job.set(Nutch.SEGMENT_NAME_KEY, segment.getName());
752 
753         job.setInputPath(arcUrlsDir);
754         job.setMapperClass(ImportArcs.class);
755 
756         job.setOutputPath(segment);
757         job.setOutputFormat(WaxFetcherOutputFormat.class);
758         job.setOutputKeyClass(Text.class);
759         job.setOutputValueClass(FetcherOutput.class);
760         // Pass the collection name out to the tasks IF non-null.
761         if ((collection != null) && (collection.length() > 0)) {
762             job.set(ImportArcs.WAX_SUFFIX + ImportArcs.ARCCOLLECTION_KEY,
763                 collection);
764         }
765         job.setJobName("import " + arcUrlsDir + " " + segment);
766 
767         JobClient.runJob(job);
768         LOG.info("ImportArcs: done");
769     }
770     
771     /***
772      * Override of nutch FetcherOutputFormat so I can substitute my own
773      * ParseOutputFormat, {@link WaxParseOutputFormat}.  While I'm here,
774      * removed content references.  NutchWAX doesn't save content.
775      * @author stack
776      */
777     public static class WaxFetcherOutputFormat extends FetcherOutputFormat {
778         public RecordWriter getRecordWriter(final FileSystem fs,
779                 final JobConf job, final String name, Progressable progress)
780         throws IOException {
781             Path f = new Path(job.getOutputPath(), CrawlDatum.FETCH_DIR_NAME);
782             final Path fetch = new Path(f, name);
783             final MapFile.Writer fetchOut = new MapFile.Writer(fs,
784                 fetch.toString(), Text.class, CrawlDatum.class);
785             
786             return new RecordWriter() {
787                 private RecordWriter parseOut;
788                 {
789                     if (Fetcher.isParsing(job)) {
790                     	// Here is nutchwax change, using WaxParseOutput
791                     	// instead of ParseOutputFormat.
792                         this.parseOut = new WaxParseOutputFormat().
793                             getRecordWriter(fs, job, name, null);
794                     }
795                 }
796 
797                 public void write(WritableComparable key, Writable value)
798                 throws IOException {
799                     FetcherOutput fo = (FetcherOutput)value;
800                     fetchOut.append(key, fo.getCrawlDatum());
801                     if (fo.getParse() != null) {
802                         parseOut.write(key, fo.getParse());
803                     }
804                 }
805 
806                 public void close(Reporter reporter) throws IOException {
807                     fetchOut.close();
808                     if (parseOut != null) {
809                         parseOut.close(reporter);
810                     }
811                 }
812             };
813         }
814     }
815     
816     /***
817      * Copy so I can add collection prefix to produced signature and link
818      * CrawlDatums.
819      * @author stack
820      */
821     public static class WaxParseOutputFormat extends ParseOutputFormat {
822         public final Log LOG = LogFactory.getLog(WaxParseOutputFormat.class);
823         
824         private URLNormalizers urlNormalizers;
825         private URLFilters filters;
826         private ScoringFilters scfilters;
827 
828 
829         public RecordWriter getRecordWriter(FileSystem fs, JobConf job,
830                 String name, Progressable progress)
831         throws IOException {
832         	// Extract collection prefix from key to use later when adding
833         	// signature and link crawldatums.
834         	
835             this.urlNormalizers =
836                 new URLNormalizers(job, URLNormalizers.SCOPE_OUTLINK);
837             this.filters = new URLFilters(job);
838             this.scfilters = new ScoringFilters(job);
839             final float interval =
840                 job.getFloat("db.default.fetch.interval", 30f);
841             final boolean ignoreExternalLinks =
842                 job.getBoolean("db.ignore.external.links", false);
843             final boolean sha1 = job.getBoolean("wax.digest.sha1", false);
844 
845 
846             Path text = new Path(new Path(job.getOutputPath(),
847                     ParseText.DIR_NAME), name);
848             Path data = new Path(new Path(job.getOutputPath(),
849                     ParseData.DIR_NAME), name);
850             Path crawl = new Path(new Path(job.getOutputPath(),
851                     CrawlDatum.PARSE_DIR_NAME), name);
852 
853             final MapFile.Writer textOut = new MapFile.Writer(job, fs,
854                 text.toString(), Text.class, ParseText.class,
855                 CompressionType.RECORD);
856 
857             final MapFile.Writer dataOut = new MapFile.Writer(job, fs,
858             	data.toString(), Text.class, ParseData.class);
859 
860             final SequenceFile.Writer crawlOut = SequenceFile.createWriter(fs,
861                 job, crawl, Text.class, CrawlDatum.class);
862 
863             return new RecordWriter() {
864 
865                 public void write(WritableComparable key, Writable value)
866                         throws IOException {
867                     // Test that I can parse the key before I do anything
868                     // else. If not, write nothing for this record.
869                     String collection = null;
870                     String fromUrl = null;
871                     String fromHost = null;
872                     String toHost = null;
873                     try {
874                 		collection = Nutchwax.getCollectionFromWaxKey(key);
875                 		fromUrl = Nutchwax.getUrlFromWaxKey(key);
876                     } catch (IOException ioe) {
877                     	LOG.warn("Skipping record. Can't parse " + key, ioe);
878                         return;
879                     }
880                     if (fromUrl == null || collection == null) {
881                     	LOG.warn("Skipping record. Null from or collection " +
882                             key);
883                         return;
884                     }
885 
886                     Parse parse = (Parse) value;
887 
888                     textOut.append(key, new ParseText(parse.getText()));
889 
890                     ParseData parseData = parse.getData();
891                     // recover the signature prepared by Fetcher or ParseSegment
892                     String sig = parseData.getContentMeta().get(
893                             Nutch.SIGNATURE_KEY);
894                     if (sig != null) {
895                         byte[] signature = (sha1)?
896                             Base32.decode(sig): StringUtil.fromHexString(sig);
897                         if (signature != null) {
898                             // append a CrawlDatum with a signature
899                             CrawlDatum d = new CrawlDatum(
900                                 CrawlDatum.STATUS_SIGNATURE, 0.0f);
901                             d.setSignature(signature);
902                             crawlOut.append(key, d);
903                         }
904                     }
905 
906                     // collect outlinks for subsequent db update
907                     Outlink[] links = parseData.getOutlinks();
908                     if (ignoreExternalLinks) {
909                         try {
910                             fromHost = new URL(fromUrl).getHost().toLowerCase();
911                         } catch (MalformedURLException e) {
912                             fromHost = null;
913                         }
914                     } else {
915                         fromHost = null;
916                     }
917 
918                     String[] toUrls = new String[links.length];
919                     int validCount = 0;
920                     for (int i = 0; i < links.length; i++) {
921                         String toUrl = links[i].getToUrl();
922                         try {
923                             toUrl = urlNormalizers.normalize(toUrl,
924                                 URLNormalizers.SCOPE_OUTLINK);
925                             toUrl = filters.filter(toUrl); // filter the url
926                         } catch (Exception e) {
927                             toUrl = null;
928                         }
929                         // ignore links to self (or anchors within the page)
930                         if (fromUrl.equals(toUrl)) toUrl = null;
931                         if (toUrl != null)
932                             validCount++;
933                         toUrls[i] = toUrl;
934                     }
935                 	
936                     CrawlDatum adjust = null;
937                     // compute score contributions and adjustment to the
938                     // original score
939                     for (int i = 0; i < toUrls.length; i++) {
940                         if (toUrls[i] == null)
941                             continue;
942                         if (ignoreExternalLinks) {
943                             try {
944                                 toHost = new URL(toUrls[i]).getHost().
945                                     toLowerCase();
946                             } catch (MalformedURLException e) {
947                                 toHost = null;
948                             }
949                             if (toHost == null || !toHost.equals(fromHost)) {
950                                 // external links
951                                 continue; // skip it
952                             }
953                         }
954 
955                         CrawlDatum target = new CrawlDatum(
956                             CrawlDatum.STATUS_LINKED, interval);
957                         Text fromURLUTF8 = new Text(fromUrl);
958                         Text targetUrl = new Text(toUrls[i]);
959                         adjust = null;
960                         try {
961                             // Scoring now expects first two arguments to be
962                             // URLs (More reason to do our own scoring).
963                             // St.Ack
964                             adjust = scfilters.distributeScoreToOutlink(
965                                 fromURLUTF8, targetUrl, parseData,
966                                 target, null, links.length, validCount);
967                         } catch (ScoringFilterException e) {
968                             if (LOG.isWarnEnabled()) {
969                                 LOG.warn("Cannot distribute score from " + key
970                                         + " to " + target + " - skipped ("
971                                         + e.getMessage());
972                             }
973                             continue;
974                         }
975                         Text targetKey =
976                             Nutchwax.generateWaxKey(targetUrl, collection);
977                         crawlOut.append(targetKey, target);
978                         if (adjust != null)
979                             crawlOut.append(key, adjust);
980                     }
981                     dataOut.append(key, parseData);
982                 }
983 
984                 public void close(Reporter reporter) throws IOException {
985                     textOut.close();
986                     dataOut.close();
987                     crawlOut.close();
988                 }
989             };
990         }
991     }
992 
993     public void close() {
994         // Nothing to close.
995     }
996     
997     public static void doImportUsage(final String message,
998     		final int exitCode) {
999         if (message != null && message.length() > 0) {
1000             System.out.println(message);
1001         }
1002         System.out.println("Usage: hadoop jar nutchwax.jar import <input>" +
1003         	" <output> <collection>");
1004         System.out.println("Arguments:");
1005         System.out.println(" input       Directory of files" +
1006         	" listing ARC URLs to import.");
1007         System.out.println(" output      Directory to import to. Inport is " +
1008         	"written to a subdir named");
1009         System.out.println("             for current date under " +
1010         		"'<output>/segments/'.");
1011         System.out.println(" collection  Collection name. Added to" +
1012             " each resource.");
1013         System.exit(exitCode);
1014     }
1015     
1016     public static void main(String[] args) throws Exception {
1017         int res = new ImportArcs().
1018         	doMain(NutchwaxConfiguration.getConfiguration(), args);
1019         System.exit(res);
1020     }
1021 
1022     public int run(final String[] args) throws Exception {
1023     	if (args.length != 3) {
1024             doImportUsage("ERROR: Wrong number of arguments passed.", 2);
1025         }
1026         // Assume list of ARC urls is first arg and output dir the second.
1027         try {
1028         	importArcs(new Path(args[0]), new Path(args[1]), args[2]);
1029         	return 0;
1030         } catch(Exception e) {
1031             LOG.fatal("ImportARCs: " + StringUtils.stringifyException(e));
1032         	return -1;
1033         }
1034     }
1035 }