Report abuse


			
package com.hongiiv.mapred;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.regex.Matcher;
import java.util.regex.Pattern;


import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;


import org.joda.time.DateTime;
import org.joda.time.DateTimeConstants;
import org.joda.time.format.DateTimeFormat;
import org.joda.time.format.DateTimeFormatter;

public class AccessLogFileAnalyzer extends Configured implements Tool {

    public static class MapClass extends MapReduceBase implements Mapper {

        private final static IntWritable ONE = new IntWritable(1);

    private static Pattern p = Pattern
        .compile("([^ ]*) ([^ ]*) ([^ ]*) \\[([^]]*)\\] \"([^\"]*)\"" +
                        " ([^ ]*) ([^ ]*).*");

    private static DateTimeFormatter formatter = DateTimeFormat
        .forPattern("dd/MMM/yyyy:HH:mm:ss Z");

    private IntWritable minute = new IntWritable();

    public void map(LongWritable key, Text value, OutputCollector output, Reporter reporter) throws IOException {

      String line = value.toString();
      Matcher matcher = p.matcher(line);
      if (matcher.matches()) {
        String timestamp = matcher.group(4);
        minute.set(getMinuteBucket(timestamp));
        output.collect(minute, ONE);
      }

    }

    private int getMinuteBucket(String timestamp) {
      DateTime dt = formatter.parseDateTime(timestamp);
      return dt.getMinuteOfDay() + (dt.getDayOfWeek() - 1)
          * DateTimeConstants.MINUTES_PER_DAY;
    }

  }

    public static class Reduce extends MapReduceBase implements Reducer {

        public void reduce(IntWritable key, Iterator values,
                OutputCollector output, Reporter reporter)
                throws IOException {
      int sum = 0;
      while (values.hasNext()) {
        sum += ((IntWritable) values.next()).get();
      }
      output.collect(key, new IntWritable(sum));
    }
  }

 static int printUsage() {
        System.out
                .println("Usage: AccessLogFileAnalyzer [-m ] [-r ]  ");
        ToolRunner.printGenericCommandUsage(System.out);
        return -1;
 }


  public int run(String[] args) throws Exception {

    JobConf conf = new JobConf(AccessLogFileAnalyzer.class);
    conf.setJobName("loganalyzer_by_hongiiv");

    // the keys are words (strings)
    conf.setOutputKeyClass(IntWritable.class);
    // the values are counts (ints)
    conf.setOutputValueClass(IntWritable.class);

    conf.setMapperClass(MapClass.class);
    conf.setCombinerClass(Reduce.class);
    conf.setReducerClass(Reduce.class);

    List other_args = new ArrayList();
    for (int i = 0; i < args.length; ++i) {
        try {
            if ("-m".equals(args[i])) {
                conf.setNumMapTasks(Integer.parseInt(args[++i]));
            } else if ("-r".equals(args[i])) {
                conf.setNumReduceTasks(Integer.parseInt(args[++i]));
            } else {
                other_args.add(args[i]);
            }
        } catch (NumberFormatException except) {
            System.out.println("ERROR: Integer expected instead of "
                    + args[i]);
            return printUsage();
        } catch (ArrayIndexOutOfBoundsException except) {
            System.out.println("ERROR: Required parameter missing from "
                    + args[i - 1]);
            return printUsage();
        }
    }
    // Make sure there are exactly 2 parameters left.
    if (other_args.size() != 2) {
        System.out.println("ERROR: Wrong number of parameters: "
                + other_args.size() + " instead of 2.");
        return printUsage();
    }
    FileInputFormat.setInputPaths(conf, other_args.get(0));
    FileOutputFormat.setOutputPath(conf, new Path(other_args.get(1)));

    JobClient.runJob(conf);
    return 0;
  }

    public static void main(String[] args) throws Exception {
        int res = ToolRunner.run(new Configuration(), new AccessLogFileAnalyzer(), args);
        System.exit(res);
    }

}