HDFS读文件过程分析:获取文件对应的Block列表

图片 9

我们可以从java.io.InputStream类中看到,抽象出一个read方法,用来读取已经打开的InputStream实例中的字节,每次调用read方法,会读取一个字节数据,该方法抽象定义,如下所示:

在使用Java读取一个文件系统中的一个文件时,我们会首先构造一个DataInputStream对象,然后就能够从文件中读取数据。对于存储在HDFS上的文件,也对应着类似的工具类,但是底层的实现逻辑却是非常不同的。我们先从使用DFSClient.DFSDataInputStream类来读取HDFS上一个文件的一段代码来看,如下所示:

       在配置hbase集群将 hdfs
挂接到其它镜像盘时,有不少困惑的地方,结合以前的资料再次学习; 
大数据底层技术的三大基石起源于Google在2006年之前的三篇论文GFS、Map-Reduce、
Bigtable,其中GFS、Map-Reduce技术直接支持了Apache
Hadoop项目的诞生,Bigtable催生了NoSQL这个崭新的数据库领域,由于map-Reduce处理框架高延时的缺陷,
Google在2009年后推出的Dremel促使了实时计算系统的兴起,以此引发大数据第二波技术浪潮,一些大数据公司纷纷推出自己的大数据查询分析产品,如:Cloudera开源了大数据查询分析引擎Impala、Hortonworks开源了 Stinger、Fackbook开源了Presto、UC Berkeley
AMPLAB实验室开发了Spark计算框架,所有这些技术的数据来源均基于hdsf,
对于 hdsf 最基本的不外乎就是其读写操作

public abstract int read() throws IOException;

package org.shirdrn.hadoop.hdfs;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;

public class HdfsFileReader {

     public static void main(String[] args) {
          String file = "hdfs://hadoop-cluster-m:8020/data/logs/basis_user_behavior/201405071237_10_10_1_73.log";
          Path path = new Path(file);

          Configuration conf = new Configuration();
          FileSystem fs;
          FSDataInputStream in;
          BufferedReader reader = null;
          try {
               fs = FileSystem.get(conf);
               in = fs.open(path); // 打开文件path,返回一个FSDataInputStream流对象
               reader = new BufferedReader(new InputStreamReader(in));
               String line = null;
               while((line = reader.readLine()) != null) { // 读取文件行内容
                    System.out.println("Record: " + line);
               }
          } catch (IOException e) {
               e.printStackTrace();
          } finally {
               try {
                    if(reader != null) reader.close();
               } catch (IOException e) {
                    e.printStackTrace();
               }
          }
     }

}

目录:

Hadoop的DFSClient.DFSInputStream类实现了该抽象逻辑,如果我们清楚了如何从HDFS中读取一个文件的一个block的一个字节的原理,更加抽象的顶层只需要迭代即可获取到该文件的全部数据。

基于上面代码,我们可以看到,通过一个FileSystem对象可以打开一个Path文件,返回一个FSDataInputStream文件输入流对象,然后从该FSDataInputStream对象就能够读取出文件的内容。所以,我们从FSDataInputStream入手,详细分析从HDFS读取文件内容的过程,在实际地读取物理数据块之前,首先要获取到文件对应的Block列表元数据信息,整体流程如下图所示:

  • hdfs 名词解释
  • hdsf 架构
  • NameNode(NN)
  • Secondary NN
  • hdfs 写文件
  • hdfs 读文件
  • block持续化结构

从HDFS读文件过程分析:获取文件对应的Block列表中,我们已经获取到一个文件对应的Block列表信息,打开一个文件,接下来就要读取实际的物理块数据,我们从下面的几个方面来详细说明读取数据的过程。

图片 1

HDFS名词解释:

Client从Datanode读取文件的一个字节

下面,详细说明整个流程:


下面,我们通过分析DFSClient.DFSInputStream中实现的代码,读取HDFS上文件的内容。首先从下面的方法开始:

创建FSDataInputStream流对象

  • Block:
    在HDFS中,每个文件都是采用的分块的方式存储,每个block放在不同的datanode上,每个block的标识是一个三元组(block id, numBytes,generationStamp),其中block
    id是具有唯一性,具体分配是由namenode节点设置,然后再由datanode上建立block文件,同时建立对应block
    meta文件
  • Packet:在DFSclient与DataNode之间通信的过程中,发送和接受数据过程都是以一个packet为基础的方式进行
  • Chunk:中文名字也可以称为块,但是为了与block区分,还是称之为chunk。在DFSClient与DataNode之间通信的过程中,由于文件采用的是基于块的方式来进行的,但是在发送数据的过程中是以packet的方式来进行的,每个packet包含了多个chunk,同时对于每个chunk进行checksum计算,生成checksum
    bytes
  • 小结:
@Override
public synchronized int read() throws IOException {
  int ret = read( oneByteBuf, 0, 1 );
  return ( ret <= 0 ) ? -1 : (oneByteBuf[0] & 0xff);
}

从一个Path路径对象,能够获取到一个FileSystem对象,然后通过调用FileSystem的open方法打开一个文件流:

    1. 一个文件被拆成多个block持续化存储(block
      size 由配置文件参数决定)  
      思考: 修改 block size
      对以前持续化的数据有何影响?
    2. 数据通讯过程中一个 block 被拆成 多个
      packet
    3. 一个 packet 包含多个
      chunk

上面调用read(oneByteBuf, 0,
1)读取一个字节到单字节缓冲区oneByteBuf中,具体实现见如下方法:

public FSDataInputStream open(Path f) throws IOException {
  return open(f, getConf().getInt("io.file.buffer.size", 4096));
}
  • Packet结构与定义: Packet分为两类,一类是实际数据包,另一类是heatbeat包。一个Packet数据包的组成结构,如图所示
  • 图片 2
  • 上图中,一个Packet是由Header和Data两部分组成,其中Header部分包含了一个Packet的概要属性信息,如下表所示:
  • 图片 3
  • Data部分是一个Packet的实际数据部分,主要包括一个4字节校验和(Checksum)与一个Chunk部分,Chunk部分最大为512字节
  • 在构建一个Packet的过程中,首先将字节流数据写入一个buffer缓冲区中,也就是从偏移量为25的位置(checksumStart)开始写Packet数据Chunk的Checksum部分,从偏移量为533的位置(dataStart)开始写Packet数据的Chunk
    Data部分,直到一个Packet创建完成为止。
  • 当写一个文件的最后一个Block的最后一个Packet时,如果一个Packet的大小未能达到最大长度,也就是上图对应的缓冲区中,Checksum与Chunk
    Data之间还保留了一段未被写过的缓冲区位置,在发送这个Packet之前,会检查Chunksum与Chunk
    Data之间的缓冲区是否为空白缓冲区(gap),如果有则将Chunk
    Data部分向前移动,使得Chunk Data 1与Chunk Checksum
    N相邻,然后才会被发送到DataNode节点
@Override
public synchronized int read(byte buf[], int off, int len) throws IOException {
  checkOpen(); // 检查Client是否正在运行
  if (closed) {
    throw new IOException("Stream closed");
  }
  failures = 0;

  if (pos < getFileLength()) { // getFileLength()获取文件所包含的总字节数,pos表示读取当前文件的第(pos+1)个字节
    int retries = 2;
    while (retries > 0) {
      try {
        if (pos > blockEnd) { // blockEnd表示文件的长度(字节数)
          currentNode = blockSeekTo(pos); // 找到第pos个字节数据所在的Datanode(实际根据该字节数据所在的block元数据来定位)
        }
        int realLen = (int) Math.min((long) len, (blockEnd - pos + 1L));
        int result = readBuffer(buf, off, realLen); // 读取一个字节到缓冲区中

        if (result >= 0) {
          pos += result; // 每成功读取result个字节,pos增加result
        } else {
          // got a EOS from reader though we expect more data on it.
          throw new IOException("Unexpected EOS from the reader");
        }
        if (stats != null && result != -1) {
          stats.incrementBytesRead(result);
        }
        return result;
      } catch (ChecksumException ce) {
        throw ce;           
      } catch (IOException e) {
        if (retries == 1) {
          LOG.warn("DFS Read: " + StringUtils.stringifyException(e));
        }
        blockEnd = -1;
        if (currentNode != null) { addToDeadNodes(currentNode); }
        if (--retries == 0) {
          throw e;
        }
      }
    }
  }
  return -1;
}

由于FileSystem是抽象类,将具体的打开操作留给具体子类实现,例如FTPFileSystem、HarFileSystem、WebHdfsFileSystem等,不同的文件系统具有不同打开文件的行为,我们以DistributedFileSystem为例,open方法实现,代码如下所示:

hdsf架构:

读取文件数据的一个字节,具体过程如下:

public FSDataInputStream open(Path f, int bufferSize) throws IOException {
  statistics.incrementReadOps(1);
  return new DFSClient.DFSDataInputStream(
        dfs.open(getPathName(f), bufferSize, verifyChecksum, statistics));
}

  1. 检查流对象是否处于打开状态(前面已经获取到文件对应的block列表的元数据,并打开一个InputStream对象)
  2. 从文件的第一个block开始读取,首先需要找到第一个block对应的数据块所在的Datanode,可以从缓存的block列表中查询到(如果查找不到,则会与Namenode进行一次RPC通信请求获取到)
  3. 打开一个到该读取的block所在Datanode节点的流,准备读取block数据
  4. 建立了到Datanode的连接后,读取一个字节数据到字节缓冲区中,返回读取的字节数(1个字节)

statistics对象用来收集文件系统操作的统计数据,这里使读取文件操作的计数器加1。然后创建了一个DFSClient.DFSDataInputStream对象,该对象的参数是通过DFSClient
dfs客户端对象打开一个这个文件从而返回一个DFSInputStream对象,下面,我们看DFSClient的open方法实现,代码如下所示:

  • hdfs的构架图网上一堆,抓了一张表述比较清楚的图如下,
    主要包含因类角色:Client、NameNode、SecondayNameNode、DataNode
  • 图片 4
  • HDFS Client:
    系统使用者,调用HDFS API操作文件;与NN交互获取文件元数据;与DN交互进行数据读写, 注意:写数据时文件切分由Client完成 
  • Namenode:Master节点(也称元数据节点),是系统唯一的管理者。负责元数据的管理(名称空间和数据块映射信息);配置副本策略;处理客户端请求
  • Datanode:数据存储节点(也称Slave节点),存储实际的数据;执行数据块的读写;汇报存储信息给NN
  • Secondary
    NameNode:小弟角色,分担大哥namenode的工作量;是NameNode的冷备份;合并fsimage和fsedits然后再发给namenode,
    注意:在hadoop 2.x 版本,当启用 hdfs ha
    时,将没有这一角色。(详见第二单)
  • 解释说明:

在读取的过程中,以字节为单位,通过判断某个偏移位置的字节属于哪个block(根据block元数据所限定的字节偏移范围),在根据这个block去定位某一个Datanode节点,这样就可连续地读取一个文件的全部数据(组成文件的、连续的多个block数据块)。

public DFSInputStream open(String src, int buffersize, boolean verifyChecksum,
                    FileSystem.Statistics stats) throws IOException {
  checkOpen();
  //    Get block info from namenode
  return new DFSInputStream(src, buffersize, verifyChecksum);
}
    1. 热备份:b是a的热备份,如果a坏掉。那么b马上运行代替a的工作
    2. 冷备份:b是a的冷备份,如果a坏掉。那么b不能马上代替a工作。但是b上存储a的一些信息,减少a坏掉之后的损失

查找待读取的一个字节所在的Datanode节点

checkOpen方法就是检查一个标志位clientRunning,表示当前的dfs客户端对象是否已经创建并初始化,在dfs客户端创建的时候该标志就为true,表示客户端正在运行状态。我们知道,当客户端DFSClient连接到Namenode的时候,实际上是创建了一个到Namenode的RPC连接,Namenode作为Server角色,DFSClient作为Client角色,它们之间建立起Socket连接。只有显式调用DFSClient的close方法时,才会修改clientRunning的值为false,实际上真正地关闭了已经建立的RPC连接。

  • hdfs构架原则:

上面public synchronized int read(byte buf[], int off, int len) throws
IOException方法,调用了blockSeekTo方法来获取,文件某个字节索引位置的数据所在的Datanode节点。其实,很容易就能想到,想要获取到数据所在的Datanode节点,一定是从block元数据中计算得到,然后根据Client缓存的block映射列表,找到block对应的Datanode列表,我们看一下blockSeekTo方法的代码实现:

我们看一下创建DFSInputStream的构造方法实现:

    1. 元数据与数据分离:文件本身的属性(即元数据)与文件所持有的数据分离
    2. 主/从架构:一个HDFS集群是由一个NameNode和一定数目的DataNode组成
    3. 一次写入多次读取:HDFS中的文件在任何时间只能有一个Writer。当文件被创建,接着写入数据,最后,一旦文件被关闭,就不能再修改。
    4. 移动计算比移动数据更划算:数据运算,越靠近数据,执行运算的性能就越好,由于hdfs数据分布在不同机器上,要让网络的消耗最低,并提高系统的吞吐量,最佳方式是将运算的执行移到离它要处理的数据更近的地方,而不是移动数据
private synchronized DatanodeInfo blockSeekTo(long target) throws IOException {
  ... ...

  DatanodeInfo chosenNode = null;
  int refetchToken = 1; // only need to get a new access token once
  while (true) {
    LocatedBlock targetBlock = getBlockAt(target, true); // 获取字节偏移位置为target的字节数据所在的block元数据对象
    assert (target==this.pos) : "Wrong postion " + pos + " expect " + target;
    long offsetIntoBlock = target - targetBlock.getStartOffset();

    DNAddrPair retval = chooseDataNode(targetBlock); // 选择一个Datanode去读取数据
    chosenNode = retval.info;
    InetSocketAddress targetAddr = retval.addr;

    // 先尝试从本地读取数据,如果数据不在本地,则正常去读取远程的Datanode节点
    Block blk = targetBlock.getBlock();
    Token<BlockTokenIdentifier> accessToken = targetBlock.getBlockToken();
    if (shouldTryShortCircuitRead(targetAddr)) {
      try {
        blockReader = getLocalBlockReader(conf, src, blk, accessToken,
            chosenNode, DFSClient.this.socketTimeout, offsetIntoBlock); // 创建一个用来读取本地数据的BlockReader对象
        return chosenNode;
      } catch (AccessControlException ex) {
        LOG.warn("Short circuit access failed ", ex);
        //Disable short circuit reads
        shortCircuitLocalReads = false;
      } catch (IOException ex) {
        if (refetchToken > 0 && tokenRefetchNeeded(ex, targetAddr)) {
          /* Get a new access token and retry. */
          refetchToken--;
          fetchBlockAt(target);
          continue;
        } else {
          LOG.info("Failed to read " + targetBlock.getBlock()
              + " on local machine" + StringUtils.stringifyException(ex));
          LOG.info("Try reading via the datanode on " + targetAddr);
        }
      }
    }

    // 本地读取失败,按照更一般的方式去读取远程的Datanode节点来获取数据
    try {
      s = socketFactory.createSocket();
      LOG.debug("Connecting to " + targetAddr);
      NetUtils.connect(s, targetAddr, getRandomLocalInterfaceAddr(), socketTimeout);
      s.setSoTimeout(socketTimeout);
      blockReader = RemoteBlockReader.newBlockReader(s, src, blk.getBlockId(),
          accessToken,
          blk.getGenerationStamp(),
          offsetIntoBlock, blk.getNumBytes() - offsetIntoBlock,
          buffersize, verifyChecksum, clientName); // 创建一个远程的BlockReader对象
      return chosenNode;
    } catch (IOException ex) {
      if (refetchToken > 0 && tokenRefetchNeeded(ex, targetAddr)) {
        refetchToken--;
        fetchBlockAt(target);
      } else {
        LOG.warn("Failed to connect to " + targetAddr
            + ", add to deadNodes and continue" + ex);
        if (LOG.isDebugEnabled()) {
          LOG.debug("Connection failure", ex);
        }
        // Put chosen node into dead list, continue
        addToDeadNodes(chosenNode); // 读取失败,会将选择的Datanode加入到Client的dead node列表,为下次读取选择合适的Datanode读取文件数据提供参考元数据信息
      }
      if (s != null) {
        try {
          s.close();
        } catch (IOException iex) { }                       
      }
      s = null;
    }
  }
}
DFSInputStream(String src, int buffersize, boolean verifyChecksum) throws IOException {
  this.verifyChecksum = verifyChecksum;
  this.buffersize = buffersize;
  this.src = src;
  prefetchSize = conf.getLong("dfs.read.prefetch.size", prefetchSize);
  openInfo();
}

NameNode:

上面代码中,主要包括如下几个要点:

先设置了几个与读取文件相关的参数值,这里有一个预先读取文件的Block字节数的参数prefetchSize,它的值设置如下:


  • 选择合适的Datanode节点,提高读取效率
public static final long DEFAULT_BLOCK_SIZE = DFSConfigKeys.DFS_BLOCK_SIZE_DEFAULT;
public static final long    DFS_BLOCK_SIZE_DEFAULT = 64*1024*1024;

  defaultBlockSize = conf.getLong("dfs.block.size", DEFAULT_BLOCK_SIZE);
  private long prefetchSize = 10 * defaultBlockSize;
  • NameNode是整个文件系统的管理节点,也是HDFS中最复杂的一个实体,它维护着HDFS文件系统中最重要的两个关系:

在读取文件的时候,首先会从Namenode获取文件对应的block列表元数据,返回的block列表是按照Datanode的网络拓扑结构进行排序过的(本地节点优先,其次是同一机架节点),而且,Client还维护了一个dead
node列表,只要此时bock对应的Datanode列表中节点不出现在dead
node列表中就会被返回,用来作为读取数据的Datanode节点。

这个prefetchSize的值默认为10*64*1024*1024=671088640,也就是说,默认预读取一个文件的10个块,即671088640B=640M,如果想要修改这个值,设置dfs.block.size即可覆盖默认值。

    1. HDFS文件系统中的文件目录树,以及文件的数据块索引,即每个文件对应的数据块列表
    2. 数据块和数据节点的对应关系,即某一块数据块保存在哪些数据节点的信息
  • 如果Client为集群Datanode节点,尝试从本地读取block

然后调用了openInfo方法,从Namenode获取到该打开文件的信息,在openInfo方法中,具体实现如下所示:

  • 第一个关系即目录树、元数据和数据块的索引信息会持久化到物理存储中,实现是保存在命名空间的镜像fsimage和编辑日志edits中,注意:在fsimage中,并没有记录每一个block对应到哪几个Datanodes的对应表信息
  • 第二个关系是在NameNode启动后,每个Datanode对本地磁盘进行扫描,将本Datanode上保存的block信息汇报给Namenode,Namenode在接收到每个Datanode的块信息汇报后,将接收到的块信息,以及其所在的Datanode信息等保存在内存中。HDFS就是通过这种块信息汇报的方式来完成 block -> Datanodes
    list的对应表构建
  • fsimage记录了自最后一次检查点之前HDFS文件系统中所有目录和文件的序列化信息;
  • edits是元数据操作日志(记录每次保存fsimage之后到下次保存之间的所有hdfs操作)
  • 在NameNode启动时候,会先将fsimage中的文件系统元数据信息加载到内存,然后根据eidts中的记录将内存中的元数据同步至最新状态,将这个新版本的 FsImage
    从内存中保存到本地磁盘上,然后删除 旧的
    Editlog,这个过程称为一个检查
    点 (checkpoint), 多长时间做一次 checkpoint?(见第四章
    参数配置
    ) checkpoint 能手工触发吗?
    验证重启hdfs服务后editlog没删除呢?
  • 类似于数据库中的检查点,为了避免edits日志过大,在Hadoop1.X中,SecondaryNameNode会按照时间阈值(比如24小时)或者edits大小阈值(比如1G),周期性的将fsimage和edits的合并,然后将最新的fsimage推送给NameNode。而在Hadoop2.X中,这个动作是由Standby
    NameNode来完成.
  • 由此可看出,这两个文件一旦损坏或丢失,将导致整个HDFS文件系统不可用,在HDP2.4安装(五):集群及组件安装 
    集群安装过程中,hdfs 默认的只能选择一个NN,是否意味着NN存在单点呢?(***见第二单 hdfs
    HA)***
  • 在hadoop1.X为了保证这两种元数据文件的高可用性,一般的做法,将dfs.namenode.name.dir设置成以逗号分隔的多个目录,这多个目录至少不要在一块磁盘上,最好放在不同的机器上,比如:挂载一个共享文件系统
  • fsimageedits
    是序列化后的文件,想要查看或编辑里面的内容,可通过 hdfs 提供的 oivoev
    命令,如下:

    • 命令: hdfs oiv (offline image viewer) 用于将fsimage文件的内容转储到指定文件中以便于阅读,,如文本文件、XML文件,该命令需要以下参数:
      1. -i  (必填参数)  –inputFile
        <arg>  输入FSImage文件
      2. -o (必填参数)  –outputFile
        <arg> 输出转换后的文件,如果存在,则会覆盖
      3. -p (可选参数) –processor
        <arg>   将FSImage文件转换成哪种格式:
        (Ls|XML|FileDistribution).默认为Ls
      4. 示例:hdfs oiv -i
        /data1/hadoop/dfs/name/current/fsimage_0000000000019372521
        -o /home/hadoop/fsimage.txt
    • 命令:hdfs oev (offline edits viewer
      离线edits查看器)的缩写,
      该工具只操作文件因而并不需要hadoop集群处于运行状态。
      1. 示例: 
        hdfs oev -i
        edits_0000000000000042778-0000000000000042779 -o
        edits.xml
      2. 支持的输出格式有binary(hadoop使用的二进制格式)、xml(在不使用参数p时的默认输出格式)和stats(输出edits文件的统计信息)
  • 小结:

通过调用chooseDataNode方法返回一个Datanode结点,通过判断,如果该节点地址是本地地址,并且该节点上对应的block元数据信息的状态不是正在创建的状态,则满足从本地读取数据块的条件,然后会创建一个LocalBlockReader对象,直接从本地读取。在创建LocalBlockReader对象的过程中,会先从缓存中查找一个本地Datanode相关的LocalDatanodeInfo对象,该对象定义了与从本地Datanode读取数据的重要信息,以及缓存了待读取block对应的本地路径信息,可以从LocalDatanodeInfo类定义的属性来说明:

synchronized void openInfo() throws IOException {
  for (int retries = 3; retries > 0; retries--) {
    if (fetchLocatedBlocks()) { // fetch block success. 如果成功获取到待读取文件对应的Block列表,则直接返回
      return;
    } else {
      // Last block location unavailable. When a cluster restarts,
      // DNs may not report immediately. At this time partial block
      // locations will not be available with NN for getting the length.
      // Lets retry a few times to get the length.
      DFSClient.LOG.warn("Last block locations unavailable. "
          + "Datanodes might not have reported blocks completely."
          + " Will retry for " + retries + " times");
      waitFor(4000);
    }
  }
  throw new IOException("Could not obtain the last block locations.");
}
  1. NameNode管理着DataNode,接收DataNode的注册、心跳、数据块提交等信息的上报,并且在心跳中发送数据块复制、删除、恢复等指令;同时,NameNode还为客户端对文件系统目录树的操作和对文件数据读写、对HDFS系统进行管理提供支持
  2. Namenode 启动后会进入一个称为安全模式的特殊状态。处于安全模式 的
    Namenode 是不会进行数据块的复制的。 Namenode 从所有的 Datanode
    接收心跳信号和块状态报告。块状态报告包括了某个 Datanode
    所有的数据 块列表。每个数据块都有一个指定的最小副本数。当 Namenode
    检测确认某
    个数据块的副本数目达到这个最小值,那么该数据块就会被认为是副本安全
    (safely replicated) 的;在一定百分比(这个参数可配置)的数据块被
    Namenode 检测确认是安全之后(加上一个额外的 30 秒等待时间),
    Namenode 将退出安全模式状态。接下来它会确定还有哪些数据块的副本没
    有达到指定数目,并将这些数据块复制到其他 Datanode 上。
private ClientDatanodeProtocol proxy = null;
private final Map<Block, BlockLocalPathInfo> cache;

上述代码中,有一个for循环用来获取Block列表。如果成功获取到待读取文件的Block列表,则直接返回,否则,最多执行3次等待重试操作(最多花费时间大于12秒)。未能成功读取文件的Block列表信息,是因为Namenode无法获取到文件对应的块列表的信息,当整个集群启动的时候,Datanode会主动向NNamenode上报对应的Block信息,只有Block
Report完成之后,Namenode就能够知道组成文件的Block及其所在Datanode列表的信息。openInfo方法方法中调用了fetchLocatedBlocks方法,用来与Namenode进行RPC通信调用,实际获取对应的Block列表,实现代码如下所示:

Secondary NameNode:在HA cluster中又称为standby node

如果缓存中存在待读取的block的相关信息,可以直接进行读取;否则,会创建一个proxy对象,以及计算待读取block的路径信息BlockLocalPathInfo,最后再加入到缓存,为后续可能的读取加速。我们看一下如果没有从缓存中找到LocalDatanodeInfo信息(尤其是BlockLocalPathInfo),则会执行如下逻辑:

private boolean fetchLocatedBlocks() throws IOException,
    FileNotFoundException {
  LocatedBlocks newInfo = callGetBlockLocations(namenode, src, 0, prefetchSize);
  if (newInfo == null) {
    throw new FileNotFoundException("File does not exist: " + src);
  }

  if (locatedBlocks != null && !locatedBlocks.isUnderConstruction() && !newInfo.isUnderConstruction()) {
    Iterator<LocatedBlock> oldIter = locatedBlocks.getLocatedBlocks().iterator();
    Iterator<LocatedBlock> newIter = newInfo.getLocatedBlocks().iterator();
    while (oldIter.hasNext() && newIter.hasNext()) {
      if (!oldIter.next().getBlock().equals(newIter.next().getBlock())) {
        throw new IOException("Blocklist for " + src + " has changed!");
      }
    }
  }
  boolean isBlkInfoUpdated = updateBlockInfo(newInfo);
  this.locatedBlocks = newInfo;
  this.currentNode = null;
  return isBlkInfoUpdated;
}

// make RPC to local datanode to find local pathnames of blocks
pathinfo = proxy.getBlockLocalPathInfo(blk, token);

调用callGetBlockLocations方法,实际上是根据创建RPC连接以后得到的Namenode的代理对象,调用Namenode来获取到指定文件的Block的位置信息(位于哪些Datanode节点上):namenode.getBlockLocations(src,
start,
length)。调用callGetBlockLocations方法返回一个LocatedBlocks对象,该对象包含了文件长度信息、List
blocks列表对象,其中LocatedBlock包含了一个Block的基本信息:

  • 定期合并 fsimage 和 edits 日志,将
    edits 日志文件大小控制在一个限度下
  • 图片 5
  • namenode 响应 Secondary namenode 请求,将 edit log 推送给 Secondary
    namenode , 开始重新写一个新的 edit log
  • Secondary namenode 收到来自 namenode 的 fsimage 文件和 edit log
  • Secondary namenode 将 fsimage 加载到内存,应用 edit log , 并生成一
    个新的 fsimage 文件
  • Secondary namenode 将新的 fsimage 推送给 Namenode
  • Namenode 用新的 fsimage 取代旧的 fsimage , 在 fstime 文件中记下检查
    点发生的时

上面proxy为ClientDatanodeProtocol类型,Client与Datanode进行RPC通信的协议,RPC调用getBlockLocalPathInfo获取block对应的本地路径信息,可以在Datanode类中查看具体实现,如下所示:

private Block b;
private long offset;  // offset of the first byte of the block in the file
private DatanodeInfo[] locs;
private boolean corrupt;

 HDFS写文件:

BlockLocalPathInfo info = data.getBlockLocalPathInfo(block);

有了这些文件的信息(文件长度、文件包含的Block的位置等信息),DFSClient就能够执行后续读取文件数据的操作了,详细过程我们在后面分析说明。


Datanode调用FSDataset(实现接口FSDatasetInterface)的getBlockLocalPathInfo,如下所示:

通过Namenode获取文件信息

  • 写文件部分参考blog 地址
    http://www.cnblogs.com/laov/p/3434917.html),2.X版本默认block的大小是
    128M (见第四章参数配置)
  • 图片 6
@Override //FSDatasetInterface
public BlockLocalPathInfo getBlockLocalPathInfo(Block block)
    throws IOException {
  File datafile = getBlockFile(block); // 获取本地block在本地Datanode文件系统中的文件路径
  File metafile = getMetaFile(datafile, block);  // 获取本地block在本地Datanode文件系统中的元数据的文件路径
  BlockLocalPathInfo info = new BlockLocalPathInfo(block, datafile.getAbsolutePath(), metafile.getAbsolutePath());
  return info;
}

上面,我们提到获取一个文件的基本信息,是通过Namenode来得到的,这里详细分析Namenode是如何获取到这些文件信息的,实现方法getBlockLocations的代码,如下所示:

Client将FileA按64M分块。分成两块,block1和Block2;

接着可以直接去读取该block文件(如果需要检查校验和文件,会读取block的元数据文件metafile):

public LocatedBlocks getBlockLocations(String src, long offset, long length) throws IOException {
  myMetrics.incrNumGetBlockLocations();
  return namesystem.getBlockLocations(getClientMachine(), src, offset, length);
}

Client向nameNode发送写数据请求,如图蓝色虚线①——>

... // BlockReaderLocal类的newBlockReader静态方法
// get a local file system
File blkfile = new File(pathinfo.getBlockPath());
dataIn = new FileInputStream(blkfile);

if (!skipChecksum) { // 如果检查block的校验和
  // get the metadata file
  File metafile = new File(pathinfo.getMetaPath());
  checksumIn = new FileInputStream(metafile);

  // read and handle the common header here. For now just a version
  BlockMetadataHeader header = BlockMetadataHeader.readHeader(new DataInputStream(checksumIn));
  short version = header.getVersion();
  if (version != FSDataset.METADATA_VERSION) {
    LOG.warn("Wrong version (" + version + ") for metadata file for " + blk + " ignoring ...");
  }
  DataChecksum checksum = header.getChecksum();
  localBlockReader = new BlockReaderLocal(conf, file, blk, token, startOffset, length, pathinfo, checksum, true, dataIn, checksumIn);
} else {
  localBlockReader = new BlockReaderLocal(conf, file, blk, token, startOffset, length, pathinfo, dataIn);
}

可以看到,Namenode又委托管理HDFS
name元数据的FSNamesystem的getBlockLocations方法实现:

NameNode节点,记录block信息。并返回可用的DataNode (NameNode按什么规则返回DataNode? 参见第三单
hadoop机架感知),如粉色虚线②———>

在上面代码中,返回了BlockLocalPathInfo,但是很可能在这个过程中block被删除了,在删除block的时候,Namenode会调度指派该Datanode删除该block,恰好在这个时间间隔内block对应的BlockLocalPathInfo信息已经失效(文件已经被删除),所以上面这段代码再try中会抛出异常,并在catch中捕获到IO异常,会从缓存中再清除掉失效的block到BlockLocalPathInfo的映射信息。

LocatedBlocks getBlockLocations(String clientMachine, String src, long offset, long length) throws IOException {
  LocatedBlocks blocks = getBlockLocations(src, offset, length, true, true, true);
  if (blocks != null) {
    //sort the blocks
    // In some deployment cases, cluster is with separation of task tracker
    // and datanode which means client machines will not always be recognized
    // as known data nodes, so here we should try to get node (but not
    // datanode only) for locality based sort.
    Node client = host2DataNodeMap.getDatanodeByHost(clientMachine);
    if (client == null) {
      List<String> hosts = new ArrayList<String> (1);
      hosts.add(clientMachine);
      String rName = dnsToSwitchMapping.resolve(hosts).get(0);
      if (rName != null)
        client = new NodeBase(clientMachine, rName);
    }  

    DFSUtil.StaleComparator comparator = null;
    if (avoidStaleDataNodesForRead) {
      comparator = new DFSUtil.StaleComparator(staleInterval);
    }
    // Note: the last block is also included and sorted
    for (LocatedBlock b : blocks.getLocatedBlocks()) {
      clusterMap.pseudoSortByDistance(client, b.getLocations());
      if (avoidStaleDataNodesForRead) {
        Arrays.sort(b.getLocations(), comparator);
      }
    }
  }
  return blocks;
}
  • Block1: host2,host1,host3
  • Block2: host7,host8,host4
  • 如果Client非集群Datanode节点,远程读取block

跟踪代码,最终会在下面的方法中实现了,如何获取到待读取文件的Block的元数据列表,以及如何取出该文件的各个Block的数据,方法实现代码,这里我做了详细的注释,可以参考,如下所示:

client向DataNode发送block1;发送过程是以流式写入,流式写入过程如下:

如果Client不是Datanode本地节点,则只能跨网络节点远程读取,首先创建Socket连接:

private synchronized LocatedBlocks getBlockLocationsInternal(String src,
                                                     long offset,
                                                     long length,
                                                     int nrBlocksToReturn,
                                                     boolean doAccessTime,
                                                     boolean needBlockToken)
                                                     throws IOException {
        INodeFile inode = dir.getFileINode(src);  // 获取到与待读取文件相关的inode数据
        if (inode == null) {
             return null;
        }
        if (doAccessTime && isAccessTimeSupported()) {
             dir.setTimes(src, inode, -1, now(), false);
        }
        Block[] blocks = inode.getBlocks(); // 获取到文件src所包含的Block的元数据列表信息
        if (blocks == null) {
             return null;
        }
        if (blocks.length == 0) { // 获取到文件src的Block数,这里=0,该文件的Block数据还没创建,可能正在创建
             return inode.createLocatedBlocks(new ArrayList<LocatedBlock>(blocks.length));
        }
        List<LocatedBlock> results;
        results = new ArrayList<LocatedBlock>(blocks.length);

        int curBlk = 0; // 当前Block在Block[] blocks数组中的索引位置
        long curPos = 0, blkSize = 0; // curPos表示某个block在文件中的字节偏移量,blkSize为Block的大小(字节数)
        int nrBlocks = (blocks[0].getNumBytes() == 0) ? 0 : blocks.length; // 获取到文件src的Block数,实际上一定>0,但是第一个block大小可能为0,这种情况认为nrBlocks=0
        for (curBlk = 0; curBlk < nrBlocks; curBlk++) {  // 根据前面代码,我们知道offset=0,所以这个循环第一次进来肯定就break出去了(正常的话,blkSize>0,所以我觉得这段代码写的稍微有点晦涩)
             blkSize = blocks[curBlk].getNumBytes();
             assert blkSize > 0 : "Block of size 0";
             if (curPos + blkSize > offset) {
                  break;
             }
             curPos += blkSize;
        }

        if (nrBlocks > 0 && curBlk == nrBlocks) // offset >= end of file, 到这里curBlk=0,如果从文件src的第一个Block的字节数累加计算,知道所有的Block的字节数都累加上了,总字节数仍然<=请求的offset,说明即使到了文件尾部,仍然没有达到offset的值。从前面fetchLocatedBlocks()方法中调用我们知道,offset=0,所以执行该分支表示文件src没有可用的Block数据块可读
             return null;

        long endOff = offset + length; // 

        do {
             // 获取Block所在位置(Datanode节点)
             int numNodes = blocksMap.numNodes(blocks[curBlk]); // 计算文件src中第curBlk个Block存储在哪些Datanode节点上
             int numCorruptNodes = countNodes(blocks[curBlk]).corruptReplicas(); // 计算存储文件src中第curBlk个Block但无法读取该Block的Datanode节点数
             int numCorruptReplicas = corruptReplicas.numCorruptReplicas(blocks[curBlk]); // 计算FSNamesystem在内存中维护的Block=>Datanode映射的列表中,无法读取该Block的Datanode节点数
             if (numCorruptNodes != numCorruptReplicas) {
                  LOG.warn("Inconsistent number of corrupt replicas for "
                            + blocks[curBlk] + "blockMap has " + numCorruptNodes
                            + " but corrupt replicas map has " + numCorruptReplicas);
             }
             DatanodeDescriptor[] machineSet = null;  // 下面的if...else用来获取一个Block所在的Datanode节点
             boolean blockCorrupt = false;
             if (inode.isUnderConstruction() && curBlk == blocks.length - 1
                       && blocksMap.numNodes(blocks[curBlk]) == 0) { // 如果文件正在创建,当前blocks[curBlk]还没有创建成功(即没有可用的Datanode可以提供该Block的服务),仍然返回待创建Block所在的Datanode节点列表。数据块是在Datanode上存储的,只要Datanode完成数据块的存储后,通过heartbeat将数据块的信息上报给Namenode后,这些信息才会存储到blocksMap中
                  // get unfinished block locations
                  INodeFileUnderConstruction cons = (INodeFileUnderConstruction) inode;
                  machineSet = cons.getTargets();
                  blockCorrupt = false;
             } else { // 文件已经创建完成
                  blockCorrupt = (numCorruptNodes == numNodes); // 是否当前的Block在所有Datanode节点上的副本都坏掉,无法提供服务
                  int numMachineSet = blockCorrupt ? numNodes : (numNodes - numCorruptNodes); // 如果是,则返回所有Datanode节点,否则,只返回可用的Block副本所在的Datanode节点
                  machineSet = new DatanodeDescriptor[numMachineSet];
                  if (numMachineSet > 0) { // 获取到当前Block所有副本所在的Datanode节点列表
                       numNodes = 0;
                       for (Iterator<DatanodeDescriptor> it = blocksMap.nodeIterator(blocks[curBlk]); it.hasNext();) {
                            DatanodeDescriptor dn = it.next();
                            boolean replicaCorrupt = corruptReplicas.isReplicaCorrupt(blocks[curBlk], dn);
                            if (blockCorrupt || (!blockCorrupt && !replicaCorrupt))
                                 machineSet[numNodes++] = dn;
                       }
                  }
             }
             LocatedBlock b = new LocatedBlock(blocks[curBlk], machineSet, curPos, blockCorrupt); // 创建一个包含Block的元数据对象、所在Datanode节点列表、起始索引位置(字节数)、健康状况的LocatedBlock对象
             if (isAccessTokenEnabled && needBlockToken) { // 如果启用Block级的令牌(Token)访问,则为当前用户生成读模式的令牌信息,一同封装到返回的LocatedBlock对象中
                  b.setBlockToken(accessTokenHandler.generateToken(b.getBlock(), EnumSet.of(BlockTokenSecretManager.AccessMode.READ)));
             }

             results.add(b); // 收集待返回给读取文件的客户端需要的LocatedBlock列表
             curPos += blocks[curBlk].getNumBytes();
             curBlk++;
        } while (curPos < endOff && curBlk < blocks.length && results.size() < nrBlocksToReturn);

        return inode.createLocatedBlocks(results); // 将收集的LocatedBlock列表数据封装到一个LocatedBlocks对象中返回
   }
  1. 将64M的block1按64k的packet划分
  2. 然后将第一个packet发送给host2
  3. host2接收完后,将第一个packet发送给host1,同时client想host2发送第二个packet
  4. host1接收完第一个packet后,发送给host3,同时接收host2发来的第二个packet
  5. 以此类推,如图红线实线所示,直到将block1发送完毕
  6. host2,host1,host3向NameNode,host2向Client发送通知,说“消息发送完了”。如图粉红颜色实线所示
  7. client收到host2发来的消息后,向namenode发送消息,说我写完了。这样就真完成了。如图黄色粗实线
  8. 发送完block1后,再向host7,host8,host4发送block2,如图蓝色实线所示
s = socketFactory.createSocket();
LOG.debug("Connecting to " + targetAddr);
NetUtils.connect(s, targetAddr, getRandomLocalInterfaceAddr(), socketTimeout);
s.setSoTimeout(socketTimeout);

我们可以看一下,最后的调用inode.createLocatedBlocks(results)生成LocatedBlocks对象的实现,代码如下所示:

  •  说明:

建立Client到目标Datanode(targetAddr)的连接,然后同样也是创建一个远程BlockReader对象RemoteBlockReader来辅助读取block数据。创建RemoteBlockReader过程中,首先向目标Datanode发送RPC请求:

LocatedBlocks createLocatedBlocks(List<LocatedBlock> blocks) {
  return new LocatedBlocks(computeContentSummary().getLength(), blocks, isUnderConstruction()); // 通过ContentSummary对象获取到文件的长度
}
    1. 当客户端向 HDFS
      文件写入数据的时候,一开始是写到本地临时文件中。假设该文件的副
      本系数设置为 3
      ,当本地临时文件累积到一个数据块的大小时,客户端会从 Namenode
      获取一个 Datanode 列表用于存放副本。然后客户端开始向第一个
      Datanode 传输数据,第一个 Datanode 一小部分一小部分 (4 KB)
      地接收数据,将每一部分写入本地仓库,并同时传输该部分到列表中
      第二个 Datanode 节点。第二个 Datanode
      也是这样,一小部分一小部分地接收数据,写入本地
      仓库,并同时传给第三个 Datanode 。最后,第三个 Datanode
      接收数据并存储在本地。因此, Datanode
      能流水线式地从前一个节点接收数据,并在同时转发给下一个节点,数据以流水线的
      方式从前一个 Datanode 复制到下一个
    2. 时序图如下:
// in and out will be closed when sock is closed (by the caller)
DataOutputStream out = new DataOutputStream(new BufferedOutputStream(NetUtils.getOutputStream(sock,HdfsConstants.WRITE_TIMEOUT)));

//write the header.
out.writeShort( DataTransferProtocol.DATA_TRANSFER_VERSION ); // Client与Datanode之间传输数据的版本号
out.write( DataTransferProtocol.OP_READ_BLOCK ); // 传输操作类型:读取block
out.writeLong( blockId ); // block ID
out.writeLong( genStamp ); // 时间戳信息
out.writeLong( startOffset ); // block起始偏移量
out.writeLong( len ); // block长度
Text.writeString(out, clientName); // 客户端标识
accessToken.write(out); 
out.flush();

客户端通过RPC调用,获取到了文件对应的Block以及所在Datanode列表的信息,然后就可以根据LocatedBlocks来进一步获取到对应的Block对应的物理数据块。

图片 7

然后获取到DataInputStream对象来读取Datanode的响应信息:

对Block列表进行排序

  •  小结:
DataInputStream in = new DataInputStream(
    new BufferedInputStream(NetUtils.getInputStream(sock), bufferSize));

我们再回到FSNamesystem类,调用getBlockLocationsInternal方法的getBlockLocations方法中,在返回文件block列表LocatedBlocks之后,会对每一个Block所在的Datanode进行的一个排序,排序的基本规则有如下2点:

  1. 写入的过程,按hdsf默认设置,1T文件,我们需要3T的存储,3T的网络流量
  2. 在执行读或写的过程中,NameNode和DataNode通过HeartBeat进行保存通信,确定DataNode活着。如果发现DataNode死掉了,就将死掉的DataNode上的数据,放到其他节点去。读取时,要读其他节点去
  3. 挂掉一个节点,没关系,还有其他节点可以备份;甚至,挂掉某一个机架,也没关系;其他机架上,也有备份

最后,返回一个对象RemoteBlockReader:

  • Client到Block所在的Datanode的距离最近,这个是通过网络拓扑关系来进行计算,例如Client的网络路径为/dc1/r1/c1,那么路径为/dc1/r1/dn1的Datanode就比路径为/dc1/r2/dn2的距离小,/dc1/r1/dn1对应的Block就会排在前面
  • 从上面一点可以推出,如果Client就是某个Datanode,恰好某个Block的Datanode列表中包括该Datanode,则该Datanode对应的Block排在前面
  • Block所在的Datanode列表中,如果其中某个Datanode在指定的时间内没有向Namenode发送heartbeat(默认由常量DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_INTERVAL_DEFAULT定义,默认值为30s),则该Datanode的状态即为STALE,具有该状态的Datanode对应的Block排在后面

hdfs读文件: 

return new RemoteBlockReader(file, blockId, in, checksum, verifyChecksum, startOffset, firstChunkOffset, sock);

基于上述规则排序后,Block列表返回到Client。


借助BlockReader来读取block字节

Client与Datanode交互更新文件Block列表

  •  读到文件示意图如下:
  • 图片 8
  • 客户端通过调用FileSystem对象的open()方法来打开希望读取的文件,对于HDFS来说,这个对象时分布文件系统的一个实例;
  • DistributedFileSystem通过使用RPC来调用NameNode以确定文件起始块的位置,同一Block按照重复数会返回多个位置,这些位置按照Hadoop集群拓扑结构排序,距离客户端近的排在前面
    (详见第三章
  • 前两步会返回一个FSDataInputStream对象,该对象会被封装成DFSInputStream对象,DFSInputStream可以方便的管理datanode和namenode数据流,客户端对这个输入流调用read()方法
  • 存储着文件起始块的DataNode地址的DFSInputStream随即连接距离最近的DataNode,通过对数据流反复调用read()方法,将数据从DataNode传输到客户端
  • 到达块的末端时,DFSInputStream会关闭与该DataNode的连接,然后寻找下一个块的最佳DataNode,这些操作对客户端来说是透明的,客户端的角度看来只是读一个持续不断的流
  • 一旦客户端完成读取,就对FSDataInputStream调用close()方法关闭文件读取

我们再回到blockSeekTo方法中,待读取block所在的Datanode信息、BlockReader信息都已经具备,接着就可以从包含输入流(InputStream)对象的BlockReader中读取数据块中一个字节数据:

我们要回到前面分析的DFSClient.DFSInputStream.fetchLocatedBlocks()方法中,查看在调用该方法之后,是如何执行实际处理逻辑的:

block持续化结构:

int result = readBuffer(buf, off, realLen);
private boolean fetchLocatedBlocks() throws IOException,
    FileNotFoundException {
  LocatedBlocks newInfo = callGetBlockLocations(namenode, src, 0, prefetchSize); // RPC调用向Namenode获取待读取文件对应的Block及其位置信息LocatedBlocks对象
  if (newInfo == null) {
    throw new FileNotFoundException("File does not exist: " + src);
  }

  if (locatedBlocks != null && !locatedBlocks.isUnderConstruction() && !newInfo.isUnderConstruction()) { // 这里面locatedBlocks!=null是和后面调用updateBlockInfo方法返回的状态有关的
    Iterator<LocatedBlock> oldIter = locatedBlocks.getLocatedBlocks().iterator();
    Iterator<LocatedBlock> newIter = newInfo.getLocatedBlocks().iterator();
    while (oldIter.hasNext() && newIter.hasNext()) { // 检查2次获取到的LocatedBlock列表:第2次得到newInfo包含的Block列表,在第2次得到的locatedBlocks中是否发生变化,如果发生了变化,则不允许读取,抛出异常
      if (!oldIter.next().getBlock().equals(newIter.next().getBlock())) {
        throw new IOException("Blocklist for " + src + " has changed!");
      }
    }
  }
  boolean isBlkInfoUpdated = updateBlockInfo(newInfo);
  this.locatedBlocks = newInfo;
  this.currentNode = null;
  return isBlkInfoUpdated;
}

将block数据中一个字节读取到buf中,如下所示:

如果第一次读取该文件时,已经获取到了对应的block列表,缓存在客户端;如果客户端第二次又读取了该文件,仍然获取到一个block列表对象。在两次读取之间,可能存在原文件完全被重写的情况,所以新得到的block列表与原列表完全不同了,存在这种情况,客户端直接抛出IO异常,如果原文件对应的block列表没有变化,则更新客户端缓存的对应block列表信息。

  •  DataNode节点上一个Block持久化到磁盘上的物理存储结构,如下图所示:
  • 图片 9
  • 每个Block文件(如上图中blk_1084013198文件)都对应一个meta文件(如上图中blk_1084013198_10273532.meta文件),Block文件是一个一个Chunk的二进制数据(每个Chunk的大小是512字节),而meta文件是与每一个Chunk对应的Checksum数据,是序列化形式存储
private synchronized int readBuffer(byte buf[], int off, int len) throws IOException {
  IOException ioe;
  boolean retryCurrentNode = true;

  while (true) {
    // retry as many times as seekToNewSource allows.
    try {
      return blockReader.read(buf, off, len); // 调用blockReader的read方法读取字节数据到buf中
    } catch ( ChecksumException ce ) {
      LOG.warn("Found Checksum error for " + currentBlock + " from " + currentNode.getName() + " at " + ce.getPos());         
      reportChecksumFailure(src, currentBlock, currentNode);
      ioe = ce;
      retryCurrentNode = false; // 只尝试读取当前选择的Datanode一次,失败的话就会被加入到Client的dead node列表中
    } catch ( IOException e ) {
      if (!retryCurrentNode) {
        LOG.warn("Exception while reading from " + currentBlock + " of " + src + " from " + currentNode + ": " + StringUtils.stringifyException(e));
      }
      ioe = e;
    }
    boolean sourceFound = false;
    if (retryCurrentNode) {
      /* possibly retry the same node so that transient errors don't
       * result in application level failures (e.g. Datanode could have
       * closed the connection because the client is idle for too long).
       */
      sourceFound = seekToBlockSource(pos);
    } else {
      addToDeadNodes(currentNode); // 加入到Client的dead node列表中
      sourceFound = seekToNewSource(pos); // 从当前选择的Datanode上读取数据失败,会再次选择一个Datanode,这里seekToNewSource方法内部调用了blockSeekTo方法去选择一个Datanode
    }
    if (!sourceFound) {
      throw ioe;
    }
    retryCurrentNode = false;
  }
}

当集群重启的时候(如果允许安全模式下读文件),或者当一个文件正在创建的时候,Datanode向Namenode进行Block
Report,这个过程中可能Namenode还没有完全重建好Block到Datanode的映射关系信息,所以即使在这种情况下,仍然会返回对应的正在创建的Block所在的Datanode列表信息,可以从前面getBlockLocationsInternal方法中看到,INode的对应UnderConstruction状态为true。这时,一个Block对应的所有副本中的某些可能还在创建过程中。

通过BlockReaderLocal或者RemoteBlockReader来读取block数据,逻辑非常类似,主要是控制读取字节的偏移量,记录偏移量的状态信息,详细可以查看它们的源码。

上面方法中,调用updateBlockInfo来更新文件的Block元数据列表信息,对于文件的某些Block可能没有创建完成,所以Namenode所保存的关于文件的Block的的元数据信息可能没有及时更新(Datanode可能还没有完成Block的报告),代码实现如下所示:

DataNode节点处理读文件Block请求

private boolean updateBlockInfo(LocatedBlocks newInfo) throws IOException {
  if (!serverSupportsHdfs200 || !newInfo.isUnderConstruction() || !(newInfo.locatedBlockCount() > 0)) { // 如果获取到的newInfo可以读取文件对应的Block信息,则返回true
    return true;
  }

  LocatedBlock last = newInfo.get(newInfo.locatedBlockCount() - 1); // 从Namenode获取文件的最后一个Block的元数据对象LocatedBlock
  boolean lastBlockInFile = (last.getStartOffset() + last.getBlockSize() == newInfo.getFileLength()); 
  if (!lastBlockInFile) { // 如果“文件长度 != 最后一个块起始偏移量 + 最后一个块长度”,说明文件对应Block的元数据信息还没有更新,但是仍然返回给读取文件的该客户端
    return true;
  }
  // 这时,已经确定last是该文件的最后一个bolck,检查最后个block的存储位置信息
  if (last.getLocations().length == 0) {
    return false;
  }

  ClientDatanodeProtocol primary = null;
  Block newBlock = null;
  for (int i = 0; i < last.getLocations().length && newBlock == null; i++) { // 根据从Namenode获取到的LocatedBlock last中对应的Datanode列表信息,Client与Datanode建立RPC连接,获取最后一个Block的元数据
    DatanodeInfo datanode = last.getLocations()[i];
    try {
      primary = createClientDatanodeProtocolProxy(datanode, conf, last .getBlock(), last.getBlockToken(), socketTimeout, connectToDnViaHostname);
      newBlock = primary.getBlockInfo(last.getBlock());
    } catch (IOException e) {
      if (e.getMessage().startsWith(
          "java.io.IOException: java.lang.NoSuchMethodException: "
              + "org.apache.hadoop.hdfs.protocol"
              + ".ClientDatanodeProtocol.getBlockInfo")) {
        // We're talking to a server that doesn't implement HDFS-200.
        serverSupportsHdfs200 = false;
      } else {
        LOG.info("Failed to get block info from "
            + datanode.getHostName() + " probably does not have "
            + last.getBlock(), e);
      }
    } finally {
      if (primary != null) {
        RPC.stopProxy(primary);
      }
    }
  }

  if (newBlock == null) { // Datanode上不存在最后一个Block对应的元数据信息,直接返回
    if (!serverSupportsHdfs200) {
      return true;
    }
    throw new IOException("Failed to get block info from any of the DN in pipeline: " + Arrays.toString(last.getLocations()));
  }

  long newBlockSize = newBlock.getNumBytes();
  long delta = newBlockSize - last.getBlockSize();
  // 对于文件的最后一个Block,如果从Namenode获取到的元数据,与从Datanode实际获取到的元数据不同,则以Datanode获取的为准,因为可能Datanode还没有及时将Block的变化信息向Namenode汇报
  last.getBlock().setNumBytes(newBlockSize);
  long newlength = newInfo.getFileLength() + delta;
  newInfo.setFileLength(newlength); // 修改文件Block和位置元数据列表信息
  LOG.debug("DFSClient setting last block " + last + " to length " + newBlockSize + " filesize is now " + newInfo.getFileLength());
  return true;
}

我们可以在DataNode端看一下,如何处理一个读取Block的请求。如果Client与DataNode不是同一个节点,则为远程读取文件Block,首先Client需要发送一个请求头信息,代码如下所示:

我们看一下,在updateBlockInfo方法中,返回false的情况:Client向Namenode发起的RPC请求,已经获取到了组成该文件的数据块的元数据信息列表,但是,文件的最后一个数据块的存储位置信息无法获取到,说明Datanode还没有及时通过block
report将数据块的存储位置信息报告给Namenode。通过在openInfo()方法中可以看到,获取文件的block列表信息有3次重试机会,也就是调用updateBlockInfo方法返回false,可以有12秒的时间,等待Datanode向Namenode汇报文件的最后一个块的位置信息,以及Namenode更新内存中保存的文件对应的数据块列表元数据信息。

//write the header.
out.writeShort( DataTransferProtocol.DATA_TRANSFER_VERSION ); // Client与Datanode之间传输数据的版本号
out.write( DataTransferProtocol.OP_READ_BLOCK ); // 传输操作类型:读取block
out.writeLong( blockId ); // block ID
out.writeLong( genStamp ); // 时间戳信息
out.writeLong( startOffset ); // block起始偏移量
out.writeLong( len ); // block长度
Text.writeString(out, clientName); // 客户端标识
accessToken.write(out); 
out.flush();

我们再看一下,在updateBlockInfo方法中,返回true的情况:

DataNode节点端通过验证数据传输版本号(DataTransferProtocol.DATA_TRANSFER_VERSION)一致以后,会判断传输操作类型,如果是读操作DataTransferProtocol.OP_READ_BLOCK,则会通过Client建立的Socket来创建一个OutputStream对象,然后通过BlockSender向Client发送Block数据,代码如下所示:

  • 文件已经创建完成,文件对应的block列表元数据信息可用
  • 文件正在创建中,但是当前能够读取到的已经完成的最后一个块(非组成文件的最后一个block)的元数据信息可用
  • 文件正在创建中,文件的最后一个block的元数据部分可读:从Namenode无法获取到该block对应的位置信息,这时Client会与Datanode直接进行RPC通信,获取到该文件最后一个block的位置信息
try {
  blockSender = new BlockSender(block, startOffset, length, true, true, false, datanode, clientTraceFmt); // 创建BlockSender对象
} catch(IOException e) {
  out.writeShort(DataTransferProtocol.OP_STATUS_ERROR);
  throw e;
}

out.writeShort(DataTransferProtocol.OP_STATUS_SUCCESS); // 回复一个响应Header信息:成功状态
long read = blockSender.sendBlock(out, baseStream, null); // 发送请求的Block数据

上面Client会与Datanode直接进行RPC通信,获取文件最后一个block的元数据,这时可能由于网络问题等等,无法得到文件最后一个block的元数据,所以也会返回true,也就是说,Client仍然可以读取该文件,只是无法读取到最后一个block的数据。

这样,在Client从Namenode/Datanode获取到的文件的Block列表元数据已经是可用的信息,可以根据这些信息读取到各个Block的物理数据块内容了,准确地说,应该是文件处于打开状态了,已经准备好后续进行的读操作了。

You can leave a response, or trackback from your own site.

Leave a Reply

网站地图xml地图