Hadoop 的 DataFlow(数据流)与一致性模型

Hadoop读文件的数据流

Hadoop文件的读取数据流如下图所示:

详细步骤如下:

  • (1)通过FileSystem的open()方法读取文件,对于HDFS来说是DistributedFileSystem的一个实例
  • (2)DistributedFileSystem使用RPC调用namenode,得到文件block的位置,namenode返回每个block在datanode的地址,并且datanode会根据与client的距离进行排序,如果client本身就是一个datanode,且该datanode上有block的副本时,就会直接从本地datanode中读取数据
  • (3)DistributedFileSystem返回一个FSDataInputStream对象给client以读取数据,FSDataInputStream封装了DFSInputStream对象,其管理datanode和namenode的IO
  • (4)client调用read()方法, 对于block,DFSInputStream连接最近的datanode进行读取
  • (5)读取完则关闭与该datanode的连接,并对下一个block寻找最近的datanode进行读取
  • (6)当文件读取完毕时,FSDataInputStream调用close()方法

另外,当DFSInputStream与datanode通信出问题时,会从这个block的其它最近的datanode中读取数据,并记住该错误的datanode,保证后续不读取该节点的后续的block。
namenode将block的地址映射信息存放在内存中,这样就能够快速响应client的请求,非常高效。

Hadoop写文件的数据流

Hadoop文件的写数据流如下图所示:

详细步骤如下:
-(1)client通过DistributedFileSystem调用create()方法
-(2)DistributedFileSystem向namenode发送PRC请求,在namespace中创建一个新文件,之后DistributeFileSystem向cleint返回一个FSDataOutputStream对象,FSDataOutputStream封装了DFSOutputStream, 其负责与namenode和datanode的通信
-(3)client开始写文件
-(4)DFSOutputStream将数据分成一个个的packet(数据包),并写入内部的数据队列(data queue),该data queue被DataStreamer消费,它的职责是请求namenode分配一个新的block,并选取一系列的datanode来存储该block的副本,这一系列的datanode构成一个pipeline,DataStreamer以流的方式先将pacet发送到pipeline中的第一个datanode,该datanode存储数据,并将其发送到pipeline的第2个datanode,依次类推,直到pipeline的最后一个datanode为止
-(5)DFSOutputStream 维护着一个内部的”ack queue”来等待datanode的确认回复,收到pipeline中所有datanode的确认回复后,数据才会从ack queue中删除
-(6)完成写入后,调用close()方法关闭数据流,其将剩余的所有packet写入datanode pipeline
-(7)在联系到namenode且发送文件写入完成的信号之前,一直等待确认

Hadoop的数据一致性模型

在文件系统中,写入文件的内容并不能保证立即可见,即使数据流已经flush,例如:

1
2
3
4
5
Path p = new Path("p");
OutputStream out = fs.create(p);
out.write("content".getBytes("UTF-8"));
out.flush();
assertThat(fs.getFileStatus(p).getLen(), is(0L));

文件的长度返回0。

当写入的数据超过一个block时,第一个block对新的reader是可见的,之后也一样,而正在写入的block对其它reader是不可见的。

HDFS提供hflush()方法将buffer中的内容强制flush到datanode中,当hflush()方法返回success时,对新的reader而言,HDFS能够保证目前为止,所有写入的数据均到达所有datanode的pipeline,并对所有新的reader可见:

1
2
3
4
5
Path p = new Path("p");
FSDataOutputStream out = fs.create(p);
out.write("content".getBytes("UTF-8"));
out.hflush();
assertThat(fs.getFileStatus(p).getLen(), is(((long) "content".length())));

要注意的是,hflush并不能保证datanode将数据写入到磁盘,它只能保证数据已经传到了datanode的内存中(此时如果断电了,数据就会丢失),为了保证数据存储,可以使用hsync()方法替代。

另外,OutputStream的close()方法本身会执行hflush()方法:

1
2
3
4
5
Path p = new Path("p");
OutputStream out = fs.create(p);
out.write("content".getBytes("UTF-8"));
out.close();
assertThat(fs.getFileStatus(p).getLen(), is(((long) "content".length())));

Hadoop中Block副本(replication)的存放策略

Hadoop默认的block副本存放策略如图所示:

Hadoop在client运行的datanode上存放第一个副本(如果client不是运行在集群中,则随机选择一个datanode存放,系统会避免挑选太忙或太慢的节点),第2个副本存放在与第1个不同机架的节点(随机选择),第3个副本存放在与第2个副本的同一个机架的不同节点中。

这种方式提供了很好的稳健性和负载均衡(数据在两个机架中,写操作只需遍历一个交换机,读取可从两个机架中读取)

坚持原创技术分享,您的支持将鼓励我继续创作!