撤回流的处理
撤回流是指流式处理过程中,两表join过程中的数据是一条一条跑过来的,即原本可以join到一起的数据在刚开始可能并没有join上。
- 撤回流的格式:
- 解决方案
- 定时器:使用定时器定时10s(数据最大的时间差值),定时器触发时将状态中的数据发送过来
- 如果重复计算这些数据,如何保持结果正确即可;通过每次度量值修改为
当次度量值 - 上次度量值
即可
异步IO
- 减少等待的时间,充分利用已有的资源
- 使用异步IO时,必须保证从头到尾都是异步的操作;即使用异步的连接器
/**
* 获取到 redis 的异步连接
*
* @return 异步链接对象
*/
public static StatefulRedisConnection<String, String> getRedisAsyncConnection() {
RedisClient redisClient = RedisClient.create("redis://hadoop102:6379/2");
return redisClient.connect();
}
/**
* 关闭 redis 的异步连接
*
* @param redisAsyncConn
*/
public static void closeRedisAsyncConnection(StatefulRedisConnection<String, String> redisAsyncConn) {
if (redisAsyncConn != null) {
redisAsyncConn.close();
}
}
/**
* 获取到 Hbase 的异步连接
*
* @return 得到异步连接对象
*/
public static AsyncConnection getHBaseAsyncConnection() {
Configuration conf = new Configuration();
conf.set("hbase.zookeeper.quorum", "hadoop102");
conf.set("hbase.zookeeper.property.clientPort", "2181");
try {
return ConnectionFactory.createAsyncConnection(conf).get();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
/**
* 关闭 hbase 异步连接
*
* @param asyncConn 异步连接
*/
public static void closeAsyncHbaseConnection(AsyncConnection asyncConn) {
if (asyncConn != null) {
try {
asyncConn.close();
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}
异步IO关联
AsyncDataStream.unorderedWait(异步核心逻辑, 60, TimeUnit.SECONDS)
异步关联维度表CompletableFuture.supplyAsync(new Supplier<>(){ 异步访问读取Redis中的数据 })
,返回的数据类型是Future类型- 先拼写访问的redisKey
- 获取到dimSkuInfoFuture期货
- 使用dimSkuInfoFuture.get()获取异步结果
thenApplyAsync(new Function<>())
, 旁路缓存判断,判断是否在redis中读取到相关数据,如果没有读取到,需要访问HBase.- 需要重写HBase的getCells方法,改为getAsyncCells方法
- 连接更换为异步连接
- Future类型数据需要再get()方法获取具体的值
- 无需关闭连接
- 将从HBase读取的数据保存到redis,
redisAsyncConnection.async().setex(redisKey,24*60*60,dimJsonObj.toJSONString());
异步维度关联封装
- 继承
RichAsyncFunction<TradeSkuOrderBean, TradeSkuOrderBean>
接口 - 将表名和rowkey拼接的方法抽象化,让方法调用者自己传进来
- 封装join方法,
join(TradeSkuOrderBean input, JSONOjbect dim)
; join方法里面填写度量值的聚合逻辑 - 将抽象方法和具体方法分离,把抽象方法放到接口中,在实现该接口
- 将
TradeSkuOrderBean
类改为泛型方法T
public abstract class DimAsyncFunction<T> extends RichAsyncFunction<T, T>
implements DimJoinFunction<T>{
StatefulRedisConnection<String, String> redisAsyncConnection;
AsyncConnection hBaseAsyncConnection;
String tableName;
@Override
public void open(Configuration parameters) throws Exception {
redisAsyncConnection = RedisUtil.getRedisAsyncConnection();
hBaseAsyncConnection = HBaseUtil.getHBaseAsyncConnection();
}
@Override
public void close() throws Exception {
RedisUtil.closeRedisAsyncConnection(redisAsyncConnection);
HBaseUtil.closeAsyncHbaseConnection(hBaseAsyncConnection);
}
@Override
public void asyncInvoke(T input, ResultFuture<T> resultFuture) throws Exception {
//java的异步编程方式
String tableName = getTableName();
String rowKey = getId(input);
String redisKey = RedisUtil.getRedisKey(tableName, rowKey);
CompletableFuture.supplyAsync(new Supplier<String>() {
@Override
public String get() {
//第一步异步访问得到的数据
RedisFuture<String> dimSkuInfoFuture = redisAsyncConnection.async().get(redisKey);
String dimInfo = null;
try {
dimInfo = dimSkuInfoFuture.get();
} catch (Exception e) {
e.printStackTrace();
}
return dimInfo;
}
}).thenApplyAsync(new Function<String, JSONObject>() {
@Override
public JSONObject apply(String dimInfo) {
JSONObject dimJsonObj = null;
//旁路缓存判断
if (dimInfo == null || dimInfo.isEmpty()) {
try {
//需要访问HBase
dimJsonObj = HBaseUtil.getAsyncCells(hBaseAsyncConnection, Constant.HBASE_NAMESPACE, tableName, rowKey);
//将读取的数据保存到redis
redisAsyncConnection.async().setex(redisKey, 24 * 60 * 60, dimJsonObj.toJSONString());
} catch (Exception e) {
e.printStackTrace();
}
} else {
//redis中存在缓存数据
dimJsonObj = JSONObject.parseObject(dimInfo);
}
return dimJsonObj;
}
}).thenAccept(new Consumer<JSONObject>() {
public void accept(JSONObject dim) {
//合并维度信息
if (dim == null) {
//无法关联到维度信息
System.out.println("无法关联到当前的维度信息:" + tableName + ":" + rowKey);
} else {
join(input,dim);
}
//返回结果
resultFuture.complete(Collections.singletonList(input));
}
});
}
}