作者 by aigle / 2023-12-13 / 暂无评论
HDFS:海量数据的存储MapReduce:海量数据的分析YARN:资源管理调度
举例说明:YARN将HDFS存储的数据分配到不同的机器节点,运用MapReduce进行运算分析

优点:
执行文件打开、关闭、重命名等操作,同时保存分文件块(block)在Datanode上的具体存储位置的映射(存在namenode的元数据中),响应客户端读写文件的请求。
问题:namenode如何保证目录的快读读写的呢?

namenodenamenode将记录上传文件的block位置信息记录在edits.log中namenode的block位置信息,将文件block写入对应的datanodenamenodenamenode将该文件的block位置写入内存元数据(方便读取)如此,namenode便将上传文件的元数据写入了内存。但此刻还未写入磁盘中。当edits.log记录的数据条数达到一定数量(小文件,为了保持快速读写)或到达指定时间时,会触发Secondary NameNode合并元数据再写入磁盘
(注意,之所以触发Secondary NameNode进行操作是因为该合并可能是十几G文件的合并,十分耗时和资源,为了不影响namenode的功能,所以设计出了Secondary NameNode)
具体步骤:
edits log条数达到触发条件,运行Secondary NameNode的checkpoint操作。Secondary NameNode通知namenode切换edits log文件。即namenode此刻采用一个全新的edits.new文件进行读写Secondary NameNode通过http下载namenode中旧的edits log文件和硬盘元数据(Fsimage)Secondary NameNode通过edits log的日志信息,生成新的元数据文件(Fsimage.chkpoint),并传给namenodenamenode用Fsimage.chkpoint和edits.new替换旧的元数据文件和edits log文件提供真实文件数据的存储服务,定期向Namenode发送心跳信息,汇报本身及其所有的block信息,健康状况
block有三个复本。副本存放策略:
1、先在客户端所连接的datanode上存放一个副本
2、再在另一个机架上选择一个datanode存放第二个副本
3、最后在本机架上根据负载情况随机挑选一个datanode存放第三个副本
副本数量的配置优先级:
1、服务端hdfs-site.xml中可以配置
2、在客户端指定dfs.replication的值
客户端所指定的值优先级更高!!!
汇报机制:
为了防止Datanode上文件的改变(如删除,损坏),Datanode还会定期向NameNode汇报自身所存储的block信息
基于zookeeper,对namenode的集群管理。
即为防止单个namenode挂了后,HDFS无法运行,配置一个namenode集群,当active的namenode挂的时候,备用的namenode从standby状态转换为active状态,继续提供服务
HA机制有2种方式,一种是NFS(Network File System)方式,另外一种是QJM(Quorum Journal Manager)方式
QJM的原理如下图:

备用的namenode和主namenode之间需要保持数据的一致(主要是edits.log和硬盘元数据Fsimage),而在初始化时,所有namenode的Fsimage都是一致的,所以主要是edits.log的一致。
为保持edits.log的一致,HA机制中直接将其拿了出来变成单独的一个服务:EDITS管理服务。所有的namenode读取edits.log信息时都在这个服务中获取。
(EDITS管理服务:可以看成把多个edits.log放在多个机器上,基于Zookeeper形成主写从读的集群。每个节点的服务名:journalNode)
数据保存一致后,当active状态的namenode挂了的时候,仍需要进行切换,负责这个切换的机制叫做ZKFC,它也是基于Zookeeper实现的,服务名:DFSZKFailoverController。
ZKFC切换namenode时,可能出现原来active状态的namenode并不是挂了,只是一段时间失去通讯,等它恢复时,ZKFC已经切换出了一个新的active的namenode,造成此时HDFS出现双active状态的namenode。
为防止这种情况的出现,ZKFC切换namenode时会有一个fencing 机制来保证原active状态的namenode一定不会再次激活
fencing 机制:
ssh kill namenode进程号类似的框架还有Storm(擅长流式计算),Spark(擅长内存迭代计算)。区别在于MapReduce可做离线数据的批量计算,而后两者可做实时处理。
map阶段,reduce阶段jar包分发,任务的启动,任务的容错、调度,中间结果的分组传递……)Map过程会将数据计算映射到各个服务器上进行处理,而Reduce会将各个服务器上的计算结果取出来进行二次处理。
当执行hadoop jar jar包路径 jar包执行主类 执行结果输出目录命令时,job提交过程如下:

(Yarn集群中有两个角色节点:ResourceManager和NodeManager)
RunJarRunJar向Yarn中的ResourceManager申请运行一个job , ResourceManager返回执行job的 "jobId" 和 "存放job资源的hdfs路径"Runjar将job需要的文件资源(jar包,jobid,jar存放的位置,配置信息等等)写入HDFS,通知ResourceManager提交完成ResourceManager读取HDFS上的job信息,开始初始化一个Job任务,并将其放入任务队列NodeManager读取ResourceManager的任务队列,发现对应任务分配给自己时,领取任务信息(心跳机制)NodeManager根据任务信息,下载所需要的jar包,分配执行job的资源容器。分配完毕后向ResourceManager汇报分配结果ResourceManager根据各个机器的分配结果,选取一个机器上的NodeManager运行MapReduce的主管进程:MRAppMasterMRAppMaster主进程协调其余NodeManager一起运行MapReduce程序来完成job任务,并将结果写入HDFS对应目录整个过程也可以简化为以下三步:
RunJar客户端和ResourceManager协作一起完成Job的提交ResourceManager和NodeManager协作一起完成Job运行所需要的资源分配MapReduce框架的主管进程MRAppMaster负责整个Job运行过程的协调控制
文件被划分为多个切片,每个切片对应一个MapperTask进行运行。
划分规则需要考虑两点:1. 切片不能太大,太大Map过程会很耗时;2. 也不能太小,太小会导致MapperTask过多,影响效率
默认的划分机制是:1. 对每个文件单独规划切片 2. 对每个文件的每一个分块(block)设置为一个切片(split)
在MapperTask和Shuffle过程之间,做一个数据的局部汇总,使MapperTask传到Shuffle的数据大幅精简,减少Shuffle过程的IO。
如图,在MapReduce上运行 "单词个数统计" 的程序时,某个MapperTask最后传给Shuffle的可能是<hello,1,1,1,重复10万次……1>的长哈希map。而Shuffle接受多个这种长字符串后,再进行合并传给Reduce的哈希map只会更长。
通过在combiner中对MapperTask的长哈希map进行累加处理得<hello,100000>,再传给Shuffle,将减少它大量的IO工作。
总结:
MapperTask的本地服务器上运行,能收到MapperTask输出的每一个key的valuelist,所以可以做局部汇总处理MapperTask的本地环境进行了局部汇总,就会让MapperTask端的输出数据量大幅精简,减小shuffle过程的网络IOcombiner其实就是一个reducer组件,跟真实的reducer的区别就在于,combiner运行MapperTask的本地环境combiner在使用时需要注意,输入输出KV数据类型要跟map和reduce的相应数据类型匹配combiner的加入而受影响Writable或者 WritableComparable<T>Writable接口后,要实现其中的两个方法//序列化,将数据写入字节流
public void write(DataOutput out) throws IOException
//反序列化,从字节流中读出数据
public void readFields(DataInput in) throws IOException 每一个reduce task输出一个结果文件。
场景:
对一个大文本的电话簿数据,想按省份的输出成不同的分文本
方法步骤:
AreaPartitioner 继承 Partitioner 抽象类,实现其中的方法 int getPartition(K,V)public class AreaPartitioner<KEY, VALUE> extends Partitioner<KEY, VALUE>{
private static HashMap<String, Integer> areaMap = new HashMap<>();
static {
areaMap.put("136", 0);
areaMap.put("137", 1);
areaMap.put("138", 2);
areaMap.put("139", 3);
}
@Override
public int getPartition(KEY key, VALUE value, int numPartitions) {
Integer provinceCode = areaMap.get(key.toString().substring(0,3));
return provinceCode==null?4:provinceCode;
}
}partitionerjob.setPartitionerClass(AreaPartitioner.class)//分区设定了5个省份,所以此处填5
job.setNumReduceTasks(5);要注意的是:
shuffle的主要工作是从Map结束到Reduce开始之间的过程。即它是map task的输出数据 到 reduce task之间的一种数据调度机制
shuffle阶段又可以分为Map端的shuffle和Reduce端的shuffle
Map端的shuffle:分组,排序,CombinerReduce端的shuffle:归并,排序
Yarn由1个ResourceManager和多个NodeManager组成
一个集群active状态的RM只有一个,负责整个集群的资源管理和调度
NodeManager(心跳机制)整个集群中有N个,负责单个节点的资源管理和使用以及task的运行情况
每个应用/作业对应一个,负责应用程序的管理,在Container中启动。
作业的运行容器,含有对任务运行情况的描述:cpu、memory、环境变量
Hive是一个数据仓库基础工具在Hadoop中用来处理结构化数据。它架构在Hadoop之上,使数据查询和分析更方便。
简单来说,Hive就是将sql语句转换为MapReduce任务或其余引擎任务运行。
Hive底层的执行引擎有:MapReduce、Tez、Spark
HBase(Hadoop Database):
一个高可靠性、高性能、面向列、可伸缩的分布式存储系统,利用HBase技术可在廉价PC Server上搭建起大规模结构化存储集群。HBase利用Hadoop HDFS作为其文件存储系统,利用Hadoop MapReduce来处理HBase中的海量数据,利用Zookeeper作为协调工具。
HBase介于NOSQL和RDBMS之间,同RDBMS一样,它也是用表结构存储数据,表结构主要由Row Key(行键),列族,Cell,时间戳 组成:


如图,HBase由两部分组成:HMaster,Region server
HMaster:为Region server分配region;负责region server的负载均衡;发现失效的region server并重新分配其上的regionRegion server:维护Master分配给它的region,处理对这些region的IO请求。即负责读写表数据Region:HBase中分布式存储和负载均衡的最小单元,分布在不同的HRegion server上。实质为在行的方向上分割为多个Hregion。客户端访问某个表的"xx行键-xx列族-xx字段名-xx版本号"数据时,通过Zookeeper中的信息访问对应的元数据表,知道数据存储在哪台Region server中,然后对应Region server读取存在HDFS中的数据返回
为分布式程序提供协调服务,作为第三方管理各个节点的共享数据,本身非常可靠,它其实就是一个分布式集群来提供服务
Zookeeper的最主要功能保管客户端提交的数据(极其少量的数据),每一份数据在zookeeper叫做一个znode
znode之间形成一种树状结构(类似于文件树)
比如:/lock(znode名) 001(znode中存的数据)/lock/last_acc(znode名) 008(znode中存的数据)
Zookeeper的应用通过Zookeeper的共享数据的管理,能实现分布式应用的许多功能:
Zookeeper封装的Dubbo)
评论已关闭