
当数据文件中同时包含 UPSERT 和 DELETE 操作时,必须添加 __op 字段,并且确保数据文件中包含一个代表操作类型的列,取值为 0 或 1。其中,取值为 0 时代表 UPSERT 操作,取值为 1 时代表 DELETE 操作。
数据样例
- 准备数据文件。 - a. 在本地文件系统创建一个 CSV 格式的数据文件 - example3.csv。文件包含四列,分别代表用户 ID、用户姓名、用户得分和操作类型,如下所示:- 101,Tom,100,1 102,Sam,70,0 103,Stan,80,0- b. 把 - example3.csv文件中的数据上传到 Kafka 集群的- topic3中。
- 准备 StarRocks 表。 - a. 在数据库 - test_db中创建一张名为- table3的主键模型表。表包含- id、- name和- score三列,分别代表用户 ID、用户名称和用户得分,主键为- id列,如下所示:- CREATE TABLE `table3` ( `id` int(11) NOT NULL COMMENT "用户 ID", `name` varchar(65533) NOT NULL COMMENT "用户姓名", `score` int(11) NOT NULL COMMENT "用户得分" ) ENGINE=OLAP PRIMARY KEY(`id`) DISTRIBUTED BY HASH(`id`);- 说明 - 自 2.5.7 版本起,StarRocks 支持在建表和新增分区时自动设置分桶数量 (BUCKETS),您无需手动设置分桶数量。更多信息,请参见 确定分桶数量。 - b. 向 - table3表中插入数据,如下所示:- INSERT INTO table3 VALUES (101, 'Tom', 100), (102, 'Sam', 90);
导入数据
通过导入,把 example3.csv 文件中 id 为 101 的数据从 table3 表中删除,把 example3.csv 文件中 id 为 102 的数据更新到 table3 表,并且把 example3.csv 文件中 id 为 103 的数据插入到 table3 表:
- 通过 Stream Load 导入: - curl --location-trusted -u <username>:<password> \ -H "Expect:100-continue" \ -H "label:label4" \ -H "column_separator:," \ -H "columns: id, name, score, temp, __op = temp" \ -T example3.csv -XPUT \ http://<fe_host>:<fe_http_port>/api/test_db/table3/_stream_load- 说明 - 上述示例中,通过 - columns参数把- example3.csv文件中代表组别代码的第四列临时命名为- temp,然后定义- __op字段等于临时命名的- temp列。这样,StarRocks 可以根据- example3.csv文件中第四列的取值是- 0还是- 1来确定执行 UPSERT 还是 DELETE 操作。
代码实现
数据转成JSON,自动增加一个是否删除标志
private String toJsonString(TapTable tapTable, Map<String, Object> record, boolean delete) throws JsonProcessingException {
    if (null == tapTable) throw new IllegalArgumentException("TapTable cannot be null");
    if (null == record) throw new IllegalArgumentException("Record cannot be null");
    LinkedHashMap<String, Object> linkedRecord = new LinkedHashMap<>();
    for (String field : tapTable.getNameFieldMap().keySet()) {
      Object value = record.get(field);
      if (null == value) {
        linkedRecord.put(field, null);
      } else {
        linkedRecord.put(field, value.toString());
      }
    }
    linkedRecord.put(Constants.STARROCKS_DELETE_SIGN, delete ? 1 : 0);
    return objectMapper.writeValueAsString(linkedRecord);
  }stream load导入
public RespContent put(final TapTable table) throws StreamLoadException, StarRocksRetryableException {
    StarRocksConfig config = starRocksContext.getStarRocksConfig();
    StarRocksContext.WriteFormat writeFormat = starRocksContext.getWriteFormat();
    String loadUrl = null;
    try {
      String[] httpNodes = config.getStarRocksHttp().split(",");
      loadUrl = buildLoadUrl(httpNodes[new Random().nextInt(httpNodes.length)], config.getDatabase(), table.getId());
      TapLogger.info("starrocks-load: loadUrl = {}", loadUrl);
      final String prefix = buildPrefix(table.getId());
      String label = prefix + "-" + UUID.randomUUID();
      List<String> columns = new ArrayList<>();
      for (Map.Entry<String, TapField> entry : table.getNameFieldMap().entrySet()) {
        columns.add(entry.getKey());
      }
      // add the STARROCKS_DELETE_SIGN at the end of the column
      columns.add(Constants.STARROCKS_DELETE_SIGN);
      columns.add("__op = "+Constants.STARROCKS_DELETE_SIGN);
      HttpPutBuilder putBuilder = new HttpPutBuilder();
      InputStreamEntity entity = new InputStreamEntity(recordStream, recordStream.getContentLength());
      Collection<String> primaryKeys = table.primaryKeys(true);
      if (CollectionUtils.isEmpty(primaryKeys)) {
        putBuilder.setUrl(loadUrl)
            // 前端表单传出来的值和tdd json加载的值可能有差别,如前端传的pwd可能是null,tdd的是空字符串
            .baseAuth(config.getUser(), config.getPassword())
            .addCommonHeader()
            .addFormat(writeFormat)
            .addColumns(columns)
            .setLabel(label)
            .enableAppend()
            .setEntity(entity);
      } else {
        putBuilder.setUrl(loadUrl)
            // 前端表单传出来的值和tdd json加载的值可能有差别,如前端传的pwd可能是null,tdd的是空字符串
            .baseAuth(config.getUser(), config.getPassword())
            .addCommonHeader()
            .addFormat(writeFormat)
            .addColumns(columns)
            .setLabel(label)
            .enableDelete()
            .setEntity(entity);
      }
      HttpPut httpPut = putBuilder.build();
      TapLogger.debug(TAG, "Call stream load http api, url: {}, headers: {}", loadUrl, putBuilder.header);
      return handlePreCommitResponse(httpClient.execute(httpPut));
    } catch (StarRocksRetryableException e) {
      metrics.clear();
      throw e;
    } catch (Exception e) {
      throw new StreamLoadException(String.format("Call stream load error: %s", e.getMessage()), e);
    }
  }![[蓝桥杯-610]分数](https://img-blog.csdnimg.cn/fdf263b57a4d43bc8c777c26b04cfc44.png)


















