Interval Join
Interval join 组合元素的条件为:两个流(暂时称为 A 和 B)中 key 相同且 B 中元素的 timestamp 处于 A 中元素 timestamp 的一定范围内,即 b.timestamp ∈ [a.timestamp + lowerBound; a.timestamp + upperBound]
或 a.timestamp + lowerBound <= b.timestamp <= a.timestamp + upperBound
这里的 a 和 b 为 A 和 B 中共享相同 key 的元素,上界和下界可正可负,只要下界永远小于等于上界即可,Interval join 目前仅执行 inner join。
当一对元素被传递给 ProcessJoinFunction
,他们的 timestamp 会从两个元素的 timestamp 中取最大值 (timestamp 可以通过 ProcessJoinFunction.Context
访问)。
Interval join 目前仅支持 event time。
上例中,join 了橙色和绿色两个流,join 的条件是:以 -2 毫秒为下界、+1 毫秒为上界。
默认情况下,上下界也被包括在区间内,但 .lowerBoundExclusive()
和 .upperBoundExclusive()
可以将它们排除在外。
图中三角形所表示的条件也可以写成更加正式的表达式:
orangeElem.ts + lowerBound <= greenElem.ts <= orangeElem.ts + upperBound
代码示例:
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
...
DataStream<Integer> orangeStream = ...;
DataStream<Integer> greenStream = ...;
orangeStream
.keyBy(<KeySelector>)
.intervalJoin(greenStream.keyBy(<KeySelector>))
.between(Time.milliseconds(-2), Time.milliseconds(1))
.process (new ProcessJoinFunction<Integer, Integer, String>(){
@Override
public void processElement(Integer left, Integer right, Context ctx, Collector<String> out) {
out.collect(left + "," + right);
}
});