第1关:成绩统计:

为了完成本关任务,你需要掌握:1.什么是MapReduce,2.如何使用MapReduce进行运算。

什么是MapReduce

MapReduce是一种可用于数据处理的编程模型,我们现在设想一个场景,你接到一个任务,任务是:挖掘分析我国气象中心近年来的数据日志,该数据日志大小有3T,让你分析计算出每一年的最高气温,如果你现在只有一台计算机,如何处理呢?我想你应该会读取这些数据,并且将读取到的数据与目前的最大气温值进行比较。比较完所有的数据之后就可以得出最高气温了。不过以我们的经验都知道要处理这么多数据肯定是非常耗时的。

如果我现在给你三台机器,你会如何处理呢?看到下图你应该想到了:最好的处理方式是将这些数据切分成三块,然后分别计算处理这些数据(Map),处理完毕之后发送到一台机器上进行合并(merge),再计算合并之后的数据,归纳(reduce)并输出。

这就是一个比较完整的MapReduce的过程了。

如何使用MapReduce进行运算

我们通过一个示例,来体验Map/Reduce的使用。

我们从一个问题入手:目前我们想统计两个文本文件中,每个单词出现的次数。

首先我们在当前目录下创建两个文件:

创建file01输入内容:

 
  1. Hello World Bye World

创建file02输入内容:

 
  1. Hello Hadoop Goodbye Hadoop

将文件上传到HDFS/usr/input/目录下:

不要忘了启动DFSstart-dfs.sh

然后创建文件夹并上传:

在右侧代码区域编写,文件WordCount.java,添加如下内容:

 
  1. public class WordCount {
  2. //Mapper类
  3. /*LongWritable表示每一行起始偏移量
  4. 第一个Text是用来接受文件中的内容,
  5. 第二个Text是用来输出给Reduce类的key,
  6. IntWritable是用来输出给Reduce类的value*/
  7. public static class TokenizerMapper
  8. extends Mapper<LongWritable, Text, Text, IntWritable>{
  9. private final static IntWritable one = new IntWritable(1);
  10. private Text word = new Text();
  11. public void map(LongWritable key, Text value, Context context
  12. ) throws IOException, InterruptedException {
  13. StringTokenizer itr = new StringTokenizer(value.toString());
  14. while (itr.hasMoreTokens()) {
  15. word.set(itr.nextToken());
  16. context.write(word, one);
  17. }
  18. }
  19. }
  20. public static class IntSumReducer
  21. extends Reducer<Text,IntWritable,Text,IntWritable> {
  22. private IntWritable result = new IntWritable();
  23. public void reduce(Text key, Iterable<IntWritable> values,
  24. Context context
  25. ) throws IOException, InterruptedException {
  26. int sum = 0;
  27. for (IntWritable val : values) {
  28. sum += val.get();
  29. }
  30. result.set(sum);
  31. context.write(key, result);
  32. }
  33. }
  34. public static void main(String[] args) throws Exception {
  35. //创建配置对象
  36. Configuration conf = new Configuration();
  37. //创建job对象
  38. Job job = new Job(conf, "word count");
  39. //设置运行job的类
  40. job.setJarByClass(WordCount.class);
  41. //设置Mapper的类
  42. job.setMapperClass(TokenizerMapper.class);
  43. //设置Reduce的类
  44. job.setReducerClass(IntSumReducer.class);
  45. //设置输出的key value格式
  46. job.setOutputKeyClass(Text.class);
  47. job.setOutputValueClass(IntWritable.class);
  48. //设置输入路径
  49. String inputfile = "/usr/input";
  50. //设置输出路径
  51. String outputFile = "/usr/output";
  52. //执行输入
  53. FileInputFormat.addInputPath(job, new Path(inputfile));
  54. //执行输出
  55. FileOutputFormat.setOutputPath(job, new Path(outputFile));
  56. //是否运行成功,true输出0,false输出1
  57. System.exit(job.waitForCompletion(true) ? 0 : 1);
  58. }
  59. }

点击评测,运行代码,可以看到/usr/output目录下已经生成了文件。

我们来查看part--r-00000文件的内容:

可以看到统计的数据已经生成在文件中了。

如果你还想要运行一次,那么你需要删除输出路径的文件夹和文件。

代码解释

示例中,Map/Reduce程序总共分为三块即:Map,Recude,JobMap负责处理输入文件的内容。

TokenizerMappermap方法,它通过StringTokenizer 以空格为分隔符将一行切分为若干tokens,之后,输出< <word>, 1> 形式的键值对。

对于示例中的第一个输入,map输出是:
< Hello, 1>
< World, 1>
< Bye, 1>
< World, 1>

第二个输入,map输出是:
< Hello, 1>
< Hadoop, 1>
< Goodbye, 1>
< Hadoop, 1>

WordCount还指定了一个combiner。因此,每次map运行之后,会对输出按照key进行排序,然后把输出传递给本地的combiner(按照作业的配置与Reducer一样),进行本地聚合

第一个map的输出是:
< Bye, 1>
< Hello, 1>
< World, 2>

第二个map的输出是:
< Goodbye, 1>
< Hadoop, 2>
< Hello, 1>

reduce的数据是这样的:

< Bye , [1]>
< GoodBye , [1]>
< Hadoop , [2]>
< Hello , [1,1]>
< World , [2]>

Reducer中的reduce方法 仅是将每个key(本例中就是单词)出现的次数求和。

因此这个作业的输出就是:
< Bye, 1>
< Goodbye, 1>
< Hadoop, 2>
< Hello, 2>
< World, 2>

在之后的实训中我们还会学习到JobConf, JobClient,Tool这些对象。

编程要求

使用MapReduce计算班级每个学生的最好成绩,输入文件路径为/user/test/input,请将计算后的结果输出到/user/test/output/目录下。

测试说明

输入文件在你每次点击评测的时候,平台会为你创建,无需你自己创建,只需要启动HDFS,编写java代码即可。

输入文件的数据格式如下:
张三 12
李四 13
张三 89
李四 92
...

依照如上格式你应该输出:

张三 89
李四 92

具体本关的预期输出请查看右侧测试集。

因为大数据实训消耗资源较大,且map/reduce运行比较耗时,所以评测时间较长,大概在60秒左右,请耐心等待。

代码:(注意是在代码文件进行操作)

import java.io.IOException;

import java.util.StringTokenizer;

import java.io.IOException;

import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.*;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.Mapper;

import org.apache.hadoop.mapreduce.Reducer;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import org.apache.hadoop.util.GenericOptionsParser;

public class WordCount {

    /********** Begin **********/

         public static class TokenizerMapper extends Mapper<LongWritable, Text, Text, IntWritable> {

        private final static IntWritable one = new IntWritable(1);

        private Text word = new Text();

        private int maxValue = 0;

        public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

            StringTokenizer itr = new StringTokenizer(value.toString(),"\n");

            while (itr.hasMoreTokens()) {

                String[] str = itr.nextToken().split(" ");

                String name = str[0];

                one.set(Integer.parseInt(str[1]));

                word.set(name);

                context.write(word,one);

            }

            //context.write(word,one);

        }

    }

    public static class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable> {

        private IntWritable result = new IntWritable();

        public void reduce(Text key, Iterable<IntWritable> values, Context context)

                throws IOException, InterruptedException {

            int maxAge = 0;

            int age = 0;

            for (IntWritable intWritable : values) {

                maxAge = Math.max(maxAge, intWritable.get());

            }

            result.set(maxAge);

            context.write(key, result);

        }

    }

    public static void main(String[] args) throws Exception {

        Configuration conf = new Configuration();

        Job job = new Job(conf, "word count");

        job.setJarByClass(WordCount.class);

        job.setMapperClass(TokenizerMapper.class);

        job.setCombinerClass(IntSumReducer.class);

        job.setReducerClass(IntSumReducer.class);

        job.setOutputKeyClass(Text.class);

        job.setOutputValueClass(IntWritable.class);

        String inputfile = "/user/test/input";

        String outputFile = "/user/test/output/";

        FileInputFormat.addInputPath(job, new Path(inputfile));

        FileOutputFormat.setOutputPath(job, new Path(outputFile));

        job.waitForCompletion(true);

        

                   }

        }

        

        

    /********** End **********/

第2关:文件内容合并去重:

通过上一小节的学习我们了解了MapReduce大致的使用方式,本关我们来了解一下Mapper类,Reducer类和Job类。

map类

首先我们来看看Mapper对象:

在编写MapReduce程序时,要编写一个类继承Mapper类,这个Mapper类是一个泛型类型,它有四个形参类型,分别指定了map()函数的输入键,输入值,和输出键,输出值的类型。就第一关的例子来说,输入键是一个长整型,输入值是一行文本,输出键是单词,输出值是单词出现的次数。

Hadoop提供了一套可优化网络序列化传输的基本类型,而不是直接使用Java内嵌的类型。这些类型都在org.apache.hadoop.io包中,这里使用LongWritable(相当于Java中的Long类型),Text类型(相当于Java中的String类型)和IntWritable(相当于Integer类型)。

map()函数的输入是一个键和一个值,我们一般首先将包含有一行输入的text值转换成JavaString类型,然后再使用对字符串操作的类或者其他方法进行操作即可。

Reducer类

同样Reducer也有四个参数类型用于指定输入和输出类型,reduce()函数的输入类型必须匹配map函数的输出类型,即Text类型和IntWritable类型,在这种情况下,reduce函数的输出类型也必须是TextIntWritable类型,即分别输出单词和次数。

Job类

一般我们用Job对象来运行MapReduce作业,Job对象用于指定作业执行规范,我们可以用它来控制整个作业的运行,我们在Hadoop集群上运行这个作业时,要把代码打包成一个JAR文件(Hadoop在集群上发布的这个文件),不用明确指定JAR文件的名称,在Job对象的setJarByClass()函数中传入一个类即可,Hadoop利用这个类来查找包含他的JAR文件。addInputPath()函数和setOutputPath()函数用来指定作业的输入路径和输出路径。值的注意的是,输出路径在执行程序之前不能存在,否则Hadoop会拒绝执行你的代码。

最后我们使用waitForCompletion()方法提交代码并等待执行,该方法唯一的参数是一个布尔类型的值,当该值为true时,作业会把执行过程打印到控制台,该方法也会返回一个布尔值,表示执行的成败。

编程要求

接下来我们通过一个练习来巩固学习到的MapReduce知识吧。

对于两个输入文件,即文件file1和文件file2,请编写MapReduce程序,对两个文件进行合并,并剔除其中重复的内容,得到一个新的输出文件file3。 为了完成文件合并去重的任务,你编写的程序要能将含有重复内容的不同文件合并到一个没有重复的整合文件,规则如下:

  • 第一列按学号排列;
  • 学号相同,按x,y,z排列;
  • 输入文件路径为:/user/tmp/input/
  • 输出路径为:/user/tmp/output/

注意:输入文件后台已经帮你创建好了,不需要你再重复创建。

测试说明

程序会对你编写的代码进行测试:
输入已经指定了测试文本数据:需要你的程序输出合并去重后的结果。
下面是输入文件和输出文件的一个样例供参考。

输入文件file1的样例如下:
20150101 x
20150102 y
20150103 x
20150104 y
20150105 z
20150106 x

输入文件file2的样例如下:
20150101 y
20150102 y
20150103 x
20150104 z
20150105 y

根据输入文件file1file2合并得到的输出文件file3的样例如下:

20150101 x
20150101 y
20150102 y
20150103 x
20150104 y
20150104 z
20150105 y
20150105 z
20150106 x

代码:(注意是在代码文件进行操作)

import java.io.IOException;

import java.util.*;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.*;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.Mapper;

import org.apache.hadoop.mapreduce.Reducer;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import org.apache.hadoop.util.GenericOptionsParser;

public class Merge {

    /**

     * @param args

     * 对A,B两个文件进行合并,并剔除其中重复的内容,得到一个新的输出文件C

     */

    //在这重载map函数,直接将输入中的value复制到输出数据的key上 注意在map方法中要抛出异常:throws IOException,InterruptedException

    public static class Map  extends Mapper<Object, Text, Text, Text>{

    

    /********** Begin **********/

        public void map(Object key, Text value, Context content) 

            throws IOException, InterruptedException {  

            Text text1 = new Text();

            Text text2 = new Text();

            StringTokenizer itr = new StringTokenizer(value.toString());

            while (itr.hasMoreTokens()) {

                text1.set(itr.nextToken());

                text2.set(itr.nextToken());

                content.write(text1, text2);

            }

        }  

    /********** End **********/

    } 

        

    //在这重载reduce函数,直接将输入中的key复制到输出数据的key上  注意在reduce方法上要抛出异常:throws IOException,InterruptedException

    public static class  Reduce extends Reducer<Text, Text, Text, Text> {

    /********** Begin **********/

        

        public void reduce(Text key, Iterable<Text> values, Context context) 

            throws IOException, InterruptedException {

            Set<String> set = new TreeSet<String>();

            for(Text tex : values){

                set.add(tex.toString());

            }

            for(String tex : set){

                context.write(key, new Text(tex));

            }

        }  

    

    /********** End **********/

    }

    

    public static void main(String[] args) throws Exception{

        // TODO Auto-generated method stub

        Configuration conf = new Configuration();

        conf.set("fs.default.name","hdfs://localhost:9000");

        

        Job job = Job.getInstance(conf,"Merge and duplicate removal");

        job.setJarByClass(Merge.class);

        job.setMapperClass(Map.class);

        job.setCombinerClass(Reduce.class);

        job.setReducerClass(Reduce.class);

        job.setOutputKeyClass(Text.class);

        job.setOutputValueClass(Text.class);

        String inputPath = "/user/tmp/input/";  //在这里设置输入路径

        String outputPath = "/user/tmp/output/";  //在这里设置输出路径

        FileInputFormat.addInputPath(job, new Path(inputPath));

        FileOutputFormat.setOutputPath(job, new Path(outputPath));

        System.exit(job.waitForCompletion(true) ? 0 : 1);

    }

}

第3关:信息挖掘 - 挖掘父子关系:

你编写的程序要能挖掘父子辈关系,给出祖孙辈关系的表格。规则如下:

  • 孙子在前,祖父在后;
  • 输入文件路径:/user/reduce/input
  • 输出文件路径:/user/reduce/output

测试说明

程序会对你编写的代码进行测试:
下面给出一个child-parent的表格,要求挖掘其中的父子辈关系,给出祖孙辈关系的表格。

输入文件内容如下:
child parent
Steven Lucy
Steven Jack
Jone Lucy
Jone Jack
Lucy Mary
Lucy Frank
Jack Alice
Jack Jesse
David Alice
David Jesse
Philip David
Philip Alma
Mark David
Mark Alma

输出文件内容如下:

 
  1. grand_child grand_parent
  2. Mark Jesse
  3. Mark Alice
  4. Philip Jesse
  5. Philip Alice
  6. Jone Jesse
  7. Jone Alice
  8. Steven Jesse
  9. Steven Alice
  10. Steven Frank
  11. Steven Mary
  12. Jone Frank
  13. Jone Mary

代码:(注意是在代码文件里进行操作)

import java.io.IOException;

import java.util.*;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.Mapper;

import org.apache.hadoop.mapreduce.Reducer;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import org.apache.hadoop.util.GenericOptionsParser;

public class simple_data_mining {

    public static int time = 0;

    /**

     * @param args

     * 输入一个child-parent的表格

     * 输出一个体现grandchild-grandparent关系的表格

     */

    //Map将输入文件按照空格分割成child和parent,然后正序输出一次作为右表,反序输出一次作为左表,需要注意的是在输出的value中必须加上左右表区别标志

    public static class Map extends Mapper<Object, Text, Text, Text>{

        public void map(Object key, Text value, Context context) throws IOException,InterruptedException{

            /********** Begin **********/

        String line = value.toString();

             String[] childAndParent = line.split(" ");

             List<String> list = new ArrayList<>(2);

              for (String childOrParent : childAndParent) {

                 if (!"".equals(childOrParent)) {

                     list.add(childOrParent);

                  } 

              } 

              if (!"child".equals(list.get(0))) {

                  String childName = list.get(0);

                  String parentName = list.get(1);

                  String relationType = "1";

                  context.write(new Text(parentName), new Text(relationType + "+"

                        + childName + "+" + parentName));

                  relationType = "2";

                  context.write(new Text(childName), new Text(relationType + "+"

                        + childName + "+" + parentName));

              }

            /********** End **********/

        }

    }

    public static class Reduce extends Reducer<Text, Text, Text, Text>{

        public void reduce(Text key, Iterable<Text> values,Context context) throws IOException,InterruptedException{

                /********** Begin **********/

                //输出表头

          if (time == 0) {

                context.write(new Text("grand_child"), new Text("grand_parent"));

                time++;

            }

                //获取value-list中value的child

List<String> grandChild = new ArrayList<>();

                //获取value-list中value的parent

 List<String> grandParent = new ArrayList<>();

                //左表,取出child放入grand_child

 for (Text text : values) {

                String s = text.toString();

                String[] relation = s.split("\\+");

                String relationType = relation[0];

                String childName = relation[1];

                String parentName = relation[2];

                if ("1".equals(relationType)) {

                    grandChild.add(childName);

                } else {

                    grandParent.add(parentName);

                }

            }

                //右表,取出parent放入grand_parent

 int grandParentNum = grandParent.size();

               int grandChildNum = grandChild.size();

               if (grandParentNum != 0 && grandChildNum != 0) {

                for (int m = 0; m < grandChildNum; m++) {

                    for (int n = 0; n < grandParentNum; n++) {

                        //输出结果

                    context.write(new Text(grandChild.get(m)), new Text(

                                grandParent.get(n)));

                    }

                }

            }

                /********** End **********/

        }

    }

    public static void main(String[] args) throws Exception{

        // TODO Auto-generated method stub

        Configuration conf = new Configuration();

        Job job = Job.getInstance(conf,"Single table join");

        job.setJarByClass(simple_data_mining.class);

        job.setMapperClass(Map.class);

        job.setReducerClass(Reduce.class);

        job.setOutputKeyClass(Text.class);

        job.setOutputValueClass(Text.class);

        String inputPath = "/user/reduce/input";   //设置输入路径

        String outputPath = "/user/reduce/output";   //设置输出路径

        FileInputFormat.addInputPath(job, new Path(inputPath));

        FileOutputFormat.setOutputPath(job, new Path(outputPath));

        System.exit(job.waitForCompletion(true) ? 0 : 1);

    }

}

感谢观看!觉得有帮助的小伙伴可以点个免费的赞和关注!有空会第一时间分享所学所想!

更多推荐

大数据从入门到实战 - 第3章 MapReduce基础实战,cz教你MapReduce实战操作