源码解析flink文件连接源TextInputFormat

news2025/7/16 3:34:25

背景:

kafka的文件系统数据源可以支持精准一次的一致性,本文就从源码看下如何TextInputFormat如何支持状态的精准一致性

TextInputFormat源码解析

首先flink会把输入的文件进行切分,分成多个数据块的形式,每个数据源算子任务会被分配以读取其中的数据块,但是不是所有的文件都能进行分块,判断文件是否可以进行分块的代码如下:

protected boolean testForUnsplittable(FileStatus pathFile) {
    if (getInflaterInputStreamFactory(pathFile.getPath()) != null) {
        unsplittable = true;
        return true;
    }
    return false;
}

private InflaterInputStreamFactory<?> getInflaterInputStreamFactory(Path path) {
    String fileExtension = extractFileExtension(path.getName());
    if (fileExtension != null) {
        return getInflaterInputStreamFactory(fileExtension);
    } else {
        return null;
    }
}

在这里插入图片描述

后缀名称是.gz,.bzip2等的文件都没法切分,如果可以切分,切分的具体代码如下所示:

while (samplesTaken < numSamples && fileNum < allFiles.size()) {
    // make a split for the sample and use it to read a record
    FileStatus file = allFiles.get(fileNum);
// 根据偏移量进行切分
    FileInputSplit split = new FileInputSplit(0, file.getPath(), offset, file.getLen() - offset, null);
    // we open the split, read one line, and take its length
    try {
        open(split);
        if (readLine()) {
            totalNumBytes += this.currLen + this.delimiter.length;
            samplesTaken++;
        }
    } finally {
        // close the file stream, do not release the buffers
        super.close();
    }
// 偏移量迁移
    offset += stepSize;

    // skip to the next file, if necessary
    while (fileNum < allFiles.size()
            && offset >= (file = allFiles.get(fileNum)).getLen()) {
        offset -= file.getLen();
        fileNum++;
    }
}

再来看一下TextInputFormat如何支持checkpoint操作,保存文件的偏移量的代码:

@Override
public void snapshotState(StateSnapshotContext context) throws Exception {
    super.snapshotState(context);

    checkState(
            checkpointedState != null, "The operator state has not been properly initialized.");

    int subtaskIdx = getRuntimeContext().getIndexOfThisSubtask();
    // 算子列表状态
    checkpointedState.clear();
    // 获取文件的当前读取的偏移
    List<T> readerState = getReaderState();

    try {
        for (T split : readerState) {
           //保存到检查点路径中
            checkpointedState.add(split);
        }
    } catch (Exception e) {
        checkpointedState.clear();

        throw new Exception(
                "Could not add timestamped file input splits to to operator "
                        + "state backend of operator "
                        + getOperatorName()
                        + '.',
                e);
    }

    if (LOG.isDebugEnabled()) {
        LOG.debug(
                "{} (taskIdx={}) checkpointed {} splits: {}.",
                getClass().getSimpleName(),
                subtaskIdx,
                readerState.size(),
                readerState);
    }
}

从检查点中恢复状态的代码如下:

public void initializeState(StateInitializationContext context) throws Exception {
    super.initializeState(context);

    checkState(checkpointedState == null, "The reader state has already been initialized.");

    // 初始化算子操作状态
    checkpointedState =
            context.getOperatorStateStore()
                    .getListState(new ListStateDescriptor<>("splits", new JavaSerializer<>()));

    int subtaskIdx = getRuntimeContext().getIndexOfThisSubtask();
    
    LOG.info(
            "Restoring state for the {} (taskIdx={}).", getClass().getSimpleName(), subtaskIdx);

    splits = splits == null ? new PriorityQueue<>() : splits;
    for (T split : checkpointedState.get()) {//从检查点状态中恢复各个切分的分块
        splits.add(split);
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1103573.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【面试经典150 | 区间】插入区间

文章目录 Tag题目解读题目来源解题思路方法一&#xff1a;合并区间方法二&#xff1a;模拟 其他语言python3 写在最后 Tag 【模拟】【数组】 题目解读 给定一个含有多个无重叠区间的数组&#xff0c;并且数组已经按照区间开始值升序排序。在列表中插入一个新的区间&#xff0…

unity动画_UI动画案例 c#

首先我们打开一个项目 在这个初始界面我们需要做一些准备工作 创建基础通用包 在场景上创建一个Cube 选中Cube 在Window下点击Animation拖拽至运行窗口 点击创建 保存后 这个操作是给Cube添加了一个组件 对Cube_添加一个Position动画 设置几个帧位置的坐标(x,y,z)值 点击运行测…

PHP 如何查看php函数源码

一、在git找到php对应的版本 找到对应的分支版本可以下载也可以在线直接查看 通过这个地址 https://github.com/php/php-src 二、下面已shuffle函数举例&#xff0c;版本为7.4 找到对应的版本进入 点击ext&#xff0c;这个文件夹里面是存放函数的目录 在文件夹里搜不到stu…

Linux使用rpm包安装mysql5.7

以前安装过mysql 前言&#xff1a;检查以前是否装有mysql rpm -qa|grep -i mysql安装了会显示&#xff1a;   bt-mysql57-5.7.31-1.el7.x86_64 停止mysql服务和删除之前安装的mysql rpm -e bt-mysql57-5.7.31-1.el7.x86_64查找并删除mysql相关目录 find / -name mysql/va…

react+ts手写cron表达式转换组件

前言 最近在写的一个分布式调度系统&#xff0c;后端同学需要让我传入cron表达式&#xff0c;给调度接口传参。我去了学习了解了cron表达式的用法&#xff0c;发现有3个通用的表达式刚好符合我们的需求&#xff1a; 需求 每天 xx 的时间&#xff1a; 0 11 20 * * ? 上面是…

jQuery+AJAX请求的统一封装

记录一下使用jQueryAJAX对http请求的统一封装 很久都没有使用jquery和ajax的组合了&#xff0c;这里记录一下jquery和ajax的组合简单封装 将来或许有机会重新启用这个组合 新建jquery.request.js&#xff1b;demo目录结构如下 const baseURL http://127.0.0.1:8116;// con…

4K壁纸小程序源码 全内容自动采集

全内容自动采集 4K壁纸小程序源码&#xff0c;带流量主。用的都是一个接口&#xff0c;不过这个不知是谁改的&#xff0c;成了LSP版&#xff0c;是真色啊&#xff0c;专搜小姐姐。 4K壁纸&#xff0c;静态壁纸&#xff0c;头像等都有保留&#xff0c;界面广告位很多&#xff0c…

List小练习,实现添加图书,并且有序遍历

SuppressWarnings({"all"})public static void main(String[] args) {List list new LinkedList(); // List list new Vector(); // List list new ArrayList();list.add(new Book1("红楼小梦",35.5,"曹雪芹"));list.add(new B…

算法-堆/归并排序-排序链表

算法-堆/归并排序-排序链表 1 题目概述 1.1 题目出处 https://leetcode.cn/problems/sort-list/description/?envTypestudy-plan-v2&envIdtop-interview-150 1.2 题目描述 2 优先级队列构建大顶堆 2.1 思路 优先级队列构建小顶堆链表所有元素放入小顶堆依次取出堆顶…

【Java 进阶篇】JavaScript BOM History 详解

当用户浏览网页时&#xff0c;可以使用JavaScript的BOM (Browser Object Model)中的History对象来访问浏览器的历史记录。这个对象允许您在不更改页面的情况下导航到不同的历史记录项&#xff0c;或者查看有关用户访问过的页面的信息。 在本篇博客中&#xff0c;我们将围绕Jav…

IIS7.0解析漏洞

IIS7.0解析漏洞 实验环境 windows server 2008 r2&#xff08;x64&#xff09; IIS 7 phpStudyIIS 2016版本 漏洞条件 1. php.ini里的cgi.cgi_pathinfo1 2. IIS7在Fast-CGI运行模式下 漏洞复现 先搭建IIS7 出现如下界面安装成功 安装phpstudy &#xff0c;使用较老的版…

通过核密度分析工具建模,基于arcgis js api 4.27 加载gp服务

一、通过arcmap10.2建模&#xff0c;其中包含三个参数 注意input属性&#xff0c;选择数据类型为要素类&#xff1a; 二、建模之后&#xff0c;加载数据&#xff0c;执行模型&#xff0c;无错误的话&#xff0c;找到执行结果&#xff0c;进行发布gp服务 注意&#xff0c;发布g…

Vue3语法-双向绑定

点击加入精英计划可以加入 点击名字可以删除 <!DOCTYPE html> <html lang"en"> <head><meta charset"UTF-8"><title>Title</title><!-- vue.js --><script src"https://unpkg.com/vue3/dist/vue.glob…

Nested loop(PostgreSQL 14 Internals翻译版)

连接类型和方法 连接是SQL语言的一个关键特性;它们是其力量和灵活性的基础。行集(要么直接从表中检索&#xff0c;要么作为某些其他操作的结果接收)总是成对连接。 有几种类型的连接&#xff1a; 内连接。 内连接(指定为“INNER JOIN”或简称为“JOIN”)由满足特定连接条件的…

单一职责模式

三、单一职责模式 单一职责模式概念1、装饰着模式&#xff08;Decorator&#xff09;动机&#xff08;Motivation)模式定义代码具体实现要点总结 2、桥模式&#xff08;Bridge&#xff09;动机 &#xff08;Motivation)模式定义代码具体实现要点总结 单一职责模式概念 在软件组…

MTK APP实现动态修改logo和开机动画

MTK APP实现动态修改logo和开机动画 前言一、修改对新分区的权限1.修改开机动画对新分区的权限2.修改系统APP对新分区的权限3.修改SE权限,不然编译会报错4.修改开机动画文件&#xff0c;让其加载新分区中的文件 二、系统APP代码使用1.系统app修改开机logo2.系统app修改开机动画…

设计模式:工厂方法模式(C#、JAVA、JavaScript、C++、Python、Go、PHP):

本节主要介绍设计模式中的工厂方法模式。 简介&#xff1a; 工厂方法模式&#xff0c;它是对简单工厂模式的进一步抽象化&#xff0c;其好处是可以使系统在不修改原来代码的情况下引进新的产品&#xff0c;即满足开闭原则。 它定义了一个用于创建对象的工厂接口&#xff0c;让…

Nginx正向代理,反向代理,负载均衡

Nginx正向代理&#xff0c;反向代理&#xff0c;负载均衡 Nginx当中有两种代理方式&#xff1a; 七层代理&#xff08;http协议&#xff09; 四层代理&#xff08;tcp/udp流量转发&#xff09; 七层代理&#xff1a;七层代理&#xff0c;代理的是http的请求和响应 客户端请求…

Redis RDB持久化

前言 我们知道 Redis 之所以快&#xff0c;很大程度是因为它的数据直接放在内存里&#xff0c;而内存是易失性存储器&#xff0c;只有通电才存储数据&#xff0c;断电数据就会丢失。 这个时候就要看你的应用场景了&#xff0c;如果你只是拿 Redis 做关系型数据库的缓存&#x…

SpringBoot实现SSMP整合

一、整合JUnit 1、Spring 整合 JUnit 核心注解有两个&#xff1a; RunWith(SpringJUnit4ClassRunner.class) 是设置Spring专用于测试的类运行器&#xff08;Spring程序执行程序有自己的一套独立的运行程序的方式&#xff0c;不能使用JUnit提供的类运行方式&#xff09;Conte…