达梦数据库自定义插件
达梦8的依赖引入
定义reader module
定义writer module
修改核心配置数据库类型支持
打包插件
测试
以mysql到dm数据库为例
配置mysql2dm.json
执行任务
查询下结果
DataX二次开发之达梦数据库插件
DataX 是阿里巴巴集团内被广泛使用的离线数据同步工具/平台,支持大部分主流的数据库之间的数据同步,也提供了数据库插件的自定义开发。
达梦数据库自定义插件
插件自定义开发详细看官网dataxPluginDev.md说明文档
达梦8的依赖引入
<!-- dm driver -->
  <!-- https://mvnrepository.com/artifact/com.dameng/DmJdbcDriver18 -->
  <dependency>
   <groupId>com.dameng</groupId>
   <artifactId>DmJdbcDriver18</artifactId>
   <version>8.1.3.140</version>
  </dependency> 
 定义reader module
 
 定义writer module
 
 修改核心配置数据库类型支持
public enum DataBaseType {
    Dm("dm", "dm.jdbc.driver.DmDriver"),
   //...省略其他
    private String typeName;
    private String driverClassName;
    DataBaseType(String typeName, String driverClassName) {
        this.typeName = typeName;
        this.driverClassName = driverClassName;
    }
    public String getDriverClassName() {
        return this.driverClassName;
    }
    public String appendJDBCSuffixForReader(String jdbc) {
      
            case Oracle:
                break;
            case Dm:
              //...省略其他
            default:
                throw DataXException.asDataXException(DBUtilErrorCode.UNSUPPORTED_TYPE, "unsupported database type.");
        }
        return result;
    }
    public String appendJDBCSuffixForWriter(String jdbc) {
        String result = jdbc;
        String suffix = null;
        switch (this) {
       
            case Oracle:
                break;
            case Dm:
                break;
          //...省略其他
            default:
                throw DataXException.asDataXException(DBUtilErrorCode.UNSUPPORTED_TYPE, "unsupported database type.");
        }
        return result;
    }
    public String formatPk(String splitPk) {
        String result = splitPk;
        switch (this) {
            case MySql:
            
            case Dm:
                if (splitPk.length() >= 2 && splitPk.startsWith("`") && splitPk.endsWith("`")) {
                    result = splitPk.substring(1, splitPk.length() - 1).toLowerCase();
                }
                break;
              //...省略其他
            default:
                throw DataXException.asDataXException(DBUtilErrorCode.UNSUPPORTED_TYPE, "unsupported database type.");
        }
        return result;
    }
    public String quoteColumnName(String columnName) {
        String result = columnName;
        switch (this) {
          
            case Dm:
               //...省略其他
            default:
                throw DataXException.asDataXException(DBUtilErrorCode.UNSUPPORTED_TYPE, "unsupported database type");
        }
        return result;
    }
    public String quoteTableName(String tableName) {
        String result = tableName;
        switch (this) {
            case MySql:
                result = "`" + tableName.replace("`", "``") + "`";
                break;
            case Oracle:
                break;
            case Dm:
                break;
              //...省略其他
            default:
                throw DataXException.asDataXException(DBUtilErrorCode.UNSUPPORTED_TYPE, "unsupported database type");
        }
        return result;
    }
    private static Pattern mysqlPattern = Pattern.compile("jdbc:mysql://(.+):\\d+/.+");
    private static Pattern oraclePattern = Pattern.compile("jdbc:oracle:thin:@(.+):\\d+:.+");
    private static Pattern dmPattern = Pattern.compile("jdbc:dm://:@(.+):\\d+:.+");
    /**
     * 注意:目前只实现了从 mysql/oracle 中识别出ip 信息.未识别到则返回 null.
     */
    public static String parseIpFromJdbcUrl(String jdbcUrl) {
        Matcher mysql = mysqlPattern.matcher(jdbcUrl);
        if (mysql.matches()) {
            return mysql.group(1);
        }
        Matcher oracle = oraclePattern.matcher(jdbcUrl);
        if (oracle.matches()) {
            return oracle.group(1);
        }
        Matcher dm = dmPattern.matcher(jdbcUrl);
        if (dm.matches()) {
            return dm.group(1);
        }
        return null;
    }
      //...省略其他
} 
 
 
 打包插件
需要在父类的插件里边配置模块以及插件打包配置
 
 maven执行以下命令
mvn -U clean package assembly:assembly -Dmaven.test.skip=true 
 
 
 测试
以mysql到dm数据库为例
mysql建立表并插入数据
-- test.psn definition
CREATE TABLE `psn` (
  `id` int(11) NOT NULL,
  `name` text,
  `address` text
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
INSERT INTO psn
(id, name, address)
VALUES(1, 'elite', 'gz');
INSERT INTO psn
(id, name, address)
VALUES(2, 'tom', 'bj');
INSERT INTO psn
(id, name, address)
VALUES(3, 'jack', 'sz');
INSERT INTO psn
(id, name, address)
VALUES(4, 'json', 'sh'); 
 在达梦数据库里边创建一个表psn
CREATE TABLE test.psn
(
 id int NOT NULL,
 name VARCHAR(40) NULL,
 address VARCHAR(100)
); 
 配置mysql2dm.json
关系型数据库插件都差不多的配置
{
   "job": {
    "setting": {
      "speed": {
        "channel":2
      }
    },
    "content": [
      {
        "reader": {
          "name": "mysqlreader",
          "parameter": {
            "username": "root",
            "password": "123456",
            "splitPk": "id",
            "column": ["id","name","address"],
            "connection": [
              {
                "jdbcUrl": ["jdbc:mysql://mysqlip:3306/test?useUnicode=true&characterEncoding=UTF-8&useSSL=false"],
                "table": ["psn"]
              }
            ]
          }
        },
        "writer": {
          "name": "dmwriter",
          "parameter": {
            "username": "test",
            "password": "123456@dm",
            "column": ["id","name","address"],
            "connection": [
              {
                "table": [
                  "psn"
                ],
                "jdbcUrl": "jdbc:dm://ip:5236?schema=TEST"
              }
            ]
          }
        }
      }
    ]
  }
} 
 执行任务
执行任务可以用命令测试,详细可以参考官网,或者用java,本例以java为例
 
 查询下结果
 
 



















