Hadoop 1. Hadoop概述 2. 单机安装 3. 集群部署 4. HDFS文件系统(分布式存储) 5. 实战案例-网盘的实现 6. MapReduce(分布式计算) 7. 日志清洗-案例 8. HA集群部署Hadoop 9. 附录A 配置文件参数
Hadoop快速入门
2018/09/04
目录
1. Hadoop概述 4
1.1. Hadoop是什么 4
1.2. 分布式必要性 4
1.2.1. 分布式存储的必要性 4
1.2.2. 分布式计算必要性 5
1.3. Hadoop包括哪些组件 6
1.3.1. 说明 6
1.3.2. 结构图 6
2. 单机安装 6
2.1. 安装准备 6
2.2. Hadoop文件加载流程图 7
2.3. 安装步骤 7
2.3.1. 第一步:修改主机名 7
2.3.2. 第二步:创建一个hadoop系统用户 8
2.3.3. 第三步:使用Hadoop登录,上传相关安装软件 10
2.3.4. 第四步:安装JDK 11
2.3.5. 第五步:准备Hadoop文件 12
2.3.6. 第六步:修改配置文件 13
2.3.7. 第七步:将Hadoop的命令加到环境变量 16
2.3.8. 第八步:格式化HDFS文件系统 17
2.3.9. 第九步,启动HDFS文件系统 17
2.3.10. 第十步:启动YARN调度程序 18
2.4. HDFS工作原理 21
3. 集群部署 22
3.1. 集群部署是什么 22
3.2. 集群部署结构图 22
3.3. 部署准备 24
3.4. Linux虚拟机的准备 24
3.5. 免密登录的实现 24
3.5.1. 免密登录是什么 24
3.5.2. Linux免密登录原理图 25
3.5.3. 操作步骤 25
3.6. 集群部署步骤 29
3.6.1. 第一步:配置slaves配置文件 29
3.6.2. 第二步:修改所有服务器的hdfs-site.xml配置 29
3.6.3. 第三步:启动测试 29
3.7. 工作原理 30
3.7.1. 上传工作原理 30
3.7.2. 下载工作原理 31
4. HDFS文件系统(分布式存储) 31
4.1. HDFS是什么 31
4.2. HDFS常用操作命令 31
4.3. Java API操作HDFS 32
4.3.1. 说明 32
4.3.2. 配置步骤 32
4.3.3. 实现上传、下载、删除、查看 35
5. 实战案例-网盘的实现 37
5.1. 需求 37
5.2. 思路 37
5.3. 准备 37
5.4. 实现步骤 38
5.4.1. 第一部分:编写公共代码 38
5.4.2. 第二部分:用户功能实现 44
5.4.3. 第三部分:网盘功能 56
6. MapReduce(分布式计算) 57
6.1. MapReduce是什么 57
6.2. MapReduce运行流程图 57
6.3. 入门示例 57
6.3.1. 说明 57
6.3.2. 配置步骤 58
6.4. MapRedurce 运行原理图 66
6.5. 本地模式 67
6.5.1. 第一步:配置Hadoop的环境变量 67
6.5.2. 第二步:需要winutils.exe支持 69
6.5.3. 第三步:将demo.txt放在本地目录 70
6.5.4. 第四步:修改WcRunner 70
6.5.5. 第五步:直接运行WcRunner类 72
6.5.6. 第六步:查看测试结果 72
7. Zookeeper 72
8. HA集群部署Hadoop 72
8.1. HA集群是什么 72
8.2. 高可用是什么 73
8.3. 为什么需要使用HA集群 73
8.4. 部署步骤 73
8.4.1. 需求说明 73
8.4.2. 部署思路 74
8.4.3. 第一部分:HA机制的集群配置 74
8.4.4. 第二部分:配置Zookeeper集群 75
8.4.5. 第三步部分:配置NameNode的HA集群 75
8.4.6. 第四部分:配置ResourceManger的HA集群 78
9. 附录A 配置文件参数 78
9.1. core-default.xml 78
9.2. hdfs-default.xml 81
9.3. mapred-default.xml 83
9.4. yarn-default.xml 89
9.5. 新旧属性对应表 90
分布式系统概述
1.1 分布式系统的必要性
我们学习了java.我会OA系统开发,我会crm系统开发.
业务系统,我们的开发时满足了业务需求!!!
老板问 : 你能不能写一个,1000万并发的业务系统呢?
1.2 分布式系统结构图
- Java语言基础
- JavaWeb基础
- 数据库技术(MySQL、Oracle)
- 主框架,(SSH-Struts2+Spring+Hibernate),(SSM=SpringMVC+Spring+Mybatis)(2个月)
- XML语言(DTD、Schema)
- Resultful 、ajax、JQuery、RPC、JSON、(WebServices,已过时)
- 应用集群(Zookeeper、Dobble、SpringCloud)
- 全文搜索框架、Lucene、Solr、elasticSearch、IK分词器
- 使用Redis缓存数据库
- 消息推送服务器ActiveMQ
- Linux技术
--大数据需要的技术(2.5月)
Hadoop开发生态
HDFS
MapReduce
Hive
Spark
flume
sqoop
hbase
pig
....
1.1. Hadoop是什么
The Apache™ Hadoop® project develops open-source software for reliable, scalable, distributed computing. |
根据官方的定义Hadoop是一个开源的分布式计算开发软件。
所谓的分布式计算,简单而言就是可以实现将数据逻辑的处理交予一个服务器集群处理。
- 分布式计算
- 分布式存储
1.2. 分布式必要性
在每天T级别的的数据系统,没有分布式的数据计算以及存储系统。想一下都恐怖。
Hadoop的出现,让本来需要投资过亿才可以实现的并发性系统,使用极低的成本就可以实现。
1.2.1. 分布式存储的必要性
--传统模式下载逻辑的实现
|
--分布式模式下载逻辑的实现
--分布式文件系统,让一个文件系统分布式放在不同的服务器里面。下载的时候可以在不同的服务器下载对应的数据块合并成一个文件。
这样可以提高下载上传的效率。其副本机制硬件损坏导致的数据丢失。
|
1.2.2. 分布式计算必要性
需求:现在有1000个文本文件,每个大小为2G,使用程序读取,并且计算输出计算的结果。
传统模式下,只能在一台电脑上面计算,效率极低。
|
在分布式系统下,系统会将计算分散在一个服务器集群里面执行需要的逻辑处理。极大地提交了程序的执行效率。
|
1.3. Hadoop包括哪些组件
1.3.1. 说明
Hadoop核心主要包括分布式存储以及分布式计算。分布式存储使用默认使用HDFS分布式文件系统,分布式计算使用MapReduce与YARN模块。
- HDFS,一个内置的分布式的文件系统
- MapReduce,分布式计算数据分析计算模块
- YARN,资源管理调度模块。MapReduce依赖该模块。
1.3.2. 结构图
|
2. 单机安装
所谓的单机安装,就是在单台服务器中安装Hadoop软件系统。
2.1. 安装准备
- Jdk1.8安装包
- linux虚拟机
- hadoop安装包,下载路径http://hadoop.apache.org/releases.html
2.2. Hadoop文件加载流程图
|
根据运行文件加载流程图,得出Hadoop需要配置的文件包括:
- hadoop-env.sh 文件,修改JDK环境变量
- core-site.xml 配置文件,修改hadoop的数据目录,以及指定使用的文件系统
- hdfs-site.xml配置文件,修改hdfs文件系统的副本个数
- mapred-site.xml配置文件,配置分析框架的参数,必须指定MapReduce的调度程序的类型
- yarn-site.xml配置文件,指定yarn调度框架Master(ResourceManager),指定调度策略
- slaves文件,数据节点(DataNode)加载文件
2.3. 安装步骤
2.3.1. 第一步:修改主机名
为了方便管理Hadoop主机,我们需要给不同的Hadoop主机一个唯一的主机名。所以在安装Hadoop之前,需要给主机修改一个主机名。
--修改主机名
使用命令:sudo vi /etc/sysconfig/network
|
--修改后,让主机名生效
使用命令:sudo hostname u4
退出,重新登录
|
--让主机名替代IP,作为访问的地址
编辑/etc/hosts文件的映射,使用命令:sudo vi /etc/hosts
|
测试是否生效,如果修改后使用 ping u4,返回ping通信息,说明成功
|
2.3.2. 第二步:创建一个hadoop系统用户
--这样方便使用hadoop用户管理hadoop
--先创建一个hadoop组
使用命令:groupadd hadoop
|
--在创建一个 hadoop用户,将用户的所属组设置为hadoop组
|
--授予hadoop用户有sudoer权限
通过输入visudo 命令,打开sudoers文件
增加 :hadoop ALL=(ALL) ALL命令,保存
|
cat /etc/sudoers文件,查看是否成功
|
问题:为什么需要给hadoop给予sudoers的权限呢?
答:是因为,如果不给hadoop用户sudoers的权限,即使使用sudo命令,也有一些系统功能是无法使用的。给了sudoers权限,hadoop可以通过sudo获得所有的系统访问权限!!
2.3.3. 第三步:使用Hadoop登录,上传相关安装软件
--修改WinSCP的root用户修改为新创建的hadoop用户
|
--上传安装包
|
2.3.4. 第四步:安装JDK
--创建一个app文件夹,将相关的软件统一安装到里面
|
--解压jdk安装包到app目录
使用命令:
tar -zxvf jdk-8u181-linux-x64.tar.gz
|
--配置全局JAVA_HOME以及PATH环境变量
编辑/etc/profile,使用命令sudo vi /etc/profile
注意:hadoop是普通用户,所以修改非属于它文件或者目录需要使用sudo开头获得管理权限修改
在profile文件最后追加
|
export JAVA_HOME=/home/hadoop/app/jdk1.8.0_181 export PATH=$PATH:$JAVA_HOME/bin |
--通过source /etc/profile让配置生效,并测试是否配置成功
|
2.3.5. 第五步:准备Hadoop文件
--解压Hadoop安装包到app文件夹,并且修改目录名hadoop
使用命令:tar -zxvf hadoop-2.9.1.tar.gz -C app/
|
进入app将hadoop-2.9.1 目录名修改为hadoop
|
--hadoop目录简单说明
|
bin:应用程序,主要用于hadoop操作,常用的就是hadoop命令
etc:hadoop配置文件
include:本地实现库的头文件,说明是使用C语言或者C++编写的
lib:本地实现库
sbin:也是应用程序,主要用于启动,关闭hadoop
share:Java实现类库
2.3.6. 第六步:修改配置文件
--配置文件说明
修改/home/hadoop/app/hadoop/etc/hadoop的
- hadoop-env.sh 文件,修改JDK环境变量
- core-site.xml 配置文件,修改hadoop的数据目录,以及指定使用的文件系统
- hdfs-site.xml配置文件,修改hdfs文件系统的副本个数
- mapred-site.xml配置文件,配置分析框架的参数,必须指定MapReduce的调度程序的类型
- yarn-site.xml配置文件,指定yarn调度框架Master(ResourceManager),指定调度策略
- slaves文件,将localhost修改为主机名u4
问题:配置的属性可以在哪里找到呢?
学习任何框架,必须要知道一个知识点,配置的属性都可以在代码里面找到。经分析,
配置文件的解释类一般都是,Configuration以及缩写,Builder的单词以及缩写
可以搜索hadoop源码包 “*-default.xml”,可以找到所有配置文件分别可以配置的属性有哪些
|
--hadoop-env.sh文件
指定Hadoop的JAVA_HOME环境变量
export JAVA_HOME=/home/hadoop/app/jdk1.8.0_181
|
--修改core-site.xml配置文件
core-site.xml是hadoop核心配置文件
<configuration> <!-- 必须指定使用的文件系统 --> <property> <name>fs.defaultFS</name> <!--注意:最后的斜杠必须加上--> <value>hdfs://u4:9000/</value> </property> <!--必须要指定hadoop数据文件的位置--> <property> <name>hadoop.tmp.dir</name> <!--注意:data文件必须事先创建好--> <value>/home/hadoop/app/hadoop/data</value> </property> </configuration> |
--修改hdfs-site.xml配置文件
hdfs-site.xml是HDFS文件系统的配置文件,用于配置HDFS配置参数
因为每个datanode上只能存放一个副本,现在是单机配置,数据节点只有一个,所以副本的个数只需要1就可以了
<configuration> <!--单机配置,数据副本只配置一个就可以--> <property> <name>dfs.replication</name> <value>1</value> </property> </configuration> |
只要配置以上两个配置文件,hadoop就可以启动了。但是无法使用MapReduce程序进行数据分析。所以建议还是将mapred-site.xml配置文件和yarn-site.xml。
--修改mapred-site.xml配置
mapred-site.xml配置文件用于MapReduce数据分析框架的参数,必须要指定使用哪个调度框架。
复制mapred-site.xml.template修改文件名为mapred-site.xml
使用命令: cp -v mapred-site.xml.template mapred-site.xml
|
编辑mapred-site.xml配置文件
<configuration> <!--指定调度使用的是hadoop自带的yarn--> <property> <name>mapreduce.framework.name</name> <value>yarn</value> </property> </configuration> |
--修改yarn-site.xml配置文件
yarn-site.xml配置文件用于设置YARN调度框架的参数,必须要要指定ResourceManager的节点。
必须要指定节点的选举规则
<configuration> <!-- Site specific YARN configuration properties --> <property> <!--指定ResourceManger--> <name>yarn.resourcemanager.hostname</name> <value>u4</value> </property> <!--指定节点调度选举规则--> <property> <name>yarn.nodemanager.aux-services</name> <value>mapreduce_shuffle</value> </property> </configuration>
|
--修改slaves文件
将localhost修改为主机名u4
|
2.3.7. 第七步:将Hadoop的命令加到环境变量
为了可以在任何目录的当前路径可以直接访问hadoop的bin以及sbin的命令,需要将这两个目录路径加入到PATH环境变量
--编辑/etc/profile文件
使用命令 sudo vi /etc/profile,修改为
export JAVA_HOME=/home/hadoop/app/jdk1.8.0_181 export HADOOP_HOME=/home/hadoop/app/hadoop export PATH=$PATH:$JAVA_HOME/bin:$HADOOP_HOME/bin:$HADOOP_HOME/sbin |
--让环境变量生效
|
2.3.8. 第八步:格式化HDFS文件系统
--使用命令: hadoop namenode -format
--如果不报异常,表示成功。
|
成功后,会在hadoop配置为data数据目录产生数据文件
|
2.3.9. 第九步,启动HDFS文件系统
--首先启动hdfs文件系统
使用start-dfs.sh
|
--测试HDFS文件系统是否成功
上传文件到HDFS文件系统
使用命令:hadoop fs -put jdk-8u181-linux-x64.tar.gz hdfs://u4:9000/
注意:上传端口默认为9000,路径的最后的斜杠不要写漏
|
--使用web客户端查看
注意:web端口默认为50070
|
2.3.10. 第十步:启动YARN调度程序
--启动yarn调度程序
启动yarn调度程序,才可以执行MapReduce分析程序
使用start-yarn.sh命令启动
工作原理
|
--测试是否成功
需求:通过hadoop自带的示例程序,测试分析文本中,单词的出现次数。
|
先创建一个文本,起名为demo.txt。随便写入文字
|
在hdfs文件系统一个wordcount文件夹(文件夹随便你写)
hadoop fs -mkdir hdfs://u4:9000/wordcount/ |
|
再在wordcount文件夹里面创建一个 input文件夹(文件夹名也是随便你写)
hadoop fs -mkdir hdfs://u4:9000/wordcount/input |
|
执行测试MapReduce测试示例程序
hadoop jar hadoop-mapreduce-examples-2.9.1.jar wordcount /wordcount/input /wordcount/output
使用格式:hadoop jar <mapreduce程序> <入口类名> [参数列表]
mapreduce]$ hadoop jar hadoop-mapreduce-examples-2.9.1.jar wordcount /wordcount/input /wordcount/output |
|
下载分析数据文件,查看
|
2.4. HDFS工作原理
HDFS主要分为:NameNode、SecondaryNameNode以及DataNode三个部分。
1.NameNode:名字子节点。用于统一接收客户端的请求,让客户端获得操作对应的数据节点。
2.SecondaryNameNode:日志备份节点,是HDFS的数据操作的备份机制。通过它来确保数据的有效性
3.DataNode:数据节点,用于存储数据
操作流程,如图所示:
|
3. 集群部署
3.1. 集群部署是什么
所谓的集群部署就是让Hadoop系统软件在多台机器同时运行,并且协同分布式处理数据。
3.2. 集群部署结构图
- 集群后,一台NameNode可以调动多台DataNode。
|
2.集群后,一台ResourceManager可以调动多台NodeManager
|
- 上传的文件可以分布式地分块放在不同的Linux系统的HDFS文件系统上
- 下载时可以在不同的服务器获得文件块。合成下载的文件。
3.3. 部署准备
- 准备三台Linux虚拟机
- 配置linux系统免密登录
3.4. Linux虚拟机的准备
3.5. 免密登录的实现
上面的安装,发现每次启动Hadoop或者关闭都要手工输入密码登录。但是如果集群部署,同时要启动所有的NameNode以及DataNode。那么这样操作非常麻烦。
如果要实现同时一起启动所有的hadoop的NameNode以及所有DataNode。需要配置免密登录。
3.5.1. 免密登录是什么
所谓远程登录Linux系统不需要任何密码。
首先第一步,我们要知道远程登录Linux使用的ssh协议。
SSH 是 Secure Shell的简写。就是一种安全的通信的协议。
Linux的远程登录就是使用该协议的。Linux实现的SSH的远程登录软件支持用户密码登录,也支持密钥登录。
免密登录,就是通过密钥登录。
3.5.2. Linux免密登录原理图
准备知识:Linux的远程登录使用SSH协议基于非对称加密算法(RSA)。
必须要指定,所谓的非对称算法,就是加密的密钥和解密的密钥分别是一个密钥对。
- 公钥加密,必须使用私钥解密
- 私钥加密,必须使用公钥解密
|
3.5.3. 操作步骤
3.5.3.1. 第一步:使用ssh_keysgen创建一个密钥对
使用hadoop用户登录,然后使用ssk-keygen命令创建一个密钥对。
|
在hadoop的个人文件夹下。使用ll -a查看是否已经创建了.ssh文件夹
问题:为什么需要使用-a参数呢?
答:因为.开头的文件夹是隐藏文件夹,必须使用-a才可以显示。 -a:表示显示所有类型的文件已经文件夹包括隐藏的
注意:文件默认的权限是 700的,千万不要修改。免密登录要求,.ssh文件必须为700权限。
|
3.5.3.2. 第二步:进入.ssh文件夹,创建authorized_keys
进入.ssh文件夹,创建authorized_keys文件,将公钥复制到authorized_keys
通过命令:cat id_rsa.pub >> authorized_keys
|
--查看是否成功
通过命令:cat authorized_keys命名查看是否成功
|
3.5.3.3. 第三步:修改authorized_keys文件权限
Linux系统规定authorized_keys必须是600权限,免密登录才生效。
通过命令: chmod 600 authorized_keys
|
3.5.3.4. 第四步:压缩.ssh文件夹
回到上一层,使用命令 tar -czvf ssh.tar.gz .ssh/压缩。
|
3.5.3.5. 第五步:通过scp命名上传到其他Linux系统
上传到远程系统的指定路径,通过 scp ssh.tar.gz 远程主机用户@主机名: 远程路径
如:上传到u2服务器,使用命令:scp ssh.tar.gz hadoop@u2:/home/hadoop/ssh.tar.gz
|
3.5.3.6. 第六步:登录远程主机
通过ssh命令登录远程主机。
执行命令为:ssh u2,根据提示输入密码
|
--登录成功,当前主机名会修改为u2
|
3.5.3.7. 第七步:登录后,解压ssh文件到当前文件夹
|
--查看
|
3.5.3.8. 第八步:回到u1主机测试
通过exit命名,退回到u1服务器
|
再使用ssh u2命令登录,确认是否不用输入密码了。如果是,表示配置成功
|
注意:重复这个步骤,所有需要免密登录的机器就实现所有机器的免密登录了
3.6. 集群部署步骤
3.6.1. 第一步:配置slaves配置文件
配置完成后,将用于NameNode和ResourceManger的服务器的slaves增加数据节点的配置就可以
编辑u1服务器的hadoop的slaves配置文件
--进入到slaves目录
|
--编辑 vi slaves,内容如下:
u1 u2 u3 |
3.6.2. 第二步:修改所有服务器的hdfs-site.xml配置
--文件的副本数据,必须大于1,必须小于等于数据节点的个数。
|
3.6.3. 第三步:启动测试
回到u1服务器启动hadoop测试。
--启动hdfs
|
--启动yarn
|
3.7. 工作原理
3.7.1. 上传工作原理
|
3.7.2. 下载工作原理
|
4. HDFS文件系统(分布式存储)
4.1. HDFS是什么
HDFS全称:Hadoop Distributed File System,就是Hadoop分布式文件系统。
实现了将数据分布在多台的Linux系统上。从而提高了文件的存取效率。通过副本机制,降低数据丢失的概率。
4.2. HDFS常用操作命令
hadoop fs -put :上传
hadoop fs -get :下载
hadoop fs -mkdir : 创建文件夹
hadoop fs -rm : 删除文件或者文件夹
hadoop fs -ls :查看文件目录
hadoop fs -chown :修改文件或者文件夹的所属者
hadoop fs -cp :复制
hadoop fs -chmod 授权
hadoop fs -du : 查看文件夹大小
hadoop fs -mv :移动文件夹或者目录
hadoop fs -cat :查看文件内容
4.3. Java API操作HDFS
4.3.1. 说明
HDFS分布式文件系统提供了Java API操作的支持。
需求:使用Java API实现对HDFS的文件的上传。
4.3.2. 配置步骤
4.3.2.1. 第一步:搭建环境
--创建一个普通的Java的项目,导入必须的Jar包
将标红的的文件夹下的所有jar包复制项目
|
--项目结构
|
4.3.2.2. 第二步:复制Hadoop的配置文件到项目
连接HDFS分布式文件系统,需要使用到core-site.xml和hdfs-site.xml配置文件。
所以复制这两个配置文件到项目的classpath根本目录。
|
4.3.2.3. 第三步:创建HdfsUtils类
创建HdfsUtils获得HDFS操作对象FileSystem对象。
package org.ranger.utils; import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; public class HdfsUtils { //获得配置建立连接,获得HDFS操作对象 public static FileSystem getFileSystem() { FileSystem fs=null; //1.创建配置类,读取配置文件 Configuration conf=new Configuration(); //2.创建HDFS操作对象FileSystem对象 try { //参数1:表示HDFS的操作路径 //参数2:设置的参数 //参数3:HDFS的操作用户,必须与HDFS的用户一一对应 fs=FileSystem.get(new URI("hdfs://u1:9000/"), conf, "hadoop"); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (URISyntaxException e) { // TODO Auto-generated catch block e.printStackTrace(); } return fs; } public static void main(String[] args) { //如果不报错,输出信息,表示成功 System.out.println(HdfsUtils.getFileSystem()); } } |
4.3.2.4. 第四步:编写操作代码
package org.ranger.test; import java.io.FileInputStream; import java.io.IOException; import org.apache.commons.io.IOUtils; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.junit.Test; import org.ranger.utils.HdfsUtils; public class HdfsUploadTest { /** * 需求:将本地文件上传到HDSF */ @Test public void fileupload() { //1.获得操作对象 FileSystem fs = HdfsUtils.getFileSystem(); try { //2.设置上传到指定的HDFS的目录,获得一个输出流 FSDataOutputStream output = fs.create(new Path("/wc/input/demo.txt")); //3.获得本地的文件流 FileInputStream input=new FileInputStream("C:/wc/input/demo.txt"); //4.将文件输入流复制到输出流中 IOUtils.copy(input, output); //5.关闭流 input.close(); output.close(); fs.close(); } catch (IllegalArgumentException e) { e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); } } } |
4.3.2.5. 注意事项
上传之前,HDFS上面的目录结构是否正确,设置的用户是否和代码的设置的使用兼容
4.3.3. 实现上传、下载、删除、查看
实现上传、下载、删除、查看目录、查看文件,创建文件夹。
package org.ranger.test; import java.io.File; import java.io.FileOutputStream; import java.io.IOException; import java.net.URISyntaxException; import org.apache.commons.io.IOUtils; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.LocatedFileStatus; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.RemoteIterator; import org.junit.Test; import org.ranger.utils.HdfsUtils; //注意:hdfs是不支持修改的,修改需要先下载修改再上传。 public class HdfsCurdTest1 { // 1.下载 @Test public void download() throws IOException { FileSystem fs = HdfsUtils.getFileSystem(); Path path = new Path("hdfs://u1:9000/hadoop-2.9.1.tar.gz"); // 将Hadoop的文件下载到本地 FSDataInputStream in = fs.open(path); FileOutputStream out = new FileOutputStream(new File("c://hadoop-2.9.1.tar.gz")); IOUtils.copy(in, out); } // 2.上传,使用copyFromLocalFile @Test public void upload() throws IOException, InterruptedException, URISyntaxException { FileSystem fs = HdfsUtils.getFileSystem(); fs.copyFromLocalFile(new Path("c://demo.txt"), new Path("/demo2.txt")); } //3.创建文件夹 @Test public void mkdir() throws IllegalArgumentException, IOException { FileSystem fs = HdfsUtils.getFileSystem(); fs.mkdirs(new Path("/test/aa")); } //4 .删除文件夹或者文件 @Test public void rm() throws IllegalArgumentException, IOException { FileSystem fs = HdfsUtils.getFileSystem(); fs.delete(new Path("/test"), true); } //5.查看文件目录 @Test public void dirList() throws IllegalArgumentException, IOException { FileSystem fs = HdfsUtils.getFileSystem(); FileStatus[] status = fs.listStatus(new Path("/")); for (FileStatus fileStatus : status) { System.out.println(fileStatus.getPath().getName()); } } //6.查看所有文件 @Test public void fileList() throws IllegalArgumentException, IOException { FileSystem fs = HdfsUtils.getFileSystem(); RemoteIterator<LocatedFileStatus> listFiles = fs.listFiles(new Path("/"), true); while (listFiles.hasNext()) { LocatedFileStatus status = listFiles.next(); System.out.println(status.getPath().getName()); } } } |
5. 实战案例-网盘的实现
5.1. 需求
实现一个类似百度网盘的的分布式网盘。能够实现对网盘上的文件增删查,可以对文件夹创建,上传,改名
5.2. 思路
- 使用Servlet+JavaBean+JDBC+MySQL技术实现用户登录、注册。
- 用户登录后,如果该用户在HDFS没有对应的目录,创建一个该用户的目录
- 使用HDFS的操作接口实现HDFS的文件操作
5.3. 准备
数据库SQL脚本
CREATE TABLE `tb_user` ( `user_id` BIGINT(20) NOT NULL AUTO_INCREMENT COMMENT '用户编号', `user_name` VARCHAR(50) NULL DEFAULT NULL COMMENT '用户名', `user_account` VARCHAR(50) NULL DEFAULT NULL COMMENT '用户登录账户', `user_password` VARCHAR(50) NULL DEFAULT NULL COMMENT '密码', `user_status` VARCHAR(50) NULL DEFAULT NULL COMMENT '状态,0可用,1禁用', PRIMARY KEY (`user_id`), UNIQUE INDEX `user_account` (`user_account`) ) ENGINE=InnoDB ; |
5.4. 实现步骤
5.4.1. 第一部分:编写公共代码
5.4.1.1. 第一步:创建一个动态网站项目
--创建一个动态网站项目,并且将三层架构的分包创建好,以及将需要的基础类的类名也写好
|
5.4.1.2. 第二步:编写pojo代码
--注意:属性和数据库对应表的字段一一对应
package org.ranger.pojo; public class User { private String userId;// BIGINT(20) '用户编号', private String userName;// VARCHAR(50) '用户名', private String userAccount;// VARCHAR(50) '用户登录账户', private String userPassword;// VARCHAR(50) '密码', private Integer userStatus;// VARCHAR(50) '状态,0可用,1禁用', public String getUserId() { return userId; } public void setUserId(String userId) { this.userId = userId; } public String getUserName() { return userName; } public void setUserName(String userName) { this.userName = userName; } public String getUserAccount() { return userAccount; } public void setUserAccount(String userAccount) { this.userAccount = userAccount; } public String getUserPassword() { return userPassword; } public void setUserPassword(String userPassword) { this.userPassword = userPassword; } public Integer getUserStatus() { return userStatus; } public void setUserStatus(Integer userStatus) { this.userStatus = userStatus; } } |
5.4.1.3. 第三步:编写DBUtils帮助类
编写DBUtils类,获得数据库连接。
注意:要导入mysql的驱动包。
--DBUtils类代码
package org.ranger.utils; import java.sql.Connection; import java.sql.DriverManager; import java.sql.SQLException; public class DbUtils { //连接数据库四要素 private static final String DRIVER="org.gjt.mm.mysql.Driver"; private static final String URL="jdbc:mysql://localhost:3306/network_disk"; private static final String USER="root"; private static final String PASSWORD="123456"; public static Connection getConnection() { try { //1.加载驱动 Class.forName(DRIVER); //2.返回连接对象 return DriverManager.getConnection(URL, USER, PASSWORD); } catch (SQLException e) { e.printStackTrace(); } catch (ClassNotFoundException e) { e.printStackTrace(); } //3.如果报异常,返回空对象 return null; } public static void main(String[] args) { System.out.println(DbUtils.getConnection()); } } |
5.4.1.4. 第四步:编写DAO的实现
--DAO接口
package org.ranger.dao; import java.sql.SQLException; public interface DAO<T> { /** * * 插入记录 * * @param entity * @return * @throws SQLException */ int insert(T entity) throws SQLException; /** * 通过编号删除记录 * * @param id * @return * @throws SQLException */ int deleteById(Long id) throws SQLException; /** * 通过编号查询记录 * @param id * @return * @throws SQLException */ T findById(Long id) throws SQLException; } |
--实现部分,UserDAO
package org.ranger.dao; import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; import org.ranger.pojo.User; import org.ranger.utils.DbUtils; public class UserDAO implements DAO<User>{ @Override public int insert(User entity) throws SQLException { //1.获得数据库连接 Connection conn = DbUtils.getConnection(); String sql ="INSERT INTO tb_user(user_name, user_account, user_password, user_status) VALUES (?, ?, ?, ?)"; //2.获得操作对象 PreparedStatement ps = conn.prepareStatement(sql); //3.设置参数 ps.setString(1, entity.getUserName()); ps.setString(2, entity.getUserAccount()); ps.setString(3, entity.getUserPassword()); //如果为空,设置为0 ps.setInt(4,entity.getUserStatus()==null?0:entity.getUserStatus()); //4.操作 int count = ps.executeUpdate(); ps.close(); conn.close(); return count; } @Override public int deleteById(Long id) throws SQLException { //1.获得数据库连接 Connection conn = DbUtils.getConnection(); String sql ="DELETE FROM tb_user WHERE user_id=?"; //2.获得操作对象 PreparedStatement ps = conn.prepareStatement(sql); //3.设置参数 ps.setLong(1, id); //4.操作 int count = ps.executeUpdate(); ps.close(); conn.close(); return count; } @Override public User findById(Long id) throws SQLException { //1.获得数据库连接 Connection conn = DbUtils.getConnection(); String sql ="SELECT * FROM tb_user WHERE user_id = ?"; //2.获得操作对象 PreparedStatement ps = conn.prepareStatement(sql); //3.设置参数 ps.setLong(1, id); //4.操作 ResultSet rs = ps.executeQuery(); User user =null; if(rs.next()) { user =new User(); user.setUserId(rs.getString("user_id")); user.setUserName(rs.getString("user_name")); user.setUserAccount(rs.getString("user_account")); user.setUserPassword(rs.getString("user_password")); user.setUserStatus(rs.getInt("user_status")); } ps.close(); conn.close(); return user; } public User findByAccount(String account) throws SQLException { //1.获得数据库连接 Connection conn = DbUtils.getConnection(); String sql ="SELECT * FROM tb_user WHERE user_account = ?"; //2.获得操作对象 PreparedStatement ps = conn.prepareStatement(sql); //3.设置参数 ps.setString(1, account); //4.操作 ResultSet rs = ps.executeQuery(); User user =null; if(rs.next()) { user =new User(); user.setUserId(rs.getString("user_id")); user.setUserName(rs.getString("user_name")); user.setUserAccount(rs.getString("user_account")); user.setUserPassword(rs.getString("user_password")); user.setUserStatus(rs.getInt("user_status")); } ps.close(); conn.close(); return user; } } |
5.4.1.5. 第五步:测试插入数据
package org.ranger.dao.test; import java.sql.SQLException; import org.junit.Test; import org.ranger.dao.UserDAO; import org.ranger.pojo.User; public class UserDAOTest { @Test public void insert() { UserDAO userDAO=new UserDAO(); User user =new User(); try { user.setUserName("张三"); user.setUserAccount("zhangsan"); userDAO.insert(user ); } catch (SQLException e) { e.printStackTrace(); } } } |
5.4.2. 第二部分:用户功能实现
5.4.2.1. 用户注册
--编写一个注册页面
<%@ page language="java" contentType="text/html; charset=UTF-8" pageEncoding="UTF-8"%> <!DOCTYPE html> <html> <head> <meta charset="UTF-8"> <title>用户注册</title> <style type="text/css"> body { margin: 5px; vertical-align: middle; text-align: center; } input { margin: 5px; } </style> </head> <body> <!--提示信息--> <span style="color: red;"> ${register_msg }</span>
<form action="${pageContext.request.contextPath }/user" method="post"> <input name="action" type="hidden" value="register"> 用户账号:<input name="userAccount" type="text"><br/> 用户密码:<input name="userPassword" type="password"><br/> 确认密码:<input name="confirmPassword" type="password"><br/> <input value="用户注册" type="submit">
</form> </body> </html> |
--修改UserDAO增加一个通过账号查询记录的方法
public User findByAccount(String account) throws SQLException { //1.获得数据库连接 Connection conn = DbUtils.getConnection(); String sql ="SELECT * FROM tb_user WHERE user_account = ?"; //2.获得操作对象 PreparedStatement ps = conn.prepareStatement(sql); //3.设置参数 ps.setString(1, account); //4.操作 ResultSet rs = ps.executeQuery(); User user =null; if(rs.next()) { user =new User(); user.setUserId(user.getUserId()); user.setUserName(user.getUserName()); user.setUserAccount(user.getUserAccount()); user.setUserPassword(user.getUserPassword()); user.setUserStatus(user.getUserStatus()); } ps.close(); conn.close(); return user; } |
--编写UserService代码
接口部分
package org.ranger.service; import java.sql.SQLException; import org.ranger.pojo.User; public interface UserService { /** * 用户注册 * @param user * @return * @throws SQLException */ User register(User user) throws SQLException; } |
实现部分
package org.ranger.service.impl; import java.sql.SQLException; import org.ranger.dao.UserDAO; import org.ranger.pojo.User; import org.ranger.service.UserService; public class UserServiceImpl implements UserService { @Override public User register(User user) throws SQLException { UserDAO userDAO=new UserDAO(); int count = userDAO.insert(user); //如果插入成功,返回该用户 if(count>0) { return userDAO.findByAccount(user.getUserAccount()); } return null; } } |
--编写UserServlet控制器
package org.ranger.web.servlet; import java.io.IOException; import javax.servlet.ServletException; import javax.servlet.http.HttpServlet; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; import javax.servlet.http.HttpSession; import org.ranger.pojo.User; import org.ranger.service.UserService; import org.ranger.service.impl.UserServiceImpl; public class UserServlet extends HttpServlet { private static final long serialVersionUID = -6221395694585506253L; @Override public void service(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException { System.out.println(req.getParameter("userAccount")); String action = req.getParameter("action"); switch (action) { case "register": this.register(req, resp); break; default: System.out.println("-找不到调用的方法-"); break; } } /** * 用户注册 * * @param req * @param resp * @throws IOException * @throws ServletException */ private void register(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException { try { //1.获得表单参数 System.out.println("-用户注册-"); String userAccount = req.getParameter("userAccount"); String userPassword = req.getParameter("userPassword"); String confirmPassword = req.getParameter("confirmPassword"); //2.验证两次输入的密码是正确 if (userPassword.equals(confirmPassword)) { UserService userService = new UserServiceImpl(); User user = new User(); user.setUserAccount(userAccount); user.setUserPassword(userPassword); user.setUserStatus(0); User resultUser = userService.register(user); HttpSession session = req.getSession(); //注册成功,将用户数据放在会话 session.setAttribute("user", resultUser); req.setAttribute("register_msg", "注册成功!"); //注册成功自动登录 req.getRequestDispatcher("/WEB-INF/views/main.jsp").forward(req, resp); return ; } else { req.setAttribute("register_msg", "两次输入的密码不一致"); } } catch (Exception e) { e.printStackTrace(); req.setAttribute("register_msg", "出现系统错误,请联系管理员"); } req.getRequestDispatcher("/user_register.jsp").forward(req, resp); } } |
--编写一个编码过滤器,统一处理表单中文乱码问题
package org.ranger.web.filter; import java.io.IOException; import javax.servlet.Filter; import javax.servlet.FilterChain; import javax.servlet.FilterConfig; import javax.servlet.ServletException; import javax.servlet.ServletRequest; import javax.servlet.ServletResponse; /** * 统一设置编码集 * @author ranger * */ public class CharsetFilter implements Filter { private static String encoding="UTF-8"; @Override public void destroy() { } @Override public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain) throws IOException, ServletException { request.setCharacterEncoding(encoding); response.setCharacterEncoding(encoding); chain.doFilter(request, response); } @Override public void init(FilterConfig filterConfig) throws ServletException { String param = filterConfig.getInitParameter("encoding"); if(param!=null&&!"".equals(param)) { encoding=param; } } } |
--配置web.xml
<?xml version="1.0" encoding="UTF-8"?> <web-app xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://java.sun.com/xml/ns/javaee" xsi:schemaLocation="http://java.sun.com/xml/ns/javaee http://java.sun.com/xml/ns/javaee/web-app_3_0.xsd" id="WebApp_ID" version="3.0"> <display-name>network_disk</display-name> <!-- 编码集过滤器 --> <filter> <filter-name>charsetFilter</filter-name> <filter-class>org.ranger.web.filter.CharsetFilter</filter-class> </filter> <filter-mapping> <filter-name>charsetFilter</filter-name> <url-pattern>/*</url-pattern> </filter-mapping> <!-- 配置servlet --> <servlet> <servlet-name>userServlet</servlet-name> <servlet-class>org.ranger.web.servlet.UserServlet</servlet-class> </servlet> <servlet-mapping> <servlet-name>userServlet</servlet-name> <url-pattern>/user</url-pattern> </servlet-mapping> <welcome-file-list> <welcome-file>index.jsp</welcome-file> </welcome-file-list> </web-app> |
5.4.2.2. 用户登录
--创建一个登陆页面
<%@ page language="java" contentType="text/html; charset=UTF-8" pageEncoding="UTF-8"%> <!DOCTYPE html> <html> <head> <meta charset="UTF-8"> <title>用户注册</title> <style type="text/css"> body { margin: 5px; vertical-align: middle; text-align: center; } input { margin: 5px; } </style> </head> <body> <span style="color: red;"> ${login_msg }</span>
<form action="${pageContext.request.contextPath }/user" method="post"> <input name="action" type="hidden" value="login"> 用户账号:<input name="userAccount" type="text"><br/> 用户密码:<input name="userPassword" type="password"><br/>
<input value="用户登录" type="submit">
</form> </body> </html> |
---修改UserService接口,标红代码
public interface UserService { /** * 用户注册 * @param user * @return * @throws SQLException */ User register(User user) throws SQLException; /** * 用户登录 * @param user * @return * @throws SQLException */ User login(User user) throws SQLException; } |
--修改UserServiceImpl实现
public class UserServiceImpl implements UserService { @Override public User register(User user) throws SQLException { UserDAO userDAO = new UserDAO(); int count = userDAO.insert(user); // 如果插入成功,返回该用户 if (count > 0) { return userDAO.findByAccount(user.getUserAccount()); } return null; } @Override public User login(User user) throws SQLException { try { UserDAO userDAO = new UserDAO(); User resultUser = userDAO.findByAccount(user.getUserAccount()); if (user.getUserPassword().equals(resultUser.getUserPassword())) { return resultUser; } } catch (Exception e) { e.printStackTrace(); } return null; } } |
--修改UserServlet代码
public class UserServlet extends HttpServlet { private static final long serialVersionUID = -6221395694585506253L; @Override public void service(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException { System.out.println(req.getParameter("userAccount")); String action = req.getParameter("action"); switch (action) { case "register": this.register(req, resp); break; case "login": this.login(req, resp); break; default: System.out.println("-找不到调用的方法-"); break; } } private void register(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException { ..........忽略代码 } /** * 用户登录 * @param req * @param resp * @throws IOException * @throws ServletException */ private void login(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException { System.out.println("-用户登录-"); String userAccount = req.getParameter("userAccount"); String userPassword = req.getParameter("userPassword"); UserService userService = new UserServiceImpl(); User user = new User(); user.setUserAccount(userAccount); user.setUserPassword(userPassword); try { User resultUser = userService.login(user); if (resultUser != null) { HttpSession session = req.getSession(); session.setAttribute("user", resultUser); req.getRequestDispatcher("/WEB-INF/views/main.jsp").forward(req, resp); return; } else { req.setAttribute("login_msg", "用户名或者密码不正确"); } } catch (SQLException e) { System.out.println("-登录失败-"); req.setAttribute("login_msg", "出现系统异常,请联系管理员"); e.printStackTrace(); } req.getRequestDispatcher("/user_login.jsp").forward(req, resp); } } |
5.4.2.3. 用户注销
--编写主界面代码
<%@ page language="java" contentType="text/html; charset=UTF-8" pageEncoding="UTF-8"%> <!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" "http://www.w3.org/TR/html4/loose.dtd"> <html> <head> <meta http-equiv="Content-Type" content="text/html; charset=UTF-8"> <style type="text/css"> body { margin: 0; } a{ color: blue; } a:hover { color: blue; } </style> </head> <body> <div style="text-align: right; padding: 8px"> <span style="padding-right: 10px;"> [${sessionScope.user.userAccount}]</span> <span style="padding-right: 10px;"> <a href="${pageContext.request.contextPath }/user?action=undo">[注销]</a> </span> </div> </body> </html> |
--修改UserServlet代码
package org.ranger.web.servlet; import java.io.IOException; import java.sql.SQLException; import javax.servlet.ServletException; import javax.servlet.http.HttpServlet; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; import javax.servlet.http.HttpSession; import org.ranger.pojo.User; import org.ranger.service.UserService; import org.ranger.service.impl.UserServiceImpl; public class UserServlet extends HttpServlet { private static final long serialVersionUID = -6221395694585506253L; @Override public void service(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException { String action = req.getParameter("action"); switch (action) { case "register": this.register(req, resp); break; case "login": this.login(req, resp); break; case "undo": this.undo(req, resp); break; default: System.out.println("-找不到调用的方法-"); req.setAttribute("login_msg", "找不到调用的方法"); req.getRequestDispatcher("/user_login.jsp").forward(req, resp); break; } } private void register(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException { try { // 1.获得表单参数 System.out.println("-用户注册-"); .......忽略代码 } private void login(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException { System.out.println("-用户登录-"); .......忽略代码 } /** * 用户注销 * @param req * @param resp * @throws IOException * @throws ServletException */ private void undo(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException { System.out.println("-用户注销-"); HttpSession session = req.getSession(); session.removeAttribute("user"); req.getRequestDispatcher("/user_login.jsp").forward(req, resp); } } |
5.4.3. 第三部分:网盘功能
5.4.3.1. 查看网盘
思路:
--连接获得HDFS连接操作对象
--在注册的时候,就给用户创建一块专属文件夹,用其用户命名
--将该该文件夹的所属者修改为该用户名
6. MapReduce(分布式计算)
6.1. MapReduce是什么
MapReduce是Hadoop里面的分布式数据计算模块(框架)。通过MapReduce可以让数据处理在分布式的Linux计算后再聚合返回。
6.2. MapReduce运行流程图
|
如果所示:MapReduce分为Map和Reduce两部分。
Map部分就是分布式计算,demo.txt每一条数据,都会执行一次Map计算。
Reduce部分是分布汇总分组计算,必须在Map计算结束后,再分组汇总计算。
6.3. 入门示例
6.3.1. 说明
通过MapReduce程序实现,统计demo.txt 文本的单词统计。
demo.txt文本内容
hello world hello tom hello boy hello girl hello jim hello jack |
输出结果为:
boy 1 girl 1 hello 6 jack 1 jim 1 tom 1 world 1 |
6.3.2. 配置步骤
6.3.2.1. 第一步:搭建环境
--创建一个Java普通项目,导入Hadoop依赖的jar包
jar放在hadoop安装包的share/hadoop目录下,如图标红的文件夹的所有jar包复制到项目。
|
创建Java项目,并且创建一个存放MapReduce程序的包
|
--创建一个demo.txt文件,并写好内容。再将文件上传对HDFS的目录中,如:/wc/deme.txt
创建一个demo.txt在当前linux用户个人目录
|
编辑demo.txt文件
|
上传文件到hdfs文件系统
|
6.3.2.2. 第二步:编写一个WcMapper类
Mapper类是一个规范,所以必须需要继承Mapper类
package org.ranger.mapredure.wc; import java.io.IOException; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; /** * 注意事项:Mapper类必须需要指定四个数据类型(KEYIN, VALUEIN, KEYOUT, VALUEOUT) * * KEYIN:传入Mapper数据key的类型,Hadoop传入一个数据偏移量作为key * VALUEIN:传入Mapper数据value的类型 * * KEYOUT:Mapper传出的key的类型 * VALUEOUT:Mapper传出的value的类型 * * 问题:为什么数据类型使用LongWritable, Text,而不是Long,String呢? * 答:因为Java实行的数据类型,序列化会包含大量信息,Hadoop基于效率的原因重写了这些类型 * @author ranger * * 注意事项:不管传入传出,Mapper都是使用key-value的方式传递 * */ public class WcMapper extends Mapper<LongWritable, Text, Text, LongWritable>{ /** * 每条数据计算,都执行一次!! */ @Override protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, LongWritable>.Context context) throws IOException, InterruptedException { //1.获得Hadoop传递过来的数据 String line = value.toString(); //2.分割数据 String[] fields = line.split(" "); //3.传出数据 for(String field:fields) { //<字段名,1> context.write(new Text(field), new LongWritable(1)); } } } |
6.3.2.3. 第三步:编写一个WcReducer类
package org.ranger.mapredure.wc; import java.io.IOException; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; /** * 分组聚合计算 * 注意事项:传入的key-value和Mapper传出的保持一致 * * @author ranger * */ public class WcReducer extends Reducer<Text, LongWritable, Text, LongWritable> { @Override protected void reduce(Text key, Iterable<LongWritable> values, Reducer<Text, LongWritable, Text, LongWritable>.Context context) throws IOException, InterruptedException {
//1.声明一个变量,统计单词的个数 long count=0; for (LongWritable value : values) { count+=value.get(); } //2.输出结果 context.write(key, new LongWritable(count)); } } |
6.3.2.4. 第四步:编写一个WcRunner类
package org.ranger.mapredure.wc; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; /** * 执行MapReduce程序的入口 * * 1.设置 * * @author ranger * */ public class WcRunner { public static void main(String[] args) { try { //1.创建一个job对象 Configuration conf=new Configuration(); Job job = Job.getInstance(conf); //设置入口的主类 job.setJarByClass(WcRunner.class); //设置job对应的Mapper和Reducer job.setMapperClass(WcMapper.class); job.setReducerClass(WcReducer.class); //设置Mapper的输出类型 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(LongWritable.class); //设置reduce的输出类型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(LongWritable.class); //map设置数据的获得 FileInputFormat.setInputPaths(job, new Path("/wc/")); FileOutputFormat.setOutputPath(job, new Path("/output/")); //提交job job.waitForCompletion(true); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (ClassNotFoundException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } |
6.3.2.5. 第五步:导出jar包
注意:因为放在Hadoop里面运行,所以可以导jar包时,不用勾上lib。
|
6.3.2.6. 上传jar文件
将jar文件上传到任何一个Hadoop服务器上。放在Hadoop的MapReduce目录下
/home/hadoop/app/hadoop/share/hadoop/mapreduce
6.3.2.7. 执行mapredurce程序
执行格式:hadoop jar wc.jar 入口类全名
如:hadoop jar wc.jar org.ranger.mapredure.wc.WcRunner
|
6.3.2.8. 查看输出结果
使用:hadoop fs -ls /output目录查看输出文件
在使用:hadoop fs -cat /output/part-r-00000查看数据结果
|
6.4. MapRedurce 运行原理图
|
- 在运行hadoop jar wc.jar org.rangermareduce.WcRunner时,会产生一个RunJar的进程
- RunJar进程会向YARN的ResourceManager请求一个job。
- ResourceManager会将RunJar的Job放在队列中,并且返回相关资源的提交路径。
- RunJar根据返回的路径提交资源到HFDS文件系统。
- ResourceManger的队列执行到job任务,会首先让NodeManager领取任务。
- NodeManager领取资源后,会在创建一个资源容器。
- ResourceManager在其中一个NodeManager的资源容器创建一个MRAppMaster进程
- 通过MRAPPMaster协调NodeManager执行mapper任务
- 在mapper任务结束后,MRAppMaster再协调NodeManager执行Reduce任务
- 在所有任务结束后,MRAppMaster向ResourceManager申请注销自己
6.5. 本地模式
如果,程序调试都是放在Hadoop上面执行,会大大降低开发的效率。MapReduce支持本地模式。
所谓的本地模式,就是在本机测试,不需要放在Hadoop上面,MapReduce也可以运行。
注意事项:必须使用64位的JDK。否则会报错。
|
6.5.1. 第一步:配置Hadoop的环境变量
--将hadoop-2.9.1.tar.gz解压到D:javahadoop-2.9.1
|
--配置环境变量
HADOOP_HOME=D:javahadoop-2.9.1
|
PATH=%HADOOP_HOME%in
|
--测试环境变量是否成功
在cmd下使用set命令查看环境变量
|
6.5.2. 第二步:需要winutils.exe支持
--下载winutils.exe路径
https://github.com/steveloughran/winutils/releases
--解压将所有文件复制到hadoop的bin目录
|
6.5.3. 第三步:将demo.txt放在本地目录
|
6.5.4. 第四步:修改WcRunner
修改WcRunner的输入输出路径为本地路径
package org.ranger.mapredure.wc; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; /** * WcRunner 执行MapReduce程序的入口 * @author ranger * */ public class WcRunner { public static void main(String[] args) { try { //1.创建一个job对象 Configuration conf=new Configuration(); Job job = Job.getInstance(conf) //设置入口的主类 job.setJarByClass(WcRunner.class); //设置job对应的Mapper和Reducer job.setMapperClass(WcMapper.class); job.setReducerClass(WcReducer.class); //设置Mapper的输出类型 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(LongWritable.class); //设置reduce的输出类型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(LongWritable.class); //map设置数据的获得 FileInputFormat.setInputPaths(job, new Path("c:/wc/input/")); FileOutputFormat.setOutputPath(job, new Path("c:/wc/output/")); //提交job job.waitForCompletion(true); } catch (IOException e) { e.printStackTrace(); } catch (ClassNotFoundException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } } } |
6.5.5. 第五步:直接运行WcRunner类
|
6.5.6. 第六步:查看测试结果
|
7. 日志清洗-案例
所谓的日志清洗,就是在杂乱日志里面,提取拥有的信息,在进行数据分析。
需求:分析网站时间段的访问量。
package cn.gzsxt.mapreduces; import java.io.IOException; import java.text.ParseException; import java.text.SimpleDateFormat; import java.util.Calendar; import java.util.Date; import java.util.Locale; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; /** * 问题:为什么使用内部类将Map计算和Reduce计算放在一起 答:这样可以快速找到Map对应的Reduce 问题:为什么使用静态内部类,而不内部类。 * 答:原因是希望可以直接使用LogMapReducer类就可以访问。不需要创建LogMapReducer对象 * * 需求:分析网站访问时间段的访问量。 * * @author ranger * */ public class LogMapReducer { /** * map 计算,获得每一条是数据的时间 * * @author ranger * */ static class LogMapper extends Mapper<LongWritable, Text, Text, LongWritable> { @Override protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, LongWritable>.Context context) throws IOException, InterruptedException { String line = value.toString(); // 127.0.0.1 - - [03/Nov/2017:08:58:42 +0800] "GET / HTTP/1.1" 200 11418 // 需要的值在第四个里面 String[] values = line.split(" "); String datestr = values[3]; // 时间处理,[03/Nov/2017:08:58:42 // 第一步:去掉[ StringBuilder sb = new StringBuilder(datestr); String resultDate = sb.substring(1); // 日期的转换,注意这个是美国时间,需要设置为US SimpleDateFormat format = new SimpleDateFormat("dd/MMM/yyyy:HH:mm:ss",Locale.US); Date date; try { date = format.parse(resultDate); // 如何获得日期里面的一个单位, Calendar calendar = Calendar.getInstance(); calendar.setTime(date); //获得24小时制的时间 int hour = calendar.get(Calendar.HOUR_OF_DAY); System.out.println(datestr + "====时间字符串"); context.write(new Text(hour + ""), new LongWritable(1)); } catch (ParseException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } // 01 123123 static class LogReducer extends Reducer<Text, LongWritable, Text, LongWritable> { @Override protected void reduce(Text key, Iterable<LongWritable> values, Reducer<Text, LongWritable, Text, LongWritable>.Context context) throws IOException, InterruptedException { long count = 0; for (LongWritable val : values) { // 每次迭代加1 count = count + val.get(); } context.write(key, new LongWritable(count)); } } } |
8. HA集群部署Hadoop
8.1. HA集群是什么
HA是High Available缩写,是双机集群系统简称,是一个高可用性集群,是保证业务连续性的有效解决方案,一般有两个或两个以上的节点,且分为活动节点及备用节点。
工作后会经常听到“双机热备”这个词。表达的是同一意思。
--来自百度百科
8.2. 高可用是什么
“高可用性”(High Availability)通常来描述一个系统经过专门的设计,从而减少停工时间,而保持其服务的高度可用性。
--来自百度百科
8.3. 为什么需要使用HA集群
上面的集群部署是我们发现只有NameNode节点,万一NameNode节点宕机,集群也崩溃了。
我们发现ResourceManager节点也只有一个,万一ResourceManager节点宕机,集群也崩溃。
所以我们需要通过HA的机制来部署NameNode节点以及ResourceManger来确保集群的容错性、可用行。
--如图:通过HA集群,可以实现NameNode和ResourceManager的双机热备,在一台NameNode或者ResourceManager崩溃后,另外一台可以直接切换。
|
8.4. 部署步骤
8.4.1. 需求说明
需求:部署一个HA的Hadoop集群。
- 使用服务u1~u7。共七台服务器
- u1、u2用于作为NameNode的HA集群、以及部署zkfc
- u3、u4用于作为ResourceManager的HA集群
- u5、u6、u7分别用于DadeNode、NodeManager、Zookeeper(HA集群)
如图所示:
|
8.4.2. 部署思路
- 首先部署好无HA机制的集群
- 再配置NameNode的HA集群
- 最后ResourceManager的HA集群
8.4.3. 第一部分:HA机制的集群配置
u1~u7的准备,统一配置。<参考单机部署与集群部署章节>
注意:只要在u1和u3配置slaves就可以。
- u1的slaves配置文件,用于指定u1、u2为NamNode,其中u1为NameNode的主机。
- u2为NameNode备机,u3的slaves配置文件用于指定u3、u4为ResourceManager,其中u3为ResourceManager主机,u4为ResourceManager备机。
- u5~u7配置为DataNode和NodeManager。
8.4.4. 第二部分:配置Zookeeper集群
<参考Zookeeper章节>
8.4.5. 第三步部分:配置NameNode的HA集群
8.4.5.1. 说明
|
配置NameNode的HA集群,就是配置NameNode的双机热备。
- 通过zkfc2实时监控Zookeeper集群NN1的状态。发现主机出现异常,备机立刻切换
- 在切换备机之前,会先发送一次shell杀NN1的NameNode进程
- 通过JuornalNode集群来实时保持两台服务器的操作日志的一致性
- 配置HA集群后,备机直接替代SecondaryNameNode的功能,所以没有SecondaryNameNode了
- 因为HA集群的两台机器,所以使用RPC来同步fsimage元数据文件的
--基于以上描述的流程,NameNode必须配置一下参数
(1)必须指定Zookeeper服务器
(2)必须指定两台NameNode的服务器名称
(3)必须指定使用的是HA策略
(4)必须指定Journal节点位置
(5)必须配置zkfc发送shell的杀机进程的ssh登录信息
(6)必须配置rpc入口,实现元数据的同步
--因为NameNode是HDFS文件系统的配置,所以可以配置的配置文件使用两个
(1)core-site.xml
(2)hdfs-site.xml
8.4.5.2. 配置步骤
8.4.5.2.1. 第一步:配置core-site.xml配置文件
--在core-site.xml配置zookeeper服务器访问路径
<configuration> <!-1.指定默认的HDFS操作路径--> <property> <name>fs.defaultFS</name> <value>hdfs://ns</value> </property> <!--2.设置数据的存放位置--> <property> <name>hadoop.tmp.dir</name> <value>/home/hadoop/app/hadoop-2.9.1/data/</value> </property> <!--3.设置Zookeeper集群的访问路径--> <property> <name>ha.zookeeper.quorum</name> <value>u5:2181,u6:2181,u7:2181</value> </property> </configuration>
|
问题:为什么Zookeeper配置core-site.xml里面呢?
答:因为Zookeeper不仅仅在NameNode里面用,可以在Hadoop其他很多地方用到,所以配置在hadoop核心文件里面
8.4.5.2.2. 第二步:配置dhfs-site.xml配置文件
<configuration> <!--1.设置数据副本的个数--> <property> <name>dfs.replication</name> <value>3</value> </property> <!--设置默认使用ha集群---> <property> <name>dfs.ha.automatic-failover.enabled</name> <value>true</value> </property> <!--设置HA的HDFS的访问服务名--> <property> <name>dfs.nameservices</name> <value>ns</value> </property> <!--设置访问服务名,对应的NameNode--> <property> <name>dfs.ha.namenodes.ns</name> <value>nn1,nn2</value> <!--配置rpc通讯访问路径--> <property> <name>dfs.namenode.rpc-address.ns.nn1</name> <value>u1:8020</value> </property> <property> <name>dfs.namenode.rpc-address.ns.nn2</name> <value>u2:8020</value> </property> <!--配置https通讯访问路径--> <property> <name>dfs.namenode.http-address.sn.nn1</name> <value>u1:50070</value> </property> <property> <name>dfs.namenode.http-address.sn.nn2</name> <value>u1:50070</value> </property> <!--配置qjouranl的节点路径--> <property> <name>dfs.namenode.shared.edits.dir</name> <value>qjournal://u5:8485;u6:8485;u7:8485/sn</value> </property> <property> <!--配置qjouranl的节点数据存放路径--> <name>dfs.journalnode.edits.dir</name> <value>/home/hadoop/app/hadoop-2.9.1/data/</value> </property> <!--配置客户端 访问NameNode的HA的入口类--> <property> <name>dfs.client.failover.proxy.provider.mycluster</name> <value>org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider</value> </property> <!--设置zkfc远程杀死进程配置,使用ssh登录--> <property> <name>dfs.ha.fencing.methods</name> <value>sshfence</value> </property> <!--设置zkfc远程杀死进程配置,指定登录的公钥--> <property> <name>dfs.ha.fencing.ssh.private-key-files</name> <value>/home/hadoop/.ssh/id_rsa</value> </property> </property> </configuration> |
8.4.6. 第四部分:配置ResourceManger的HA集群
8.4.6.1. 运行流程图
9. 附录A 配置文件参数
9.1. core-default.xml
序号 |
参数名 |
参数值 |
参数说明 |
1 |
hadoop.tmp.dir |
/tmp/hadoop-${user.name} |
数据目录设置 |
2 |
hadoop.native.lib |
true |
使用本地hadoop库标识。 |
3 |
hadoop.http.filter.initializers |
|
http服务器过滤链设置 |
4 |
hadoop.security.group.mapping |
org.apache.hadoop.security.ShellBasedUnixGroupsMapping |
组内用户的列表的类设定 |
5 |
hadoop.security.authorization |
false |
服务端认证开启 |
6 |
hadoop.security.authentication |
simple |
无认证或认证设置 |
7 |
hadoop.security.token.service.use_ip |
true |
是否开启使用IP地址作为连接的开关 |
8 |
hadoop.logfile.size |
10000000 |
日志文件最大为10M |
9 |
hadoop.logfile.count |
10 |
日志文件数量为10个 |
10 |
io.file.buffer.size |
4096 |
流文件的缓冲区为4K |
11 |
io.bytes.per.checksum |
512 |
校验位数为512字节 |
12 |
io.skip.checksum.errors |
FALSE |
校验出错后是抛出异常还是略过标识。True则略过。 |
13 |
io.compression.codecs |
org.apache.hadoop.io.compress.DefaultCodec, |
压缩和解压的方式设置 |
org.apache.hadoop.io.compress.GzipCodec, |
|||
org.apache.hadoop.io.compress.BZip2Codec, |
|||
org.apache.hadoop.io.compress.SnappyCodec |
|||
14 |
io.serializations |
org.apache.hadoop.io.serializer.WritableSerialization |
序例化和反序列化的类设定 |
15 |
fs.default.name |
file:/// |
缺省的文件系统URI标识设定。 |
16 |
fs.trash.interval |
0 |
文件废弃标识设定,0为禁止此功能 |
17 |
fs.file.impl |
org.apache.hadoop.fs.LocalFileSystem |
本地文件操作类设置 |
18 |
fs.hdfs.impl |
org.apache.hadoop.hdfs.DistributedFileSystem |
HDFS文件操作类设置 |
19 |
fs.s3.impl |
org.apache.hadoop.fs.s3.S3FileSystem |
S3文件操作类设置 |
20 |
fs.s3n.impl |
org.apache.hadoop.fs.s3native.NativeS3FileSystem |
S3文件本地操作类设置 |
21 |
fs.kfs.impl |
org.apache.hadoop.fs.kfs.KosmosFileSystem |
KFS文件操作类设置. |
22 |
fs.hftp.impl |
org.apache.hadoop.hdfs.HftpFileSystem |
HTTP方式操作文件设置 |
23 |
fs.hsftp.impl |
org.apache.hadoop.hdfs.HsftpFileSystem |
HTTPS方式操作文件设置 |
24 |
fs.webhdfs.impl |
org.apache.hadoop.hdfs.web.WebHdfsFileSystem |
WEB方式操作文件类设置 |
25 |
fs.ftp.impl |
org.apache.hadoop.fs.ftp.FTPFileSystem |
FTP文件操作类设置 |
26 |
fs.ramfs.impl |
org.apache.hadoop.fs.InMemoryFileSystem |
内存文件操作类设置 |
27 |
fs.har.impl |
org.apache.hadoop.fs.HarFileSystem |
压缩文件操作类设置. |
28 |
fs.har.impl.disable.cache |
TRUE |
是否缓存har文件的标识设定 |
29 |
fs.checkpoint.dir |
${hadoop.tmp.dir}/dfs/namesecondary |
备份名称节点的存放目前录设置 |
30 |
fs.checkpoint.edits.dir |
${fs.checkpoint.dir} |
备份名称节点日志文件的存放目前录设置 |
31 |
fs.checkpoint.period |
3600 |
动态检查的间隔时间设置 |
32 |
fs.checkpoint.size |
67108864 |
日志文件大小为64M |
33 |
fs.s3.block.size |
67108864 |
写S3文件系统的块的大小为64M |
34 |
fs.s3.buffer.dir |
${hadoop.tmp.dir}/s3 |
S3文件数据的本地存放目录 |
35 |
fs.s3.maxRetries |
4 |
S3文件数据的偿试读写次数 |
36 |
fs.s3.sleepTimeSeconds |
10 |
S3文件偿试的间隔 |
37 |
local.cache.size |
10737418240 |
缓存大小设置为10GB |
38 |
io.seqfile.compress.blocksize |
1000000 |
压缩流式文件中的最小块数为100万 |
39 |
io.seqfile.lazydecompress |
TRUE |
块是否需要压缩标识设定 |
40 |
io.seqfile.sorter.recordlimit |
1000000 |
内存中排序记录块类最小为100万 |
41 |
io.mapfile.bloom.size |
1048576 |
BloomMapFiler过滤量为1M |
42 |
io.mapfile.bloom.error.rate |
0.005 |
|
43 |
hadoop.util.hash.type |
murmur |
缺少hash方法为murmur |
44 |
ipc.client.idlethreshold |
4000 |
连接数据最小阀值为4000 |
45 |
ipc.client.kill.max |
10 |
一个客户端连接数最大值为10 |
46 |
ipc.client.connection.maxidletime |
10000 |
断开与服务器连接的时间最大为10秒 |
47 |
ipc.client.connect.max.retries |
10 |
建立与服务器连接的重试次数为10次 |
48 |
ipc.server.listen.queue.size |
128 |
接收客户连接的监听队例的长度为128 |
49 |
ipc.server.tcpnodelay |
FALSE |
开启或关闭服务器端TCP连接算法 |
50 |
ipc.client.tcpnodelay |
FALSE |
开启或关闭客户端TCP连接算法 |
51 |
webinterface.private.actions |
FALSE |
Web交互的行为设定 |
52 |
hadoop.rpc.socket.factory.class.default |
org.apache.hadoop.net.StandardSocketFactory |
缺省的socket工厂类设置 |
53 |
hadoop.rpc.socket.factory.class.ClientProtocol |
与dfs连接时的缺省socket工厂类 |
|
54 |
hadoop.socks.server |
服务端的工厂类缺省设置为SocksSocketFactory. |
|
55 |
topology.node.switch.mapping.impl |
org.apache.hadoop.net.ScriptBasedMapping |
|
56 |
topology.script.file.name |
||
57 |
topology.script.number.args |
100 |
参数数量最多为100 |
58 |
hadoop.security.uid.cache.secs |
14400 |
9.2. hdfs-default.xml
序号 |
参数名 |
参数值 |
参数说明 |
1 |
dfs.namenode.logging.level |
info |
输出日志类型 |
2 |
dfs.secondary.http.address |
0.0.0.0:50090 |
备份名称节点的http协议访问地址与端口 |
3 |
dfs.datanode.address |
0.0.0.0:50010 |
数据节点的TCP管理服务地址和端口 |
4 |
dfs.datanode.http.address |
0.0.0.0:50075 |
数据节点的HTTP协议访问地址和端口 |
5 |
dfs.datanode.ipc.address |
0.0.0.0:50020 |
数据节点的IPC服务访问地址和端口 |
6 |
dfs.datanode.handler.count |
3 |
数据节点的服务连接处理线程数 |
7 |
dfs.http.address |
0.0.0.0:50070 |
名称节点的http协议访问地址与端口 |
8 |
dfs.https.enable |
false |
支持https访问方式标识 |
9 |
dfs.https.need.client.auth |
false |
客户端指定https访问标识 |
10 |
dfs.https.server.keystore.resource |
ssl-server.xml |
Ssl密钥服务端的配置文件 |
11 |
dfs.https.client.keystore.resource |
ssl-client.xml |
Ssl密钥客户端的配置文件 |
12 |
dfs.datanode.https.address |
0.0.0.0:50475 |
数据节点的HTTPS协议访问地址和端口 |
13 |
dfs.https.address |
0.0.0.0:50470 |
名称节点的HTTPS协议访问地址和端口 |
14 |
dfs.datanode.dns.interface |
default |
数据节点采用IP地址标识 |
15 |
dfs.datanode.dns.nameserver |
default |
指定DNS的IP地址 |
16 |
dfs.replication.considerLoad |
true |
加载目标或不加载的标识 |
17 |
dfs.default.chunk.view.size |
32768 |
浏览时的文件块大小设置为32K |
18 |
dfs.datanode.du.reserved |
0 |
每个卷预留的空闲空间数量 |
19 |
dfs.name.dir |
${hadoop.tmp.dir}/dfs/name |
存贮在本地的名字节点数据镜象的目录,作为名字节点的冗余备份 |
20 |
dfs.name.edits.dir |
${dfs.name.dir} |
存贮文件操作过程信息的存贮目录 |
21 |
dfs.web.ugi |
webuser,webgroup |
Web接口访问的用户名和组的帐户设定 |
22 |
dfs.permissions |
true |
文件操作时的权限检查标识。 |
23 |
dfs.permissions.supergroup |
supergroup |
超级用户的组名定义 |
24 |
dfs.block.access.token.enable |
false |
数据节点访问令牌标识 |
25 |
dfs.block.access.key.update.interval |
600 |
升级访问钥时的间隔时间 |
26 |
dfs.block.access.token.lifetime |
600 |
访问令牌的有效时间 |
27 |
dfs.data.dir |
${hadoop.tmp.dir}/dfs/data |
数据节点的块本地存放目录 |
28 |
dfs.datanode.data.dir.perm |
755 |
数据节点的存贮块的目录访问权限设置 |
29 |
dfs.replication |
3 |
缺省的数据块副本的数量 |
30 |
dfs.replication.max |
512 |
块复制的最大数量 |
31 |
dfs.replication.min |
1 |
块复制的最小数量 |
32 |
dfs.block.size |
67108864 |
缺省的文件块大小为64M |
33 |
dfs.df.interval |
60000 |
磁盘空间统计间隔为6秒 |
34 |
dfs.client.block.write.retries |
3 |
块写入出错时的重试次数 |
35 |
dfs.blockreport.intervalMsec |
3600000 |
块的报告间隔时为1小时 |
36 |
dfs.blockreport.initialDelay |
0 |
块顺序报告的间隔时间 |
37 |
dfs.heartbeat.interval |
3 |
数据节点的心跳检测间隔时间 |
38 |
dfs.namenode.handler.count |
10 |
名称节点的连接处理的线程数量 |
39 |
dfs.safemode.threshold.pct |
0.999f |
启动安全模式的阀值设定 |
40 |
dfs.safemode.extension |
30000 |
当阀值达到量值后扩展的时限 |
41 |
dfs.balance.bandwidthPerSec |
1048576 |
启动负载均衡的数据节点可利用带宽最大值为1M |
42 |
dfs.hosts |
可与名称节点连接的主机地址文件指定。 |
|
43 |
dfs.hosts.exclude |
不充计与名称节点连接的主机地址文件设定 |
|
44 |
dfs.max.objects |
0 |
文件数、目录数、块数的最大数量 |
45 |
dfs.namenode.decommission.interval |
30 |
名称节点解除命令执行时的监测时间周期 |
46 |
dfs.namenode.decommission.nodes.per.interval |
5 |
名称节点解除命令执行是否完检测次数 |
47 |
dfs.replication.interval |
3 |
名称节点计算数据节点的复制工作的周期数. |
48 |
dfs.access.time.precision |
3600000 |
充许访问文件的时间精确到1小时 |
49 |
dfs.support.append |
false |
是否充许链接文件指定 |
50 |
dfs.namenode.delegation.key.update-interval |
86400000 |
名称节点上的代理令牌的主key的更新间隔时间为24小时 |
51 |
dfs.namenode.delegation.token.max-lifetime |
604800000 |
代理令牌的有效时间最大值为7天 |
52 |
dfs.namenode.delegation.token.renew-interval |
86400000 |
代理令牌的更新时间为24小时 |
53 |
dfs.datanode.failed.volumes.tolerated |
0 |
决定停止数据节点提供服务充许卷的出错次数。0次则任何卷出错都要停止数据节点 |
9.3. mapred-default.xml
序号 |
参数名 |
参数值 |
参数说明 |
1 |
hadoop.job.history.location |
作业跟踪管理器的静态历史文件的存放目录。 |
|
2 |
hadoop.job.history.user.location |
可以指定具体某个作业的跟踪管理器的历史文件存放目录 |
|
3 |
mapred.job.tracker.history.completed.location |
已完成作业的历史文件的存放目录 |
|
4 |
io.sort.factor |
10 |
排完序的文件的合并时的打开文件句柄数 |
5 |
io.sort.mb |
100 |
排序文件的内存缓存大小为100M |
6 |
io.sort.record.percent |
0.05 |
排序线程阻塞的内存缓存剩余比率 |
7 |
io.sort.spill.percent |
0.80 |
当缓冲占用量为该值时,线程需要将内容先备份到磁盘中。 |
8 |
io.map.index.skip |
0 |
索引条目的间隔设定 |
9 |
mapred.job.tracker |
local |
作业跟踪管理器是否和MR任务在一个进程中 |
10 |
mapred.job.tracker.http.address |
0.0.0.0:50030 |
作业跟踪管理器的HTTP服务器访问端口和地址 |
11 |
mapred.job.tracker.handler.count |
10 |
作业跟踪管理器的管理线程数,线程数比例是任务管理跟踪器数量的0.04 |
12 |
mapred.task.tracker.report.address |
127.0.0.1:0 |
任务管理跟踪器的主机地址和端口地址 |
13 |
mapred.local.dir |
${hadoop.tmp.dir}/mapred/local |
MR的中介数据文件存放目录 |
14 |
mapred.system.dir |
${hadoop.tmp.dir}/mapred/system |
MR的控制文件存放目录 |
15 |
mapreduce.jobtracker.staging.root.dir |
${hadoop.tmp.dir}/mapred/staging |
每个正在运行作业文件的存放区 |
16 |
mapred.temp.dir |
${hadoop.tmp.dir}/mapred/temp |
MR临时共享文件存放区 |
17 |
mapred.local.dir.minspacestart |
0 |
MR本地中介文件删除时,不充许有任务执行的数量值。 |
18 |
mapred.local.dir.minspacekill |
0 |
MR本地中介文件删除时,除非所有任务都已完成的数量值。 |
19 |
mapred.tasktracker.expiry.interval |
600000 |
任务管理跟踪器不发送心跳的累计时间间隔超过600秒,则任务管理跟踪器失效 |
20 |
mapred.tasktracker.resourcecalculatorplugin |
指定的一个用户访问资源信息的类实例 |
|
21 |
mapred.tasktracker.taskmemorymanager.monitoring-interval |
5000 |
监控任务管理跟踪器任务内存使用率的时间间隔 |
22 |
mapred.tasktracker.tasks.sleeptime-before-sigkill |
5000 |
发出进程终止后,间隔5秒后发出进程消亡信号 |
23 |
mapred.map.tasks |
2 |
每个作业缺省的map任务数为2 |
24 |
mapred.reduce.tasks |
1 |
每个作业缺省的reduce任务数为1 |
25 |
mapreduce.tasktracker.outofband.heartbeat |
false |
让在任务结束后发出一个额外的心跳信号 |
26 |
mapreduce.tasktracker.outofband.heartbeat.damper |
1000000 |
当额外心跳信号发出量太多时,则适当阻止 |
27 |
mapred.jobtracker.restart.recover |
false |
充许任务管理器恢复时采用的方式 |
28 |
mapred.jobtracker.job.history.block.size |
3145728 |
作业历史文件块的大小为3M |
29 |
mapreduce.job.split.metainfo.maxsize |
10000000 |
分隔元信息文件的最大值是10M以下 |
30 |
mapred.jobtracker.taskScheduler |
org.apache.hadoop.mapred.JobQueueTaskScheduler |
设定任务的执行计划实现类 |
31 |
mapred.jobtracker.taskScheduler.maxRunningTasksPerJob |
作业同时运行的任务数的最大值 |
|
32 |
mapred.map.max.attempts |
4 |
Map任务的重试次数 |
33 |
mapred.reduce.max.attempts |
4 |
Reduce任务的重试次数 |
34 |
mapred.reduce.parallel.copies |
5 |
在复制阶段时reduce并行传送的值。 |
35 |
mapreduce.reduce.shuffle.maxfetchfailures |
10 |
取map输出的最大重试次数 |
36 |
mapreduce.reduce.shuffle.connect.timeout |
180000 |
REDUCE任务连接任务管理器获得map输出时的总耗时是3分钟 |
37 |
mapreduce.reduce.shuffle.read.timeout |
180000 |
REDUCE任务等待map输出数据的总耗时是3分钟 |
38 |
mapred.task.timeout |
600000 |
如果任务无读无写时的时间耗时为10分钟,将被终止 |
39 |
mapred.tasktracker.map.tasks.maximum |
2 |
任管管理器可同时运行map任务数为2 |
40 |
mapred.tasktracker.reduce.tasks.maximum |
2 |
任管管理器可同时运行reduce任务数为2 |
41 |
mapred.jobtracker.completeuserjobs.maximum |
100 |
当用户的完成作业数达100个后,将其放入作业历史文件中 |
42 |
mapreduce.reduce.input.limit |
-1 |
Reduce输入量的限制。 |
43 |
mapred.job.tracker.retiredjobs.cache.size |
1000 |
作业状态为已不在执行的保留在内存中的量为1000 |
44 |
mapred.job.tracker.jobhistory.lru.cache.size |
5 |
作业历史文件装载到内存的数量 |
45 |
mapred.child.java.opts |
-Xmx200m |
启动task管理的子进程时的内存设置 |
46 |
mapred.child.env |
|
子进程的参数设置 |
47 |
mapred.child.ulimit |
|
虚拟机所需内存的设定。 |
48 |
mapred.cluster.map.memory.mb |
-1 |
|
49 |
mapred.cluster.reduce.memory.mb |
-1 |
|
50 |
mapred.cluster.max.map.memory.mb |
-1 |
|
51 |
mapred.cluster.max.reduce.memory.mb |
-1 |
|
52 |
mapred.job.map.memory.mb |
-1 |
|
53 |
mapred.job.reduce.memory.mb |
-1 |
|
54 |
mapred.child.tmp |
/tmp |
Mr任务信息的存放目录 |
55 |
mapred.inmem.merge.threshold |
1000 |
内存中的合并文件数设置 |
56 |
mapred.job.shuffle.merge.percent |
0.66 |
|
57 |
mapred.job.shuffle.input.buffer.percent |
0.70 |
|
58 |
mapred.job.reduce.input.buffer.percent |
0.0 |
|
59 |
mapred.map.tasks.speculative.execution |
true |
Map任务的多实例并行运行标识 |
60 |
mapred.reduce.tasks.speculative.execution |
true |
Reduce任务的多实例并行运行标识 |
61 |
mapred.job.reuse.jvm.num.tasks |
1 |
每虚拟机运行的任务数 |
62 |
mapred.min.split.size |
0 |
Map的输入数据被分解的块数设置 |
63 |
mapred.jobtracker.maxtasks.per.job |
-1 |
一个单独作业的任务数设置 |
64 |
mapred.submit.replication |
10 |
提交作业文件的复制级别 |
65 |
mapred.tasktracker.dns.interface |
default |
任务管理跟踪器是否报告IP地址名的开关 |
66 |
mapred.tasktracker.dns.nameserver |
default |
作业和任务管理跟踪器之间通讯方式采用的DNS服务的主机名或IP地址 |
67 |
tasktracker.http.threads |
40 |
http服务器的工作线程数量 |
68 |
mapred.task.tracker.http.address |
0.0.0.0:50060 |
任务管理跟踪器的http服务器的地址和端口 |
69 |
keep.failed.task.files |
false |
失败任务是否保存到文件中 |
70 |
mapred.output.compress |
false |
作业的输出是否压缩 |
71 |
mapred.output.compression.type |
RECORD |
作业输出采用NONE, RECORD or BLOCK三种方式中一种压缩的写入到流式文件 |
72 |
mapred.output.compression.codec |
org.apache.hadoop.io.compress.DefaultCodec |
压缩类的设置 |
73 |
mapred.compress.map.output |
false |
Map的输出是否压缩 |
74 |
mapred.map.output.compression.codec |
org.apache.hadoop.io.compress.DefaultCodec |
Map的输出压缩的实现类指定 |
75 |
map.sort.class |
org.apache.hadoop.util.QuickSort |
排序键的排序类指定 |
76 |
mapred.userlog.limit.kb |
0 |
每个任务的用户日志文件大小 |
77 |
mapred.userlog.retain.hours |
24 |
作业完成后的用户日志留存时间为24小时 |
78 |
mapred.user.jobconf.limit |
5242880 |
Jobconf的大小为5M |
79 |
mapred.hosts |
|
可与作业管理跟踪器连接的主机名 |
80 |
mapred.hosts.exclude |
|
不可与作业管理跟踪器连接的主机名 |
81 |
mapred.heartbeats.in.second |
100 |
作业管理跟踪器的每秒中到达的心跳数量为100 |
82 |
mapred.max.tracker.blacklists |
4 |
任务管理跟踪器的黑名单列表的数量 |
83 |
mapred.jobtracker.blacklist.fault-timeout-window |
180 |
任务管理跟踪器超时180分钟则訪任务将被重启 |
84 |
mapred.jobtracker.blacklist.fault-bucket-width |
15 |
|
85 |
mapred.max.tracker.failures |
4 |
任务管理跟踪器的失败任务数设定 |
86 |
jobclient.output.filter |
FAILED |
控制任务的用户日志输出到作业端时的过滤方式 |
87 |
mapred.job.tracker.persist.jobstatus.active |
false |
是否持久化作业管理跟踪器的信息 |
88 |
mapred.job.tracker.persist.jobstatus.hours |
0 |
持久化作业管理跟踪器的信息的保存时间 |
89 |
mapred.job.tracker.persist.jobstatus.dir |
/jobtracker/jobsInfo |
作业管理跟踪器的信息存放目录 |
90 |
mapreduce.job.complete.cancel.delegation.tokens |
true |
恢复时是否变更领牌 |
91 |
mapred.task.profile |
false |
任务分析信息是否建设标志 |
92 |
mapred.task.profile.maps |
0-2 |
设置map任务的分析范围 |
93 |
mapred.task.profile.reduces |
0-2 |
设置reduce任务的分析范围 |
94 |
mapred.line.input.format.linespermap |
1 |
每次切分的行数设置 |
95 |
mapred.skip.attempts.to.start.skipping |
2 |
在跳转模式未被设定的情况下任务的重试次数 |
96 |
mapred.skip.map.auto.incr.proc.count |
true |
MapRunner在调用map功能后的增量处理方式设置 |
97 |
mapred.skip.reduce.auto.incr.proc.count |
true |
在调用reduce功能后的增量处理方式设置 |
98 |
mapred.skip.out.dir |
|
跳过记录的输出目录 |
99 |
mapred.skip.map.max.skip.records |
0 |
|
100 |
mapred.skip.reduce.max.skip.groups |
0 |
|
101 |
job.end.retry.attempts |
0 |
Hadoop偿试连接通知器的次数 |
102 |
job.end.retry.interval |
30000 |
通知偿试回应的间隔操作为30秒 |
103 |
hadoop.rpc.socket.factory.class.JobSubmissionProtocol |
指定与作业跟踪管理器的通讯方式,缺省是采用rpc方式 |
|
104 |
mapred.task.cache.levels |
2 |
任务缓存级别设置 |
105 |
mapred.queue.names |
default |
分隔作业队例的分隔符设定 |
106 |
mapred.acls.enabled |
false |
指定ACL访问控制列表 |
107 |
mapred.queue.default.state |
RUNNING |
定义队列的状态 |
108 |
mapred.job.queue.name |
default |
已提交作业的队列设定 |
109 |
mapreduce.job.acl-modify-job |
|
指定可修改作业的ACL列表 |
110 |
mapreduce.job.acl-view-job |
|
指定可浏临作业的ACL列表 |
111 |
mapred.tasktracker.indexcache.mb |
10 |
任务管理跟踪器的索引内存的最大容器 |
112 |
mapred.combine.recordsBeforeProgress |
10000 |
在聚合处理时的记录块数 |
113 |
mapred.merge.recordsBeforeProgress |
10000 |
在汇总处理时的记录块数 |
114 |
mapred.reduce.slowstart.completed.maps |
0.05 |
|
115 |
mapred.task.tracker.task-controller |
org.apache.hadoop.mapred.DefaultTaskController |
任务管理器的设定 |
116 |
mapreduce.tasktracker.group |
|
任务管理器的组成员设定 |
117 |
mapred.healthChecker.script.path |
|
脚本的绝对路径指定,这些脚本是心跳服务的 |
118 |
mapred.healthChecker.interval |
60000 |
节点心跳信息的间隔 |
119 |
mapred.healthChecker.script.timeout |
600000 |
|
120 |
mapred.healthChecker.script.args |
|
参数列表 |
121 |
mapreduce.job.counters.limit |
120 |
作业计数器的最小值 |
9.4. yarn-default.xml
序号 |
参数 |
默认值 |
参数说明 |
1 |
yarn.resourcemanager.hostname |
RM的hostname |
|
2 |
yarn.resourcemanager.address |
${yarn.resourcemanager.hostname}:8032 |
RM对客户端暴露的地址,客户端通过该地址向RM提交应用程序等 |
3 |
yarn.resourcemanager.scheduler.address |
${yarn.resourcemanager.hostname}:8030 |
RM对AM暴露的地址,AM通过地址想RM申请资源,释放资源等 |
4 |
yarn.resourcemanager.webapp.address |
${yarn.resourcemanager.hostname}:8088 |
RM对外暴露的web http地址,用户可通过该地址在浏览器中查看集群信息 |
5 |
yarn.resourcemanager.webapp.https.address |
${yarn.resourcemanager.hostname}:8090 |
web https 地址 |
6 |
yarn.resourcemanager.resource-tracker.address |
${yarn.resourcemanager.hostname}:8031 |
RM对NM暴露地址,NM通过该地址向RM汇报心跳,领取任务等 |
7 |
yarn.resourcemanager.resource-tracker.client.thread-count |
50 |
处理来自NM的RPC请求的handler数 |
8 |
yarn.resourcemanager.admin.address |
${yarn.resourcemanager.hostname}:8033 |
管理员可以通过该地址向RM发送管理命令等 |
9 |
yarn.resourcemanager.scheduler.class |
org.apache.hadoop.yarn.server.resourcemanager |
资源调度器主类 |
10 |
.scheduler.capacity.CapacityScheduler |
||
11 |
yarn.resourcemanager.scheduler.client.thread-count |
50 |
处理来自AM的RPC请求的handler数 |
12 |
yarn.scheduler.minimum-allocation-mb |
1024 |
可申请的最少内存资源,以MB为单位 |
13 |
yarn.scheduler.maximum-allocation-mb |
8192 |
可申请的最大内存资源,以MB为单位 |
14 |
yarn.scheduler.minimum-allocation-vcores |
1 |
可申请的最小虚拟CPU个数 |
15 |
yarn.scheduler.maximum-allocation-vcores |
32 |
可申请的最 大虚拟CPU个数 |
16 |
yarn.nodemanager.local-dirs |
${hadoop.tmp.dir}/nm-local-dir |
中间结果存放位置,可配置多目录 |
17 |
yarn.log-aggregation-enable |
FALSE |
是否启用日志聚合 |
18 |
yarn.nodemanager.remote-app-log-dir |
/tmp/logs |
日志聚合目录 |
19 |
yarn.nodemanager.resource.memory-mb |
8192 |
NM总的可用物理内存,以MB为单位。一旦设置,不可动态修改 |
20 |
yarn.nodemanager.resource.cpu-vcores |
8 |
可分配的CPU个数 |
21 |
yarn.nodemanager.aux-services |
NodeManager上运行的附属服务。需配置成mapreduce_shuffle,才可运行MapReduce程序 |
9.5. 新旧属性对应表
废弃属性 |
新属性 |
create.empty.dir.if.nonexist |
mapreduce.jobcontrol.createdir.ifnotexist |
dfs.access.time.precision |
dfs.namenode.accesstime.precision |
dfs.backup.address |
dfs.namenode.backup.address |
dfs.backup.http.address |
dfs.namenode.backup.http-address |
dfs.balance.bandwidthPerSec |
dfs.datanode.balance.bandwidthPerSec |
dfs.block.size |
dfs.blocksize |
dfs.data.dir |
dfs.datanode.data.dir |
dfs.datanode.max.xcievers |
dfs.datanode.max.transfer.threads |
dfs.df.interval |
fs.df.interval |
dfs.encryption.key.provider.uri |
hadoop.security.key.provider.path |
dfs.federation.nameservice.id |
dfs.nameservice.id |
dfs.federation.nameservices |
dfs.nameservices |
dfs.http.address |
dfs.namenode.http-address |
dfs.https.address |
dfs.namenode.https-address |
dfs.https.client.keystore.resource |
dfs.client.https.keystore.resource |
dfs.https.enable |
dfs.http.policy |
dfs.https.need.client.auth |
dfs.client.https.need-auth |
dfs.max.objects |
dfs.namenode.max.objects |
dfs.max-repl-streams |
dfs.namenode.replication.max-streams |
dfs.name.dir |
dfs.namenode.name.dir |
dfs.name.dir.restore |
dfs.namenode.name.dir.restore |
dfs.name.edits.dir |
dfs.namenode.edits.dir |
dfs.permissions |
dfs.permissions.enabled |
dfs.permissions.supergroup |
dfs.permissions.superusergroup |
dfs.read.prefetch.size |
dfs.client.read.prefetch.size |
dfs.replication.considerLoad |
dfs.namenode.replication.considerLoad |
dfs.replication.interval |
dfs.namenode.replication.interval |
dfs.replication.min |
dfs.namenode.replication.min |
dfs.replication.pending.timeout.sec |
dfs.namenode.replication.pending.timeout-sec |
dfs.safemode.extension |
dfs.namenode.safemode.extension |
dfs.safemode.threshold.pct |
dfs.namenode.safemode.threshold-pct |
dfs.secondary.http.address |
dfs.namenode.secondary.http-address |
dfs.socket.timeout |
dfs.client.socket-timeout |
dfs.umaskmode |
fs.permissions.umask-mode |
dfs.web.ugi |
hadoop.http.staticuser.user |
dfs.write.packet.size |
dfs.client-write-packet-size |
fs.checkpoint.dir |
dfs.namenode.checkpoint.dir |
fs.checkpoint.edits.dir |
dfs.namenode.checkpoint.edits.dir |
fs.checkpoint.period |
dfs.namenode.checkpoint.period |
fs.default.name |
fs.defaultFS |
fs.s3a.server-side-encryption-key |
fs.s3a.server-side-encryption.key |
hadoop.configured.node.mapping |
net.topology.configured.node.mapping |
hadoop.job.history.location |
mapreduce.jobtracker.jobhistory.location |
hadoop.native.lib |
io.native.lib.available |
hadoop.net.static.resolutions |
mapreduce.tasktracker.net.static.resolutions |
hadoop.pipes.command-file.keep |
mapreduce.pipes.commandfile.preserve |
hadoop.pipes.executable.interpretor |
mapreduce.pipes.executable.interpretor |
hadoop.pipes.executable |
mapreduce.pipes.executable |
hadoop.pipes.java.mapper |
mapreduce.pipes.isjavamapper |
hadoop.pipes.java.recordreader |
mapreduce.pipes.isjavarecordreader |
hadoop.pipes.java.recordwriter |
mapreduce.pipes.isjavarecordwriter |
hadoop.pipes.java.reducer |
mapreduce.pipes.isjavareducer |
hadoop.pipes.partitioner |
mapreduce.pipes.partitioner |
heartbeat.recheck.interval |
dfs.namenode.heartbeat.recheck-interval |
io.bytes.per.checksum |
dfs.bytes-per-checksum |
io.sort.factor |
mapreduce.task.io.sort.factor |
io.sort.mb |
mapreduce.task.io.sort.mb |
io.sort.spill.percent |
mapreduce.map.sort.spill.percent |
jobclient.completion.poll.interval |
mapreduce.client.completion.pollinterval |
jobclient.output.filter |
mapreduce.client.output.filter |
jobclient.progress.monitor.poll.interval |
mapreduce.client.progressmonitor.pollinterval |
job.end.notification.url |
mapreduce.job.end-notification.url |
job.end.retry.attempts |
mapreduce.job.end-notification.retry.attempts |
job.end.retry.interval |
mapreduce.job.end-notification.retry.interval |
job.local.dir |
mapreduce.job.local.dir |
keep.failed.task.files |
mapreduce.task.files.preserve.failedtasks |
keep.task.files.pattern |
mapreduce.task.files.preserve.filepattern |
key.value.separator.in.input.line |
mapreduce.input.keyvaluelinerecordreader.key.value.separator |
local.cache.size |
mapreduce.tasktracker.cache.local.size |
map.input.file |
mapreduce.map.input.file |
map.input.length |
mapreduce.map.input.length |
map.input.start |
mapreduce.map.input.start |
map.output.key.field.separator |
mapreduce.map.output.key.field.separator |
map.output.key.value.fields.spec |
mapreduce.fieldsel.map.output.key.value.fields.spec |
mapred.acls.enabled |
mapreduce.cluster.acls.enabled |
mapred.binary.partitioner.left.offset |
mapreduce.partition.binarypartitioner.left.offset |
mapred.binary.partitioner.right.offset |
mapreduce.partition.binarypartitioner.right.offset |
mapred.cache.archives |
mapreduce.job.cache.archives |
mapred.cache.archives.timestamps |
mapreduce.job.cache.archives.timestamps |
mapred.cache.files |
mapreduce.job.cache.files |
mapred.cache.files.timestamps |
mapreduce.job.cache.files.timestamps |
mapred.cache.localArchives |
mapreduce.job.cache.local.archives |
mapred.cache.localFiles |
mapreduce.job.cache.local.files |
mapred.child.tmp |
mapreduce.task.tmp.dir |
mapred.cluster.average.blacklist.threshold |
mapreduce.jobtracker.blacklist.average.threshold |
mapred.cluster.map.memory.mb |
mapreduce.cluster.mapmemory.mb |
mapred.cluster.max.map.memory.mb |
mapreduce.jobtracker.maxmapmemory.mb |
mapred.cluster.max.reduce.memory.mb |
mapreduce.jobtracker.maxreducememory.mb |
mapred.cluster.reduce.memory.mb |
mapreduce.cluster.reducememory.mb |
mapred.committer.job.setup.cleanup.needed |
mapreduce.job.committer.setup.cleanup.needed |
mapred.compress.map.output |
mapreduce.map.output.compress |
mapred.data.field.separator |
mapreduce.fieldsel.data.field.separator |
mapred.debug.out.lines |
mapreduce.task.debugout.lines |
mapred.healthChecker.interval |
mapreduce.tasktracker.healthchecker.interval |
mapred.healthChecker.script.args |
mapreduce.tasktracker.healthchecker.script.args |
mapred.healthChecker.script.path |
mapreduce.tasktracker.healthchecker.script.path |
mapred.healthChecker.script.timeout |
mapreduce.tasktracker.healthchecker.script.timeout |
mapred.heartbeats.in.second |
mapreduce.jobtracker.heartbeats.in.second |
mapred.hosts.exclude |
mapreduce.jobtracker.hosts.exclude.filename |
mapred.hosts |
mapreduce.jobtracker.hosts.filename |
mapred.inmem.merge.threshold |
mapreduce.reduce.merge.inmem.threshold |
mapred.input.dir.formats |
mapreduce.input.multipleinputs.dir.formats |
mapred.input.dir.mappers |
mapreduce.input.multipleinputs.dir.mappers |
mapred.input.dir |
mapreduce.input.fileinputformat.inputdir |
mapred.input.pathFilter.class |
mapreduce.input.pathFilter.class |
mapred.jar |
mapreduce.job.jar |
mapred.job.classpath.archives |
mapreduce.job.classpath.archives |
mapred.job.classpath.files |
mapreduce.job.classpath.files |
mapred.job.id |
mapreduce.job.id |
mapred.jobinit.threads |
mapreduce.jobtracker.jobinit.threads |
mapred.job.map.memory.mb |
mapreduce.map.memory.mb |
mapred.job.name |
mapreduce.job.name |
mapred.job.priority |
mapreduce.job.priority |
mapred.job.queue.name |
mapreduce.job.queuename |
mapred.job.reduce.input.buffer.percent |
mapreduce.reduce.input.buffer.percent |
mapred.job.reduce.markreset.buffer.percent |
mapreduce.reduce.markreset.buffer.percent |
mapred.job.reduce.memory.mb |
mapreduce.reduce.memory.mb |
mapred.job.reduce.total.mem.bytes |
mapreduce.reduce.memory.totalbytes |
mapred.job.reuse.jvm.num.tasks |
mapreduce.job.jvm.numtasks |
mapred.job.shuffle.input.buffer.percent |
mapreduce.reduce.shuffle.input.buffer.percent |
mapred.job.shuffle.merge.percent |
mapreduce.reduce.shuffle.merge.percent |
mapred.job.tracker.handler.count |
mapreduce.jobtracker.handler.count |
mapred.job.tracker.history.completed.location |
mapreduce.jobtracker.jobhistory.completed.location |
mapred.job.tracker.http.address |
mapreduce.jobtracker.http.address |
mapred.jobtracker.instrumentation |
mapreduce.jobtracker.instrumentation |
mapred.jobtracker.job.history.block.size |
mapreduce.jobtracker.jobhistory.block.size |
mapred.job.tracker.jobhistory.lru.cache.size |
mapreduce.jobtracker.jobhistory.lru.cache.size |
mapred.job.tracker |
mapreduce.jobtracker.address |
mapred.jobtracker.maxtasks.per.job |
mapreduce.jobtracker.maxtasks.perjob |
mapred.job.tracker.persist.jobstatus.active |
mapreduce.jobtracker.persist.jobstatus.active |
mapred.job.tracker.persist.jobstatus.dir |
mapreduce.jobtracker.persist.jobstatus.dir |
mapred.job.tracker.persist.jobstatus.hours |
mapreduce.jobtracker.persist.jobstatus.hours |
mapred.jobtracker.restart.recover |
mapreduce.jobtracker.restart.recover |
mapred.job.tracker.retiredjobs.cache.size |
mapreduce.jobtracker.retiredjobs.cache.size |
mapred.job.tracker.retire.jobs |
mapreduce.jobtracker.retirejobs |
mapred.jobtracker.taskalloc.capacitypad |
mapreduce.jobtracker.taskscheduler.taskalloc.capacitypad |
mapred.jobtracker.taskScheduler |
mapreduce.jobtracker.taskscheduler |
mapred.jobtracker.taskScheduler.maxRunningTasksPerJob |
mapreduce.jobtracker.taskscheduler.maxrunningtasks.perjob |
mapred.join.expr |
mapreduce.join.expr |
mapred.join.keycomparator |
mapreduce.join.keycomparator |
mapred.lazy.output.format |
mapreduce.output.lazyoutputformat.outputformat |
mapred.line.input.format.linespermap |
mapreduce.input.lineinputformat.linespermap |
mapred.linerecordreader.maxlength |
mapreduce.input.linerecordreader.line.maxlength |
mapred.local.dir |
mapreduce.cluster.local.dir |
mapred.local.dir.minspacekill |
mapreduce.tasktracker.local.dir.minspacekill |
mapred.local.dir.minspacestart |
mapreduce.tasktracker.local.dir.minspacestart |
mapred.map.child.env |
mapreduce.map.env |
mapred.map.child.java.opts |
mapreduce.map.java.opts |
mapred.map.child.log.level |
mapreduce.map.log.level |
mapred.map.max.attempts |
mapreduce.map.maxattempts |
mapred.map.output.compression.codec |
mapreduce.map.output.compress.codec |
mapred.mapoutput.key.class |
mapreduce.map.output.key.class |
mapred.mapoutput.value.class |
mapreduce.map.output.value.class |
mapred.mapper.regex.group |
mapreduce.mapper.regexmapper..group |
mapred.mapper.regex |
mapreduce.mapper.regex |
mapred.map.task.debug.script |
mapreduce.map.debug.script |
mapred.map.tasks |
mapreduce.job.maps |
mapred.map.tasks.speculative.execution |
mapreduce.map.speculative |
mapred.max.map.failures.percent |
mapreduce.map.failures.maxpercent |
mapred.max.reduce.failures.percent |
mapreduce.reduce.failures.maxpercent |
mapred.max.split.size |
mapreduce.input.fileinputformat.split.maxsize |
mapred.max.tracker.blacklists |
mapreduce.jobtracker.tasktracker.maxblacklists |
mapred.max.tracker.failures |
mapreduce.job.maxtaskfailures.per.tracker |
mapred.merge.recordsBeforeProgress |
mapreduce.task.merge.progress.records |
mapred.min.split.size |
mapreduce.input.fileinputformat.split.minsize |
mapred.min.split.size.per.node |
mapreduce.input.fileinputformat.split.minsize.per.node |
mapred.min.split.size.per.rack |
mapreduce.input.fileinputformat.split.minsize.per.rack |
mapred.output.compression.codec |
mapreduce.output.fileoutputformat.compress.codec |
mapred.output.compression.type |
mapreduce.output.fileoutputformat.compress.type |
mapred.output.compress |
mapreduce.output.fileoutputformat.compress |
mapred.output.dir |
mapreduce.output.fileoutputformat.outputdir |
mapred.output.key.class |
mapreduce.job.output.key.class |
mapred.output.key.comparator.class |
mapreduce.job.output.key.comparator.class |
mapred.output.value.class |
mapreduce.job.output.value.class |
mapred.output.value.groupfn.class |
mapreduce.job.output.group.comparator.class |
mapred.permissions.supergroup |
mapreduce.cluster.permissions.supergroup |
mapred.pipes.user.inputformat |
mapreduce.pipes.inputformat |
mapred.reduce.child.env |
mapreduce.reduce.env |
mapred.reduce.child.java.opts |
mapreduce.reduce.java.opts |
mapred.reduce.child.log.level |
mapreduce.reduce.log.level |
mapred.reduce.max.attempts |
mapreduce.reduce.maxattempts |
mapred.reduce.parallel.copies |
mapreduce.reduce.shuffle.parallelcopies |
mapred.reduce.slowstart.completed.maps |
mapreduce.job.reduce.slowstart.completedmaps |
mapred.reduce.task.debug.script |
mapreduce.reduce.debug.script |
mapred.reduce.tasks |
mapreduce.job.reduces |
mapred.reduce.tasks.speculative.execution |
mapreduce.reduce.speculative |
mapred.seqbinary.output.key.class |
mapreduce.output.seqbinaryoutputformat.key.class |
mapred.seqbinary.output.value.class |
mapreduce.output.seqbinaryoutputformat.value.class |
mapred.shuffle.connect.timeout |
mapreduce.reduce.shuffle.connect.timeout |
mapred.shuffle.read.timeout |
mapreduce.reduce.shuffle.read.timeout |
mapred.skip.attempts.to.start.skipping |
mapreduce.task.skip.start.attempts |
mapred.skip.map.auto.incr.proc.count |
mapreduce.map.skip.proc-count.auto-incr |
mapred.skip.map.max.skip.records |
mapreduce.map.skip.maxrecords |
mapred.skip.on |
mapreduce.job.skiprecords |
mapred.skip.out.dir |
mapreduce.job.skip.outdir |
mapred.skip.reduce.auto.incr.proc.count |
mapreduce.reduce.skip.proc-count.auto-incr |
mapred.skip.reduce.max.skip.groups |
mapreduce.reduce.skip.maxgroups |
mapred.speculative.execution.slowNodeThreshold |
mapreduce.job.speculative.slownodethreshold |
mapred.speculative.execution.slowTaskThreshold |
mapreduce.job.speculative.slowtaskthreshold |
mapred.speculative.execution.speculativeCap |
mapreduce.job.speculative.speculativecap |
mapred.submit.replication |
mapreduce.client.submit.file.replication |
mapred.system.dir |
mapreduce.jobtracker.system.dir |
mapred.task.cache.levels |
mapreduce.jobtracker.taskcache.levels |
mapred.task.id |
mapreduce.task.attempt.id |
mapred.task.is.map |
mapreduce.task.ismap |
mapred.task.partition |
mapreduce.task.partition |
mapred.task.profile |
mapreduce.task.profile |
mapred.task.profile.maps |
mapreduce.task.profile.maps |
mapred.task.profile.params |
mapreduce.task.profile.params |
mapred.task.profile.reduces |
mapreduce.task.profile.reduces |
mapred.task.timeout |
mapreduce.task.timeout |
mapred.tasktracker.dns.interface |
mapreduce.tasktracker.dns.interface |
mapred.tasktracker.dns.nameserver |
mapreduce.tasktracker.dns.nameserver |
mapred.tasktracker.events.batchsize |
mapreduce.tasktracker.events.batchsize |
mapred.tasktracker.expiry.interval |
mapreduce.jobtracker.expire.trackers.interval |
mapred.task.tracker.http.address |
mapreduce.tasktracker.http.address |
mapred.tasktracker.indexcache.mb |
mapreduce.tasktracker.indexcache.mb |
mapred.tasktracker.instrumentation |
mapreduce.tasktracker.instrumentation |
mapred.tasktracker.map.tasks.maximum |
mapreduce.tasktracker.map.tasks.maximum |
mapred.tasktracker.memory_calculator_plugin |
mapreduce.tasktracker.resourcecalculatorplugin |
mapred.tasktracker.memorycalculatorplugin |
mapreduce.tasktracker.resourcecalculatorplugin |
mapred.tasktracker.reduce.tasks.maximum |
mapreduce.tasktracker.reduce.tasks.maximum |
mapred.task.tracker.report.address |
mapreduce.tasktracker.report.address |
mapred.task.tracker.task-controller |
mapreduce.tasktracker.taskcontroller |
mapred.tasktracker.tasks.sleeptime-before-sigkill |
mapreduce.tasktracker.tasks.sleeptimebeforesigkill |
mapred.temp.dir |
mapreduce.cluster.temp.dir |
mapred.text.key.comparator.options |
mapreduce.partition.keycomparator.options |
mapred.text.key.partitioner.options |
mapreduce.partition.keypartitioner.options |
mapred.textoutputformat.separator |
mapreduce.output.textoutputformat.separator |
mapred.tip.id |
mapreduce.task.id |
mapreduce.combine.class |
mapreduce.job.combine.class |
mapreduce.inputformat.class |
mapreduce.job.inputformat.class |
mapreduce.job.counters.limit |
mapreduce.job.counters.max |
mapreduce.jobtracker.permissions.supergroup |
mapreduce.cluster.permissions.supergroup |
mapreduce.map.class |
mapreduce.job.map.class |
mapreduce.outputformat.class |
mapreduce.job.outputformat.class |
mapreduce.partitioner.class |
mapreduce.job.partitioner.class |
mapreduce.reduce.class |
mapreduce.job.reduce.class |
mapred.used.genericoptionsparser |
mapreduce.client.genericoptionsparser.used |
mapred.userlog.limit.kb |
mapreduce.task.userlog.limit.kb |
mapred.userlog.retain.hours |
mapreduce.job.userlog.retain.hours |
mapred.working.dir |
mapreduce.job.working.dir |
mapred.work.output.dir |
mapreduce.task.output.dir |
min.num.spills.for.combine |
mapreduce.map.combine.minspills |
reduce.output.key.value.fields.spec |
mapreduce.fieldsel.reduce.output.key.value.fields.spec |
security.job.submission.protocol.acl |
security.job.client.protocol.acl |
security.task.umbilical.protocol.acl |
security.job.task.protocol.acl |
sequencefile.filter.class |
mapreduce.input.sequencefileinputfilter.class |
sequencefile.filter.frequency |
mapreduce.input.sequencefileinputfilter.frequency |
sequencefile.filter.regex |
mapreduce.input.sequencefileinputfilter.regex |
session.id |
dfs.metrics.session-id |
slave.host.name |
dfs.datanode.hostname |
slave.host.name |
mapreduce.tasktracker.host.name |
tasktracker.contention.tracking |
mapreduce.tasktracker.contention.tracking |
tasktracker.http.threads |
mapreduce.tasktracker.http.threads |
topology.node.switch.mapping.impl |
net.topology.node.switch.mapping.impl |
topology.script.file.name |
net.topology.script.file.name |
topology.script.number.args |
net.topology.script.number.args |
user.name |
mapreduce.job.user.name |
webinterface.private.actions |
mapreduce.jobtracker.webinterface.trusted |
yarn.app.mapreduce.yarn.app.mapreduce.client-am.ipc.max-retries-on-timeouts |
yarn.app.mapreduce.client-am.ipc.max-retries-on-timeouts |
yarn.client.app-submission.poll-interval |
yarn.client.application-client-protocol.poll-timeout-ms |