文章目录
- 一、Java开发UDF
- 1、创建Maven项目
- 2、创建UDF类
- 3、打包上传资源
- 4、创建函数MyUDF
- 5、SQL验证
- 二、Java开发UDTF
- 1、创建Maven项目
- 2、创建UDTF类
- 3、打包上传更新资源
- 4、创建函数MyUDTF
- 5、SQL验证
- 三、常见问题
- 1、发布函数报错
一、Java开发UDF
1、创建Maven项目
创建Maven项目,名称为 MaxComputeUDF 配置pom文件如下
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.example</groupId>
<artifactId>MaxComputeUDF</artifactId>
<version>1.0-SNAPSHOT</version>
<packaging>jar</packaging>
<dependencies>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
<version>4.5.5</version>
</dependency>
<dependency>
<groupId>commons-codec</groupId>
<artifactId>commons-codec</artifactId>
<version>1.11</version>
</dependency>
<dependency>
<groupId>com.github.ben-manes.caffeine</groupId>
<artifactId>caffeine</artifactId>
<version>2.7.0</version>
</dependency>
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
<version>2.10.1</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.62</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-api -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.25</version>
</dependency>
<!-- https://mvnrepository.com/artifact/ch.qos.logback/logback-classic -->
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>1.2.3</version>
</dependency>
<!-- https://mvnrepository.com/artifact/ch.qos.logback/logback-core -->
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-core</artifactId>
<version>1.2.3</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.20</version>
</dependency>
<dependency>
<groupId>com.aliyun.odps</groupId>
<artifactId>odps-sdk-core</artifactId>
<version>${sdk.version}</version>
</dependency>
<dependency>
<groupId>com.aliyun.odps</groupId>
<artifactId>odps-sdk-udf</artifactId>
<version>${sdk.version}</version>
</dependency>
<dependency>
<groupId>com.aliyun.odps</groupId>
<artifactId>odps-udf-local</artifactId>
<version>${sdk.version}</version>
</dependency>
<dependency>
<groupId>com.aliyun.odps</groupId>
<artifactId>odps-sdk-mapred</artifactId>
<version>${sdk.version}</version>
</dependency>
<dependency>
<groupId>com.aliyun.odps</groupId>
<artifactId>odps-mapred-local</artifactId>
<version>${sdk.version}</version>
</dependency>
<dependency>
<groupId>com.aliyun.odps</groupId>
<artifactId>odps-sdk-graph</artifactId>
<version>${sdk.version}</version>
</dependency>
<dependency>
<groupId>com.aliyun.odps</groupId>
<artifactId>odps-graph-local</artifactId>
<version>${sdk.version}</version>
</dependency>
</dependencies>
<properties>
<sdk.version>0.38.3-public</sdk.version>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
</properties>
<build>
<plugins>
<!-- Scala Compiler -->
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.2.2</version>
<executions>
<execution>
<id>scala-compile-first</id>
<phase>process-resources</phase>
<goals>
<goal>compile</goal>
</goals>
</execution>
<execution>
<id>scala-test-compile</id>
<phase>process-test-resources</phase>
<goals>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
<configuration>
<args>
<arg>-nobootcp</arg>
</args>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.0.0</version>
<executions>
<!-- Run shade goal on package phase -->
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<artifactSet>
<excludes>
<exclude>org.apache.flink:force-shading</exclude>
<exclude>com.google.code.findbugs:jsr305</exclude>
<!-- <exclude>org.slf4j:*</exclude>-->
<!-- <exclude>log4j:*</exclude>-->
</excludes>
</artifactSet>
<filters>
<filter>
<!-- Do not copy the signatures in the META-INF folder.
Otherwise, this might cause SecurityExceptions when using the JAR. -->
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<transformers>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass></mainClass>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
2、创建UDF类
逻辑简单,仅供测试使用
package com.aliyun;
import com.aliyun.odps.udf.UDF;
import com.aliyun.odps.udf.annotation.Resolve;
@Resolve({"string->string"})
public final class MyUDF extends UDF {
public String evaluate(String s) {
if (s == null) {
return null;
}
return s.toLowerCase();
}
}
3、打包上传资源
在Dataworks数据开发页面上传资源并发布
4、创建函数MyUDF
配置如下
5、SQL验证
--函数名不区分大小写
SELECT myudf("HELLO")
二、Java开发UDTF
1、创建Maven项目
上面已创建项目,pom文件也配置好了,直接跳过该步骤
2、创建UDTF类
package com.aliyun;
import com.aliyun.odps.udf.ExecutionContext;
import com.aliyun.odps.udf.UDFException;
import com.aliyun.odps.udf.UDTF;
import com.aliyun.odps.udf.annotation.Resolve;
import com.alibaba.fastjson.*;
@Resolve({"string->bigint,string,string"})
public class MyUDTF extends UDTF {
@Override
public void process(Object[] args) throws UDFException {
String event = (String) args[0];
JSONArray jsonArray = JSON.parseArray(event);
for (int i = 0; i < jsonArray.size(); i++) {
JSONObject jsonObject = jsonArray.getJSONObject(i);
String ett = (String) jsonObject.getString("aa");
String eventName = (String) jsonObject.getString("bb");
String eventJson = (String) jsonObject.getString("cc");
forward(Long.parseLong(ett), eventName, eventJson);
}
}
}
3、打包上传更新资源
打包后直接更新第一次上传的jar包,重新发布
4、创建函数MyUDTF
配置如下
5、SQL验证
测试数据如下
{
"a": "",
"b": "app",
"key": [{
"aa": "1",
"bb": "张三",
"cc": {
"age": "21",
"des": "工人"
}
},
{
"aa": "2",
"bb": "李四",
"cc": {
"age": 24,
"des": "大学生"
}
},
{
"aa": "3",
"bb": "王五",
"cc": {
"age": "33",
"des": "老师"
}
}]
}
测试SQL如下
SELECT MyUDTF(
GET_JSON_OBJECT('{
"a": "",
"b": "app",
"key": [{
"aa": "1",
"bb": "张三",
"cc": {
"age": "21",
"des": "工人"
}
},
{
"aa": "2",
"bb": "李四",
"cc": {
"age": 24,
"des": "大学生"
}
},
{
"aa": "3",
"bb": "王五",
"cc": {
"age": "33",
"des": "老师"
}
}]
}','$.key')
) AS (id,name,jsonvalue)
;
测试结果如下
三、常见问题
1、发布函数报错
失败原因:Fail to add or update function MyUDF. Error message is MaxCompute exception happened. ErrorCode: InvalidParameter, ErrorMessage: ODPS-0421111: Resource not found - ‘maxcomputeudf.jar’.
问题原因: 引用的资源没有发布,导致找不到资源
解决方案: 先发布引用的资源, 再重试发布函数即可,这里注意未发布的资源会有上传的符号