Doris(6):数据导入(Load)之Stream Load

news2025/6/21 6:05:43

Broker load是一个同步的导入方式,用户通过发送HTTP协议将本地文件或者数据流导入到Doris中,Stream Load同步执行导入并返回结果,用户可以通过返回判断导入是否成功。

1 适用场景

Stream load 主要适用于导入本地文件,或通过程序导入数据流中的数据。

2 基本原理

下图展示了 Stream load 的主要流程,省略了一些导入细节。

Stream load 中,Doris 会选定一个节点作为 Coordinator 节点。该节点负责接数据并分发数据到其他数据节点。

用户通过 HTTP 协议提交导入命令。如果提交到 FE,则 FE 会通过 HTTP redirect 指令将请求转发给某一个 BE。用户也可以直接提交导入命令给某一指定 BE。

导入的最终结果由 Coordinator BE 返回给用户。

3 语法

具体帮助使用help stream load查看

Name: 'STREAM LOAD'
Description:
    NAME:
        stream-load: load data to table in streaming

    SYNOPSIS
        curl --location-trusted -u user:passwd [-H ""...] -T data.file -XPUT http://fe_host:http_port/api/{db}/{table}/_stream_load

    DESCRIPTION
        该语句用于向指定的 table 导入数据,与普通Load区别是,这种导入方式是同步导入。
        这种导入方式仍然能够保证一批导入任务的原子性,要么全部数据导入成功,要么全部失败。
        该操作会同时更新和此 base table 相关的 rollup table 的数据。
        这是一个同步操作,整个数据导入工作完成后返回给用户导入结果。
        当前支持HTTP chunked与非chunked上传两种方式,对于非chunked方式,必须要有Content-Length来标示上传内容长度,这样能够保证数据的完整性。
        另外,用户最好设置Expect Header字段内容100-continue,这样可以在某些出错场景下避免不必要的数据传输。

    OPTIONS
        用户可以通过HTTP的Header部分来传入导入参数

        label: 一次导入的标签,相同标签的数据无法多次导入。用户可以通过指定Label的方式来避免一份数据重复导入的问题。
        当前Palo内部保留30分钟内最近成功的label。

        column_separator:用于指定导入文件中的列分隔符,默认为\t。如果是不可见字符,则需要加\x作为前缀,使用十六进制来表示分隔符。
        如hive文件的分隔符\x01,需要指定为-H "column_separator:\x01"。
        可以使用多个字符的组合作为列分隔符。

        line_delimiter:用于指定导入文件中的换行符,默认为\n。
        可以使用做多个字符的组合作为换行符。

        columns:用于指定导入文件中的列和 table 中的列的对应关系。如果源文件中的列正好对应表中的内容,那么是不需要指定这个字段的内容的。
        如果源文件与表schema不对应,那么需要这个字段进行一些数据转换。这里有两种形式column,一种是直接对应导入文件中的字段,直接使用字段名表示;
        一种是衍生列,语法为 `column_name` = expression。举几个例子帮助理解。
        例1: 表中有3个列“c1, c2, c3”,源文件中的三个列一次对应的是"c3,c2,c1"; 那么需要指定-H "columns: c3, c2, c1"
        例2: 表中有3个列“c1, c2, c3", 源文件中前三列依次对应,但是有多余1列;那么需要指定-H "columns: c1, c2, c3, xxx";
        最后一个列随意指定个名称占位即可
        例3: 表中有3个列“year, month, day"三个列,源文件中只有一个时间列,为”2018-06-01 01:02:03“格式;
        那么可以指定-H "columns: col, year = year(col), month=month(col), day=day(col)"完成导入

        where: 用于抽取部分数据。用户如果有需要将不需要的数据过滤掉,那么可以通过设定这个选项来达到。
        例1: 只导入大于k1列等于20180601的数据,那么可以在导入时候指定-H "where: k1 = 20180601"

        max_filter_ratio:最大容忍可过滤(数据不规范等原因)的数据比例。默认零容忍。数据不规范不包括通过 where 条件过滤掉的行。

        partitions: 用于指定这次导入所设计的partition。如果用户能够确定数据对应的partition,推荐指定该项。不满足这些分区的数据将被过滤掉。
        比如指定导入到p1, p2分区,-H "partitions: p1, p2"

        timeout: 指定导入的超时时间。单位秒。默认是 600 秒。可设置范围为 1 秒 ~ 259200 秒。

        strict_mode: 用户指定此次导入是否开启严格模式,默认为关闭。开启方式为 -H "strict_mode: true"。

        timezone: 指定本次导入所使用的时区。默认为东八区。该参数会影响所有导入涉及的和时区有关的函数结果。

        exec_mem_limit: 导入内存限制。默认为 2GB。单位为字节。

        format: 指定导入数据格式,默认是csv,支持json格式。

        jsonpaths: 导入json方式分为:简单模式和匹配模式。
              简单模式:没有设置jsonpaths参数即为简单模式,这种模式下要求json数据是对象类型,例如:
              {"k1":1, "k2":2, "k3":"hello"},其中k1,k2,k3是列名字。

              匹配模式:用于json数据相对复杂,需要通过jsonpaths参数匹配对应的value。

        strip_outer_array: 布尔类型,为true表示json数据以数组对象开始且将数组对象中进行展平,默认值是false。例如:
           [
            {"k1" : 1, "v1" : 2},
            {"k1" : 3, "v1" : 4}
           ]
           当strip_outer_array为true,最后导入到doris中会生成两行数据。

        json_root: json_root为合法的jsonpath字符串,用于指定json document的根节点,默认值为""。

        merge_type: 数据的合并类型,一共支持三种类型APPEND、DELETE、MERGE 其中,APPEND是默认值,表示这批数据全部需要追加到现有数据中,DELETE 表示删除与这批数据key相同的所有行,MERGE 语义 需要与delete 条件联合使用,表示满足delete 条件的数据按照DELETE 语义处理其余的按照APPEND 语义处理, 示例:`-H "merge_type: MERGE" -H "delete: flag=1"`
        delete: 仅在 MERGE下有意义, 表示数据的删除条件
        
        function_column.sequence_col: 只适用于UNIQUE_KEYS,相同key列下,保证value列按照source_sequence列进行REPLACE, 
            source_sequence可以是数据源中的列,也可以是表结构中的一列。
        
        fuzzy_parse: 布尔类型,为true表示json将以第一行为schema 进行解析,开启这个选项可以提高json 导入效率,但是要求所有json 对象的key的顺序和第一行一致, 默认为false,仅用于json 格式

        num_as_string: 布尔类型,为true表示在解析json数据时会将数字类型转为字符串,然后在确保不会出现精度丢失的情况下进行导入。

        read_json_by_line: 布尔类型,为true表示支持每行读取一个json对象,默认值为false。
        
        send_batch_parallelism: 整型,用于设置发送批处理数据的并行度,如果并行度的值超过 BE 配置中的 `max_send_batch_parallelism_per_job`,那么作为协调点的 BE 将使用 `max_send_batch_parallelism_per_job` 的值。 

    RETURN VALUES
        导入完成后,会以Json格式返回这次导入的相关内容。当前包括以下字段
        Status: 导入最后的状态。
            Success:表示导入成功,数据已经可见;
            Publish Timeout:表述导入作业已经成功Commit,但是由于某种原因并不能立即可见。用户可以视作已经成功不必重试导入
            Label Already Exists: 表明该Label已经被其他作业占用,可能是导入成功,也可能是正在导入。
            用户需要通过get label state命令来确定后续的操作
            其他:此次导入失败,用户可以指定Label重试此次作业
        Message: 导入状态详细的说明。失败时会返回具体的失败原因。
        NumberTotalRows: 从数据流中读取到的总行数
        NumberLoadedRows: 此次导入的数据行数,只有在Success时有效
        NumberFilteredRows: 此次导入过滤掉的行数,即数据质量不合格的行数
        NumberUnselectedRows: 此次导入,通过 where 条件被过滤掉的行数
        LoadBytes: 此次导入的源文件数据量大小
        LoadTimeMs: 此次导入所用的时间
        BeginTxnTimeMs: 向Fe请求开始一个事务所花费的时间,单位毫秒。
        StreamLoadPutTimeMs: 向Fe请求获取导入数据执行计划所花费的时间,单位毫秒。
        ReadDataTimeMs: 读取数据所花费的时间,单位毫秒。
        WriteDataTimeMs: 执行写入数据操作所花费的时间,单位毫秒。
        CommitAndPublishTimeMs: 向Fe请求提交并且发布事务所花费的时间,单位毫秒。
        ErrorURL: 被过滤数据的具体内容,仅保留前1000条
     
    ERRORS
        可以通过以下语句查看导入错误详细信息:

        SHOW LOAD WARNINGS ON 'url'

        其中 url 为 ErrorURL 给出的 url。
Examples:

    1. 将本地文件'testData'中的数据导入到数据库'testDb'中'testTbl'的表,使用Label用于去重。指定超时时间为 100 秒
        curl --location-trusted -u root -H "label:123" -H "timeout:100" -T testData http://host:port/api/testDb/testTbl/_stream_load

    2. 将本地文件'testData'中的数据导入到数据库'testDb'中'testTbl'的表,使用Label用于去重, 并且只导入k1等于20180601的数据
        curl --location-trusted -u root -H "label:123" -H "where: k1=20180601" -T testData http://host:port/api/testDb/testTbl/_stream_load

    3. 将本地文件'testData'中的数据导入到数据库'testDb'中'testTbl'的表, 允许20%的错误率(用户是defalut_cluster中的)
        curl --location-trusted -u root -H "label:123" -H "max_filter_ratio:0.2" -T testData http://host:port/api/testDb/testTbl/_stream_load

    4. 将本地文件'testData'中的数据导入到数据库'testDb'中'testTbl'的表, 允许20%的错误率,并且指定文件的列名(用户是defalut_cluster中的)
        curl --location-trusted -u root  -H "label:123" -H "max_filter_ratio:0.2" -H "columns: k2, k1, v1" -T testData http://host:port/api/testDb/testTbl/_stream_load

    5. 将本地文件'testData'中的数据导入到数据库'testDb'中'testTbl'的表中的p1, p2分区, 允许20%的错误率。
        curl --location-trusted -u root  -H "label:123" -H "max_filter_ratio:0.2" -H "partitions: p1, p2" -T testData http://host:port/api/testDb/testTbl/_stream_load

    6. 使用streaming方式导入(用户是defalut_cluster中的)
        seq 1 10 | awk '{OFS="\t"}{print $1, $1 * 10}' | curl --location-trusted -u root -T - http://host:port/api/testDb/testTbl/_stream_load

    7. 导入含有HLL列的表,可以是表中的列或者数据中的列用于生成HLL列,也可使用hll_empty补充数据中没有的列
        curl --location-trusted -u root -H "columns: k1, k2, v1=hll_hash(k1), v2=hll_empty()" -T testData http://host:port/api/testDb/testTbl/_stream_load

    8. 导入数据进行严格模式过滤,并设置时区为 Africa/Abidjan
        curl --location-trusted -u root -H "strict_mode: true" -H "timezone: Africa/Abidjan" -T testData http://host:port/api/testDb/testTbl/_stream_load

    9. 导入含有BITMAP列的表,可以是表中的列或者数据中的列用于生成BITMAP列,也可以使用bitmap_empty填充空的Bitmap
        curl --location-trusted -u root -H "columns: k1, k2, v1=to_bitmap(k1), v2=bitmap_empty()" -T testData http://host:port/api/testDb/testTbl/_stream_load

    10. 简单模式,导入json数据
         表结构: 

           `category` varchar(512) NULL COMMENT "",
           `author` varchar(512) NULL COMMENT "",
           `title` varchar(512) NULL COMMENT "",
           `price` double NULL COMMENT ""
         json数据格式:
           {"category":"C++","author":"avc","title":"C++ primer","price":895}
         导入命令:
           curl --location-trusted -u root  -H "label:123" -H "format: json" -T testData http://host:port/api/testDb/testTbl/_stream_load
         为了提升吞吐量,支持一次性导入多条json数据,每行为一个json对象,默认使用\n作为换行符,需要将read_json_by_line设置为true,json数据格式如下:  
            {"category":"C++","author":"avc","title":"C++ primer","price":89.5}
            {"category":"Java","author":"avc","title":"Effective Java","price":95}
            {"category":"Linux","author":"avc","title":"Linux kernel","price":195}    

    11. 匹配模式,导入json数据
       json数据格式:
           [
           {"category":"xuxb111","author":"1avc","title":"SayingsoftheCentury","price":895},
           {"category":"xuxb222","author":"2avc","title":"SayingsoftheCentury","price":895},
           {"category":"xuxb333","author":"3avc","title":"SayingsoftheCentury","price":895}
           ]
         通过指定jsonpath进行精准导入,例如只导入category、author、price三个属性  
         curl --location-trusted -u root  -H "columns: category, price, author" -H "label:123" -H "format: json" -H "jsonpaths: [\"$.category\",\"$.price\",\"$.author\"]" -H "strip_outer_array: true" -T testData http://host:port/api/testDb/testTbl/_stream_load
         说明:
           1)如果json数据是以数组开始,并且数组中每个对象是一条记录,则需要将strip_outer_array设置成true,表示展平数组。
           2)如果json数据是以数组开始,并且数组中每个对象是一条记录,在设置jsonpath时,我们的ROOT节点实际上是数组中对象。

    12. 用户指定json根节点
       json数据格式:
            {
            "RECORDS":[
                {"category":"11","title":"SayingsoftheCentury","price":895,"timestamp":1589191587},
                {"category":"22","author":"2avc","price":895,"timestamp":1589191487},
                {"category":"33","author":"3avc","title":"SayingsoftheCentury","timestamp":1589191387}
                ]
            }
        通过指定jsonpath进行精准导入,例如只导入category、author、price三个属性  
         curl --location-trusted -u root  -H "columns: category, price, author" -H "label:123" -H "format: json" -H "jsonpaths: [\"$.category\",\"$.price\",\"$.author\"]" -H "strip_outer_array: true" -H "json_root: $.RECORDS" -T testData http://host:port/api/testDb/testTbl/_stream_load

    13. 删除与这批导入key 相同的数据
         curl --location-trusted -u root -H "merge_type: DELETE" -T testData http://host:port/api/testDb/testTbl/_stream_load
    14. 将这批数据中与flag 列为ture 的数据相匹配的列删除,其他行正常追加
         curl --location-trusted -u root: -H "column_separator:," -H "columns: siteid, citycode, username, pv, flag" -H "merge_type: MERGE" -H "delete: flag=1"  -T testData http://host:port/api/testDb/testTbl/_stream_load
         
    15. 导入数据到含有sequence列的UNIQUE_KEYS表中
        curl --location-trusted -u root -H "columns: k1,k2,source_sequence,v1,v2" -H "function_column.sequence_col: source_sequence" -T testData http://host:port/api/testDb/testTbl/_stream_load

4 数据导入演示

进入mysqlclient,清空上次导入到user_result表的数据

truncate table user_result;

通过命令将csv将数据导入到doris,-H指定参数,column_seqarator指定分割符,-T指定数据源文件(在csv文件目录下执行)

curl --location-trusted -u root -H "label:123" -H "column_separator:," -T user.csv -X PUT http://192.168.222.143:8030/api/test_db/user_result/_stream_load

5 其他导入案例参考

将本地文件'testData'中的数据导入到数据库'testDb'中'testTbl'的表,使用Label用于去重     

curl --location-trusted -u root -H "label:123" -T testData http://host:port/api/testDb/testTbl/_stream_load

将本地文件'testData'中的数据导入到数据库'testDb'中'testTbl'的表,使用Label用于去重, 并且只导入k1等于20180601的数据

curl --location-trusted -u root -H "label:123" -H "where: k1=20180601" -T testData http://host:port/api/testDb/testTbl/_stream_load

将本地文件'testData'中的数据导入到数据库'testDb'中'testTbl'的表, 允许20%的错误率(用户是defalut_cluster中的)

curl --location-trusted -u root -H "label:123" -H "max_filter_ratio:0.2" -T testData http://host:port/api/testDb/testTbl/_stream_load

将本地文件'testData'中的数据导入到数据库'testDb'中'testTbl'的表, 允许20%的错误率,并且指定文件的列名(用户是defalut_cluster中的)

curl --location-trusted -u root -H "label:123" -H "max_filter_ratio:0.2" -H "columns: k2, k1, v1" -T testData http://host:port/api/testDb/testTbl/_stream_load

将本地文件'testData'中的数据导入到数据库'testDb'中'testTbl'的表中的p1, p2分区, 允许20%的错误率。

curl --location-trusted -u root -H "label:123" -H "max_filter_ratio:0.2" -H "partitions: p1, p2" -T testData http://host:port/api/testDb/testTbl/_stream_load

使用streaming方式导入(用户是defalut_cluster中的)

seq 1 10 | awk '{OFS="\t"}{print $1, $1 * 10}' | curl --location-trusted -u root -T - http://host:port/api/testDb/testTbl/_stream_load

导入含有HLL列的表,可以是表中的列或者数据中的列用于生成HLL列

curl --location-trusted -u root -H "columns: k1, k2, v1=hll_hash(k1)" -T testData http://host:port/api/testDb/testTbl/_stream_load

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/411413.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

小厂实习要不要去?

大家好,我是帅地。 最近暑假实习招聘,不少 训练营 学员都拿到了小厂实习来保底,但是很多小厂基本要求一周内给答复,中大厂就还在流程之中,所以很纠结小厂实习要不要去。 不知道你是否有这样的纠结,今天帅地…

【测试面试汇总2】

目录Linux操作系统1.Linux操作命令2.在Linux中find和grep的区别?3.绝对路径用什么符号表示?4.当前目录、上层目录用什么表示?5.主目录用什么表示?6.怎么查看进程信息?7.保存文件并退出vi 编辑?8.怎么查看当前用户id&a…

【Python从入门到进阶】15、函数的定义和使用

接上篇《14、字典高级应用》 上一篇我们学习了有关字典的高级应用操作(字典的增删改查),本篇我们来学习Python中函数的定义和使用,包括函数的参数、返回值、局部变量和全景变量等操作。 一、一个思考 例如这里有一段大东北洗浴中…

2023年PMP报考时间安排攻略!

1.2023年PMP考试时间 PMP一年开考4次,分别为3月、6月、9月、12月,预计2023年PMP第一次考试时间在2023年3月左右,具体以基金会官方通知为准。 1)为什么考PMP? 大部分人考 PMP 无非以下几个原因,总的来说&…

运行时内存数据区之程序计数器

内存是非常重要的系统资源,是硬盘和CPU的中间仓库及桥梁,承载着操作系统和应用程序的实时选行。JVM内存布局规定了Java在运行过程中内存申请、分配、管理的策略,保证了JVM的高效稳定运行。 不同的VM对于内存的划分方式和管理机制存在着部分差…

算法时间复杂度计算

目录 1.时间复杂度计算 1.1 时间复杂度例题 1.1.1例题 1.1.2例题 1.1.3例题 1.1.4例题 1.2时间复杂度leetcode例题 1.时间复杂度计算 首先,我们需要了解时间复杂度是什么:算法的时间复杂度是指算法在编写成可执行程序后,运行时需要耗费…

一天吃透操作系统八股文

操作系统的四个特性? 并发:同一段时间内多个程序执行(与并行区分,并行指的是同一时刻有多个事件,多处理器系统可以使程序并行执行) 共享:系统中的资源可以被内存中多个并发执行的进线程共同使…

MATLAB | 给热图整点花哨操作(三角,树状图,分组图)

前段时间写的特殊热图绘制函数迎来大更新,基础使用教程可以看看这一篇: https://slandarer.blog.csdn.net/article/details/129292679 原本的绘图代码几乎完全不变,主要是增添了很多新的功能!!! 工具函数完…

FastChat开放,媲美ChatGPT的90%能力——从下载到安装、部署

FastChat开放,媲美ChatGPT的90%能力——从下载到安装、部署前言两个前置软件创建FastChat虚拟环境安装PyTorch安装 FastChat下载 LLaMA,并转换生成FastChat对应的模型Vicuna启动FastChat的命令行交互将模型部署为一个服务,提供Web GUI前言 最…

Cesium:自定义MaterialProperty

在项目中应用Cesium.js时,时常遇到需要对Cesium.js的Material材质或者MaterialProperty材质属性进行拓展的应用场景。如果对GLSL(openGL Shading Language ),即:OpenGL着色语言熟悉的话,参考Cesium官方文档,构建一个新的Material必定不是难事。而MaterialProperty材质属…

【C语言进阶:动态内存管理】动态内存函数的介绍

本节重点内容: malloc 和 free 函数calloc 函数realloc 函数🌸为什么存在动态内存分配 到目前为止,我们已经掌握的内存开辟方式有两种: 创建变量:int val 20; //在栈空间上开辟四个字节 创建数组&#xff1…

Html5钢琴块游戏制作与分享(音游可玩)

当年一款手机节奏音游,相信不少人都玩过或见过。最近也是将其做了出来分享给大家。 游戏的基本玩法:点击下落的黑色方块,弹奏音乐。(下落的速度会越来越快) 可以进行试玩,手机玩起来效果会更好些。 点击…

【Python】基于serial的UART串口通信(可实现AT指令自动化 以ML307A开发板为例)

【Python】基于serial的UART串口通信(可实现AT指令自动化 以ML307A开发板为例) Python下的串口serial库 串行口的属性: name:设备名字 portstr:已废弃,用name代替 port:读或者写端口 baudrate:波特率 byt…

Charles 安装及配置,详细步骤(不错,保存一下)

一、安装激活 1.1、下载 https://www.charlesproxy.com/download/ 1.2、激活 打开Charles > Help > Register Charles > 输入 Registered Name : https://zhile.io License Key:48891cf209c6d32bf4 二、代理配置 2.1、代理设置 Proxy > Pr…

Nodejs中的fs模块

一、文件写入操作 writeFile 直接打开文件默认是 w 模式,所以如果文件存在,该方法写入的内容会覆盖旧的文件内容 语法: writeFile(file, data[, options], callback)异步writeFileSync(file, data)同步 参数: file文件名data要…

MYSQL 2:一条更新语句是如何进行的

一. MYSQL的一条更新语句如何进行的? 和查询一样,一开始我们需要通过连接器连接到MYSQL服务器上,然后我们会将我们的语句交给解析器,然后交给执行器。比如我们执行一条这样的语句 update cc1 from user_info where id 2 1.执行…

PTA:C课程设计(5)

山东大学(威海)2022级大一下C习题集(5)函数题5-6-1 求一组数中的平均值及最大值5-6-2 判断满足条件的三位数5-6-3 函数实现字符串逆序5-6-4 查找子串5-6-5 计算最长的字符串长度5-6-6 二分查找编程题5-7-1 找最长的字符串5-7-2 藏…

第七天sql优化篇

一、查询SQL尽量不要使用select *,而是select具体字段 因为select * 进行查询时,很可能就不会使用到覆盖索引了,就会造成回表查询 select stu.name from student stu; 二、如果知道查询结果只有一条或者只要最大/最小一条记录&#xff…

CMMI认证唯一查询官网

CMMI是“能力成熟度模型集成”的意思。是一种评估或者认证制度。最新的CMMI V2.0模型有四个视图,DEV开发视图、SVC服务视图、供应商、人力资源,目前开发视图是全球应用最广泛的,主要是由CMMI研究院主任评估师按照CMMI模型检查企业或组织的软件…

HTML - 实现IE浏览器访问网址自动跳转至谷歌浏览器打开

HTML - 实现IE浏览器访问网址自动跳转至谷歌浏览器打开一. 实现代码二. IE浏览器设置一. 实现代码 注意:代码中的数据变量需要使用 var 声明 核心代码var href "http://www.baidu.com" //创建ActiveXObject实例,只在IE下有效,才可…