使用场景: 标量函数即 UDF,⽤于进⼀条数据出⼀条数据的场景。
开发流程:
- 实现 org.apache.flink.table.functions.ScalarFunction 接⼝
- 实现⼀个或者多个⾃定义的 eval 函数,名称必须叫做 eval,eval ⽅法签名必须是 public 的
- eval ⽅法的⼊参、出参都是直接体现在 eval 函数的签名中
开发案例:
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.annotation.DataTypeHint;
import org.apache.flink.table.annotation.InputGroup;
import org.apache.flink.table.api.*;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.functions.ScalarFunction;
import static org.apache.flink.table.api.Expressions.*;
/**
* 输入数据:
* nc -lk 88888
* a,1
*
* 输出结果:
* res1=>:3> +I[97]
* res2=>:3> +I[97]
* res3=>:3> +I[97]
*/
public class ScalarFunctionTest {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings settings = EnvironmentSettings.newInstance()
.useBlinkPlanner()
.inStreamingMode()
.build();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, settings);
DataStreamSource<String> source = env.socketTextStream("localhost", 8888);
SingleOutputStreamOperator<Tuple2<String, String>> tpStream = source.map(new MapFunction<String, Tuple2<String, String>>() {
@Override
public Tuple2<String, String> map(String input) throws Exception {
return new Tuple2<>(input.split(",")[0], input.split(",")[1]);
}
});
Table table = tEnv.fromDataStream(tpStream, "id,name");
tEnv.createTemporaryView("SourceTable",table);
// 在 Table API ⾥不经注册直接调⽤函数
Table res1 = tEnv.from("SourceTable").select(call(HashFunction.class, $("id")));
// 注册函数
tEnv.createTemporarySystemFunction("HashFunction", HashFunction.class);
// 在 Table API ⾥调⽤注册好的函数
Table res2 = tEnv.from("SourceTable").select(call("HashFunction", $("id")));
// 在 SQL ⾥调⽤注册好的函数
Table res3 = tEnv.sqlQuery("SELECT HashFunction(id) FROM SourceTable");
tEnv.toDataStream(res1).print("res1=>");
tEnv.toDataStream(res2).print("res2=>");
tEnv.toDataStream(res3).print("res3=>");
env.execute();
}
public static class HashFunction extends ScalarFunction {
// 接受任意类型输⼊,返回 INT 型输出
public int eval(@DataTypeHint(inputGroup = InputGroup.ANY) Object o) {
return o.hashCode();
}
}
}
测试结果: