Mapreduce入门程序wordcount

前面文章我们学习了一堆Mapreduce的理论知识,本文就以实例wordcout程序来讲解hadoop Mapreduce详细运行步骤和map()、reduce()函数的编写。

hadoop的WordCount程序就好比学习java时候编写的hello world,是hadoop的一个入门程序。因为wordCount程序可以很好的体现出hadoop“分而治之”的思想。如下简单看下wordCount程序运行过程图解:

Mapreduce入门程序wordcount


需求分析:

简单说,wordcount程序要做的是输出所有文本文件当中单词出现的频次。

1、Mapreduce对文本进行切割分区处理,同时把行号作为map输入的键值对的键(key),文本内容作为键值对的值(value)。

2、经过Map方法处理(代码编写),输出中间结果<word,1>的键值对数据。

3、reduce根据定义好的分区,把map的结果数据进行合并然后作为入参传入reduce函数(代码编写)。

4、完成reduce函数计算,输出结果<word,个数(count)>。

WordCount程序编写

1、idea创建hadoop项目

本文常用的是idea+maven搭建的hadoop项目,前文在学习hdfs时候已经教大家在idea上创建了一个myHadoop的项目,如下:

Mapreduce入门程序wordcount

2、编写Map()函数

在文件夹Mapreduce下新建一个wordcount的map实现类WordCountMapper,代码如下:

package Mapreduce;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.util.StringUtils;
import java.io.IOException;

public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {

    //该方法循环调用,读取split(分片)文件中的每行数据,把该行所在的下标为key,该行的内容为value
    protected void map(LongWritable key, Text value, Context context)
            throws IOException, InterruptedException {
        //根据空格分隔单词
        String[] words = StringUtils.split(value.toString(), ' ');
        for(String w :words){
            context.write(new Text(w), new IntWritable(1));
        }
    }
}
  • Mapper类是用于读取分片数据,并通过实现map方法来实现具体业务的map操作。

  • Mapper类的入参是mapper类的输入键值对,输出参数两个是输出键值对。

  • map方法是Mapreduce框架会自动把键值对作为参数传入,longwritable key代表文本的行号,Text value代表是该行的文本内容。Context context代表上下文,可以在Mapper类进行值传递,用于Mapper类输出结果变量。

3、编写reduce()函数

在文件夹Mapreduce下新建一个wordcount的reduce实现类WordCountReducer,代码如下:

package Mapreduce;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable>{

    /**
     * Map过程输出<key,values>中key为单个单词,而values是对应单词的计数值所组成的列表,Map的输出就是Reduce的输入,
     * /所以reduce方法只要遍历values并求和,即可得到某个单词的总次数。
     */
    protected void reduce(Text key, Iterable<IntWritable> values,Context context)
            throws IOException, InterruptedException {
        int sum =0;
        for(IntWritable i: values){
            sum=sum+i.get();
        }
        context.write(key, new IntWritable(sum));
    }
}
  • Reducer类用于接收Mapper输出的中间结果作为Reducer类的输入,并执行reduce方法。

  • Reducer类输入参数是reduce方法输入的键值对类型(对应map输出类型),输出参数是两个键值对类型。

  • reduce方法参数:key是单个单词,values是map函数出来的对应单词的计数值所组成的列表,Context类型是Reducer的上下文,用于输出reduce结果变量。

4、编写运行类

在文件夹Mapreduce下新建一个wordcount的运行类runTask,代码如下:

package Mapreduce;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class runTask {
    public static void main(String[] args) {
        //设置环境变量HADOOP_USER_NAME,其值是root
        System.setProperty("HADOOP_USER_NAME", "root");
        //Configuration类包含了Hadoop的配置
        Configuration config =new Configuration();
        //设置fs.defaultFS
        config.set("fs.defaultFS", "hdfs://192.168.100.100:9000");
        //设置yarn.resourcemanager节点
        config.set("yarn.resourcemanager.hostname", "hdp-slave-01");
        try {
            FileSystem fs =FileSystem.get(config);
            Job job =Job.getInstance(config);
            job.setJarByClass(runTask.class);
            job.setJobName("wcount");
            //设置Mapper类
            job.setMapperClass(WordCountMapper.class);
            //设置Reduce类
            job.setReducerClass(WordCountReducer.class);
            //设置reduce方法输出key的类型
            job.setOutputKeyClass(Text.class);
            //设置reduce方法输出value的类型
            job.setOutputValueClass(IntWritable.class);
            //指定输入路径
            FileInputFormat.addInputPath(job, new Path("/myHadoop/input/"));
            //指定输出路径(会自动创建)
            Path outpath =new Path("/myHadoop/output/");
            //输出路径是MapReduce自动创建的,如果存在则需要先删除
            FileOutputFormat.setOutputPath(job, outpath);
            //提交任务,等待执行完成
            boolean f= job.waitForCompletion(true);
            if(f){
                System.out.println("job任务执行成功");
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

5、运行结果:

至此一个简单hadoop的单词计数程序已经写好,对于写好的hadoop程序可以有两种运行方式:

1、idea编译导出jar包,放到hadoop服务器上进行运行。

导出jar上传到hadoop服务器:

Mapreduce入门程序wordcount

运行hadoop程序:

hadoop jar ./myHadooop.jar Mapreduce.runTask input output

结果如下:

Mapreduce入门程序wordcount

查看一下hdfs文件系统中运行的统计结果,如下:

Mapreduce入门程序wordcount


2、本地开发环境上运行。(由于本地开发环境没有搭建hadoop环境,会导致job指定的输入路径与输出路径会指向本地文件系统上,所以需要本地开发环境安装hadoop的环境变量,可以引导开发环境可以使用hadoop服务端资源)。

具体操作如下:

2.1:windows下确实hadoop运行的动态库文件:hadoop.dll、winutils.exe,可以在github上下载和自己对应的hadoop版本文件,我采用的是hadoop2.10:

Mapreduce入门程序wordcount

2.2:添加本地hadoop运行的环境变量:HADOOP_HOME,然后再path环境变量中添加%HADOOP_HOME%\bin:

Mapreduce入门程序wordcount

Mapreduce入门程序wordcount

2.3:重启idea,运行hadoop程序,如下,发现已经运行成功了。

Mapreduce入门程序wordcount