1   /* Nutchwax
2    * 
3    * $Id: Nutchwax.java 1433 2007-01-16 18:08:41Z stack-sf $
4    * 
5    * Created on Feb 14, 2006
6    *
7    * Copyright (C) 2006 Internet Archive.
8    * 
9    * This file is part of the Heritrix web crawler (crawler.archive.org).
10   * 
11   * Heritrix is free software; you can redistribute it and/or modify
12   * it under the terms of the GNU Lesser Public License as published by
13   * the Free Software Foundation; either version 2.1 of the License, or
14   * any later version.
15   * 
16   * Heritrix is distributed in the hope that it will be useful, 
17   * but WITHOUT ANY WARRANTY; without even the implied warranty of
18   * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
19   * GNU Lesser Public License for more details.
20   * 
21   * You should have received a copy of the GNU Lesser Public License
22   * along with Heritrix; if not, write to the Free Software
23   * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
24   */
25  package org.archive.access.nutch;
26  
27  import java.io.FileNotFoundException;
28  import java.io.IOException;
29  import java.lang.reflect.InvocationTargetException;
30  import java.lang.reflect.Method;
31  import java.util.ArrayList;
32  import java.util.Arrays;
33  import java.util.List;
34  import java.util.regex.Matcher;
35  import java.util.regex.Pattern;
36  
37  import org.apache.commons.logging.Log;
38  import org.apache.commons.logging.LogFactory;
39  import org.apache.hadoop.fs.FileSystem;
40  import org.apache.hadoop.fs.Path;
41  import org.apache.hadoop.io.Text;
42  import org.apache.hadoop.io.WritableComparable;
43  import org.apache.hadoop.mapred.JobConf;
44  import org.apache.nutch.crawl.CrawlDb;
45  import org.apache.nutch.crawl.Generator;
46  import org.apache.nutch.indexer.DeleteDuplicates;
47  import org.apache.nutch.indexer.IndexMerger;
48  import org.archive.util.ArchiveUtils;
49  
50  /***
51   * Script to run all indexing jobs from index through merge of final index.
52   */
53  public class Nutchwax {
54      public static final Log LOG =
55          LogFactory.getLog(Nutchwax.class.getName());
56      
57      private static final String KEY_COLLECTION_PREFIX = "c=";
58      private static final String KEY_COLLECTION_SUFFIX = ",u=";
59      private static final Pattern COLLECTION =
60          Pattern.compile("^//s*c=([^,]+),u=(.*)//s*", Pattern.DOTALL);
61  
62      private final static List JOBS = Arrays.asList(new String[] {
63          "import", "update", "invert", "index", "dedup", "merge", "all",
64          "class"});
65  
66      // Lazy initialize these two variables to delay complaint about hadoop not
67      // being present -- if its not.  Meantime I get command-line processing
68      // done.
69      private FileSystem fs = null;
70      private JobConf conf = null;
71      
72      /***
73       * Default constructor.
74       * @throws IOException 
75       */
76      public Nutchwax() throws IOException {
77          super();
78      }
79      
80      public synchronized JobConf getJobConf() {
81          if (this.conf == null) {
82              this.conf = new JobConf(NutchwaxConfiguration.getConfiguration());
83          }
84          return this.conf;
85      }
86      
87      public synchronized FileSystem getFS() throws IOException {
88          if (this.fs == null) {
89              this.fs = FileSystem.get(getJobConf());
90          }
91          return this.fs;
92      }
93      
94      protected class OutputDirectories {
95          private final Path output;
96          private final Path crawlDb;
97          private final Path linkDb;
98          private final Path segments;
99          private final Path indexes;
100         private final Path index;
101         private final Path tmpDir;
102 
103         public OutputDirectories(final Path output) throws IOException {
104             this.output = output;
105             this.crawlDb = new Path(output + "/crawldb");
106             this.linkDb = new Path(output + "/linkdb");
107             this.segments = new Path(output + "/segments");
108             this.indexes = new Path(output + "/indexes");
109             this.index = new Path(output + "/index");
110             this.tmpDir = getJobConf().getLocalPath("mapred.temp.dir",
111                 Generator.generateSegmentName());
112         }
113 
114         public Path getCrawlDb() {
115             return crawlDb;
116         }
117 
118         public Path getIndexes() {
119             return indexes;
120         }
121 
122         public Path getLinkDb() {
123             return linkDb;
124         }
125 
126         public Path getSegments() {
127             return segments;
128         }
129 
130         public Path getTmpDir() {
131             return tmpDir;
132         }
133 
134         public Path getIndex() {
135             return index;
136         }
137 
138         public Path getOutput() {
139             return output;
140         }
141     }
142 
143     /***
144      * Run passed list of mapreduce indexing jobs. Jobs are always run in
145      * order: import, update, etc.
146      * 
147      * @throws Exception
148      */
149     protected void doAll(final Path input, final String collectionName,
150             final OutputDirectories od)
151     throws Exception {
152         doImport(input, collectionName, od);
153         doUpdate(od);
154         doInvert(od);
155         doIndexing(od);
156         doDedup(od);
157         doMerge(od);
158         LOG.info("Nutchwax finished.");
159     }
160     
161     protected void doImport(final Path input, final String collectionName,
162             final OutputDirectories od)
163     throws IOException {
164         Path segment = new Path(od.getSegments(),
165             Generator.generateSegmentName() +
166                 ((collectionName == null || collectionName.length() <= 0)?
167                         "": "-" + collectionName));
168         new ImportArcs(getJobConf()).importArcs(input, segment,
169             collectionName);
170     }
171     
172     protected void doUpdate(final OutputDirectories od)
173     throws IOException {
174         doUpdate(od, null);
175     }
176     
177     protected void doUpdate(final OutputDirectories od,
178             final String[] segments)
179     throws IOException {
180         LOG.info("updating crawldb " + od.getCrawlDb());
181         // Need to make sure the db dir exists before progressing.
182         Path dbPath = new Path(od.getCrawlDb(), CrawlDb.CURRENT_NAME);
183         if (!getFS().exists(dbPath)) {
184             getFS().mkdirs(dbPath);
185         }
186         CrawlDb cdb = new NutchwaxCrawlDb(getJobConf());
187         if (segments != null) {
188             List<Path> paths = new ArrayList<Path>(segments.length);
189             for (int i = 0; i < segments.length; i++) {
190                 Path p = new Path(segments[i]);
191                 if (!getFS().exists(p)) {
192                     throw new FileNotFoundException(p.toString());
193                 }
194                 paths.add(p);
195             }
196             cdb.update(od.getCrawlDb(), paths.toArray(new Path[paths.size()]),
197                 true, true);
198         } else {
199             Path[] allSegments = getSegments(od);
200             // This just does the last segment created.
201             cdb.update(od.getCrawlDb(),
202                 new Path[] {allSegments[allSegments.length - 1]}, true, true);
203         }
204     }
205 
206     protected Path [] getSegments(final OutputDirectories od)
207     throws IOException {
208         Path[] allSegments = getFS().listPaths(od.getSegments());
209         if (allSegments == null || allSegments.length <= 0) {
210             throw new FileNotFoundException(od.getSegments().toString());
211         }
212         return allSegments;
213     }
214     
215     protected void doInvert(final OutputDirectories od, final Path [] segments)
216     throws IOException {
217         createLinkdb(od);
218         new NutchwaxLinkDb(getJobConf()).
219         	invert(od.getLinkDb(), segments, true, true);
220     }
221     
222     protected void doInvert(final OutputDirectories od)
223     throws IOException {
224         LOG.info("inverting links in " + od.getSegments());
225         new NutchwaxLinkDb(getJobConf()).
226         	invert(od.getLinkDb(), getSegments(od), true, true);
227     }
228     
229     protected boolean createLinkdb(final OutputDirectories od)
230     throws IOException {
231         boolean result = false;
232         // Make sure the linkdb exists.  Otherwise the install where
233         // the temporary location gets moved to the permanent fails.
234         if (getFS().mkdirs(new Path(od.getLinkDb(),
235                 NutchwaxLinkDb.CURRENT_NAME))) {
236             LOG.info("Created " + od.getLinkDb());
237             result = true;
238         }
239         return result;
240     }
241     
242     protected void doIndexing(final OutputDirectories od)
243     throws IOException {
244         doIndexing(od, getFS().listPaths(od.getSegments()));
245     }
246     
247     protected void doIndexing(final OutputDirectories od,
248         final Path [] segments)
249     throws IOException {
250         LOG.info(" indexing " + segments);
251         new NutchwaxIndexer(getJobConf()).index(od.getIndexes(),
252             od.getCrawlDb(), od.getLinkDb(), segments);
253     }
254     
255     protected void doDedup(final OutputDirectories od) throws IOException {
256         LOG.info("dedup " + od.getIndex());
257         new DeleteDuplicates(getJobConf()).dedup(new Path[] {od.getIndexes()});
258     }
259     
260     protected void doMerge(final OutputDirectories od) throws IOException {
261         LOG.info("index merge " + od.getOutput() + " using tmpDir=" +
262             od.getTmpDir());
263         new IndexMerger(getJobConf()).merge(getFS().listPaths(od.getIndexes()),
264             od.getIndex(), od.getTmpDir());
265     }
266 
267     protected void doClass(final String [] args) {
268         // Redo args so absent our nutchwax 'class' command.
269         final int cmdOffset = 2;
270         final String [] newArgs = new String[args.length - cmdOffset];
271         final String className = args[1];
272         for (int i = 0; i < args.length; i++) {
273             if (i < cmdOffset) {
274                 continue;
275             }
276             newArgs[i - cmdOffset] = args[i];
277         }
278         // From http://www.javaworld.com/javaworld/javaqa/1999-06/01-outside.html
279         Class [] argTypes = new Class[1];
280         argTypes[0] = String[].class;
281         try {
282             Method mainMethod =
283                 Class.forName(className).getDeclaredMethod("main", argTypes);
284             mainMethod.invoke(newArgs, new Object [] {newArgs});
285         } catch (SecurityException e) {
286             throw new RuntimeException(e);
287         } catch (NoSuchMethodException e) {
288             throw new RuntimeException(e);
289         } catch (ClassNotFoundException e) {
290             throw new RuntimeException(e);
291         } catch (IllegalArgumentException e) {
292             throw new RuntimeException(e);
293         } catch (IllegalAccessException e) {
294             throw new RuntimeException(e);
295         } catch (InvocationTargetException e) {
296             throw new RuntimeException(e);
297         }
298     }
299     
300     protected void doJob(final String jobName, final String [] args)
301     throws Exception {
302         if (jobName.equals("import")) {
303             // Usage: hadoop jar nutchwax.jar import input output name
304             if (args.length != 4) {
305                 ImportArcs.doImportUsage(
306                     "ERROR: Wrong number of arguments passed.", 2);
307             }
308             final Path input = new Path(args[1]);
309             final Path output = new Path(args[2]);
310             final String collectionName = args[3];
311             checkArcsDir(input);
312             OutputDirectories od = new OutputDirectories(output);
313             doImport(input, collectionName, od);
314         } else if (jobName.equals("update")) {
315             // Usage: hadoop jar nutchwax.jar update output
316             if (args.length < 2) {
317                 doUpdateUsage("ERROR: Wrong number of arguments passed.", 2);
318             }
319             OutputDirectories od = new OutputDirectories(new Path(args[1]));
320             if (args.length == 2) {
321                 doUpdate(od);
322             } else {
323                 for (int i = 2; i < args.length; i++) {
324                     doUpdate(od, new String [] {args[i]});
325                 }
326             }
327         } else if (jobName.equals("invert")) {
328             // Usage: hadoop jar nutchwax.jar invert output
329             if (args.length < 2) {
330                 doInvertUsage("ERROR: Wrong number of arguments passed.", 2);
331             }
332             OutputDirectories od = new OutputDirectories(new Path(args[1]));
333             if (args.length == 2) {
334                 doInvert(od);
335             } else {
336                 final int offset = 2;
337                 Path [] segments = new Path[args.length - offset];
338                 for (int i = offset; i < args.length; i++) {
339                     Path f = new Path(args[i]);
340                     if (!getFS().exists(f)) {
341                         throw new FileNotFoundException(f.toString());
342                     }
343                     segments[i - offset] = f;
344                 }
345                 doInvert(od, segments);
346             }
347         } else if (jobName.equals("index")) {
348             // Usage: hadoop jar nutchwax.jar index output
349             if (args.length < 2) {
350                 doIndexUsage("ERROR: Wrong number of arguments passed.", 2);
351             }
352             OutputDirectories od = new OutputDirectories(new Path(args[1]));
353             if (args.length == 2) {
354                 doIndexing(od);
355             } else {
356                 final int offset = 2;
357                 Path [] segments = new Path[args.length - offset];
358                 for (int i = offset; i < args.length; i++) {
359                     Path f = new Path(args[i]);
360                     if (!getFS().exists(f)) {
361                         throw new FileNotFoundException(f.toString());
362                     }
363                     segments[i - offset] = f;
364                 }
365                 doIndexing(od, segments);
366             }
367         } else if (jobName.equals("dedup")) {
368             // Usage: hadoop jar nutchwax.jar dedup output
369             if (args.length != 2) {
370                 doDedupUsage("Wrong number of arguments passed.", 2);
371             }
372             doDedup(new OutputDirectories(new Path(args[1])));
373         } else if (jobName.equals("merge")) {
374             // Usage: hadoop jar nutchwax.jar merge output");
375             if (args.length != 2) {
376                 doMergeUsage("ERROR: Wrong number of arguments passed.", 2);
377             }
378             doMerge(new OutputDirectories(new Path(args[1])));
379         } else if (jobName.equals("all")) {
380             // Usage: hadoop jar nutchwax.jar import input output name
381             if (args.length != 4) {
382                 doAllUsage("ERROR: Wrong number of arguments passed.", 2);
383             }
384             final Path input = new Path(args[1]);
385             final Path output = new Path(args[2]);
386             final String collectionName = args[3];
387             checkArcsDir(input);
388             OutputDirectories od = new OutputDirectories(output);
389             doAll(input, collectionName, od);
390         } else if (jobName.equals("class")) {
391             if (args.length < 2) {
392                 doClassUsage("ERROR: Wrong number of arguments passed.", 2);
393             }
394             doClass(args);
395         } else {
396             usage("ERROR: No handler for job name " + jobName, 4);
397             System.exit(0);
398         }
399     }
400 
401     /***
402      * Check the arcs dir exists and looks like it has files that list ARCs
403      * (rather than ARCs themselves).
404      * 
405      * @param arcsDir Directory to examine.
406      * @throws IOException
407      */
408     protected void checkArcsDir(final Path arcsDir)
409             throws IOException {
410         if (!getFS().exists(arcsDir)) {
411             throw new IOException(arcsDir + " does not exist.");
412         }
413         if (!fs.isDirectory(arcsDir)) {
414             throw new IOException(arcsDir + " is not a directory.");
415         }
416 
417         final Path [] files = getFS().listPaths(arcsDir);
418         for (int i = 0; i < files.length; i++) {
419             if (!getFS().isFile(files[i])) {
420                 throw new IOException(files[i] + " is not a file.");
421             }
422             if (files[i].getName().toLowerCase().endsWith(".arc.gz")) {
423                 throw new IOException(files[i] + " is an ARC file (ARCSDIR " +
424                     "should contain text file listing ARCs rather than " +
425                     "actual ARCs).");
426             }
427         }
428     }
429     
430     public static Text generateWaxKey(WritableComparable key,
431             final String collection) {
432         return generateWaxKey(key.toString(), collection);
433     }
434     
435     public static Text generateWaxKey(final String keyStr,
436             final String collection) {
437         if (collection == null) {
438             throw new NullPointerException("Collection is null for " + keyStr);
439         }
440         if (keyStr == null) {
441             throw new NullPointerException("keyStr is null");
442         }
443         if (keyStr.startsWith(KEY_COLLECTION_PREFIX)) {
444             LOG.warn("Key already has collection prefix: " + keyStr
445                     + ". Skipping.");
446             return new Text(keyStr);
447         }
448         
449         return new Text(KEY_COLLECTION_PREFIX + collection.trim() +
450             KEY_COLLECTION_SUFFIX + keyStr.trim());
451     }
452     
453     public static String getCollectionFromWaxKey(final WritableComparable key)
454     throws IOException {
455         Matcher m = COLLECTION.matcher(key.toString());
456         if (m == null || !m.matches()) {
457             throw new IOException("Key doesn't have collection " +
458                     "prefix <" + key.toString() + ">");
459         }
460         return m.group(1);
461     }
462     
463     public static String getUrlFromWaxKey(final WritableComparable key)
464     throws IOException {
465         Matcher m = COLLECTION.matcher(key.toString());
466         if (m == null || !m.matches()) {
467             throw new IOException("Key doesn't have collection " +
468                     " prefix: " + key);
469         }
470         return m.group(2);
471     }
472     
473     public static long getDate(String d)
474     throws IOException {
475         long date = 0;
476         try {
477             date = ArchiveUtils.getDate(d).getTime();
478         } catch (final java.text.ParseException e) {
479             throw new IOException("Failed parse of date: " + d + ": " +
480                 e.getMessage());
481         }
482         // Date can be < 0 if pre-1970 (Seen in some old ARCs).
483         return date >= 0? date: 0;
484     }
485 
486     public static void usage(final String message, final int exitCode) {
487         if (message != null && message.length() > 0) {
488             System.out.println(message);
489         }
490 
491         System.out.println("Usage: hadoop jar nutchwax.jar <job> [args]");
492         System.out.println("Launch NutchWAX job(s) on a hadoop platform.");
493         System.out.println("Type 'hadoop jar nutchwax.jar help <job>' for" +
494             " help on a specific job.");
495         System.out.println("Jobs (usually) must be run in the order " +
496             "listed below.");
497         System.out.println("Available jobs:");
498         System.out.println(" import  Import ARCs.");
499         System.out.println(" update  Update dbs with recent imports.");
500         System.out.println(" invert  Invert links.");
501         System.out.println(" index   Index segments.");
502         System.out.println(" dedup   Deduplicate by URL or content MD5.");
503         System.out.println(" merge   Merge segment indices into one.");
504         System.out.println(" all     Runs all above jobs in order.");
505         System.out.println(" class   Run the passed class's main.");
506         
507         System.exit(exitCode);
508     }
509     
510     public static void doUpdateUsage(final String message,
511             final int exitCode) {
512         if (message != null && message.length() > 0) {
513             System.out.println(message);
514         }
515         System.out.println("Usage: hadoop jar nutchwax.jar update <output> " +
516                 "[<segments>...]");
517         System.out.println("Arguments:");
518         System.out.println(" output    Directory to write crawldb under.");
519         System.out.println("Options:");
520         System.out.println(" segments  List of segments to update crawldb " +
521                 "with. If none supplied, updates");
522         System.out.println("            using latest segment found.");
523         System.exit(exitCode);
524     }
525     
526     public static void doInvertUsage(final String message,
527             final int exitCode) {
528         if (message != null && message.length() > 0) {
529             System.out.println(message);
530         }
531         System.out.println("Usage: hadoop jar nutchwax.jar invert <output> " +
532             "[<segments>...]");
533         System.out.println("Arguments:");
534         System.out.println(" output    Directory to write linkdb under.");
535         System.out.println("Options:");
536         System.out.println(" segments  List of segments to update linkdb " +
537             "with. If none supplied, all under");
538         System.out.println("           '<output>/segments/' " +
539             "are passed.");
540         System.exit(exitCode);
541     }
542     
543     public static void doIndexUsage(final String message,
544             final int exitCode) {
545         if (message != null && message.length() > 0) {
546             System.out.println(message);
547         }
548         System.out.println("Usage: hadoop jar nutchwax.jar index <output> " +
549             "[<segments>...]");
550         System.out.println("Arguments:");
551         System.out.println(" output    Directory to write indexes under.");
552         System.out.println("Options:");
553         System.out.println(" segments  List of segments to index. " +
554             "If none supplied, all under");
555         System.out.println("           '<output>/segments/' " +
556             "are indexed.");
557         System.exit(exitCode);
558     }
559     
560     public static void doDedupUsage(final String message,
561             final int exitCode) {
562         if (message != null && message.length() > 0) {
563             System.out.println(message);
564         }
565         System.out.println("Usage: hadoop jar nutchwax.jar dedup <output>");
566         System.out.println("Arguments:");
567         System.out.println(" output  Directory in which indices" +
568             " to dedup reside.");
569         System.exit(exitCode);
570     }
571     
572     public static void doMergeUsage(final String message,
573             final int exitCode) {
574         if (message != null && message.length() > 0) {
575             System.out.println(message);
576         }
577         System.out.println("Usage: hadoop jar nutchwax.jar merge <output>");
578         System.out.println("Arguments:");
579         System.out.println(" output  Directory in which indices" +
580             " to merge reside.");
581         System.exit(exitCode);
582     }
583     
584     public static void doAllUsage(final String message,
585             final int exitCode) {
586         if (message != null && message.length() > 0) {
587             System.out.println(message);
588         }
589         System.out.println("Usage: hadoop jar nutchwax.jar all <input> " +
590             "<output> <collection>");
591         System.out.println("Arguments:");
592         System.out.println(" input       Directory of files" +
593             " listing ARC URLs to import.");
594         System.out.println(" output      Directory write indexing product to.");
595         System.out.println(" collection  Collection name. Added to" +
596             " each resource.");
597         System.exit(exitCode);
598     }
599     
600     public static void doClassUsage(final String message,
601             final int exitCode) {
602         if (message != null && message.length() > 0) {
603             System.out.println(message);
604         }
605         System.out.println("Usage: hadoop jar nutchwax.jar class CLASS ...");
606         System.out.println("Arguments:");
607         System.out.println(" CLASS    Name of class to run. Invokes main " +
608             "passing command-line arguments.");
609         System.out.println("          For example, use to run nutch " +
610             "commands. Below is list of command");
611         System.out.println("          name and implementing class. " +
612                 "Pass name of class only and emits usage.");
613         System.out.println();
614         System.out.println("          readdb      " +
615             "org.apache.nutch.crawl.CrawlDbReader");
616         System.out.println("          mergedb     " +
617             "org.apache.nutch.crawl.CrawlDbMerger");
618         System.out.println("          readlinkdb  " +
619             "org.apache.nutch.crawl.LinkDbReader");
620         System.out.println("          segread     " +
621             "org.apache.nutch.segment.SegmentReader");
622         System.out.println("          mergesegs   " +
623             "org.apache.nutch.segment.SegmentMerger");
624         System.out.println("          mergelinkdb " +
625             "org.apache.nutch.crawl.LinkDbMerger");
626         System.exit(exitCode);
627     }
628 
629     static void doJobHelp(final String jobName) {
630         if (!JOBS.contains(jobName)) {
631             usage("ERROR: Unknown job " + jobName, 1);
632         }
633         if (jobName.equals("import")) {
634             ImportArcs.doImportUsage(null, 1);
635         } else if (jobName.equals("update")) {
636             doUpdateUsage(null, 1);
637         } else if (jobName.equals("invert")) {
638             doInvertUsage(null, 1);
639         } else if (jobName.equals("index")) {
640             doIndexUsage(null, 1);
641         } else if (jobName.equals("dedup")) {
642             doDedupUsage(null, 1);
643         } else if (jobName.equals("merge")) {
644             doMergeUsage(null, 1);
645         } else if (jobName.equals("all")) {
646             doAllUsage(null, 1);
647         } else if (jobName.equals("class")) {
648             doClassUsage(null, 1);
649         } else {
650             usage("ERROR: No help for job name " + jobName, 4);
651         }
652     }
653 
654     public static void main(String args[]) throws Exception {
655         if (args.length < 1) {
656             usage(null, 0);
657             return;
658         }
659 
660         if (args[0].toLowerCase().equals("help")) {
661             if (args.length == 1) {
662                 usage("ERROR: Add command you need help on.", 0);
663                 return;
664             }
665             doJobHelp(args[1].toLowerCase());
666         }
667         
668         final String jobName = args[0].toLowerCase();
669         if (!JOBS.contains(jobName)) {
670             usage("ERROR: Unknown <job> " + jobName, 1);
671         }
672         
673         Nutchwax ia = new Nutchwax();
674         ia.doJob(jobName, args);
675     }
676 }