How to: create a hadoop map/reduce job tutorial

After we install and config hadoop to run in fully distributed Mode, Centos, now it’s time to write a hello world program to run on top of the hadoop cluster in distributed mode.

I use the 1.8M text file downloaded from, here is a file layout.

"01","35010","AL","NEW SITE",85.951086,32.941445,19942,0.004935

we will write a map reduce job to do a basic summary to list the number of citys in each State.

some configuration change,
Since the file is only 1.8M and we have two datanodes(task nodes), we will override the default block size to 500K, so that two task nodes can run at the same time.

  <description>The default block size for new files.</description>

then we use the hadoop command line to push the file to the cluster.
hadoop fs –mkdir raw
hadoop fs –copyFromLocal zip.txt raw

then from the name web interface, we can tell the zip.txt has 4 Blocks

Now time to write a simple map/reduce java program.  use your favorite IDE eclipse , create a java project and add the lib of those hadoo*.jar to the project builder lib. and here is my basic program.  I put some comments, here

public class statecount extends Configured implements Tool {

public int run(String[] args) throws Exception {

    if (args != null && args.length != 2) {
        System.out.println("statecount   <input> <output>");
        return -1;

    Configuration conf = this.getConf();
    JobConf jobconf = new JobConf(conf, statecount.class);
    Path in = new Path(args[0]);
    Path out = new Path(args[1]);

    jobconf.setJobName("Hello world/City count by State");

    FileInputFormat.setInputPaths(jobconf, in);
    FileOutputFormat.setOutputPath(jobconf, out);





    return 0;

public static void main(String[] args) throws Exception {
int res = Configuration(), new statecount(), args);

public static class SMapClass extends MapReduceBase implements
    Mapper<LongWritable, Text, Text, LongWritable>,
    Reducer<Text, LongWritable, Text, Text> {

public void map(LongWritable key, Text value,
        OutputCollector<Text, LongWritable> output,
        Reporter paramReporter) throws IOException {
    // "01","35004","AL","ACMAR",86.51557,33.584132,6055,0.001499

    String s = value.toString();
    output.collect(new Text(s.split(",")[2]), new LongWritable(1));
    //Collect AL, 1, here
    //get the third value which will be the State

public void reduce(Text key, Iterator<LongWritable> values,
        OutputCollector<Text, Text> output, Reporter paramReporter)
        throws IOException {

    long i = 0;
    while (values.hasNext()) {
        i += 1;
        Object o =; //I forget this line, there will be deadloop, reduce never over:(
    output.collect(key, new Text("#number of citys" + i));


then click the export menu to export classes to a jar file , then copy to the name server.
run “hadoop jar helloworld.jar statecount raw/zip.txt  ouputraw” on name server(job track server.)

from 50030, we can see two task nodes are running


whole statistics,

From the console you sumit job, you can see the job output.


after done, go to filesystem browser from :50070 web portal. we can see the final result.

if you want to make sure that reduce task is distributed, you can put a hostname into the reduce job and run again.

output.collect(key, new Text("#number of citys" + i + "on host" + InetAddress.getLocalHost().getHostName() ));

and assign the reduce tasks

jobconf.setJobName("Hello world/City count by State");

in the output hdfs folder, you will see four files.

click one file, this reduce job is executed on host Home


Another one is one Host LA


