《分布式与云计算》综合实验 ——Hadoop实现mapreduce

1. 实验环境调研

虚拟机:VMware® Workstation 15 Pro

虚拟机系统:ubuntu 14.04 LTS

Ubuntu是装在虚拟机上的,虚拟机寄存在不同的主机上,寄存主机在同一个内网里。我的分布式结构如下,一个master和4个slave节点:

192.168.43.131 master
192.168.43.246 slave1
192.168.43.151 slave2
192.168.43.160 slave3

IP地址是根据虚拟机分配的,也可以设置静态IP。

虚拟机上安装java步骤如下:

linux装java

cd /usr
mkdir java
cp jdk-8u211-linux-x64.tar.gz /usr/java
tar -zxvf jdk-8u211-linux-x64.tar.gz

配置环境变量

vim /etc/profile
# 按a编辑,在最后一行添加如下:
JAVA_HOME=/usr/java/jdk1.8.0_211
CLASSPATH=$JAVA_HOME/lib/
PATH=$PATH:$JAVA_HOME/bin
export PATH JAVA_HOME CLASSPATH
# 然后按esc
# 按大写的ZZ保存退出
# 重启虚拟机可以测试java

2. Hadoop环境搭建

2.1 节点配置

1.修改 hosts文件

sudo su root
gedit /etc/hosts

修改为:

192.168.43.131    master
192.168.43.246    slave1
192.168.43.151    slave2
192.168.43.160    slave3

这些 IP 地址是通过查看本机得来的,使用 ifconfig

2.修改hostname

namenode 上修改为master ,datanode上分别修改为slave1,slave2,slave3。

gedit /etc/hostname

3.用节点名称ping通,证明成功

2.2 安装JDK

在 oracle官网上下载 Linux 版本的JDK,我用的是 jdk1.8.0_211版本

usr目录下创建 java的文件夹,并解压

cd /usr
mkdir java
cp jdk-8u211-linux-x64.tar.gz /usr/java
tar -zxvf jdk-8u211-linux-x64.tar.gz

JDK安装后要添加环境变量

vim /etc/profile
#按a编辑,在最后一行添加如下:
JAVA_HOME=/usr/java/jdk1.8.0_211
CLASSPATH=$JAVA_HOME/lib/
PATH=$PATH:$JAVA_HOME/bin
export PATH JAVA_HOME CLASSPATH
#然后按esc
#按大写的ZZ保存退出

重启虚拟机,确定 java版本是否安装成功

Java -version

2.3 SSH无密码验证配置

Hadoop必须通过SSH 与本地计算机及其他主机链接,所以必须设置SSH。

1.首先安装 SSH

sudo apt-get install ssh

2.生成公钥和私钥(不管提示一直按enter)

ssh-keygen -t  rsa

3.将公钥添加到.ssh/authorized_keys,这时就能在本机免密码登陆了

cat ~/.ssh/id_rsa.pub >> ~/.ssh/authorized_keys

master 和 salve 都要执行以上三步,slave还要执行下面

4.将master上的 id_rsa.pub拷贝到 slave上(我放在Desktop),然后添加到 salve的 .ssh/authorized_keys

cat /home/slave1/Desktop/id_rsa.pub >> ~/.ssh/authorized_keys

5.重启SSH服务

service ssh restart

此时,我们的namenode可以无密码访问datanode了,回到master上

ssh master

此外,要把防火墙关掉

ufw disable

2.4 下载安装Hadoop

我在网上找的hadoop-0.20.2 版本,有点旧,但是不影响做实验。

1.在/opt目录下建立hadoop目录,把hadoop-0.20.2.tar.gz拷贝到/opt/hadoop目录下,然后解压:

mkdir /opt/hadoop
cp hadoop-0.20.2.tar.gz /opt/hadoop
cd /opt/hadoop
tar –zxvf hadoop-0.20.2.tar.gz
cd hadoop-0.20.2

2.配置Hadoop

在Hadoop目录建立 tmphdfs文件夹

mkdir tmp
mkdir hdfs

编辑conf/hadoop-env.sh文件,把JAVA_HOME设置成Java安装根路径,如下:export JAVA_HOME=/usr/java/jdk1.8.0_211

记住把export前面的#去掉

gedit conf/hadoop-env.sh

配置core-site.xml

gedit /conf/core-site.xml
#修改内容如下:
<configuration>
    <property>
        <name>hadoop.tmp.dir</name>
        <value>/opt/hadoop/tmp</value>
        <description>A base for other temporary directories.</description>
    </property>
    <!-- file system properties -->
    <property>
        <name>fs.default.name</name>
        <value>hdfs://master:9000</value>
    </property>
</configuration>

配置hdfs-site.xml

gedit conf/hdfs-site.xml
#修改内容如下:
<configuration>
    <property>
        <name>dfs.replication</name>
        <value>3</value>
    </property>
    <property>
        <name>dfs.name.dir</name>
        <value>/opt/hadoop/hdfs/name</value>
    </property>
    <property>
        <name>dfs.data.dir</name>
        <value>/opt/hadoop/hdfs/data</value>
    </property>
</configuration>

配置mapred-site.xml

gedit conf/mapred-site.xml
#修改内容如下:
<configuration>
    <property>
        <name>mapred.job.tracker</name>
        <value>master:9001</value>
    </property>
</configuration>

配置conf/mastersconf/slaves来设置主从结点,注意最好使用主机名,并且保证VM之间可以通过主机名可以互相访问,每个主机名一行

vi masters
master
vi slaves
slave1
slave2
slave3

3.运行Hadoop

格式化Hadoop(每次之前要删除 hdfs里的name和data文件夹 才能格式化)

./bin/hadoop namenode -format

启动Hadoop

./bin/start-all.sh

网页上查看

http://master:50070

1555809923169
1555809983078

3. MapReduce原理

3.1 MapReduce 定义

​ Hadoop 中的 MapReduce 是一个使用简单的软件框架,基于它写出来的应用程序能够运行在由上千个商用机器组成的大型集群上,并以一种可靠容错式并行处理TB级别的数据集。

​ Hadoop MapReduce 源于 Google 在2004年12月份发表的 MapReduce 论文。 Hadoop MapReduce 其实就是 Google MapReduce 的一个克隆版本。

3.2 MapReduce 特点

1.MapReduce 易于编程 。它简单的实现一些接口,就可以完成一个分布式程序,这个分布式程序可以分布到大量廉价的 PC 机器运行。也就是说你写一个分布式程序,跟写一个简单的串行程序是一模一样的。就是因为这个特点使得 MapReduce 编程变得非常流行。

2.良好的 扩展性 。当你的计算资源不能得到满足的时候,你可以通过简单的增加机器来扩展它的计算能力。

3.高容错性 。MapReduce 设计的初衷就是使程序能够部署在廉价的 PC 机器上,这就要求它具有很高的容错性。比如其中一台机器挂了,它可以把上面的计算任务转移到另外一个节点上面上运行,不至于这个任务运行失败,而且这个过程不需要人工参与,而完全是Hadoop 内部完成的。

4.适合 PB 级以上海量数据的 离线处理 。这里加红字体离线处理,说明它适合离线处理而不适合在线处理。比如像毫秒级别的返回一个结果,MapReduce 很难做到。

3.3 原理解析

基于我们做的词频统计实验。

MapReduce处理流程:

第一步:假设一个文件有三行英文单词作为 MapReduce的Input(输入),这里经过 Splitting 过程把文件分割为3块。分割后的3块数据就可以并行处理,每一块交给一个 map 线程处理。

第二步:每个 map 线程中,以每个单词为key,以1作为词频数value,然后输出。

第三步:每个 map 的输出要经过 shuffling(混洗),将相同的单词key放在一个桶里面,然后交给 reduce 处理。

第四步:reduce 接受到 shuffling 后的数据,会将相同的单词进行合并,得到每个单词的词频数,最后将统计好的每个单词的词频数作为输出结果。

前两步可以看做 map 阶段,后两步可以看做 reduce 阶段。

1.map阶段
Map 阶段是由一定数量的 Map Task 组成。这些 Map Task 可以同时运行,每个 Map Task又是由以下三个部分组成。

1)对输入数据格式进行解析的一个组件:InputFormat
因为不同的数据可能存储的数据格式不一样,这就需要有一个InputFormat 组件来解析这些数据的存放格式。默认情况下,它提供了一个 TextInputFormat 来解释数据格式.TextInputFormat 就是我们前面提到的文本文件输入格式,它会将文件的每一行解释成(key,value),key代表每行偏移量,value代表每行数据内容。通常情况我们不需要自定义 InputFormat,因为 MapReduce 提供了很多种InputFormat的实现,我们根据不同的数据格式,选择不同的 InputFormat 来解释就可以了.

2)输入数据处理: Mapper

3)数据分组: Partitioner
Mapper 数据处理之后输出之前,输出key会经过 Partitioner分组或者分桶选择不同的reduce。默认的情况下,Partitioner 会对 map 输出的key进行hash取模,比如有6个Reduce Task,它就是模(mod)6,如果key的hash值为0,就选择第0个 Reduce Task,如果key的hash值为1,就选择第一个 Reduce Task。这样不同的 map 对相同单词key,它的 hash 值取模是一样的,所以会交给同一个 reduce 来处理。

2.reduce阶段

Reduce 阶段由一定数量的 Reduce Task 组成。这些 Reduce Task 可以同时运行,每个Reduce Task又是由以下四个部分组成。

1)数据运程拷贝。Reduce Task 要运程拷贝每个 map 处理的结果,从每个 map 中读取一部分结果。每个 Reduce Task 拷贝哪些数据,是由上面 Partitioner 决定的。

2)数据按照key排序。Reduce Task 读取完数据后,要按照key进行排序。按照key排序后,相同的key被分到一组,交给同一个 Reduce Task处理。

3)数据处理: Reducer 。以WordCount为例,相同的单词key分到一组,交个同一个Reducer处理,这样就实现了对每个单词的词频统计。

4)数据输出格式: OutputFormat 。Reducer 统计的结果,将按照 OutputFormat 格式输出。默认情况下的输出格式为 TextOutputFormat,以WordCount为例,这里的key为单词,value为词频数。

InputFormat、Mapper、Partitioner、Reducer和OutputFormat 都是用户可以实现的。通常情况下,用户只需要实现 Mapper和Reducer,其他的使用默认实现就可以了。

3.4 MapReduce内部逻辑

1.首先将 HDFS 中的数据以 Split 方式作为 MapReduce 的输入。前面我们提到,HDFS中的数据是以 block存储,这里怎么又变成了以Split 作为输入呢?其实 block 是 HDFS 中的术语,Split 是 MapReduce 中的术语。默认的情况下,一个 Split 可以对应一个 block,当然也可以对应多个block,它们之间的对应关系是由 InputFormat 决定的。默认情况下,使用的TextInputFormat,这时一个Split对应一个block。假设这里有4个block,也就是4个Split,分别为Split0、Split1、Split2和Split3。这时通过 InputFormat 来读每个Split里面的数据,它会把数据解析成一个个的(key,value),然后交给已经编写好的Mapper 函数来处理。

2.每个Mapper 将输入(key,value)数据解析成一个个的单词和词频,比如(a,1)、(b,1)和(c,1)等等。

3.Mapper解析出的数据,比如(a,1),经过 Partitioner之后,会知道该选择哪个Reducer来处理。每个 map 阶段后,数据会输出到本地磁盘上。

4.在reduce阶段,每个reduce要进行 shuffle 读取它所对应的数据。当所有数据读取完之后,要经过Sort全排序,排序之后再交给 Reducer 做统计处理。比如,第一个Reducer读取了两个的(a,1)键值对数据,然后进行统计得出结果(a,2)。

5.将 Reducer 的处理结果,以OutputFormat数据格式输出到 HDFS 的各个文件路径下。这里的OutputFormat默认为TextOutputFormat,key为单词,value为词频数,key和value之间的分割符为"tab"。(a 2)输出到Part-0,(b 3)输出到Part-1,(c 3) 输出到Part-2。

4. 词频统计实验

4.1 WordCount.java 分析

1555789397331

4.2 编辑WordCount.java

创建 wordcount 测试目录

mkdir —p ~/wordcount/input

切换到 wordcount 测试目录

cd ~/wordcount

编辑 WordCount 程序代码,网上有。

sudo gedit WordCount.java

4.3 WordCount.java 代码分析

1.首先要Import 相关的Lib链接库,并且创建WordCount类class

package wordcount;
import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

public class wordcount

2.创建main function。

main function是程序的起点,在main function中设置map 类和reduce类。

public static void main(String[] args) throws Exception {
         Configuration conf = new Configuration();   //实例化Configuration
/***********
GenericOptionsParser是hadoop框架中解析命令行参数的基本类。 getRemainingArgs();返回数组【一组路径】
*********/
/**********
函数实现
public String[] getRemainingArgs() {
    return (commandLine == null) ? new String[]{} : commandLine.getArgs();
  }

/********
//总结上面:返回数组【一组路径】
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();

//如果只有一个路径,则输出需要有输入路径和输出路径
if (otherArgs.length < 2) {
   System.err.println("Usage: wordcount <in> [<in>...] <out>");
   System.exit(2);
}

Job job = Job.getInstance(conf, "word count");   //实例化job
job.setJarByClass(wordcount.class);   //为了能够找到wordcount这个类
job.setMapperClass(TokenizerMapper.class);   //指定map类型
/********
指定CombinerClass类
这里很多人对CombinerClass不理解
************/
job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(IntSumReducer.class);  //指定reduce类
job.setOutputKeyClass(Text.class); //rduce输出Key的类型,是Text
job.setOutputValueClass(IntWritable.class);  // rduce输出Value的类型

for (int i = 0; i < otherArgs.length - 1; ++i) 
   FileInputFormat.addInputPath(job, new Path(otherArgs));  //添加输入路径

FileOutputFormat.setOutputPath(job, new Path(otherArgs[otherArgs.length - 1]));   //添加输出路径
System.exit(job.waitForCompletion(true) ? 0 : 1);  //提交job
}

3.创建 TokenizerMapper 类

 public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable>{   //继承泛型类Mapper
                private final static IntWritable one = new IntWritable(1);  //定义hadoop数据类型IntWritable实例one,并且赋值为1
                private Text word = new Text();                                    //定义hadoop数据类型Text实例word
  
                public void map(Object key, Text value, Context context) throws IOException, InterruptedException { //实现map函数 
                        StringTokenizer itr = new StringTokenizer(value.toString());//Java的字符串分解类,默认分隔符“空格”、“制表符(‘\t’)”、“换行符(‘\n’)”、“回车符(‘\r’)”

                        while (itr.hasMoreTokens()) {  //循环条件表示返回是否还有分隔符。
                                word.set(itr.nextToken());   // nextToken():返回从当前位置到下一个分隔符的字符串,word.set():Java数据类型与hadoop数据类型转换
                                context.write(word, one);   //hadoop全局类context输出函数write;
                        }
         }

}

4.创建 IntSumReducer 类

public static class IntSumReducer extends Reducer<Text,IntWritable,Text,IntWritable> {    //继承泛型类Reducer
         private IntWritable result = new IntWritable();   //实例化IntWritable
         public void reduce(Text key, Iterable<IntWritable> values, Context context ) throws IOException, InterruptedException {  //实现reduce
                    int sum = 0;
                    for (IntWritable val : values)    //循环values,并记录单词个数
                               sum += val.get();
                    result.set(sum);   //Java数据类型sum,转换为hadoop数据类型result
                    context.write(key, result);   //输出结果到hdfs
          }
}

4.4 编译WordCount.java

编译WordCount程序

hadoop com.sun.tools.javac.Main WordCount.java

WordCount 类打包成 wc.jar

jar cf wc.jar WordCount*.class 打包成jar

4.5 创建测试文本文件

切换到输入目录

cp /opt/hadoop/hadoop-0.20.2/LICENSE.txt ~/wordcount/input

启动Hadoop

Start-all.sh

在HDFS创建目录

hadoop fs -mkdir -p /user/hduser/wordcount/input

切换到本地数据文件目录

cd ~/wordcount/input

上传文件到HDFS

hadoop fs -copyFromLocal LICENSE.txt /user/hduser/wordcount/input

4.6 运行WordCount.java

cd ~/wordcount
hadoop jar wc.jar WordCount /user/hduser/wordcount/input/LICENSE.txt /user/hduser/wordcount/output

1555809815162

4.7 查看运行结果

查看HDFS的目录

hadoop fs -ls /user/hduser/wordcount/output

查看输出文件

hadoop fs -cat /user/hduser/wordcount/output/part-r-00000|more

1555809761348

本文作者:Author:     文章标题:《分布式与云计算》综合实验 ——Hadoop实现mapreduce
本文地址:https://www.ningcaichen.top/archives/4.html     
版权说明:若无注明,本文皆为“宁采晨's Blog”原创,转载请保留文章出处。
Last modification:2019 年 11 月 12 日 23 : 38 PM
如果觉得我的文章对你有用,请随意赞赏

Leave a Comment

召唤看板娘