MapReduce编程实践

编程环境准备:

  1. 要在 Eclipse 上编译和运行 MapReduce 程序,需要安装 hadoop-eclipse-plugin,可下载 Github 上的 hadoop2x-eclipse-plugin

  2. 下载后,将 release 中的 hadoop-eclipse-kepler-plugin-2.6.0.jar 复制到 Eclipse 安装目录的 plugins 文件夹中,运行 eclipse -clean 重启 Eclipse 即可(添加插件后只需要运行一次该命令,以后按照正常方式启动就行了)。

  3. 打开eclipse,进行hadoop插件配置。

    1. 选择Window菜单下的Preference。

    2. 然后选择Hadoop Map/Reduce,选择hadoop的安装目录,并确认配置。

    3. 在输出窗口下又一个蓝色大象,点击可进行hadoop环境配置。

    4. 按如下进行设置:

      其中,Localtion name可以随意填写,端口号则为9000。还有很多配置参数,为了方便,直接先创建WordCount的MapReduce工程,然后将/usr/local/hadoop/etc/hadoop中的配置文件core-site.xml ,hdfs-site.xml以及 log4j.properties 复制到 WordCount 项目下的 src 文件夹(~/workspace/WordCount/src)中:复制完成后,需要对工程文件进行刷新。

      这样在运行MapReduce作业时,就会使用配置文件中的配置参数。

    5. 然后就可以进行开发了。

  4. 注:HDFS 中的内容变动后,Eclipse 不会同步刷新,需要右键点击 Project Explorer中的 MapReduce Location,选择 Refresh,才能看到变动后的文件。

(1)编程实例–WordCount:

功能:对指定输入的文件进行单词个数统计,然后输出到指定文件夹中。

程序代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
import java.io.IOException;
import java.util.Iterator;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;


public class WordCount{
public WordCount(){
}

public static void main(String[] args) throws Exception{

Configuration conf = new Configuration();
//指定输入文件路径input和输出文件路径output
String[] otherArgs = new String[]{"/input","/output"};
if(otherArgs.length < 2){
System.err.println("没有输入输出路径!");
System.exit(2);
}

/**
* Job:它允许用户配置作业、提交作业、控制其执行和查询状态。
* SET方法仅在提交作业之前工作,
* 之后它们将引发非法LealEtExeExchange。
*/
//创建没有特定集群和给定作业名的新作业。
//只有当需要时,才会从CONF参数创建一个集群。
//作业生成配置的副本,以便任何必要的内部修改不反映传入参数。
Job job = Job.getInstance(conf, "word count");
//通过找到给定类的来源来设置jar
job.setJarByClass(WordCount.class);
job.setMapperClass(WordCount.TokenizerMapper.class);
job.setCombinerClass(WordCount.IntSumReducer.class);
job.setReducerClass(WordCount.IntSumReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);

for(int i = 0; i < otherArgs.length - 1; ++i) {
//FileInputFormat:基于文件的输入格式的基类
//添加输入文件路径
FileInputFormat.addInputPath(job, new Path(otherArgs[i]));
}

//FileOutputFormat:基于文件的输出格式的基类
//添加输出文件路径
FileOutputFormat.setOutputPath(job, new Path(otherArgs[otherArgs.length - 1]));
System.exit(job.waitForCompletion(true)?0:1);
}

/**
* Reduce:减少一组中间值,这些值共享一组较小的值。
*/
public static class IntSumReducer extends Reducer
<Text, IntWritable, Text, IntWritable>{

private IntWritable result = new IntWritable();

public IntSumReducer(){
}

public void reduce(Text key, Iterable<IntWritable>
values, Reducer<Text, IntWritable, Text, IntWritable>.Context context)
throws IOException, InterruptedException{

int sum = 0;

IntWritable val = null;
for(Iterator it = values.iterator(); it.hasNext(); sum += val.get()){
val = (IntWritable) it.next();
}
this.result.set(sum);

context.write(key, this.result);

}

}

/**
* Mapper:将输入的键/值对映射到一组中间键/值对
* 映射是将输入记录转换为中间记录的单个任务。转换后的
* 中间记录不需要与输入记录相同。给定的输入对可以映射到零或多个输出对。
*/
public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable>{
//IntWritable:一个可写的用于int型的。
//设置一个变量one为1.
private static final IntWritable one = new IntWritable(1);

//Text:该类使用标准UTF8编码存储文本。
private Text word = new Text();

public TokenizerMapper(){

}

//map():为输入分割中的每个键/值对调用一次。
//大多数应用程序应该重写这个,但是默认是标识函数。
public void map(Object key, Text value, Mapper<Object, Text, Text, IntWritable>.Context context)
throws IOException, InterruptedException{
// StringTokenizer这个类主要是把一个字符串按某个标记分段,
//默认的情况下的分割符是空格
StringTokenizer itr = new StringTokenizer(value.toString());

while(itr.hasMoreTokens()){
this.word.set(itr.nextToken());
context.write(this.word, one);
}
}
}
}

(2)编程实例-求平均值:

功能:计算学生的平均成绩,每个文件包括所有的学生成绩,格式为 姓名 成绩,有多少个科目,就有多少个输入文件。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
import java.io.IOException;
import java.util.Iterator;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
//import org.apache.hadoop.util.GenericOptionsParser;

/**
* 计算学生的平均成绩
* 学生成绩以每科一个文件输入
* 文件内容:姓名 成绩
* 例如:  小明  80
*/
public class AverageScore {
public AverageScore(){
}

public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {

Configuration conf = new Configuration();
//设置文件输入输出路径
String[] otherArgs = new String[]{"/input1","/output1"};
//可以用来读取输入输出文件参数,这里采用上一行代码,手动设置路径
//String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
if(otherArgs.length < 2){
System.err.println("请输入至少两个文件!");
System.exit(2);
}

//设置工作参数
Job job = Job.getInstance(conf,"Average Score");
job.setJarByClass(AverageScore.class);
job.setMapperClass(AverageScore.AverageMapper.class);
job.setCombinerClass(AverageScore.AverageReduce.class);
job.setReducerClass(AverageScore.AverageReduce.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(FloatWritable.class);
//输入文件路径
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
//输出文件路径
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
System.exit(job.waitForCompletion(true)?0:1);
}


/*
* map():将每个输入文件,将姓名和成绩分割开。
*/
public static class AverageMapper extends Mapper<Object, Text, Text, FloatWritable>{

public AverageMapper(){

}

@Override
protected void map(Object key, Text value, Context context)
throws IOException, InterruptedException {
String line = value.toString();
//按行进行划分
StringTokenizer tokens = new StringTokenizer(line,"\n");
while(tokens.hasMoreTokens()){
String tmp = tokens.nextToken();
//按空格进行划分
StringTokenizer sz = new StringTokenizer(tmp);
String name = sz.nextToken();
float score = Float.valueOf(sz.nextToken());
Text outName = new Text(name);
FloatWritable outScore = new FloatWritable(score);
context.write(outName, outScore);
}
}
}

/**
* reduce():将同一个学的各科成绩加起来,求平均数
*/
public static class AverageReduce extends Reducer<Text, FloatWritable, Text, FloatWritable>{

public AverageReduce(){

}

protected void reduce(Text key, Iterable<FloatWritable> value, Context context) throws IOException, InterruptedException {

float sum = 0;//刚开始总分为0
int count = 0;//记录有几科成绩

Iterator<FloatWritable> it = value.iterator();//遍历成绩
//获取各科成绩进行累加
while(it.hasNext()){
sum += it.next().get();
count++;
}
//求出平均值
FloatWritable averageScore = new FloatWritable(sum/count);
//写人文件
context.write(key,averageScore);

}
}
}

(3)编程实例-数据去重:

功能:数据重复,map中每一行做为一个key,value值任意,经过shuffle之后输入到reduce中利用key的唯一性直接输出key。

数据:

file1.txt

2016-6-1 b
2016-6-2 a
2016-6-3 b
2016-6-4 d
2016-6-5 a
2016-6-6 c
2016-6-7 d
2016-6-3 c

file2.txt

2016-6-1 a
2016-6-2 b
2016-6-3 c
2016-6-4 d
2016-6-5 a
2016-6-6 b
2016-6-7 c
2016-6-3 c

源代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
import java.io.IOException;  
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
//import org.apache.hadoop.util.GenericOptionsParser;

/**
* 数据去重
*/
public class Dedup {

public static class MyMapper extends Mapper<Object, Text, Text, Text>{

@Override
protected void map(Object key, Text value, Context context)
throws IOException, InterruptedException {
//value:为每行数据
context.write(value, new Text(""));
}
}

public static class MyReducer extends Reducer<Text, Text, Text, Text>{

@Override
protected void reduce(Text key, Iterable<Text> value,
Context context)
throws IOException, InterruptedException {
context.write(key, new Text(""));
}
}


public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException{
Configuration conf = new Configuration();
//String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
String[] otherArgs = new String[]{"/input2","/output2"};
if(otherArgs.length<2){
System.out.println("parameter errors!");
System.exit(2);
}

Job job = Job.getInstance(conf, "Dedup");
job.setJarByClass(Dedup.class);
job.setMapperClass(MyMapper.class);
job.setCombinerClass(MyReducer.class);
job.setReducerClass(MyReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);

FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));

System.exit(job.waitForCompletion(true)?0:1);

}
}

程序运行后输入文件为:

2016-6-1 a
2016-6-1 b
2016-6-2 a
2016-6-2 b
2016-6-3 b
2016-6-3 c
2016-6-4 d
2016-6-5 a
2016-6-6 b
2016-6-6 c
2016-6-7 c
2016-6-7 d


以上内容为听华为大数据培训课程和大学MOOC上厦门大学 林子雨的《大数据技术原理与应用》课程而整理的笔记。

大数据技术原理与应用https://www.icourse163.org/course/XMU-1002335004


坚持原创技术分享,您的支持将鼓励我继续创作!