太长不看版
1、写入(create)创建DFSOutputStream,启动DataStreamer线程run (主线程)
2、setPipeline -> nextBlockOutputStream -> locateFollowingBlock(addBlock)
2、createBlockOutputStream (client -> dn1 -> dn2 -> dn3)启动blockStream(实际用来写数据)
4、new ResponseProcessor 并启动线程run
5、按照packet粒度发送 packet 到datanode
a、writeChunk -> waitAndQueueCurrentPacket -> dataQueue
b、DataStreamer run方法不断从dataQueue队列take出来发送
c、收到ack后放入ackQueue
6、写完一个block 后endBlock -> 关闭response 线程 -> 关闭blockStream 线程
7、写下一个block 重复2 - 6
8、complete
一、总体流程
1、客户端向NameNode发出写文件请求。
2、检查是否已存在文件、检查权限。若通过检查,直接先将操作写入EditLog,并返回输出流对象。
(注:WAL,write ahead log,先写Log,再写内存,因为EditLog记录的是最新的HDFS客户端执行所有的写操作。如果后续真实写操作失败了,
由于在真实写操作之前,操作就被写入EditLog中了,故EditLog中仍会有记录)
3、client端按128MB的块切分文件。
4、client将NameNode返回的DataNode列表和Data数据一同发送给最近的第一个DataNode节点,此后client端和多个DataNode构成pipeline管道。
client向第一个DataNode写入一个packet,这个packet便会在pipeline里传给第二个、第三个…DataNode。
在pipeline反方向上,逐个发送ack(命令正确应答),最终由pipeline中第一个DataNode节点将ack发送给client。
5、写完数据,关闭输输出流.
6、发送完成信号给NameNode。
二、代码细节
1、创建文件
通常情况下,我们在创建文件的时候会新建一个FileSystem对象,然后调用create方法。
FileSystem.java
/**
* Create an FSDataOutputStream at the indicated Path with write-progress
* reporting.
* @param f the file name to open
* @param permission
* @param overwrite if a file with this name already exists, then if true,
* the file will be overwritten, and if false an error will be thrown.
* @param bufferSize the size of the buffer to be used.
* @param replication required block replication for the file.
* @param blockSize
* @param progress
* @throws IOException
* @see #setPermission(Path, FsPermission)
*/
public abstract FSDataOutputStream create(Path f,
FsPermission permission,
boolean overwrite,
int bufferSize,
short replication,
long blockSize,
Progressable progress) throws IOException;
这是一个抽象类,DistributedFileSystem 继承了这个抽象类,并实现了create方法。
DistributedFileSystem.java
@Override
public FSDataOutputStream create(Path f, FsPermission permission,
boolean overwrite, int bufferSize, short replication, long blockSize,
Progressable progress) throws IOException {
return this.create(f, permission,
overwrite ? EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE)
: EnumSet.of(CreateFlag.CREATE), bufferSize, replication,
blockSize, progress, null);
}
@Override
public FSDataOutputStream create(final Path f, final FsPermission permission,
final EnumSet<CreateFlag> cflags, final int bufferSize,
final short replication, final long blockSize, final Progressable progress,
final ChecksumOpt checksumOpt) throws IOException {
statistics.incrementWriteOps(1);
Path absF = fixRelativePart(f);
return new FileSystemLinkResolverWithStatistics<FSDataOutputStream>(proxyParameter) {
@Override
public FSDataOutputStream doCall(final Path p)
throws IOException, UnresolvedLinkException {
final DFSOutputStream dfsos = dfs.create(getPathName(p), permission,
cflags, replication, blockSize, progress, bufferSize,
checksumOpt);
return dfs.createWrappedOutputStream(dfsos, statistics);
}
@Override
public FSDataOutputStream next(final FileSystem fs, final Path p)
throws IOException {
return fs.create(p, permission, cflags, bufferSize,
replication, blockSize, progress, checksumOpt);
}
}.resolve(this, absF, OpType.CREATE, OpType.CREATE_SPAN, OpType.CREATE_EXC, OpType.CREATE_EXC_SPAN);
}
然后调用DFSClient
DFSClient.java
/**
* Call {@link #create(String, FsPermission, EnumSet, boolean, short,
* long, Progressable, int, ChecksumOpt)} with <code>createParent</code>
* set to true.
*/
public DFSOutputStream create(String src,
FsPermission permission,
EnumSet<CreateFlag> flag,
short replication,
long blockSize,
Progressable progress,
int buffersize,
ChecksumOpt checksumOpt)
throws IOException {
return create(src, permission, flag, true,
replication, blockSize, progress, buffersize, checksumOpt, null);
}
往下看,最终调用了底层的ClientProtocal.create()方法通知Namenode进行操作
final DFSOutputStream result = DFSOutputStream.newStreamForCreate(this,
src, masked, flag, createParent, replication, blockSize, progress,
buffersize, dfsClientConf.createChecksum(checksumOpt),
favoredNodeStrs);
beginFileLease(result.getFileId(), result);
2、创建文件
@AtMostOnce
public HdfsFileStatus create(String src, FsPermission masked,
String clientName, EnumSetWritable<CreateFlag> flag,
boolean createParent, short replication, long blockSize,
CryptoProtocolVersion[] supportedVersions)
throws AccessControlException, AlreadyBeingCreatedException,
DSQuotaExceededException, FileAlreadyExistsException,
FileNotFoundException, NSQuotaExceededException,
ParentNotDirectoryException, SafeModeException, UnresolvedLinkException,
SnapshotAccessControlException, IOException;
NamenodeRPCServer.create() 实现了这了ClientProtocal的接口(响应了这个请求)。最终我们获得了一个DFSOutputStream对象。
下面我们来看下NamenodeRPCServer.create()。
@Override // ClientProtocol
public HdfsFileStatus create(String src, FsPermission masked,
String clientName, EnumSetWritable<CreateFlag> flag,
boolean createParent, short replication, long blockSize,
CryptoProtocolVersion[] supportedVersions)
throws IOException {
checkNNStartup();
String clientMachine = getClientMachine();
if (stateChangeLog.isDebugEnabled()) {
stateChangeLog.debug("*DIR* NameNode.create: file "
+src+" for "+clientName+" at "+clientMachine);
}
if (!checkPathLength(src)) {
throw new IOException("create: Pathname too long. Limit "
+ MAX_PATH_LENGTH + " characters, " + MAX_PATH_DEPTH + " levels.");
}
namesystem.checkOperation(OperationCategory.WRITE);
HdfsFileStatus fileStatus = namesystem.startFile(src, new PermissionStatus(
getRemoteUser().getShortUserName(), null, masked),
clientName, clientMachine, flag.get(), createParent, replication,
blockSize, supportedVersions);
metrics.incrFilesCreated();
metrics.incrCreateFileOps();
return fileStatus;
}
这里的namenode.startFIle方法会调用startFileInt方法,创建文件目录。
startFileInternal方法最终用于创建文件或者覆盖一个文件。
2、pipeline写数据
获取了DFSOutputStream之后,HDFS 客户端就可以向数据流管道写数据了。