1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106
| public class Main { public static void main(String[] args) throws Exception { countWords(); countUrls(); }
private static void countWords() throws Exception { String input_dir = "./data/sogou.full.utf8"; String outputDir = "./result/words";
Configuration conf = new Configuration(); FileSystem fs = FileSystem.get(conf); fs.deleteOnExit(new Path(outputDir)); fs.close();
Job job = new Job(conf, "CountWords"); job.setMapperClass(CountWordMapper.class); job.setReducerClass(CountWordReducer.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(LongWritable.class);
job.setOutputKeyClass(Text.class); job.setOutputValueClass(LongWritable.class);
job.setInputFormatClass(TextInputFormat.class); TextInputFormat.setInputPaths(job, new Path(input_dir));
job.setOutputFormatClass(TextOutputFormat.class); TextOutputFormat.setOutputPath(job, new Path(outputDir)); job.waitForCompletion(true);
}
}
public class CountWordMapper extends Mapper<LongWritable, Text, Text, LongWritable> { HashMap<String, Long> map = new HashMap<>();
@Override public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String fields[] = value.toString().split("\t");
if (fields.length != 6) { return; } String keyWord = fields[2];
long count=map.getOrDefault(keyWord,-1L); if (count==-1L) map.put(keyWord,1L); else map.replace(keyWord,count+1); }
@Override protected void cleanup(Mapper<LongWritable, Text, Text , LongWritable>.Context context) throws IOException, InterruptedException { for (String keyWord : map.keySet()) { context.write(new Text(keyWord), new LongWritable(map.get(keyWord))); } } }
public class CountWordReducer extends Reducer<Text, LongWritable, Text, LongWritable> { public static int K = 30;
PriorityQueue<Pair<String, Long>> minHeap = new PriorityQueue<>((p1, p2) -> (int) (p1.getValue() - p2.getValue()));
public void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException { long total = 0; for (LongWritable count : values) { total += count.get(); }
Pair<String, Long> tmp = new Pair<>(key.toString(), total); minHeap.add(tmp); if (minHeap.size() > K) minHeap.poll(); }
@Override protected void cleanup(Context context) throws IOException, InterruptedException { List<Pair<String, Long>> list = new ArrayList<>(); for (Pair<String, Long> p : minHeap) list.add(p);
Collections.sort(list, ((p1, p2) -> (int) (p2.getValue() - p1.getValue())));
for (Pair<String, Long> t : list) context.write(new Text(t.getKey()), new LongWritable(t.getValue())); } }
|