Flink State 和 Fault Tolerance详解

有状态操作或者操作算子在处理DataStream的元素或者事件的时候需要存储计算的中间状态,这就使得状态在整个Flink的精细化计算中有着非常重要的地位:

  • 记录数据从某一个过去时间点到当前时间的状态信息。
  • 以每分钟/小时/天汇总事件时,状态将保留待处理的汇总记录。
  • 在训练机器学习模型时,状态将保持当前版本的模型参数。

Flink在管理状态方面,使用Checkpoint和Savepoint实现状态容错。Flink的状态在计算规模发生变化的时候,可以自动在并行实例间实现状态的重新分发,底层使用State Backend策略存储计算状态,State Backend决定了状态存储的方式和位置(后续章节介绍)。

Flink在状态管理中将所有能操作的状态分为Keyed StateOperator State,其中Keyed State类型的状态同key一一绑定,并且只能在KeyedStream中使用。所有non-KeyedStream状态操作都叫做Operator State。Flink在底层做状态管理时,将Keyed State和<parallel-operator-instance, key>关联,由于某一个key仅仅落入其中一个operator-instance中,因此可以简单的理解Keyed State是和<operator,key>进行绑定的,采用Key Group机制对Keyed State进行管理或者分类,所有的keyed-operator在做状态操作的时候可能需要和1~n个Key Group进行交互。

Flink在分发Keyed State状态的时候,不是以key为单位,而是以Key Group为最小单元分发

Operator State (也称为 non-keyed state),每一个operator state 会和一个parallel operator instance进行绑定。Keyed StateOperator State 以两种形式存在( managed(管理)和 raw(原生)),所有Flink已知的操作符都支持Managed State,但是Raw State仅仅在用户自定义Operator时使用,并且不支持在并行度发生变化的时候重新分发状态,因此,虽然Flink支持Raw State,但是在绝大多数的应用场景下,一般使用的都是Managed State。

Keyed State

Keyed-state接口提供对不同类型状态的访问,所有状态都限于当前输入元素的key。

类型说明方法
ValueState这个状态主要存储一个可以用作更新的值update(T)
T value()
clear()
ListState这将存储List集合元素add(T)
addAll(List)
Iterable get()
update(List)
clear()
ReducingState这将保留一个值,该值表示添加到状态的所有值的汇总
需要用户提供ReduceFunction
add(T)
T get()
clear()
AggregatingState<IN, OUT>这将保留一个值,该值表示添加到状态的所有值的汇总
需要用户提供AggregateFunction
add(IN)
T get()
clear()
FoldingState<T, ACC>这将保留一个值,该值表示添加到状态的所有值的汇总
需要用户提供FoldFunction
add(IN)
T get()
clear()
MapState<UK, UV>这个状态会保留一个Map集合元素put(UK, UV)
putAll(Map<UK, UV>)
entries()
keys()
values()
clear()

ValueSate

var env=StreamExecutionEnvironment.getExecutionEnvironment
env.socketTextStream("centos",9999)
.flatMap(_.split("\\s+"))
.map((_,1))
.keyBy(0)
.map(new RichMapFunction[(String,Int),(String,Int)] {
    var vs:ValueState[Int]=_
    
    override def open(parameters: Configuration): Unit = {
        val vsd=new ValueStateDescriptor[Int]("valueCount",createTypeInformation[Int])
        vs=getRuntimeContext.getState[Int](vsd)
    }
    
    override def map(value: (String, Int)): (String, Int) = {
        val histroyCount = vs.value()
        val currentCount=histroyCount+value._2
        vs.update(currentCount)
        (value._1,currentCount)
    }
}).print()
env.execute("wordcount")

AggregatingState<IN, OUT>

var env=StreamExecutionEnvironment.getExecutionEnvironment
env.socketTextStream("centos",9999)
.map(_.split("\\s+"))
.map(ts=>(ts(0),ts(1).toInt))
.keyBy(0)
.map(new RichMapFunction[(String,Int),(String,Double)] {
    var vs:AggregatingState[Int,Double]=_
    override def open(parameters: Configuration): Unit = {
        val vsd=new AggregatingStateDescriptor[Int,(Double,Int),Double]("avgCount",new AggregateFunction[Int,(Double,Int),Double] {
            override def createAccumulator(): (Double, Int) = {
                (0.0,0)
            }

            override def add(value: Int, accumulator: (Double, Int)): (Double, Int) = {
                (accumulator._1+value,accumulator._2+1)
            }
            
            override def merge(a: (Double, Int), b: (Double, Int)): (Double, Int) = {
                (a._1+b._1,a._2+b._2)
            }
            
            override def getResult(accumulator: (Double, Int)): Double = {
                accumulator._1/accumulator._2
            }
        },createTypeInformation[(Double,Int)])
        vs=getRuntimeContext.getAggregatingState(vsd)
    }
    override def map(value: (String, Int)): (String, Double) = {
        vs.add(value._2)
        val avgCount=vs.get()
        (value._1,avgCount)
    }
}).print()
env.execute("wordcount")

MapState<UK, UV>

var env=StreamExecutionEnvironment.getExecutionEnvironment
//001 zs 202.15.10.12 日本 2019-10-10
env.socketTextStream("centos",9999)
.map(_.split("\\s+"))
.map(ts=>Login(ts(0),ts(1),ts(2),ts(3),ts(4)))
.keyBy("id","name")
.map(new RichMapFunction[Login,String] {
    var vs:MapState[String,String]=_
    override def open(parameters: Configuration): Unit = {
        val msd=new MapStateDescriptor[String,String]("mapstate",createTypeInformation[String],createTypeInformation[String])
        vs=getRuntimeContext.getMapState(msd)
    }
    override def map(value: Login): String = {
        println("历史登录")
        for(k<- vs.keys().asScala){
            println(k+" "+vs.get(k))
        }
        var result=""
        if(vs.keys().iterator().asScala.isEmpty){
            result="ok"
        }else{
            if(!value.city.equalsIgnoreCase(vs.get("city"))){
                result="error"
            }else{
                result="ok"
            }
        }
        vs.put("ip",value.ip)
        vs.put("city",value.city)
        vs.put("loginTime",value.loginTime)
        result
    }
}).print()
env.execute("wordcount")

总结

new Rich[Map|FaltMap]Function {
    var vs:XxxState=_ //状态声明
    override def open(parameters: Configuration): Unit = {
        val xxd=new XxxStateDescription //完成状态的初始化
        vs=getRuntimeContext.getXxxState(xxd)
    }
    override def xxx(value: Xx): Xxx = {
       //状态操作
    }
}
  • ValueState<T> getState(ValueStateDescriptor<T>)
  • ReducingState<T> getReducingState(ReducingStateDescriptor<T>)
  • ListState<T> getListState(ListStateDescriptor<T>)
  • AggregatingState<IN, OUT> getAggregatingState(AggregatingStateDescriptor<IN, ACC, OUT>)
  • FoldingState<T, ACC> getFoldingState(FoldingStateDescriptor<T, ACC>)
  • MapState<UK, UV> getMapState(MapStateDescriptor<UK, UV>)

State Time-To-Live(TTL)

基本使用

可以将state存活时间(TTL)分配给任何类型的keyed-state,如果配置了TTL且状态值已过期,则Flink将尽力清除存储的历史状态值。

import org.apache.flink.api.common.state.StateTtlConfig
import org.apache.flink.api.common.state.ValueStateDescriptor
import org.apache.flink.api.common.time.Time

val ttlConfig = StateTtlConfig
    .newBuilder(Time.seconds(1))
    .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
    .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
    .build
val stateDescriptor = new ValueStateDescriptor[String]("text state", classOf[String])
stateDescriptor.enableTimeToLive(ttlConfig)
  • 案例
var env=StreamExecutionEnvironment.getExecutionEnvironment
env.socketTextStream("centos",9999)
.flatMap(_.split("\\s+"))
.map((_,1))
.keyBy(0)
.map(new RichMapFunction[(String,Int),(String,Int)] {
    var vs:ValueState[Int]=_
    override def open(parameters: Configuration): Unit = {
        val vsd=new ValueStateDescriptor[Int]("valueCount",createTypeInformation[Int])

        val ttlConfig = StateTtlConfig.newBuilder(Time.seconds(5)) //过期时间5s
        .setUpdateType(UpdateType.OnCreateAndWrite)//创建和修改的时候更新过期时间
        .setStateVisibility(StateVisibility.NeverReturnExpired)//永不返回过期的数据
        .build()

        vsd.enableTimeToLive(ttlConfig)

        vs=getRuntimeContext.getState[Int](vsd)
    }
    override def map(value: (String, Int)): (String, Int) = {
        val histroyCount = vs.value()
        val currentCount=histroyCount+value._2
        vs.update(currentCount)
        (value._1,currentCount)
    }
}).print()
env.execute("wordcount")

注意:开启TTL之后,系统会额外消耗内存存储时间戳(Processing Time),如果用户以前没有开启TTL配置,在启动之前修改代码开启了TTL,在做状态恢复的时候系统启动不起来,会抛出兼容性失败以及StateMigrationException的异常。

清除Expired State

在默认情况下,仅当明确读出过期状态时,通过调用ValueState.value()方法才会清除过期的数据,这意味着,如果系统一直未读取过期的状态,则不会将其删除,可能会导致存储状态数据的文件持续增长。

Cleanup in full snapshot

系统会从上一次状态恢复的时间点,加载所有的State快照,在加载过程中会剔除那些过期的数据,这并不会影响磁盘已存储的状态数据,该状态数据只会在Checkpoint的时候被覆盖,但是依然解决不了在运行时自动清除过期且没有用过的数据。

import org.apache.flink.api.common.state.StateTtlConfig
import org.apache.flink.api.common.time.Time
val ttlConfig = StateTtlConfig
    .newBuilder(Time.seconds(1))
    .cleanupFullSnapshot
    .build

只能用于memory或者snapshot状态的后端实现,不支持RocksDB State Backend。

Cleanup in background

可以开启后台清除策略,根据State Backend采取默认的清除策略(不同状态的后端存储,清除策略不同)

import org.apache.flink.api.common.state.StateTtlConfig
val ttlConfig = StateTtlConfig
.newBuilder(Time.seconds(1))
.cleanupInBackground
.build
  • Incremental cleanup(基于内存backend)
import org.apache.flink.api.common.state.StateTtlConfig
val ttlConfig = StateTtlConfig.newBuilder(Time.seconds(5))
.setUpdateType(UpdateType.OnCreateAndWrite)
.setStateVisibility(StateVisibility.NeverReturnExpired)
.cleanupIncrementally(100,true) //默认值 5 | false
.build()

第一个参数表示每一次触发cleanup的时候,系统会一次处理100个元素。第二个参数是false,表示只要用户对任意一个state进行操作,系统都会触发cleanup策略;第二个参数是true,表示只要系统接收到记录数(即使用户没有操作状态)就会触发cleanup策略。

  • RocksDB compaction

RocksDB是一个嵌入式的key-value存储,其中key和value是任意的字节流,底层进行异步压缩,会将key相同的数据进行compact(压缩),以减少state文件大小,但是并不对过期的state进行清理,因此可以通过配置compactFilter,让RocksDB在compact的时候对过期的state进行排除,RocksDB数据库的这种过滤特性,默认关闭,如果想要开启,可以在flink-conf.yaml中配置 state.backend.rocksdb.ttl.compaction.filter.enabled:true 或者在应用程序的API里设置RocksDBStateBackend::enableTtlCompactionFilter。

在这里插入图片描述

import org.apache.flink.api.common.state.StateTtlConfig 
val ttlConfig = StateTtlConfig.newBuilder(Time.seconds(5))
.setUpdateType(UpdateType.OnCreateAndWrite)
.setStateVisibility(StateVisibility.NeverReturnExpired)
.cleanupInRocksdbCompactFilter(1000) //默认配置1000
.build()

这里的1000表示,系统在做Compact的时候,会检查1000个元素是否失效,如果失效,则清除该过期数据。

Operator State

如果用户想要使用Operator State,只需要实现通用的CheckpointedFunction 接口或者ListCheckpointed<T extends Serializable>,值得注意的是,目前的operator-state仅仅支持list-style风格的状态,要求所存储的状态必须是一个List,且其中的元素必须可以序列化。

CheckpointedFunction

提供两种不同的状态分发方案:Even-splitUnion

void snapshotState(FunctionSnapshotContext context) throws Exception;
void initializeState(FunctionInitializationContext context) throws Exception;
  • snapshotState():调用checkpoint()的时候,系统会调用snapshotState()对状态做快照
  • initializeState():第一次启动或者从上一次状态恢复的时候,系统会调用initializeState()

Even-split:表示系统在故障恢复时,会将operator-state的元素均分给所有的operator实例,每个operator实例将获取到整个operator-state的sub-list数据。

Union:表示系统在故障恢复时,每一个operator实例可以获取到整个operator-state的全部数据。

案例

class BufferingSink(threshold: Int = 0) extends SinkFunction[(String, Int)]  with CheckpointedFunction  {
    var listState:ListState[(String,Int)]=_
    val bufferedElements = ListBuffer[(String, Int)]()
    //负责将数据输出到外围系统
    override def invoke(value: (String, Int)): Unit = {
        bufferedElements += value
        if(bufferedElements.size == threshold){
            for(ele <- bufferedElements){
                println(ele)
            }
            bufferedElements.clear()
        }
    }
    //是在savepoint|checkpoint时候将数据持久化
    override def snapshotState(context: FunctionSnapshotContext): Unit = {
        listState.clear()
        for(ele <- bufferedElements){
            listState.add(ele)
        }
    }
    //状态恢复|初始化 创建状态
    override def initializeState(context: FunctionInitializationContext): Unit = {
        val lsd = new ListStateDescriptor[(String, Int)]("buffered-elements",createTypeInformation[(String,Int)])
        listState=context.getOperatorStateStore.getListState(lsd)
        if(context.isRestored){
            for(element <- listState.get().asScala) {
                bufferedElements += element
            }
        }
    }
}
var env=StreamExecutionEnvironment.getExecutionEnvironment
env.socketTextStream("centos",9999)
.flatMap(_.split("\\s+"))
.map((_,1))
.keyBy(0)
.sum(1)
.addSink(new BufferingSink(5))
env.execute("testoperatorstate")
  • 启动netcat服务
[root@centos ~]# nc -lk 9999
  • 提交任务

在这里插入图片描述

注意,将并行度设置为1,方便测试

  • 在netcat中输入以下数据
[root@centos ~]# nc -lk 9999
a1 b1 c1 d1
  • 取消任务,并且创建savepoint
[root@centos flink-1.8.1]# ./bin/flink list -m centos:8081
------------------ Running/Restarting Jobs -------------------
17.10.2019 09:49:20 : f21795e74312eb06fbf0d48cb8d90489 : testoperatorstate (RUNNING)
--------------------------------------------------------------
[root@centos flink-1.8.1]# ./bin/flink cancel -m centos:8081 -s hdfs:///savepoints f21795e74312eb06fbf0d48cb8d90489
Cancelling job f21795e74312eb06fbf0d48cb8d90489 with savepoint to hdfs:///savepoints.
Cancelled job f21795e74312eb06fbf0d48cb8d90489. Savepoint stored in hdfs://centos:9000/savepoints/savepoint-f21795-38e7beefe07b.

注意,如果Flink需要和Hadoop整合,必须保证在当前环境变量下有HADOOP_HOME|HADOOP_CALSSPATH

[root@centos flink-1.8.1]# cat /root/.bashrc
HADOOP_HOME=/usr/hadoop-2.9.2
JAVA_HOME=/usr/java/latest
PATH=$PATH:$JAVA_HOME/bin:$HADOOP_HOME/bin:$HADOOP_HOME/sbin
CLASSPATH=.
export JAVA_HOME
export PATH
export CLASSPATH
export HADOOP_HOME
HADOOP_CLASSPATH=`hadoop classpath`
export HADOOP_CLASSPATH
  • 测试状态

在这里插入图片描述

ListCheckpointed

ListCheckpointed接口是CheckpointedFunction接口的一种变体形式,仅仅支持Even-split状态的分发策略。

List<T> snapshotState(long checkpointId, long timestamp) throws Exception;
void restoreState(List<T> state) throws Exception;
  • snapshotState():调用checkpoint()的时候,系统会调用snapshotState()对状态做快照
  • restoreState():等价于上述CheckpointedFunction中声明的initializeState()方法,用作状态恢复

案例

import java.lang.{Long => JLong} //修改类别名
import scala.{Long => SLong} //修改类别名
class CustomStatefulSourceFunction extends ParallelSourceFunction[SLong] with ListCheckpointed[JLong]{
  @volatile
  var isRunning:Boolean = true
  var offset = 0L
    
  override def run(ctx: SourceFunction.SourceContext[SLong]): Unit = {
    val lock = ctx.getCheckpointLock
    while(isRunning){
       Thread.sleep(1000)
       lock.synchronized({
         ctx.collect(offset)
         offset += 1
       })
    }
  }

  override def cancel(): Unit = {
    isRunning=false
  }

  override def snapshotState(checkpointId: Long, timestamp: Long): util.List[JLong] = {
    Collections.singletonList(offset) //存储的是 当前source的偏移量,如果状态不可拆分,用户可以使Collections.singletonList
  }

  override def restoreState(state: util.List[JLong]): Unit = {
    for (s <- state.asScala) {
      offset = s
    }
  }
}
var env=StreamExecutionEnvironment.getExecutionEnvironment
env.addSource[Long](new CustomStatefulSourceFunction)
.print("offset:")
env.execute("testOffset")

广播状态

支持Operator State的第三种类型是广播状态。引入广播状态以支持用例,其中需要将来自一个流的某些数据广播到所有下游任务,广播的状态将存储在本地,用于处理另一个流上所有传入的元素。

A third type of supported operator state is the Broadcast State. Broadcast state was introduced to support use cases where some data coming from one stream is required to be broadcasted to all downstream tasks, where it is stored locally and is used to process all incoming elements on the other stream.

non-keyed

import org.apache.flink.api.common.state.MapStateDescriptor
import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction
import org.apache.flink.util.Collector
import scala.collection.JavaConverters._
class UserBuyPathBroadcastProcessFunction(msd:MapStateDescriptor[String,Int]) extends BroadcastProcessFunction[UserBuyPath,Rule,String]{
    //处理的是UserBuyPath,读取广播状态
    override def processElement(value: UserBuyPath,
                                ctx: BroadcastProcessFunction[UserBuyPath, Rule, String]#ReadOnlyContext,
                                out: Collector[String]): Unit = {
        val broadcastState = ctx.getBroadcastState(msd)
        if(broadcastState.contains(value.channel)){//如果有规则,尝试计算
            val threshold= broadcastState.get(value.channel)
            if(value.path >= threshold){//将满足条件的用户信息输出
                out.collect(value.id+" "+value.name+" "+value.channel+" "+value.path)
            }
        }
    }
    //处理的是规则 Rule 数据 ,记录修改广播状态
    override def processBroadcastElement(value: Rule, ctx: BroadcastProcessFunction[UserBuyPath, Rule, String]#Context,
                                         out: Collector[String]): Unit = {
        val broadcastState = ctx.getBroadcastState(msd)
        broadcastState.put(value.channel,value.threshold)//更新状态

        println("=======rule======")
        for(entry <- broadcastState.entries().asScala){
            println(entry.getKey+"\t"+entry.getValue)
        }
        println()
        println()
    }
}
var env=StreamExecutionEnvironment.getExecutionEnvironment
// id name channel action
// 001 mack 手机 view
// 001 mack 手机 view
// 001 mack 手机 addToCart
// 001 mack 手机 buy
val userStream = fsEnv.socketTextStream("centos", 9999)
    .map(line => line.split("\\s+"))
    .map(ts => UserAction(ts(0), ts(1), ts(2), ts(3)))
    .keyBy("id", "name")
    .map(new UserActionRichMapFunction)

val msd=new MapStateDescriptor[String,Int]("braodcast-sate",createTypeInformation[String],
                                           createTypeInformation[Int])
// channel 阈值
// 手机类   10
val broadcastStream: BroadcastStream[Rule] = fsEnv.socketTextStream("centos", 8888)
    .map(line => line.split("\\s+"))
    .map(ts => Rule(ts(0), ts(1).toInt))
    .broadcast(msd)

userStream.connect(broadcastStream)
.process(new UserBuyPathBroadcastProcessFunction(msd))
.print()
env.execute("testoperatorstate")
case class Rule(channel:String,threshold:Int)
case class UserAction(id:String,name:String ,channel:String,action:String)
case class UserBuyPath(id:String,name:String,channel:String,path:Int)
class UserActionRichMapFunction extends RichMapFunction[UserAction,UserBuyPath]{
  var buyPathState:MapState[String,Int]=_
  override def open(parameters: Configuration): Unit = {
   val msd= new MapStateDescriptor[String,Int]("buy-path",createTypeInformation[String],createTypeInformation[Int])
   buyPathState=getRuntimeContext.getMapState(msd)
  }
  override def map(value: UserAction): UserBuyPath = {
    val channel = value.channel
    var path=0
    if(buyPathState.contains(channel)){
      path=buyPathState.get(channel)
    }
    if(value.action.equals("buy")){
      buyPathState.remove(channel)
    }else{
      buyPathState.put(channel,path+1)
    }
    UserBuyPath(value.id,value.name,value.channel,buyPathState.get(channel))
  }
}

keyed

class UserBuyPathKeyedBroadcastProcessFunction(msd:MapStateDescriptor[String,Int]) extends KeyedBroadcastProcessFunction[String,UserAction,Rule,String]{
  override def processElement(value: UserAction,
                              ctx: KeyedBroadcastProcessFunction[String, UserAction, Rule, String]#ReadOnlyContext,
                              out: Collector[String]): Unit = {
    println("value:"+value +" key:"+ctx.getCurrentKey)
    println("=====state======")
    for(entry <- ctx.getBroadcastState(msd).immutableEntries().asScala){
      println(entry.getKey+"\t"+entry.getValue)
    }
  }

  override def processBroadcastElement(value: Rule, ctx: KeyedBroadcastProcessFunction[String, UserAction, Rule, String]#Context, out: Collector[String]): Unit = {
     println("Rule:"+value)
    //更新状态
    ctx.getBroadcastState(msd).put(value.channel,value.threshold)
  }
}

case class Rule(channel:String,threshold:Int)
case class UserAction(id:String,name:String ,channel:String,action:String)
var env=StreamExecutionEnvironment.getExecutionEnvironment
// id name channel action
// 001 mack 手机 view
// 001 mack 手机 view
// 001 mack 手机 addToCart
// 001 mack 手机 buy
val userKeyedStream = env.socketTextStream("centos", 9999)
.map(line => line.split("\\s+"))
.map(ts => UserAction(ts(0), ts(1), ts(2), ts(3)))
.keyBy(0)//只可以写一个参数

val msd=new MapStateDescriptor[String,Int]("braodcast-sate",createTypeInformation[String],
                                           createTypeInformation[Int])
// channel 阈值
// 手机类 10
// 电子类 10
val broadcastStream: BroadcastStream[Rule] = fsEnv.socketTextStream("centos", 8888)
.map(line => line.split("\\s+"))
.map(ts => Rule(ts(0), ts(1).toInt))
.broadcast(msd)
userKeyedStream.connect(broadcastStream)
.process(new UserBuyPathKeyedBroadcastProcessFunction(msd))
.print()
env.execute("testoperatorstate")

CheckPoint & SavePoint

CheckPoint是Flink实现故障容错的一种机制,系统会根据配置的检查点定期自动对程序计算状态进行备份。一旦程序在计算过程中出现故障,系统会选择一个最近的检查点进行故障恢复。

SavePoint是一种有效的运维手段,需要用户手动触发程序进行状态备份,本质也是在做CheckPoint。

实现故障恢复的先决条件:

  • 持久的数据源,可以在一定时间内重播记录(例如,FlinkKafkaConsumer)
  • 状态的永久性存储,通常是分布式文件系统(例如,HDFS)
var env=StreamExecutionEnvironment.getExecutionEnvironment
//启动检查点机制
env.enableCheckpointing(5000,CheckpointingMode.EXACTLY_ONCE)
//配置checkpoint必须在2s内完成一次checkpoint,否则检查点终止
env.getCheckpointConfig.setCheckpointTimeout(2000)
//设置checkpoint之间时间间隔 <=  Checkpoint interval
env.getCheckpointConfig.setMinPauseBetweenCheckpoints(5)
//配置checkpoint并行度,不配置默认1
env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
//一旦检查点不能正常运行,Task也将终止
env.getCheckpointConfig.setFailOnCheckpointingErrors(true)
//将检查点存储外围系统 filesystem、rocksdb,可以配置在cancel任务时候,系统是否保留checkpoint
env.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
val props = new Properties()
props.setProperty("bootstrap.servers", "centos:9092")
props.setProperty("group.id", "g1")
env.addSource(new FlinkKafkaConsumer[String]("topic01",new SimpleStringSchema(),props))
.flatMap(line => line.split("\\s+"))
.map((_,1))
.keyBy(0) //只可以写一个参数
.sum(1)
.print()
env.execute("testoperatorstate")

State Backend

State Backend决定Flink如何存储系统状态信息(Checkpoint形式),目前Flink提供了三种State Backend实现。

  • Memory (JobManagwer):这是Flink的默认实现,通常用于测试,系统会将计算状态存储在JobManager的内存中,但是在实际的生产环境中,由于计算的状态比较多,使用Memory 很容易导致OOM(out of memory)。
  • FileSystem:系统会将计算状态存储在TaskManager的内存中,因此一般用作生产环境,系统会根据CheckPoin机制,将TaskManager状态数据在文件系统上进行备份。如果是超大规模集群,TaskManager内存也可能发生溢出。
  • RocksDB:系统会将计算状态存储在TaskManager的内存中,如果TaskManager内存不够,系统可以使用RocksDB配置本地磁盘完成状态的管理,同时支持将本地的状态数据备份到远程文件系统,因此,RocksDB Backend 是推荐的选择。

参考:https://ci.apache.org/projects/flink/flink-docs-release-1.9/ops/state/state_backends.html

每一个Job都可以配置自己状态存储的后端实现

var env=StreamExecutionEnvironment.getExecutionEnvironment
val fsStateBackend:StateBackend = new FsStateBackend("hdfs:///xxx") //MemoryStateBackend、FsStateBackend、RocksDBStateBackend
env.setStateBackend(fsStateBackend)

如果用户不配置,则系统使用默认实现,默认实现可以通过修改flink-conf-yaml文件进行配置

[root@centos ~]# cd /usr/flink-1.8.1/
[root@centos flink-1.8.1]# vi conf/flink-conf.yaml
#==============================================================================
# Fault tolerance and checkpointing
#==============================================================================
# The backend that will be used to store operator state checkpoints if
# checkpointing is enabled.
#
# Supported backends are 'jobmanager', 'filesystem', 'rocksdb', or the
# <class-name-of-factory>.
#
 state.backend: rocksdb
# Directory for checkpoints filesystem, when using any of the default bundled
# state backends.
#
 state.checkpoints.dir: hdfs:///flink-checkpoints
# Default target directory for savepoints, optional.
#
 state.savepoints.dir: hdfs:///flink-savepoints
 
# Flag to enable/disable incremental checkpoints for backends that
# support incremental checkpoints (like the RocksDB state backend).
#
 state.backend.incremental: true

注意,必须在环境变量中出现HADOOP_CLASSPATH

Flink计算发布之后是否还能够修改计算算子?

首先,这在Spark中是不允许的,因为Spark会持久化代码片段,一旦修改代码,必须删除Checkpoint,但是Flink仅仅存储各个算子的计算状态,如果用户修改代码,需要用户在有状态的操作算子上指定uid属性。

env.addSource(new FlinkKafkaConsumer[String]("topic01",new SimpleStringSchema(),props))
.uid("kakfa-consumer")
.flatMap(line => line.split("\\s+"))
.map((_,1))
.keyBy(0) //只可以写一个参数
.sum(1)
.uid("word-count") //唯一
.map(t=>t._1+"->"+t._2)
.print()

Flink Kafka如何保证精准一次的语义操作?

  • https://www.cnblogs.com/ooffff/p/9482873.html
  • https://www.jianshu.com/p/8cf344bb729a
  • https://www.jianshu.com/p/de35bf649293
  • https://blog.csdn.net/justlpf/article/details/80292375
  • https://www.jianshu.com/p/c0af87078b9c (面试题)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/59118.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

原型链污染分析

原型链污染问题 原型链原型的继承原型链污染 原型链 原型的继承 先创建一个对象&#xff0c;查看一下属性 const obj { prop1: 111, prop2: 222,} 这里的Object.prototype就是对象的原型。 原型里面有许多的属性&#xff0c;这里面的constructor是我们需要着重关注的。 除此…

在线LaTeX公式编辑器编辑公式

在线LaTeX公式编辑器编辑公式 在编辑LaTex文档时候&#xff0c;需要输入公式&#xff0c;可以使用在线LaTeX公式编辑器编辑公式&#xff0c;其链接为: 在线LaTeX公式编辑器&#xff0c;https://www.latexlive.com/home 图1 在线LaTeX公式编辑器界面 图2 在线LaTeX公式编辑器…

【iOS RunLoop】

文章目录 前言-什么是RunLoop&#xff1f;默认情况下主线程的RunLoop原理 1. RunLoop对象RunLoop对象的获取 CFRunLoopRef源码部分&#xff08;引入线程相关&#xff09; 2. RunLoop和线程3. RunLoop相关的类RunLoop相关类的实现CFRunLoopModeRef五种运行模式CommonModes CFRun…

VBA技术资料MF39:VBA_计算单元格中的字符数

【分享成果&#xff0c;随喜正能量】依赖也好&#xff0c;不依赖也罢&#xff0c;人的心灵都是需要安放的&#xff0c;有人安放在另一个人身上&#xff0c;有人安放在喜欢的事业之上&#xff0c;有人安放在宗教信仰之上&#xff0c;过程不同&#xff0c;终点都一样&#xff0c;…

全面升级:华为鸿蒙HarmonyOS4正式发布,玩趣个性化,小艺AI升级

8月4日新闻&#xff0c;今天下午&#xff0c;华为正式发布了最新版本的鸿蒙操作系统——HarmonyOS 4&#xff01; 在华为发布会上&#xff0c;鸿蒙HarmonyOS迎来了一系列令人激动的功能升级。其中包括个性化空间、多种生产力工具以及增强的手机AI助手"小艺"。这次更…

自动化应用杂志自动化应用杂志社自动化应用编辑部2023年第11期目录

数据处理与人工智能 大数据视域下无轨设备全生命周期健康管理技术的研究 赖凡; 1-3 三维激光扫描结合无人机倾斜摄影在街区改造测绘中的技术应用 张睿; 4-6 井上变电站巡检机器人的设计与应用 刘芳; 7-9 《自动化应用》投稿邮箱&#xff1a;cnqikantg126.com 基于机…

Visual Studio 2022的MFC框架——应用程序向导

我是荔园微风&#xff0c;作为一名在IT界整整25年的老兵&#xff0c;今天我们来重新审视一下Visual Studio 2022开发工具下的MFC框架知识。 MFC(Microsoft Foundation Class&#xff0c;微软基础类库&#xff09;是微软为了简化程序员的开发工作所开发的一套C类的集合&#xf…

AI 绘画Stable Diffusion 研究(三)sd模型种类介绍及安装使用详解

本文使用工具&#xff0c;作者:秋葉aaaki 免责声明: 工具免费提供 无任何盈利目的 大家好&#xff0c;我是风雨无阻。 今天为大家带来的是 AI 绘画Stable Diffusion 研究&#xff08;三&#xff09;sd模型种类介绍及安装使用详解。 目前&#xff0c;AI 绘画Stable Diffusion的…

深度学习训练营之CGAN生成手势图像

深度学习训练营之CGAN生成手势 原文链接CGAN简单介绍环境介绍前置工作数据导入所需的包加载数据创建数据集查看数据集 模型设置初始化模型的权重定义生成器构造判别器 模型训练定义损失函数设置超参数正式开始训练 结果可视化 原文链接 &#x1f368; 本文为&#x1f517;365天…

Palo Alto Networks® PA-220R 下一代防火墙 确保恶劣工况下的网络安全

一、主要安全功能 1、每时每刻在各端口对全部应用进行分类 • 将 App-ID 用于工业协议和应用&#xff0c;例如 Modbus、 DNP3、IEC 60870-5-104、Siemens S7、OSIsoft PI 等。 • 不论采用何种端口、SSL/SSH 加密或者其他规避技术&#xff0c;都会识别应用。 • 使用…

Apache Flink概述

Flink 是构建在数据流之上的一款有状态的流计算框架&#xff0c;通常被人们称为第三代大数据分析方案 第一代大数据处理方案&#xff1a;基于Hadoop的MapReduce 静态批处理 | Storm 实时流计算 &#xff0c;两套独立的计算引擎&#xff0c;难度大&#xff08;2014年9月&#x…

JavaWeb 项目实现(四) 验证旧密码

1.验证旧密码 步骤很简单&#xff0c;从Session中取到当前密码&#xff0c;和修改密码界面得到的旧密码对比&#xff0c;判断是否相等。 特别之处在于实现用到了Ajax&#xff0c;可以不刷新整个页面的情况下与Web服务器进行通信。 2.Ajax Ajax&#xff08;Asynchronous Java…

Tomcat添加第三方jar包、如何在IDEA中启动部署Web模板

前言:公司最近维护老项目,是最原始的web项目,servlet和jsp结合的web项目,启动的时候配置了好几遍, 都起不来,很折磨人,这个文档比较全配置一遍准备工作 首先 拉取代码: git clone xxx.git ,如需要别的操作,自行baidu 也可以在idea中拉取第一步File ->Project Structure->…

❤ npm不是内部或外部命令,也不是可运行的程序 或批处理文件

❤ npm不是内部或外部命令,也不是可运行的程序 或批处理文件 cmd或者终端用nvm 安装提示&#xff1a; npm不是内部或外部命令,也不是可运行的程序或批处理文件 原因&#xff08;一&#xff09; 提示这个问题&#xff0c;有可能是Node没有安装&#xff0c;也有可能是没有配置…

力扣 1049. 最后一块石头的重量 II

题目来源&#xff1a;https://leetcode.cn/problems/last-stone-weight-ii/description/ C题解&#xff08;思路来源代码随想录&#xff09;&#xff1a;本题其实就是尽量让石头分成重量相同的两堆&#xff0c;相撞之后剩下的石头最小&#xff0c;这样就化解成01背包问题了。 …

【深度学习】SMILEtrack: SiMIlarity LEarning for Multiple Object Tracking,论文

论文&#xff1a;https://arxiv.org/abs/2211.08824 代码&#xff1a;https://github.com/WWangYuHsiang/SMILEtrack 文章目录 AbstractIntroductionRelated WorkTracking-by-DetectionDetection methodData association method Tracking-by-Attention Methodology架构概述外观…

8.4 作业

1.思维导图 2.判断家目录下&#xff0c;普通文件的个数和目录文件的个数 #!/bin/bash count10 count20 cd ~ for i in $(ls) doif [ -f "$i" ]thencount1$((count11))elif [ -d "$i" ]then count2$((count21))fi done echo $count1 echo $count2 3.输入一…

WEB集群——tomcat

1. 简述静态网页和动态网页的区别。 2. 简述 Webl.0 和 Web2.0 的区别。 3. 安装tomcat8&#xff0c;配置服务启动脚本&#xff0c;部署jpress应用。 一、简述静态网页和动态网页的区别 &#xff08;1&#xff09;静态网页 1.什么是静态网页 请求响应信息&#xff0c;发…

高级IO:五种IO模型

五种IO模型 阻塞IO 阻塞IO: 在内核将数据准备好之前, 系统调用会一直等待. 所有的套接字, 默认都是阻塞方式. 非阻塞IO 如果内核还未将数据准备好, 系统调用仍然会直接返回, 并且返回EAGAIN/EWOULDBLOCK错误码. 非阻塞IO往往需要程序员循环的方式反复尝试读写文件描述符, 这…

学习gRPC (三)

测试gRPC例子 编写proto文件实现服务端代码实现客户端代码 通过gRPC 已经编译并且安装好之后&#xff0c;就可以在源码目录下找到example 文件夹下来试用gRPC 提供的例子。 在这里我使用VS2022来打开仓库目录下example/cpp/helloworld目录 编写proto文件 下面是我改写的exa…