Monday, February 21, 2011

How to: create a hadoop map/reduce job tutorial

After we install and config hadoop to run in fully distributed Mode, Centos, now it’s time to write a hello world program to run on top of the hadoop cluster in distributed mode.

I use the 1.8M text file downloaded from http://www.census.gov/tiger/tms/gazetteer/zips.txt, here is a file layout.

"01","35004","AL","ACMAR",86.51557,33.584132,6055,0.001499
"01","35005","AL","ADAMSVILLE",86.959727,33.588437,10616,0.002627
"01","35006","AL","ADGER",87.167455,33.434277,3205,0.000793
"01","35007","AL","KEYSTONE",86.812861,33.236868,14218,0.003519
"01","35010","AL","NEW SITE",85.951086,32.941445,19942,0.004935
"01","35014","AL","ALPINE",86.208934,33.331165,3062,0.000758
"01","35016","AL","ARAB",86.489638,34.328339,13650,0.003378
"01","35019","AL","BAILEYTON",86.621299,34.268298,1781,0.000441
"01","35020","AL","BESSEMER",86.947547,33.409002,40549,0.010035
"01","35023","AL","HUEYTOWN",86.999607,33.414625,39677,0.00982
"01","35031","AL","BLOUNTSVILLE",86.568628,34.092937,9058,0.002242


we will write a map reduce job to do a basic summary to list the number of citys in each State.

some configuration change,
Since the file is only 1.8M and we have two datanodes(task nodes), we will override the default block size to 500K, so that two task nodes can run at the same time.
>>>hdfs-site.xml

<property>
  <name>dfs.block.size</name>
  <value>512000</value>
  <description>The default block size for new files.</description>
</property>


then we use the hadoop command line to push the file to the cluster.
hadoop fs –mkdir raw
hadoop fs –copyFromLocal zip.txt raw

then from the name web interface, we can tell the zip.txt has 4 Blocks
image

Now time to write a simple map/reduce java program.  use your favorite IDE eclipse , create a java project and add the lib of those hadoo*.jar to the project builder lib. and here is my basic program.  I put some comments, here


public class statecount extends Configured implements Tool {

@Override
public int run(String[] args) throws Exception {

    if (args != null && args.length != 2) {
        System.out.println("statecount   <input> <output>");
        ToolRunner.printGenericCommandUsage(System.out);
        return -1;
    }

    Configuration conf = this.getConf();
    JobConf jobconf = new JobConf(conf, statecount.class);
    Path in = new Path(args[0]);
    Path out = new Path(args[1]);

    jobconf.setJobName("Hello world/City count by State");
    jobconf.setMapperClass(SMapClass.class);
    jobconf.setReducerClass(SMapClass.class);

    FileInputFormat.setInputPaths(jobconf, in);
    FileOutputFormat.setOutputPath(jobconf, out);

    jobconf.setInputFormat(TextInputFormat.class);
    jobconf.setOutputFormat(TextOutputFormat.class);

    jobconf.setMapOutputKeyClass(Text.class);
    jobconf.setMapOutputValueClass(LongWritable.class);

    jobconf.setOutputKeyClass(Text.class);
    jobconf.setOutputValueClass(Text.class);

    JobClient.runJob(jobconf);

    return 0;
}

public static void main(String[] args) throws Exception {
int res = ToolRunner.run(new Configuration(), new statecount(), args);
System.exit(res);
}

public static class SMapClass extends MapReduceBase implements
    Mapper<LongWritable, Text, Text, LongWritable>,
    Reducer<Text, LongWritable, Text, Text> {

@Override
public void map(LongWritable key, Text value,
        OutputCollector<Text, LongWritable> output,
        Reporter paramReporter) throws IOException {
    // "01","35004","AL","ACMAR",86.51557,33.584132,6055,0.001499

    String s = value.toString();
    output.collect(new Text(s.split(",")[2]), new LongWritable(1));
    //Collect AL, 1, here
    //get the third value which will be the State
}

@Override
public void reduce(Text key, Iterator<LongWritable> values,
        OutputCollector<Text, Text> output, Reporter paramReporter)
        throws IOException {

    long i = 0;
    while (values.hasNext()) {
        i += 1;
        Object o = values.next(); //I forget this line, there will be deadloop, reduce never over:(
       
    }
    output.collect(key, new Text("#number of citys" + i));

}}}


then click the export menu to export classes to a jar file , then copy to the name server.
run “hadoop jar helloworld.jar statecount raw/zip.txt  ouputraw” on name server(job track server.)

from 50030, we can see two task nodes are running

image

whole statistics,
image

From the console you sumit job, you can see the job output.

image

after done, go to filesystem browser from :50070 web portal. we can see the final result.
image

if you want to make sure that reduce task is distributed, you can put a hostname into the reduce job and run again.

output.collect(key, new Text("#number of citys" + i + "on host" + InetAddress.getLocalHost().getHostName() ));

and assign the reduce tasks

jobconf.setJobName("Hello world/City count by State");
    jobconf.setMapperClass(SMapClass.class);
    jobconf.setReducerClass(SMapClass.class);
  
jobconf.setNumReduceTasks(4);


in the output hdfs folder, you will see four files.
image

click one file, this reduce job is executed on host Home

image

Another one is one Host LA

image

More Hadoop Blogs

  1. How to: create a hadoop map/reduce job tutorial
  2. How to: install and config hadoop to run in fully distributed Mode, Centos
  3. How to : setup SSH authentication over keys, hadoop installation

No comments:

 
Locations of visitors to this page