你好,我是 shengjk1,多年大厂经验,努力构建 通俗易懂的、好玩的编程语言教程。 欢迎关注!你会有如下收益:
- 了解大厂经验
- 拥有和大厂相匹配的技术等
希望看什么,评论或者私信告诉我!
文章目录
- 一、前言
- 二、Flink 代码优化
- 2.0 问题发现
- 2.1 原有代码
- 2.2 CompletableFuture 优化
 
- 三、avatorscript 使用的简单介绍
- 3.1 自定义函数
- 3.2 从 Map 中取值
- 3.3 使用 Java 的工具类
- 3.4 AviatorScript 函数
 
- 四、总结
一、前言
目前 Flink 利用 avatorscript 脚本语言,来做到规则的自动化更新。avatorscript将表达式直接翻译成对应的 java 字节码执行,所以在大数据量的情况下,自然而然这里就成为了瓶颈
二、Flink 代码优化
2.0 问题发现

 通过 Flink UI 发现 window 算子是瓶颈,而 window 算子的核心就是 avatorscript 表达式
2.1 原有代码
xxx
AviatorEvaluator.execute(columnFunction, dataView.getProperties(), true);
xxx
经过测试平均执行时间在1毫秒以内,但经不住数据量大,所以Flink QPS一直在 11w 左右
2.2 CompletableFuture 优化
xxx
List<CompletableFuture> executeFutures=new ArrayList<>();
CompletableFuture<Object> executeFuture = CompletableFuture.supplyAsync(() -> {
                return AviatorEvaluator.execute(columnFunction, dataView.getProperties(), true);
            });
executeFutures.add(executeFuture);
for (int i = 0; i < executeFutures.size(); i++) {
    executeFutures.get(i).get()
    xxxx
}
修改完上线后,Flink QPS 有原来 11W 增加到 17W 左右
三、avatorscript 使用的简单介绍
为了让你更容易理解 avatorscript,这里我们也可以先简单的介绍一下:
3.1 自定义函数
class AddFunction extends AbstractFunction {
    @Override
    public AviatorObject call(Map<String, Object> env,
                              AviatorObject arg1, AviatorObject arg2) {
        Number left = FunctionUtils.getNumberValue(arg1, env);
        Number right = FunctionUtils.getNumberValue(arg2, env);
        return new AviatorDouble(left.intValue() + right.intValue());
    }
    public String getName() {
        return "add" ;
    }
}
public static void main(String[] args) throws IllegalAccessException, NoSuchMethodException {
    //注册函数
   AviatorEvaluator.addFunction(new AddFunction());
    System.out.println(AviatorEvaluator.execute( "add(2,1)" ));
}
3.2 从 Map 中取值
public static void main(String[] args) throws IllegalAccessException, NoSuchMethodException {
    //注册函数
     AviatorEvaluator.addFunction(new AddFunction());
 HashMap<String, Object> stringObjectHashMap = new HashMap<>();
    stringObjectHashMap.put( "testId1" , 1);
    stringObjectHashMap.put( "testId2" , 2);
    Object execute = AviatorEvaluator.execute( "add(testId1,testId2)" , stringObjectHashMap);
3.3 使用 Java 的工具类
public static void main(String[] args) throws IllegalAccessException, NoSuchMethodException {
 HashMap<String, Object> stringObjectHashMap = new HashMap<>();
    stringObjectHashMap.put( "ip" , "a1111" );
    // stringObjectHashMap.put("result", "a&B&C&d");
 stringObjectHashMap.put( "voucher_endtime" , "2022.03.02 11:32" );
    stringObjectHashMap.put( "imei2" , "v1aaaaaa1" );
    stringObjectHashMap.put( "testId" , "v1ot_service_quality_1111" );
    stringObjectHashMap.put( "testId1" , "sku" );
    stringObjectHashMap.put( "a" , "123" );
    stringObjectHashMap.put( "a1" , "null" );
    stringObjectHashMap.put( "b1" , 123);
 
    AviatorEvaluator.addStaticFunctions( "doubleStatic" , Double.class);
    AviatorEvaluator.addInstanceFunctions( "doubleInstance" , Double.class)
 execute2 = AviatorEvaluator.execute( "(doubleStatic.valueOf(sys_net_bandwidth))" , stringObjectHashMap);
    System.out.println(execute2);
    execute2 = AviatorEvaluator.execute( "doubleInstance.longValue(doubleStatic.valueOf(sys_net_bandwidth)) " , stringObjectHashMap);
    System.out.println( "###" + execute2);
    execute2 = AviatorEvaluator.execute( "doubleInstance.longValue(doubleStatic.valueOf(str(voucher)))" , stringObjectHashMap);
3.4 AviatorScript 函数
## examples/function.av
fn add(x, y) {
  return x + y;
}
p(add(1,2))
public static void main(String[] args) throws IllegalAccessException, NoSuchMethodException {
    String function = "## examples/function.av\n" +
            "\n" +
            "fn add(x, y) {\n" +
            "  return x + y;\n" +
            "}" ;
    AviatorEvaluator.defineFunction( "add" , function);
    System.out.println( "defineFunction6666================+" + AviatorEvaluator.execute( "add(1,2)" , stringObjectHashMap));
}
四、总结
本文主要介绍了 Flink 中使用 avatorscript 脚本语言的问题,以及如何通过 CompletableFuture 优化代码来提高 Flink QPS。同时,还介绍了 avatorscript 的使用方法,包括自定义函数、从 Map 中取值、使用 Java 工具类和 AviatorScript 函数。通过本文的介绍,读者可以更好地了解 Flink 中 avatorscript 的使用方法,以及如何优化代码来提高 Flink QPS。



















