一、目的
在完成错误数据表任务后,需要对每条错误数据的错误字段及其字段值进行分析
Hive中原有SQL语句和ClickHouse现有SQL语句很大不同
二、Hive中原有代码
2.1 表结构
--31、静态排队数据清洗记录表 create table if not exists hurys_db.dwd_data_clean_record_queue( id string comment '唯一ID', data_type int comment '1:转向比,2:统计,3:评价,4:区域,5:过车,6:静态排队,7:动态排队,8:轨迹,9:事件数据,10:事件资源', device_no string comment '设备编号', create_time string comment '创建时间', field_name string comment '字段名', field_value string comment '字段值' ) comment '静态排队数据清洗记录表' partitioned by (day string) stored as orc ;
2.2 SQL代码
with t3 as( select id, device_no, case when device_no is null then CONCAT('device_no:','null') END AS device_no_value, create_time, case when lane_no < 0 or lane_no >255 then CONCAT('lane_no:', CAST(lane_no AS STRING)) END AS lane_no_value, case when queue_len < 0 or queue_len > 500 then CONCAT('queue_len:', CAST(queue_len AS STRING)) END AS queue_len_value, case when queue_head < 0 or queue_head > 500 then CONCAT('queue_head:', CAST(queue_head AS STRING)) END AS queue_head_value, case when queue_tail < 0 or queue_tail > 500 then CONCAT('queue_tail:', CAST(queue_tail AS STRING)) END AS queue_tail_value, case when queue_count < 0 or queue_count > 100 then CONCAT('queue_count:', CAST(queue_count AS STRING)) END AS queue_count_value, concat_ws(',', case when device_no is null then CONCAT('device_no:','null') end , case when lane_no < 0 or lane_no >255 then CONCAT('lane_no:', CAST(lane_no AS STRING)) END , case when queue_len < 0 or queue_len > 500 then CONCAT('queue_len:', CAST(queue_len AS STRING)) END, case when queue_head < 0 or queue_head > 500 then CONCAT('queue_head:', CAST(queue_head AS STRING)) END, case when queue_tail < 0 or queue_tail > 500 then CONCAT('queue_tail:', CAST(queue_tail AS STRING)) END, case when queue_count < 0 or queue_count > 100 then CONCAT('queue_count:', CAST(queue_count AS STRING)) END ) AS kv_pairs , day from hurys_db.dwd_queue_error where day='2024-09-10' ) insert overwrite table hurys_db.dwd_data_clean_record_queue partition(day) select id, '6' data_type, t3.device_no, create_time, split(pair, ':')[0] AS field_name, split(pair, ':')[1] AS field_value, day from t3 lateral view explode(split(t3.kv_pairs , ',')) exploded_table AS pair where device_no_value is not null or queue_len_value is not null or lane_no_value is not null or queue_head_value is not null or queue_tail_value is not null or queue_count_value is not null ;
三、ClickHouse中现有代码
3.1 表结构
--31、静态排队数据清洗记录表(长期存储)
create table if not exists hurys_jw.dwd_data_clean_record_queue(
id String comment '唯一ID',
data_type Nullable(Int32) comment '1:转向比,2:统计,3:评价,4:区域,5:过车,6:静态排队,7:动态排队,8:轨迹,9:事件数据,10:事件资源',
device_no Nullable(String) comment '设备编号',
create_time DateTime comment '创建时间',
field_name Nullable(String) comment '字段名',
field_value Nullable(String) comment '字段值',
day Date comment '日期'
)
ENGINE = MergeTree
PARTITION BY day
PRIMARY KEY (day,id)
ORDER BY (day,id)
SETTINGS index_granularity = 8192;
3.2 SQL代码
SELECT id, '6' AS data_type, device_no, create_time, splitByString(':', pair)[1] AS field_name, splitByString(':', pair)[2] AS field_value, day FROM (SELECT id, device_no, create_time, day, arrayConcat( if(device_no IS NULL, ['device_no:null'], []), if(lane_no < 0 OR lane_no > 255, [concat('lane_no:', toString(lane_no))], []), if(queue_len < 0 OR queue_len > 500, [concat('queue_len:', toString(queue_len))], []), if(queue_head < 0 OR queue_head > 500, [concat('queue_head:', toString(queue_head))], []), if(queue_tail < 0 OR queue_tail > 500, [concat('queue_tail:', toString(queue_tail))], []), if(queue_count < 0 OR queue_count > 100, [concat('queue_count:', toString(queue_count))], []) ) AS pairs FROM hurys_jw.dwd_queue_error WHERE device_no IS NULL OR lane_no < 0 OR lane_no > 255 OR queue_len < 0 OR queue_len > 500 OR queue_head < 0 OR queue_head > 500 OR queue_tail < 0 OR queue_tail > 500 OR queue_count < 0 OR queue_count > 100 ) AS subquery array join pairs AS pair ;
注意:1、错误数据表dwd_queue_error的清洗字段不能设置nullable,这是一大坑
2、如果错误数据表中的清洗字段是Decimal(10,1),那么相关字段就要调整
arrayConcat( if(device_no IS NULL, ['device_no:null'], []), if(lane_no < 0 OR lane_no > 255, [concat('lane_no:', toString(lane_no))], []), if(azimuth < 0 OR azimuth > toDecimal32(359.9,1), [concat('azimuth:', toString(azimuth))], []), if(rcs < -64 OR rcs > toDecimal32(63.5,1), [concat('rcs:', toString(rcs))], []), if(prob < 0 OR prob > 100, [concat('prob:', toString(prob))], []) ) AS pairs
3.3 Kettle任务
3.3.1 newtime
3.3.2 替换NULL值
3.3.3 clickhouse输入
3.3.4 字段选择
3.3.5 clickhouse输出
3.3.6 执行任务
3.3.7 海豚调度
由于不需要实时记录,因为把所有数据的清洗记录任务放在一个海豚工作流里面,T+1执行即可!