1 package org.archive.access.nutch;
2
3 import java.io.IOException;
4 import java.net.MalformedURLException;
5 import java.net.URL;
6 import java.util.Random;
7 import java.util.Arrays;
8
9 import org.apache.hadoop.conf.Configuration;
10 import org.apache.hadoop.fs.FileSystem;
11 import org.apache.hadoop.fs.Path;
12 import org.apache.hadoop.io.Text;
13 import org.apache.hadoop.io.Writable;
14 import org.apache.hadoop.io.WritableComparable;
15 import org.apache.hadoop.mapred.JobClient;
16 import org.apache.hadoop.mapred.JobConf;
17 import org.apache.hadoop.mapred.MapFileOutputFormat;
18 import org.apache.hadoop.mapred.OutputCollector;
19 import org.apache.hadoop.mapred.Reporter;
20 import org.apache.hadoop.mapred.SequenceFileInputFormat;
21 import org.apache.nutch.crawl.Inlink;
22 import org.apache.nutch.crawl.Inlinks;
23 import org.apache.nutch.crawl.LinkDb;
24 import org.apache.nutch.crawl.LinkDbFilter;
25 import org.apache.nutch.net.URLFilters;
26 import org.apache.nutch.net.URLNormalizers;
27 import org.apache.nutch.parse.Outlink;
28 import org.apache.nutch.parse.ParseData;
29 import org.apache.nutch.util.NutchJob;
30
31 /***
32 * Subclass of nutch indexer that writes out LinkDb keys that include the
33 * collection name.
34 * Bulk of code is a copy and paste from LinkDb. LinkDb is not amenable to
35 * subclassing.
36 * @author stack
37 */
38 public class NutchwaxLinkDb extends LinkDb {
39 private int nwMaxAnchorLength;
40 private boolean nwIgnoreInternalLinks;
41 private URLFilters nwUrlFilters;
42 private URLNormalizers nwUrlNormalizers;
43
44
45 public NutchwaxLinkDb() {
46 super(null);
47 }
48
49 /*** Construct an LinkDb. */
50 public NutchwaxLinkDb(Configuration conf) {
51 super(conf);
52 }
53
54 public void configure(JobConf job) {
55 super.configure(job);
56
57
58 this.nwMaxAnchorLength = job.getInt("db.max.anchor.length", 100);
59 this.nwIgnoreInternalLinks =
60 job.getBoolean("db.ignore.internal.links", true);
61 if (job.getBoolean(LinkDbFilter.URL_FILTERING, false)) {
62 this.nwUrlFilters = new URLFilters(job);
63 }
64 if (job.getBoolean(LinkDbFilter.URL_NORMALIZING, false)) {
65 this.nwUrlNormalizers =
66 new URLNormalizers(job, URLNormalizers.SCOPE_LINKDB);
67 }
68 }
69
70 public void map(WritableComparable key, Writable value,
71 OutputCollector output, Reporter reporter)
72 throws IOException {
73 String collection = Nutchwax.getCollectionFromWaxKey(key);
74 if (collection == null) {
75 LOG.info("Collection is null in key -- skipping " + key);
76 }
77
78 String fromUrl = Nutchwax.getUrlFromWaxKey(key);
79 String fromHost = getHost(fromUrl);
80
81 if (this.nwUrlNormalizers != null) {
82 try {
83 fromUrl = this.nwUrlNormalizers.
84 normalize(fromUrl, URLNormalizers.SCOPE_LINKDB);
85 } catch (Exception e) {
86 LOG.warn("Skipping " + fromUrl + ":" + e);
87 fromUrl = null;
88 }
89 }
90 if (fromUrl != null && this.nwUrlFilters != null) {
91 try {
92 fromUrl = this.nwUrlFilters.filter(fromUrl);
93 } catch (Exception e) {
94 LOG.warn("Skipping " + fromUrl + ":" + e);
95 fromUrl = null;
96 }
97 }
98 if (fromUrl == null) return;
99
100
101 ParseData parseData = (ParseData)value;
102 Outlink[] outlinks = parseData.getOutlinks();
103 Inlinks inlinks = new Inlinks();
104 for (int i = 0; i < outlinks.length; i++) {
105 Outlink outlink = outlinks[i];
106 String toUrl = outlink.getToUrl();
107 if (this.nwIgnoreInternalLinks) {
108 String toHost = getHost(toUrl);
109 if (toHost == null || toHost.equals(fromHost)) {
110 continue;
111 }
112 }
113
114 if (this.nwUrlNormalizers != null) {
115 try {
116 toUrl = this.nwUrlNormalizers.
117 normalize(toUrl, URLNormalizers.SCOPE_LINKDB);
118 } catch (Exception e) {
119 LOG.warn("Skipping " + toUrl + ":" + e);
120 toUrl = null;
121 }
122 }
123 if (toUrl != null && this.nwUrlFilters != null) {
124 try {
125 toUrl = this.nwUrlFilters.filter(toUrl);
126 } catch (Exception e) {
127 LOG.warn("Skipping " + toUrl + ":" + e);
128 toUrl = null;
129 }
130 }
131 if (toUrl == null) continue;
132
133 inlinks.clear();
134 String anchor = outlink.getAnchor();
135 if (anchor.length() > this.nwMaxAnchorLength) {
136 anchor = anchor.substring(0, this.nwMaxAnchorLength);
137 }
138 inlinks.add(new Inlink(fromUrl, anchor));
139 output.collect(new Text(Nutchwax.generateWaxKey(toUrl, collection)),
140 inlinks);
141 }
142 }
143
144 private String getHost(String url) {
145 try {
146 return new URL(url).getHost().toLowerCase();
147 } catch (MalformedURLException e) {
148 return null;
149 }
150 }
151
152 public void invert(Path linkDb, final Path[] segments,
153 final boolean normalize, final boolean filter)
154 throws IOException {
155 if (LOG.isInfoEnabled()) {
156 LOG.info("NutchwaxLinkDb: starting");
157 LOG.info("NutchwaxLinkDb: linkdb: " + linkDb);
158 LOG.info("LinkDb: URL normalize: " + normalize);
159 LOG.info("LinkDb: URL filter: " + filter);
160 }
161 JobConf job = createJob(getConf(), linkDb, normalize, filter);
162
163 for (int i = 0; i < segments.length; i++) {
164 if (LOG.isInfoEnabled()) {
165 LOG.info("LinkDb: adding segment: " + segments[i]);
166 }
167 job.addInputPath(new Path(segments[i], ParseData.DIR_NAME));
168 }
169 JobClient.runJob(job);
170 FileSystem fs = FileSystem.get(getConf());
171 if (fs.exists(linkDb)) {
172 if (LOG.isInfoEnabled()) {
173 LOG.info("LinkDb: merging with existing linkdb: " + linkDb);
174 }
175
176 Path newLinkDb = job.getOutputPath();
177 job = LinkDb.createMergeJob(getConf(), linkDb, normalize, filter);
178 job.setJobName("NutchwaxLinkDb merge " + linkDb + " " +
179 Arrays.asList(segments));
180 job.setMapperClass(NutchwaxLinkDbFilter.class);
181 job.addInputPath(new Path(linkDb, CURRENT_NAME));
182 job.addInputPath(newLinkDb);
183 JobClient.runJob(job);
184 fs.delete(newLinkDb);
185 }
186 LinkDb.install(job, linkDb);
187 if (LOG.isInfoEnabled()) { LOG.info("LinkDb: done"); }
188 }
189
190 /***
191 * Copied from parent because method is private there (Its public in
192 * crawldb). Additions are on end just before return.
193 * @param config
194 * @param linkDb
195 * @param normalize
196 * @param filter
197 * @return A jobconf.
198 */
199 private static JobConf createJob(Configuration config, Path linkDb,
200 final boolean normalize, final boolean filter) {
201 Path newLinkDb =
202 new Path("linkdb-" +
203 Integer.toString(new Random().nextInt(Integer.MAX_VALUE)));
204
205 JobConf job = new NutchJob(config);
206 job.setJobName("linkdb " + linkDb);
207
208 job.setInputFormat(SequenceFileInputFormat.class);
209
210 job.setMapperClass(LinkDb.class);
211
212 if (normalize || filter) {
213 try {
214 FileSystem fs = FileSystem.get(config);
215 if (!fs.exists(linkDb)) {
216 job.setBoolean(LinkDbFilter.URL_FILTERING, filter);
217 job.setBoolean(LinkDbFilter.URL_NORMALIZING, normalize);
218 }
219 } catch (Exception e) {
220 LOG.warn("LinkDb createJob: " + e);
221 }
222 }
223 job.setReducerClass(LinkDb.class);
224
225 job.setOutputPath(newLinkDb);
226 job.setOutputFormat(MapFileOutputFormat.class);
227 job.setBoolean("mapred.output.compress", true);
228 job.setOutputKeyClass(Text.class);
229 job.setOutputValueClass(Inlinks.class);
230
231
232
233 job.setJobName("nutchwaxLinkdb " + linkDb);
234 job.setMapperClass(NutchwaxLinkDb.class);
235
236 return job;
237 }
238
239 public static void main(String[] args) throws Exception {
240 int res = new NutchwaxLinkDb().
241 doMain(NutchwaxConfiguration.getConfiguration(), args);
242 System.exit(res);
243 }
244 }