Paimon 在处理与远程文件系统的连接和使用方面,设计了一套灵活的抽象机制。下面将结合源代码分析 Paimon 是如何实现这一点的。
核心思想是定义一个通用的 FileIO
接口,然后为不同的文件系统提供具体的实现。对于常见的 HDFS、S3、OSS 等,Paimon 通常会利用 Hadoop 的 FileSystem
API 来进行交互,这样可以复用 Hadoop 生态的成熟能力。
1. 核心抽象:FileIO
接口
Paimon 通过 org.apache.paimon.fs.FileIO
接口来抽象所有文件系统操作。这个接口定义了如读、写、删除、列出文件状态等标准方法。
FileIO.java
// ... 部分引入和包声明 ...
import java.io.Closeable;
import java.io.IOException;
import java.io.Serializable;
import java.util.Collections;
import java.util.LinkedList;
import java.util.Queue;
public interface FileIO extends Serializable, Closeable {
/** Initializes the FileIO. */
void initialize(CatalogContext context) throws IOException;
/**
* Return the status of the file at the given {@link Path}.
*
* @param path The path of the file.
* @return The status of the file at the given {@link Path}.
* @throws IOException Thrown if an I/O error occurred.
*/
FileStatus getFileStatus(Path path) throws IOException;
/**
* Returns an array of {@link FileStatus} objects, which represent the files and directories in
* the directory denoted by the given {@link Path}.
*
* @param path The path of the directory.
* @return The array of {@link FileStatus} objects.
* @throws IOException Thrown if an I/O error occurred.
*/
FileStatus[] listStatus(Path path) throws IOException;
/**
* Deletes the file or directory at the given {@link Path}.
*
* @param f The path of the file or directory.
* @param recursive If true and f is a directory, the directory is deleted recursively.
* @return true if and only if the file or directory is successfully deleted; false otherwise.
* @throws IOException Thrown if an I/O error occurred.
*/
boolean delete(Path f, boolean recursive) throws IOException;
/**
* Create a directory at the given {@link Path}.
*
* @param f The path of the directory.
* @return true if and only if the directory is successfully created; false otherwise.
* @throws IOException Thrown if an I/O error occurred.
*/
boolean mkdirs(Path f) throws IOException;
/**
* Opens an {@link SeekableInputStream} at the indicated Path.
*
* @param f The path of the file.
* @return The {@link SeekableInputStream} for the given file.
* @throws IOException Thrown if an I/O error occurred.
*/
SeekableInputStream newInputStream(Path f) throws IOException;
/**
* Creates an {@link PositionOutputStream} at the indicated Path.
*
* @param f The path of the file.
* @param overwrite If true, the file will be overwritten if it already exists.
* @return The {@link PositionOutputStream} for the given file.
* @throws IOException Thrown if an I/O error occurred.
*/
PositionOutputStream newOutputStream(Path f, boolean overwrite) throws IOException;
/**
* Tells whether the file system is distributed, like HDFS, S3, OSS, etc.
*
* <p>This is a hint for readers and writers. For example, if the file system is not
* distributed, readers can prefer to read the file directly instead of copying it to local.
*/
boolean isDistributedFS();
/** Returns a {@link RemoteIterator} that recursively lists all files in the given path. */
default RemoteIterator<FileStatus> listFilesIterative(Path path, boolean recursive)
throws IOException {
Queue<FileStatus> files = new LinkedList<>();
Queue<Path> directories = new LinkedList<>(Collections.singletonList(path));
return new RemoteIterator<FileStatus>() {
@Override
public boolean hasNext() throws IOException {
maybeUnpackDirectory();
return !files.isEmpty();
}
@Override
public FileStatus next() throws IOException {
maybeUnpackDirectory();
return files.remove();
}
private void maybeUnpackDirectory() throws IOException {
while (files.isEmpty() && !directories.isEmpty()) {
FileStatus[] statuses = listStatus(directories.remove());
for (FileStatus f : statuses) {
if (!f.isDir()) {
files.add(f);
continue;
}
if (!recursive) {
continue;
}
directories.add(f.getPath());
}
}
}
@Override
public void close() {}
};
}
// ... 其他方法 ...
}
任何 Paimon 需要支持的文件系统,都需要提供这个接口的实现。
2. FileIO
的获取:工厂模式
Paimon 通常使用 org.apache.paimon.fs.FileIO#get(Path path, CatalogContext context)
这个静态工厂方法来获取特定路径对应的 FileIO
实例。该方法会根据传入 Path
的 scheme (例如 hdfs://
, s3://
, oss://
, file://
) 来动态加载并初始化相应的 FileIO
实现。例如,如果路径是 s3://mybucket/path
,它会尝试加载 S3 的 FileIO
实现。
3. 远程文件系统支持的通用机制:Hadoop FileSystem
对于 HDFS、Amazon S3、Aliyun OSS 等广泛使用的远程文件系统,Paimon 倾向于通过 Hadoop 的 FileSystem
API (org.apache.hadoop.fs.FileSystem
) 进行集成。这是因为:
- Hadoop
FileSystem
已经为多种存储系统提供了成熟的客户端实现。 - 可以利用 Hadoop 的配置体系(如
core-site.xml
,hdfs-site.xml
)来管理连接参数和认证信息。 - Paimon 本身也常部署在 Hadoop 环境中。
在 Paimon 中,会看到类似 HadoopCompliantFileIO
这样的类,它们封装了 Hadoop FileSystem
的操作。例如,在 paimon-s3-impl
和 paimon-oss-impl
模块中:
org.apache.paimon.s3.HadoopCompliantFileIO
用于 S3。org.apache.paimon.oss.HadoopCompliantFileIO
用于 OSS。
这些类的基本工作模式是:
- 初始化:在首次使用时,它们会根据传入的路径和 Paimon 的配置(可能包含 Hadoop 的配置项)来获取一个 Hadoop
FileSystem
实例。通常使用FileSystem.get(URI, Configuration)
方法,Hadoop 在内部会缓存这些FileSystem
实例,以提高效率和复用连接。 - 方法委托:
FileIO
接口定义的操作(如listStatus
,newInputStream
等)会被委托给持有的 HadoopFileSystem
实例的对应方法。 - 路径和状态转换:需要将 Paimon 的
Path
对象转换为 Hadoop 的Path
对象,并将 Hadoop 的FileStatus
转换为 Paimon 的FileStatus
。
连接管理和配置:
- 连接:底层的网络连接、重试、连接池等是由具体的 Hadoop
FileSystem
实现(如 S3AFileSystem, OSSFileSystem)来管理的。Paimon 层面不直接处理这些细节。 - 认证:访问远程存储(尤其是 S3、OSS)通常需要认证。这部分也主要依赖 Hadoop 的标准机制,例如:
- 通过 Hadoop 配置文件设置 access key 和 secret key。
- 使用 IAM 角色(AWS)或 RAM 角色(Aliyun)。
- 环境变量。
- 依赖:使用这些远程文件系统时,需要在 Paimon 的 classpath 中包含相应的 Hadoop connector JAR 包(例如
hadoop-aws.jar
用于 S3,hadoop-aliyun.jar
用于 OSS)及其依赖。Python API 文档中提到的设置_PYPAIMON_JAVA_CLASSPATH
也是为了确保这些 JAR 包能被找到。
4. 特定文件系统的实现和扩展
虽然通用机制依赖 Hadoop FileSystem
,但 Paimon 也允许特定 FileIO
实现进行扩展或优化。 以 org.apache.paimon.oss.OSSFileIO
为例,它继承自 HadoopCompliantFileIO,可以重写某些方法以加入 OSS 特有的逻辑。
5. 配置示例
在 Paimon Catalog 的配置中,warehouse
属性指定了表数据存储的根路径,其 scheme 决定了默认使用的 FileIO
。 此外,如 docs/layouts/shortcodes/generated/catalog_configuration.html
中提到的:
resolving-file-io.enabled
: 当设置为true
时,结合表的data-file.external-paths
属性,Paimon 可以读写外部存储路径(如 OSS 或 S3)。- 访问这些外部路径需要正确配置相应的访问密钥。
总结
Paimon 通过 FileIO
接口和工厂模式实现了对不同文件系统的支持。对于远程文件系统,它主要依赖 Hadoop FileSystem
API,从而利用了 Hadoop 生态的广泛兼容性和成熟的连接管理、认证机制。同时,它也保留了为特定文件系统进行扩展和优化的能力。开发者在使用时,需要确保正确的依赖和配置(尤其是认证信息和 Hadoop connector JARs)。