Using Hadoop and Amazon Elastic MapReduce to Process Your Data More Efficiently

If the amount of data in your enterprise is overwhelming and/or you’re looking for ways to process said data more efficiently, then Hadoop and Amazon Elastic MapReduce may be your answer.

MapReduce frameworks allow developers without much knowledge on distributed computing to write applications that take advantage of distributed resources. Hadoop MapReduce is an implementation of such a model.

Background:

Recently, we developed a web asset delivery service for one of our clients that would allow businesses to display high quality assets from a CDN in their websites for a monthly fee. Users’ accounts would be associated with bandwidth limits based on different account levels associated with a pricing model. This meant that we needed a way to provide users with information on monthly bandwidth utilization across their websites in order to defend the pricing model. The solution to this was to implement web log parsing using Hadoop’s MapReduce framework in conjunction with Amazon’s cloud-based Elastic MapReduce service.

Here’s how Hadoop MapReduce works:

Hadoop MapReduce is a Java-based framework that allows you to write applications that process high volumes of data in parallel clusters. Hadoop uses a distributed file storage system called Hadoop Distributed File System (HDFS) to store large amount of data across multiple nodes. It supports most major platforms and MapReduce programs can be written in Python, Ruby, php, Pig, etc., in addition to Java.  Using Hadoop, we were able to write a simple Java program that could easily parse through raw data in log files collected by the CDN and filter relevant bandwidth utilization information.

The basic idea around a map-reduce model is that you write two functions — map() and reduce() — to divide up your programming tasks and let the framework manage most of the crunching. Map and reduce functions take in key-value pairs (using data types that implement Hadoop’s Writable interface) as the input and output. When you start a map reduce process, you pass in a data file in HDFS as the input. Hadoop divides up the inputs into smaller pieces that the map function can consume. Likewise, the outputs of the map function are grouped together in logical chunks by Hadoop and sent to the reduce function for processing.  Both map and reduce functions can run in parallel — Hadoop can distribute the tasks across various clusters of nodes.

In our case, we simply pass the log file(s) (copied to HDFS) as input to the map reduce program.  Hadoop merges all log files specified and serializes each log entry to the datatype expected (Text) before passing them as inputs to the map reduce tasks. Our map function then parses each long entry individually and stores the relevant data (bandwidth info) in a HashMap type object (MapWritable) which is then sent as another key-value pair (<asset path – MapWritable object>) for the reduce function to work with. The reduce function then aggregates the data based on user accounts, date, user agent, etc. and saves it to a database (Amazon RDS Database). We can then query the database to pull all types of information around utilization and send out notifications to users, for example, if their account is over the monthly cap, etc.

Below is the structure of a sample map reduce program written in Java:

public class LogProcessor {

  public static class LogMap
            extends Mapper<LongWritable, Text, Text, MapWritable> {
    public void map( LongWritable key, Text value, Context context ) {
      MapWritable logEntry = new MapWritable();
      //parse log file
      ...
      Text key = new Text();
      //key = resource-path;

      context.write( key, logEntry);
    }
  }

  public static class LogReduce
            extends Reducer<Text, MapWritable, DBWritable, NullWritable> {
    public void reduce( Text key, Iterable<MapWritable> values, Context context ) {
      while(values.iterator().hasNext()) {
        MapWritable entry = values.iterator().next();
        //process entry and write to db
        ...
      }
    }
  }

  public static void main(String[] args) {
    // Set up a new mapreduce job
    Job job = new Job();
    job.setJarByClass(LogProcessor.class);    //register the main class

    FileInputFormat.addInputPath( job, new Path(<Input file path>) );
    FileOutputFormat.setInputPath( job, new Path(<output file path> ) );

    job.setMapperClass( LogMap.class );
    job.setReducerClass( LogReduce.class );

    job.setOutputKeyClass( Text.class );
    job.setOutputValueClass( MapWritable.class );

    job.waitForCompletion(true) ? System.exit(0) : System.exit(1);
  }
}

The program is packaged in a jar file (with dependencies) that Hadoop can run.

And here’s how to utilize Amazon’s Elastic MapReduce service to run the program:

At Control Group, we leverage Amazon’s cloud based infrastructure heavily in lots of projects. It basically allows us to cost effectively (pay by usage) deploy applications that need to scale up very easily. Amazon’s Elastic MapReduce service is the perfect fit for running our MapReduce application described above. It’s easy to set up, and it also shields off some of the infrastructure/maintenance issues around running Hadoop.

In a nutshell, the Elastic MapReduce service runs a hosted Hadoop instance on an EC2 instance (master), and it’s able to instantly provision other pre-configured EC2 instances (slave nodes) to distribute the MapReduce process, which are all terminated once the MapReduce tasks complete running. Amazon allows us to specify up to 20 EC2 instances for data intensive processing. It also provides the option to upgrade your Elastic MapReduce to increase EC2 instance count.

So to run the map reduce service, we create a new “Job Flow” via the AWS console, the command line utility (ruby based) or an API provided by Amazon. A job flow is a set of steps that Elastic MapReduce runs. You basically provide some configuration information (number of EC2 instances to use and bootstrap actions) and the location of your map reduce program ( usually an Amazon S3 bucket path). Job flow records/logs can be viewed at the AWS console. You can also explicitly instruct Elastic MapReduce to keep the master EC2 instance alive for debugging purposes – you can then ssh into the instance to check the log files created by Hadoop, etc.

In summary, Hadoop’s MapReduce framework allows us to write simple applications that process high volumes of data in a distributed computing environment while Amazon’s MapReduce service provides a cost-effective means of implementing such a solution.