文章目录
- 1. 大数据项目结构
- 2. 类说明
- 2.1 公共接口类
- 2.2 TaskNameEnum指定每个任务的名称
- 2.3 TaskRunner中编写任务的业务逻辑
- 3. 任务执行脚本
每个公司内部都有一套自己的架子,一般新人来了就直接在已有的架子上开发业务。
以下仅仅作为记录下自己使用的架子,不作为任何推荐,也不认为这样的组织结构就是好用的。
1. 大数据项目结构
项目的整体组织结构
目录 | 说明 |
---|---|
annotation | 自定义注解Runner和Task。 |
app | 用来放整个项目的各个任务。 test1和test2是具体开发的业务任务。 |
base | BaseRunner和BaseTask是两个基础类 |
enums | 用来定义任务的别名 |
FeatureContextApp | 主类在目录中的位置保持不变,如果移动,会影响扫描task和Runner |
2. 类说明
2.1 公共接口类
package com.king.ml.base
import com.king.ml.enums.TaskNameEnum
import org.apache.spark.SparkConf
import org.apache.spark.internal.Logging
import org.apache.spark.sql.SparkSession
import org.joda.time.DateTime
import org.springframework.util.StopWatch
import scala.util.{Failure, Success, Try}
trait BaseTask extends Logging with Serializable {
def taskName: TaskNameEnum.Value
def initConf(sparkConf: SparkConf = new SparkConf()): SparkConf = sparkConf
var runtime: StopWatch = _
def around(implicit spark: SparkSession, currDate: DateTime = DateTime.now): Unit = {
before
Try {
Class.forName(spark.conf.get("task.runner"))
.newInstance()
.asInstanceOf[BaseRunner]
.run
} match {
case Success(_) => after
case Failure(_) => afterThrowException
}
}
private def before(implicit spark: SparkSession, currDate: DateTime): Unit = {
val taskName = spark.conf.get("task.runner")
println("开始执行任务 ...["+taskName+"]")
runtime = new StopWatch(taskName)
runtime.start(taskName)
}
private def after(implicit spark: SparkSession, currDate: DateTime): Unit = {
val taskName = spark.conf.get("task.runner")
runtime.stop()
println("任务执行结束 ...["+ taskName+"],共耗时:" + runtime.getTotalTimeSeconds +"秒")
}
private def afterThrowException(implicit spark: SparkSession, currDate: DateTime): Unit = {
val taskName = spark.conf.get("task.runner")
runtime.stop()
println("任务执行异常 ...[" + taskName + "],共耗时:" + runtime.getTotalTimeSeconds + "秒")
}
}
通过一个公共的接口记录每个任务执行的具体日志信息。
2.2 TaskNameEnum指定每个任务的名称
object TaskNameEnum extends Enumeration {
def getEnumType(source:String):TaskNameEnum.Value = {
val values =TaskNameEnum.values.toList.filter(_.toString.toUpperCase == source.toUpperCase)
values.length match {
case 1 => values.head
case _ => throw new IllegalArgumentException("该任务不存在")
}
}
val Test1 = Value("ods.ods_test1")
val Test2 = Value("ods.ods_test2")
}
这里的Test1和Test2表示任务的名称。
2.3 TaskRunner中编写任务的业务逻辑
package com.king.ml.app.test1
import com.king.ml.annotation.Runner
import com.king.ml.base.BaseRunner
import com.king.ml.enums.TaskNameEnum
import org.apache.spark.sql.SparkSession
import org.joda.time.DateTime
@Runner
class Test1TaskRunner extends BaseRunner{
override def taskName: TaskNameEnum.Value = TaskNameEnum.Test1
override def run(implicit spark: SparkSession, currDate: DateTime): Unit = {
val cnt = spark.table("ods.ods_test1").count()
println("===>总记录数为:")
println("===>" + cnt)
}
}
3. 任务执行脚本
在执行脚本中,任务主程序名不需要改变,只需要给任务传参枚举中任务名的值即可。
spark-submit \
--name 'test-ml' \
--master yarn \
--deploy-mode client \
--conf spark.port.maxRetries=100 \
--conf spark.dynamicAllocation.enabled=false \
--conf spark.yarn.executor.memoryOverhead=5120 \
--queue root.production \
--driver-memory 2g --num-executors 2 --executor-memory 2g --executor-cores 1 \
--class com.king.ml.app.FeatureContextApp \
./ml/ml-demo.jar "ods.ods_test1"