1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25 package org.archive.access.nutch;
26
27 import java.io.FileNotFoundException;
28 import java.io.IOException;
29 import java.lang.reflect.InvocationTargetException;
30 import java.lang.reflect.Method;
31 import java.util.ArrayList;
32 import java.util.Arrays;
33 import java.util.List;
34 import java.util.regex.Matcher;
35 import java.util.regex.Pattern;
36
37 import org.apache.commons.logging.Log;
38 import org.apache.commons.logging.LogFactory;
39 import org.apache.hadoop.fs.FileSystem;
40 import org.apache.hadoop.fs.Path;
41 import org.apache.hadoop.io.Text;
42 import org.apache.hadoop.io.WritableComparable;
43 import org.apache.hadoop.mapred.JobConf;
44 import org.apache.nutch.crawl.CrawlDb;
45 import org.apache.nutch.crawl.Generator;
46 import org.apache.nutch.indexer.DeleteDuplicates;
47 import org.apache.nutch.indexer.IndexMerger;
48 import org.archive.util.ArchiveUtils;
49
50 /***
51 * Script to run all indexing jobs from index through merge of final index.
52 */
53 public class Nutchwax {
54 public static final Log LOG =
55 LogFactory.getLog(Nutchwax.class.getName());
56
57 private static final String KEY_COLLECTION_PREFIX = "c=";
58 private static final String KEY_COLLECTION_SUFFIX = ",u=";
59 private static final Pattern COLLECTION =
60 Pattern.compile("^//s*c=([^,]+),u=(.*)//s*", Pattern.DOTALL);
61
62 private final static List JOBS = Arrays.asList(new String[] {
63 "import", "update", "invert", "index", "dedup", "merge", "all",
64 "class"});
65
66
67
68
69 private FileSystem fs = null;
70 private JobConf conf = null;
71
72 /***
73 * Default constructor.
74 * @throws IOException
75 */
76 public Nutchwax() throws IOException {
77 super();
78 }
79
80 public synchronized JobConf getJobConf() {
81 if (this.conf == null) {
82 this.conf = new JobConf(NutchwaxConfiguration.getConfiguration());
83 }
84 return this.conf;
85 }
86
87 public synchronized FileSystem getFS() throws IOException {
88 if (this.fs == null) {
89 this.fs = FileSystem.get(getJobConf());
90 }
91 return this.fs;
92 }
93
94 protected class OutputDirectories {
95 private final Path output;
96 private final Path crawlDb;
97 private final Path linkDb;
98 private final Path segments;
99 private final Path indexes;
100 private final Path index;
101 private final Path tmpDir;
102
103 public OutputDirectories(final Path output) throws IOException {
104 this.output = output;
105 this.crawlDb = new Path(output + "/crawldb");
106 this.linkDb = new Path(output + "/linkdb");
107 this.segments = new Path(output + "/segments");
108 this.indexes = new Path(output + "/indexes");
109 this.index = new Path(output + "/index");
110 this.tmpDir = getJobConf().getLocalPath("mapred.temp.dir",
111 Generator.generateSegmentName());
112 }
113
114 public Path getCrawlDb() {
115 return crawlDb;
116 }
117
118 public Path getIndexes() {
119 return indexes;
120 }
121
122 public Path getLinkDb() {
123 return linkDb;
124 }
125
126 public Path getSegments() {
127 return segments;
128 }
129
130 public Path getTmpDir() {
131 return tmpDir;
132 }
133
134 public Path getIndex() {
135 return index;
136 }
137
138 public Path getOutput() {
139 return output;
140 }
141 }
142
143 /***
144 * Run passed list of mapreduce indexing jobs. Jobs are always run in
145 * order: import, update, etc.
146 *
147 * @throws Exception
148 */
149 protected void doAll(final Path input, final String collectionName,
150 final OutputDirectories od)
151 throws Exception {
152 doImport(input, collectionName, od);
153 doUpdate(od);
154 doInvert(od);
155 doIndexing(od);
156 doDedup(od);
157 doMerge(od);
158 LOG.info("Nutchwax finished.");
159 }
160
161 protected void doImport(final Path input, final String collectionName,
162 final OutputDirectories od)
163 throws IOException {
164 Path segment = new Path(od.getSegments(),
165 Generator.generateSegmentName() +
166 ((collectionName == null || collectionName.length() <= 0)?
167 "": "-" + collectionName));
168 new ImportArcs(getJobConf()).importArcs(input, segment,
169 collectionName);
170 }
171
172 protected void doUpdate(final OutputDirectories od)
173 throws IOException {
174 doUpdate(od, null);
175 }
176
177 protected void doUpdate(final OutputDirectories od,
178 final String[] segments)
179 throws IOException {
180 LOG.info("updating crawldb " + od.getCrawlDb());
181
182 Path dbPath = new Path(od.getCrawlDb(), CrawlDb.CURRENT_NAME);
183 if (!getFS().exists(dbPath)) {
184 getFS().mkdirs(dbPath);
185 }
186 CrawlDb cdb = new NutchwaxCrawlDb(getJobConf());
187 if (segments != null) {
188 List<Path> paths = new ArrayList<Path>(segments.length);
189 for (int i = 0; i < segments.length; i++) {
190 Path p = new Path(segments[i]);
191 if (!getFS().exists(p)) {
192 throw new FileNotFoundException(p.toString());
193 }
194 paths.add(p);
195 }
196 cdb.update(od.getCrawlDb(), paths.toArray(new Path[paths.size()]),
197 true, true);
198 } else {
199 Path[] allSegments = getSegments(od);
200
201 cdb.update(od.getCrawlDb(),
202 new Path[] {allSegments[allSegments.length - 1]}, true, true);
203 }
204 }
205
206 protected Path [] getSegments(final OutputDirectories od)
207 throws IOException {
208 Path[] allSegments = getFS().listPaths(od.getSegments());
209 if (allSegments == null || allSegments.length <= 0) {
210 throw new FileNotFoundException(od.getSegments().toString());
211 }
212 return allSegments;
213 }
214
215 protected void doInvert(final OutputDirectories od, final Path [] segments)
216 throws IOException {
217 createLinkdb(od);
218 new NutchwaxLinkDb(getJobConf()).
219 invert(od.getLinkDb(), segments, true, true);
220 }
221
222 protected void doInvert(final OutputDirectories od)
223 throws IOException {
224 LOG.info("inverting links in " + od.getSegments());
225 new NutchwaxLinkDb(getJobConf()).
226 invert(od.getLinkDb(), getSegments(od), true, true);
227 }
228
229 protected boolean createLinkdb(final OutputDirectories od)
230 throws IOException {
231 boolean result = false;
232
233
234 if (getFS().mkdirs(new Path(od.getLinkDb(),
235 NutchwaxLinkDb.CURRENT_NAME))) {
236 LOG.info("Created " + od.getLinkDb());
237 result = true;
238 }
239 return result;
240 }
241
242 protected void doIndexing(final OutputDirectories od)
243 throws IOException {
244 doIndexing(od, getFS().listPaths(od.getSegments()));
245 }
246
247 protected void doIndexing(final OutputDirectories od,
248 final Path [] segments)
249 throws IOException {
250 LOG.info(" indexing " + segments);
251 new NutchwaxIndexer(getJobConf()).index(od.getIndexes(),
252 od.getCrawlDb(), od.getLinkDb(), segments);
253 }
254
255 protected void doDedup(final OutputDirectories od) throws IOException {
256 LOG.info("dedup " + od.getIndex());
257 new DeleteDuplicates(getJobConf()).dedup(new Path[] {od.getIndexes()});
258 }
259
260 protected void doMerge(final OutputDirectories od) throws IOException {
261 LOG.info("index merge " + od.getOutput() + " using tmpDir=" +
262 od.getTmpDir());
263 new IndexMerger(getJobConf()).merge(getFS().listPaths(od.getIndexes()),
264 od.getIndex(), od.getTmpDir());
265 }
266
267 protected void doClass(final String [] args) {
268
269 final int cmdOffset = 2;
270 final String [] newArgs = new String[args.length - cmdOffset];
271 final String className = args[1];
272 for (int i = 0; i < args.length; i++) {
273 if (i < cmdOffset) {
274 continue;
275 }
276 newArgs[i - cmdOffset] = args[i];
277 }
278
279 Class [] argTypes = new Class[1];
280 argTypes[0] = String[].class;
281 try {
282 Method mainMethod =
283 Class.forName(className).getDeclaredMethod("main", argTypes);
284 mainMethod.invoke(newArgs, new Object [] {newArgs});
285 } catch (SecurityException e) {
286 throw new RuntimeException(e);
287 } catch (NoSuchMethodException e) {
288 throw new RuntimeException(e);
289 } catch (ClassNotFoundException e) {
290 throw new RuntimeException(e);
291 } catch (IllegalArgumentException e) {
292 throw new RuntimeException(e);
293 } catch (IllegalAccessException e) {
294 throw new RuntimeException(e);
295 } catch (InvocationTargetException e) {
296 throw new RuntimeException(e);
297 }
298 }
299
300 protected void doJob(final String jobName, final String [] args)
301 throws Exception {
302 if (jobName.equals("import")) {
303
304 if (args.length != 4) {
305 ImportArcs.doImportUsage(
306 "ERROR: Wrong number of arguments passed.", 2);
307 }
308 final Path input = new Path(args[1]);
309 final Path output = new Path(args[2]);
310 final String collectionName = args[3];
311 checkArcsDir(input);
312 OutputDirectories od = new OutputDirectories(output);
313 doImport(input, collectionName, od);
314 } else if (jobName.equals("update")) {
315
316 if (args.length < 2) {
317 doUpdateUsage("ERROR: Wrong number of arguments passed.", 2);
318 }
319 OutputDirectories od = new OutputDirectories(new Path(args[1]));
320 if (args.length == 2) {
321 doUpdate(od);
322 } else {
323 for (int i = 2; i < args.length; i++) {
324 doUpdate(od, new String [] {args[i]});
325 }
326 }
327 } else if (jobName.equals("invert")) {
328
329 if (args.length < 2) {
330 doInvertUsage("ERROR: Wrong number of arguments passed.", 2);
331 }
332 OutputDirectories od = new OutputDirectories(new Path(args[1]));
333 if (args.length == 2) {
334 doInvert(od);
335 } else {
336 final int offset = 2;
337 Path [] segments = new Path[args.length - offset];
338 for (int i = offset; i < args.length; i++) {
339 Path f = new Path(args[i]);
340 if (!getFS().exists(f)) {
341 throw new FileNotFoundException(f.toString());
342 }
343 segments[i - offset] = f;
344 }
345 doInvert(od, segments);
346 }
347 } else if (jobName.equals("index")) {
348
349 if (args.length < 2) {
350 doIndexUsage("ERROR: Wrong number of arguments passed.", 2);
351 }
352 OutputDirectories od = new OutputDirectories(new Path(args[1]));
353 if (args.length == 2) {
354 doIndexing(od);
355 } else {
356 final int offset = 2;
357 Path [] segments = new Path[args.length - offset];
358 for (int i = offset; i < args.length; i++) {
359 Path f = new Path(args[i]);
360 if (!getFS().exists(f)) {
361 throw new FileNotFoundException(f.toString());
362 }
363 segments[i - offset] = f;
364 }
365 doIndexing(od, segments);
366 }
367 } else if (jobName.equals("dedup")) {
368
369 if (args.length != 2) {
370 doDedupUsage("Wrong number of arguments passed.", 2);
371 }
372 doDedup(new OutputDirectories(new Path(args[1])));
373 } else if (jobName.equals("merge")) {
374
375 if (args.length != 2) {
376 doMergeUsage("ERROR: Wrong number of arguments passed.", 2);
377 }
378 doMerge(new OutputDirectories(new Path(args[1])));
379 } else if (jobName.equals("all")) {
380
381 if (args.length != 4) {
382 doAllUsage("ERROR: Wrong number of arguments passed.", 2);
383 }
384 final Path input = new Path(args[1]);
385 final Path output = new Path(args[2]);
386 final String collectionName = args[3];
387 checkArcsDir(input);
388 OutputDirectories od = new OutputDirectories(output);
389 doAll(input, collectionName, od);
390 } else if (jobName.equals("class")) {
391 if (args.length < 2) {
392 doClassUsage("ERROR: Wrong number of arguments passed.", 2);
393 }
394 doClass(args);
395 } else {
396 usage("ERROR: No handler for job name " + jobName, 4);
397 System.exit(0);
398 }
399 }
400
401 /***
402 * Check the arcs dir exists and looks like it has files that list ARCs
403 * (rather than ARCs themselves).
404 *
405 * @param arcsDir Directory to examine.
406 * @throws IOException
407 */
408 protected void checkArcsDir(final Path arcsDir)
409 throws IOException {
410 if (!getFS().exists(arcsDir)) {
411 throw new IOException(arcsDir + " does not exist.");
412 }
413 if (!fs.isDirectory(arcsDir)) {
414 throw new IOException(arcsDir + " is not a directory.");
415 }
416
417 final Path [] files = getFS().listPaths(arcsDir);
418 for (int i = 0; i < files.length; i++) {
419 if (!getFS().isFile(files[i])) {
420 throw new IOException(files[i] + " is not a file.");
421 }
422 if (files[i].getName().toLowerCase().endsWith(".arc.gz")) {
423 throw new IOException(files[i] + " is an ARC file (ARCSDIR " +
424 "should contain text file listing ARCs rather than " +
425 "actual ARCs).");
426 }
427 }
428 }
429
430 public static Text generateWaxKey(WritableComparable key,
431 final String collection) {
432 return generateWaxKey(key.toString(), collection);
433 }
434
435 public static Text generateWaxKey(final String keyStr,
436 final String collection) {
437 if (collection == null) {
438 throw new NullPointerException("Collection is null for " + keyStr);
439 }
440 if (keyStr == null) {
441 throw new NullPointerException("keyStr is null");
442 }
443 if (keyStr.startsWith(KEY_COLLECTION_PREFIX)) {
444 LOG.warn("Key already has collection prefix: " + keyStr
445 + ". Skipping.");
446 return new Text(keyStr);
447 }
448
449 return new Text(KEY_COLLECTION_PREFIX + collection.trim() +
450 KEY_COLLECTION_SUFFIX + keyStr.trim());
451 }
452
453 public static String getCollectionFromWaxKey(final WritableComparable key)
454 throws IOException {
455 Matcher m = COLLECTION.matcher(key.toString());
456 if (m == null || !m.matches()) {
457 throw new IOException("Key doesn't have collection " +
458 "prefix <" + key.toString() + ">");
459 }
460 return m.group(1);
461 }
462
463 public static String getUrlFromWaxKey(final WritableComparable key)
464 throws IOException {
465 Matcher m = COLLECTION.matcher(key.toString());
466 if (m == null || !m.matches()) {
467 throw new IOException("Key doesn't have collection " +
468 " prefix: " + key);
469 }
470 return m.group(2);
471 }
472
473 public static long getDate(String d)
474 throws IOException {
475 long date = 0;
476 try {
477 date = ArchiveUtils.getDate(d).getTime();
478 } catch (final java.text.ParseException e) {
479 throw new IOException("Failed parse of date: " + d + ": " +
480 e.getMessage());
481 }
482
483 return date >= 0? date: 0;
484 }
485
486 public static void usage(final String message, final int exitCode) {
487 if (message != null && message.length() > 0) {
488 System.out.println(message);
489 }
490
491 System.out.println("Usage: hadoop jar nutchwax.jar <job> [args]");
492 System.out.println("Launch NutchWAX job(s) on a hadoop platform.");
493 System.out.println("Type 'hadoop jar nutchwax.jar help <job>' for" +
494 " help on a specific job.");
495 System.out.println("Jobs (usually) must be run in the order " +
496 "listed below.");
497 System.out.println("Available jobs:");
498 System.out.println(" import Import ARCs.");
499 System.out.println(" update Update dbs with recent imports.");
500 System.out.println(" invert Invert links.");
501 System.out.println(" index Index segments.");
502 System.out.println(" dedup Deduplicate by URL or content MD5.");
503 System.out.println(" merge Merge segment indices into one.");
504 System.out.println(" all Runs all above jobs in order.");
505 System.out.println(" class Run the passed class's main.");
506
507 System.exit(exitCode);
508 }
509
510 public static void doUpdateUsage(final String message,
511 final int exitCode) {
512 if (message != null && message.length() > 0) {
513 System.out.println(message);
514 }
515 System.out.println("Usage: hadoop jar nutchwax.jar update <output> " +
516 "[<segments>...]");
517 System.out.println("Arguments:");
518 System.out.println(" output Directory to write crawldb under.");
519 System.out.println("Options:");
520 System.out.println(" segments List of segments to update crawldb " +
521 "with. If none supplied, updates");
522 System.out.println(" using latest segment found.");
523 System.exit(exitCode);
524 }
525
526 public static void doInvertUsage(final String message,
527 final int exitCode) {
528 if (message != null && message.length() > 0) {
529 System.out.println(message);
530 }
531 System.out.println("Usage: hadoop jar nutchwax.jar invert <output> " +
532 "[<segments>...]");
533 System.out.println("Arguments:");
534 System.out.println(" output Directory to write linkdb under.");
535 System.out.println("Options:");
536 System.out.println(" segments List of segments to update linkdb " +
537 "with. If none supplied, all under");
538 System.out.println(" '<output>/segments/' " +
539 "are passed.");
540 System.exit(exitCode);
541 }
542
543 public static void doIndexUsage(final String message,
544 final int exitCode) {
545 if (message != null && message.length() > 0) {
546 System.out.println(message);
547 }
548 System.out.println("Usage: hadoop jar nutchwax.jar index <output> " +
549 "[<segments>...]");
550 System.out.println("Arguments:");
551 System.out.println(" output Directory to write indexes under.");
552 System.out.println("Options:");
553 System.out.println(" segments List of segments to index. " +
554 "If none supplied, all under");
555 System.out.println(" '<output>/segments/' " +
556 "are indexed.");
557 System.exit(exitCode);
558 }
559
560 public static void doDedupUsage(final String message,
561 final int exitCode) {
562 if (message != null && message.length() > 0) {
563 System.out.println(message);
564 }
565 System.out.println("Usage: hadoop jar nutchwax.jar dedup <output>");
566 System.out.println("Arguments:");
567 System.out.println(" output Directory in which indices" +
568 " to dedup reside.");
569 System.exit(exitCode);
570 }
571
572 public static void doMergeUsage(final String message,
573 final int exitCode) {
574 if (message != null && message.length() > 0) {
575 System.out.println(message);
576 }
577 System.out.println("Usage: hadoop jar nutchwax.jar merge <output>");
578 System.out.println("Arguments:");
579 System.out.println(" output Directory in which indices" +
580 " to merge reside.");
581 System.exit(exitCode);
582 }
583
584 public static void doAllUsage(final String message,
585 final int exitCode) {
586 if (message != null && message.length() > 0) {
587 System.out.println(message);
588 }
589 System.out.println("Usage: hadoop jar nutchwax.jar all <input> " +
590 "<output> <collection>");
591 System.out.println("Arguments:");
592 System.out.println(" input Directory of files" +
593 " listing ARC URLs to import.");
594 System.out.println(" output Directory write indexing product to.");
595 System.out.println(" collection Collection name. Added to" +
596 " each resource.");
597 System.exit(exitCode);
598 }
599
600 public static void doClassUsage(final String message,
601 final int exitCode) {
602 if (message != null && message.length() > 0) {
603 System.out.println(message);
604 }
605 System.out.println("Usage: hadoop jar nutchwax.jar class CLASS ...");
606 System.out.println("Arguments:");
607 System.out.println(" CLASS Name of class to run. Invokes main " +
608 "passing command-line arguments.");
609 System.out.println(" For example, use to run nutch " +
610 "commands. Below is list of command");
611 System.out.println(" name and implementing class. " +
612 "Pass name of class only and emits usage.");
613 System.out.println();
614 System.out.println(" readdb " +
615 "org.apache.nutch.crawl.CrawlDbReader");
616 System.out.println(" mergedb " +
617 "org.apache.nutch.crawl.CrawlDbMerger");
618 System.out.println(" readlinkdb " +
619 "org.apache.nutch.crawl.LinkDbReader");
620 System.out.println(" segread " +
621 "org.apache.nutch.segment.SegmentReader");
622 System.out.println(" mergesegs " +
623 "org.apache.nutch.segment.SegmentMerger");
624 System.out.println(" mergelinkdb " +
625 "org.apache.nutch.crawl.LinkDbMerger");
626 System.exit(exitCode);
627 }
628
629 static void doJobHelp(final String jobName) {
630 if (!JOBS.contains(jobName)) {
631 usage("ERROR: Unknown job " + jobName, 1);
632 }
633 if (jobName.equals("import")) {
634 ImportArcs.doImportUsage(null, 1);
635 } else if (jobName.equals("update")) {
636 doUpdateUsage(null, 1);
637 } else if (jobName.equals("invert")) {
638 doInvertUsage(null, 1);
639 } else if (jobName.equals("index")) {
640 doIndexUsage(null, 1);
641 } else if (jobName.equals("dedup")) {
642 doDedupUsage(null, 1);
643 } else if (jobName.equals("merge")) {
644 doMergeUsage(null, 1);
645 } else if (jobName.equals("all")) {
646 doAllUsage(null, 1);
647 } else if (jobName.equals("class")) {
648 doClassUsage(null, 1);
649 } else {
650 usage("ERROR: No help for job name " + jobName, 4);
651 }
652 }
653
654 public static void main(String args[]) throws Exception {
655 if (args.length < 1) {
656 usage(null, 0);
657 return;
658 }
659
660 if (args[0].toLowerCase().equals("help")) {
661 if (args.length == 1) {
662 usage("ERROR: Add command you need help on.", 0);
663 return;
664 }
665 doJobHelp(args[1].toLowerCase());
666 }
667
668 final String jobName = args[0].toLowerCase();
669 if (!JOBS.contains(jobName)) {
670 usage("ERROR: Unknown <job> " + jobName, 1);
671 }
672
673 Nutchwax ia = new Nutchwax();
674 ia.doJob(jobName, args);
675 }
676 }