1   package org.archive.access.nutch;
2   
3   import java.io.IOException;
4   import java.net.MalformedURLException;
5   import java.net.URL;
6   import java.util.Random;
7   import java.util.Arrays;
8   
9   import org.apache.hadoop.conf.Configuration;
10  import org.apache.hadoop.fs.FileSystem;
11  import org.apache.hadoop.fs.Path;
12  import org.apache.hadoop.io.Text;
13  import org.apache.hadoop.io.Writable;
14  import org.apache.hadoop.io.WritableComparable;
15  import org.apache.hadoop.mapred.JobClient;
16  import org.apache.hadoop.mapred.JobConf;
17  import org.apache.hadoop.mapred.MapFileOutputFormat;
18  import org.apache.hadoop.mapred.OutputCollector;
19  import org.apache.hadoop.mapred.Reporter;
20  import org.apache.hadoop.mapred.SequenceFileInputFormat;
21  import org.apache.nutch.crawl.Inlink;
22  import org.apache.nutch.crawl.Inlinks;
23  import org.apache.nutch.crawl.LinkDb;
24  import org.apache.nutch.crawl.LinkDbFilter;
25  import org.apache.nutch.net.URLFilters;
26  import org.apache.nutch.net.URLNormalizers;
27  import org.apache.nutch.parse.Outlink;
28  import org.apache.nutch.parse.ParseData;
29  import org.apache.nutch.util.NutchJob;
30  
31  /***
32   * Subclass of nutch indexer that writes out LinkDb keys that include the
33   * collection name.
34   * Bulk of code is a copy and paste from LinkDb. LinkDb is not amenable to
35   * subclassing.
36   * @author stack
37   */
38  public class NutchwaxLinkDb extends LinkDb {
39      private int nwMaxAnchorLength;
40      private boolean nwIgnoreInternalLinks;
41      private URLFilters nwUrlFilters;
42      private URLNormalizers nwUrlNormalizers;
43  
44      
45      public NutchwaxLinkDb() {
46          super(null);
47      }
48  
49      /*** Construct an LinkDb. */
50      public NutchwaxLinkDb(Configuration conf) {
51          super(conf);
52      }
53      
54      public void configure(JobConf job) {
55          super.configure(job);
56          // These config. are private in parent class.  Make copy here in this
57          // class with a 'nw' prefix.  St.Ack.
58          this.nwMaxAnchorLength = job.getInt("db.max.anchor.length", 100);
59          this.nwIgnoreInternalLinks =
60              job.getBoolean("db.ignore.internal.links", true);
61          if (job.getBoolean(LinkDbFilter.URL_FILTERING, false)) {
62              this.nwUrlFilters = new URLFilters(job);
63          }
64          if (job.getBoolean(LinkDbFilter.URL_NORMALIZING, false)) {
65              this.nwUrlNormalizers =
66                  new URLNormalizers(job, URLNormalizers.SCOPE_LINKDB);
67          }
68      }
69  
70      public void map(WritableComparable key, Writable value,
71              OutputCollector output, Reporter reporter)
72      throws IOException {
73          String collection = Nutchwax.getCollectionFromWaxKey(key);
74          if (collection == null) {
75              LOG.info("Collection is null in key -- skipping " + key);
76          }
77  
78          String fromUrl = Nutchwax.getUrlFromWaxKey(key);
79          String fromHost = getHost(fromUrl);
80  
81          if (this.nwUrlNormalizers != null) {
82              try {
83                  fromUrl = this.nwUrlNormalizers.
84                      normalize(fromUrl, URLNormalizers.SCOPE_LINKDB);
85              } catch (Exception e) {
86                  LOG.warn("Skipping " + fromUrl + ":" + e);
87                  fromUrl = null;
88              }
89          }
90          if (fromUrl != null && this.nwUrlFilters != null) {
91              try {
92                  fromUrl = this.nwUrlFilters.filter(fromUrl);
93              } catch (Exception e) {
94                  LOG.warn("Skipping " + fromUrl + ":" + e);
95                  fromUrl = null;
96              }
97          }
98          if (fromUrl == null) return; // discard all outlinks
99  
100 
101         ParseData parseData = (ParseData)value;
102         Outlink[] outlinks = parseData.getOutlinks();
103         Inlinks inlinks = new Inlinks();
104         for (int i = 0; i < outlinks.length; i++) {
105             Outlink outlink = outlinks[i];
106             String toUrl = outlink.getToUrl();
107             if (this.nwIgnoreInternalLinks) {
108                 String toHost = getHost(toUrl);
109                 if (toHost == null || toHost.equals(fromHost)) { // internal link
110                     continue;                               // skip it
111                 }
112             }
113 
114             if (this.nwUrlNormalizers != null) {
115                 try {
116                     toUrl = this.nwUrlNormalizers.
117                         normalize(toUrl, URLNormalizers.SCOPE_LINKDB);
118                 } catch (Exception e) {
119                     LOG.warn("Skipping " + toUrl + ":" + e);
120                     toUrl = null;
121                 }
122             }
123             if (toUrl != null && this.nwUrlFilters != null) {
124                 try {
125                 toUrl = this.nwUrlFilters.filter(toUrl); // filter the url
126                 } catch (Exception e) {
127                     LOG.warn("Skipping " + toUrl + ":" + e);
128                     toUrl = null;
129                 }
130             }
131             if (toUrl == null) continue;
132 
133             inlinks.clear();
134             String anchor = outlink.getAnchor();        // truncate long anchors
135             if (anchor.length() > this.nwMaxAnchorLength) {
136                 anchor = anchor.substring(0, this.nwMaxAnchorLength);
137             }
138             inlinks.add(new Inlink(fromUrl, anchor));   // collect inverted link
139             output.collect(new Text(Nutchwax.generateWaxKey(toUrl, collection)),
140                 inlinks);
141         }
142     }
143 
144     private String getHost(String url) {
145         try {
146             return new URL(url).getHost().toLowerCase();
147         } catch (MalformedURLException e) {
148             return null;
149         }
150     }
151 
152     public void invert(Path linkDb, final Path[] segments,
153             final boolean normalize, final boolean filter)
154     throws IOException {
155         if (LOG.isInfoEnabled()) {
156             LOG.info("NutchwaxLinkDb: starting");
157             LOG.info("NutchwaxLinkDb: linkdb: " + linkDb);
158             LOG.info("LinkDb: URL normalize: " + normalize);
159             LOG.info("LinkDb: URL filter: " + filter);
160         }
161         JobConf job = createJob(getConf(), linkDb, normalize, filter);
162         
163         for (int i = 0; i < segments.length; i++) {
164             if (LOG.isInfoEnabled()) {
165                 LOG.info("LinkDb: adding segment: " + segments[i]);
166             }
167             job.addInputPath(new Path(segments[i], ParseData.DIR_NAME));
168         }
169         JobClient.runJob(job);
170         FileSystem fs = FileSystem.get(getConf());
171         if (fs.exists(linkDb)) {
172             if (LOG.isInfoEnabled()) {
173                 LOG.info("LinkDb: merging with existing linkdb: " + linkDb);
174             }
175             // try to merge
176             Path newLinkDb = job.getOutputPath();
177             job = LinkDb.createMergeJob(getConf(), linkDb, normalize, filter);
178             job.setJobName("NutchwaxLinkDb merge " + linkDb + " " +
179                 Arrays.asList(segments));
180             job.setMapperClass(NutchwaxLinkDbFilter.class);
181             job.addInputPath(new Path(linkDb, CURRENT_NAME));
182             job.addInputPath(newLinkDb);
183             JobClient.runJob(job);
184             fs.delete(newLinkDb);
185         }
186         LinkDb.install(job, linkDb);
187         if (LOG.isInfoEnabled()) { LOG.info("LinkDb: done"); }
188     }
189 
190     /***
191      * Copied from parent because method is private there (Its public in
192      * crawldb). Additions are on end just before return.
193      * @param config
194      * @param linkDb
195      * @param normalize
196      * @param filter
197      * @return A jobconf.
198      */
199     private static JobConf createJob(Configuration config, Path linkDb,
200             final boolean normalize, final boolean filter) {
201         Path newLinkDb =
202             new Path("linkdb-" +
203                 Integer.toString(new Random().nextInt(Integer.MAX_VALUE)));
204 
205         JobConf job = new NutchJob(config);
206         job.setJobName("linkdb " + linkDb);
207 
208         job.setInputFormat(SequenceFileInputFormat.class);
209 
210         job.setMapperClass(LinkDb.class);
211         // if we don't run the mergeJob, perform normalization/filtering now
212         if (normalize || filter) {
213             try {
214                 FileSystem fs = FileSystem.get(config);
215                 if (!fs.exists(linkDb)) {
216                     job.setBoolean(LinkDbFilter.URL_FILTERING, filter);
217                     job.setBoolean(LinkDbFilter.URL_NORMALIZING, normalize);
218                 }
219             } catch (Exception e) {
220                 LOG.warn("LinkDb createJob: " + e);
221             }
222         }
223         job.setReducerClass(LinkDb.class);
224 
225         job.setOutputPath(newLinkDb);
226         job.setOutputFormat(MapFileOutputFormat.class);
227         job.setBoolean("mapred.output.compress", true);
228         job.setOutputKeyClass(Text.class);
229         job.setOutputValueClass(Inlinks.class);
230 
231         // Now do the NutchwaxLinkDb config. changing mapper -- we use LinkDb's
232         // reducer -- and job name.
233         job.setJobName("nutchwaxLinkdb " + linkDb);
234         job.setMapperClass(NutchwaxLinkDb.class);
235         
236         return job;
237     }
238     
239     public static void main(String[] args) throws Exception {
240     	int res = new NutchwaxLinkDb().
241     		doMain(NutchwaxConfiguration.getConfiguration(), args);
242     	System.exit(res);
243 	}
244 }