import json
import os
from enum import Enum
class LaneDirectionType(int, Enum):
LaneDirectionType_Unknown = -1 # 类型未知
OneWay = 1 # 单向
TwoWay = 2 # 双向
# 颜色类型
class ColorCombo(int, Enum):
NOUSE = 0 # 默认值
UNKNOWN = 1000 # 未定义
WHITE = 1 # 白色(默认值)
YELLOW = 2 # 黄色
ORANGE = 3 # 橙色
BLUE = 4 # 蓝色
GREEN = 5 # 绿色
LEFT_WHITE_RIGHT_YELLOW = 6 # 左白右黄
LEFT_YELLOW_RIGHT_WHITE = 7 # 左黄右白
RED = 8 # 红色
class LaneTurnType(int, Enum):
NOUSE = 0 # 默认值
UNKNOWN = 1000 # 未定义
AHEAD = 1 # 直行
LEFT = 2 # 左转
RIGHT = 3 # 右转
U_TURN = 4 # 掉头
class ObjectLaneType(int, Enum):
NOUSE = 0 # 默认值
UNKNOWN = 1000 # 未分类
VIRTUAL_WIRE = 1 # 虚拟线
THICK_DASHED_LINE_SEGMENT = 2 # 粗虚线段
SINGLE_DASHED_LINE = 3 # 单虚线
SINGLE_SOLID_LINE = 4 # 单实线
DOUBLE_DASHED_LINE = 5 # 双虚线
DOUBLE_SOLID_LINE = 6 # 双实线
LEFT_SOLID_RIGHT_DASHED_LINE = 7 # 左实右虚线
RIGHT_SOLID_LEFT_DASHED_LINE = 8 # 右实左虚线
FOUR_SOLID_LINE = 9 # 四实线
class LongitudinalType(int, Enum):
COMMON = 0 # 常规标线
DISTANCE_CONFIRM_LINE = 1 # 白色半圆状车距确认线
LOW_SPEED_LINE = 2 # 车行道纵向减速标线
GORE_AREA_LINE = 3 # 导流区边线
NO_PARKING_LINE = 4 # 禁停区边线
PARKING_LINE = 5 # 停车位边线
VARIABLE_GUIDANCE_LINE = 6 # 可变导向车道线
# 路边条带枚举定义
class ObjectFenceType(int, Enum):
NOUSE = 0 # 默认值
UNKNOWN = 1000 # 未分类
CURB = 1 # 路缘石
GUARDRAIL = 2 # 护栏
WALL = 3 # 墙体
GEOGRAPHICAL_BOUNDARTES = 4 # 地理边界
GREENBELTS = 5 # 绿化带
OTHER_HARD_ISOLATION = 6 # 其它硬隔离
PARKING_POST = 7 # 停车场柱子
def read_data(input_data):#./data3转换/output/semantic
feature = []
for _file_name in os.scandir(input_data):#会遍历该目录下的所有文件和子目录
with open(_file_name, encoding='utf-8') as fh:
feature_collection = json.loads(fh.read())#fh.read()会读取文件的所有内容作为字符串
feature.append(feature_collection)
return feature
def transform_line_properties(id="", groupid="", color=ColorCombo.NOUSE, color_tf=100, type=ObjectLaneType.NOUSE,
type_tf=100,
longitudinal_type=str(LongitudinalType.COMMON.value), aggregation_count=1,
taskid="0", update_time=0):
properties = {
"id": str(id),
"groupid": groupid,
"color": color,
# 置信度默认赋值 100
"color_tf": color_tf,
"type": type,
# 置信度默认赋值 100
"type_tf": type_tf,
"longitudinal_type": longitudinal_type,
# 聚类次数默认赋值 1
"aggregation_count": aggregation_count,
"taskid": taskid,
"update_time": update_time
}
return properties
def transform_boundary_properties(id="", type=6, type_tf=100, aggregation_count=1, taskid="0", update_time=0):
properties = {
"id": str(id),
"type": type,
# 置信度默认赋值 100
"type_tf": type_tf,
# 聚类次数默认赋值 1
"aggregation_count": aggregation_count,
"taskid": taskid,
"update_time": update_time
}
return properties
def transform_trajectory_properties(id=0,
lanenode_id_s=-1,
lanenode_id_e=-1,
speed=0,
turn_type=LaneTurnType.NOUSE,
collect_num=1,
direction=LaneDirectionType.LaneDirectionType_Unknown,
taskid="0",
update_time=0):
properties = {
"id": id,
"lanenode_id_s": lanenode_id_s,
"lanenode_id_e": lanenode_id_e,
"speed": speed,
"turn_type": turn_type,
"collect_num": collect_num,
"direction": direction,
"taskid": taskid,
"update_time": update_time
}
return properties
def transform_line(data):
# if isinstance(data["properties"]["longitudinal_type"], list):
# longitudinal_type = ','.join([str(x) for x in data["properties"]["longitudinal_type"]])
# elif isinstance(data["properties"]["longitudinal_type"], str):
# longitudinal_type = data["properties"]["longitudinal_type"]
# else:
# longitudinal_type = ""
new_properties = transform_line_properties(id=data["properties"]["id"],
color=data["properties"]["color"],
color_tf=data["properties"]["color_tf"],
type=ObjectLaneType.SINGLE_DASHED_LINE)
data["properties"] = new_properties
def transform_boundary(data):
new_properties = transform_boundary_properties(id=data["properties"]["id"],
type=ObjectFenceType.OTHER_HARD_ISOLATION)
data["properties"] = new_properties
#修改参数,可以改上层参数
def transform_trajectory(data):
new_properties = transform_trajectory_properties(id=data["properties"]["id"])
data["properties"] = new_properties
def save_data(data_list, out_data_path):
directory = os.path.dirname(out_data_path)
if not os.path.exists(directory):
os.makedirs(directory)
# out_data_path = out_data_path + save_type + ".geojson"
out_data= {"type": "FeatureCollection",
"features":data_list}
with open(out_data_path, 'w', encoding='utf-8') as fp:
fp.write(json.dumps(out_data, ensure_ascii=False))
def transform_format(in_data_path,out_path):
# 读取semantic数据 转成字典保存在list中返回
semantic_features = read_data(os.path.join(in_data_path, "semantic"))#./data3转换/output/semantic
boundary_data = []#道路边线
line_data = []#车道线
for data_features in semantic_features:
for data_feature in data_features["features"]:#遍历list 包括几何信息,属性信息
# 云端建图后续可能会有字段枚举值
if data_feature["properties"]["type"] == 6:
transform_boundary(data_feature)#只更新对象里的属性值,其他的保留
boundary_data.append(data_feature)
if data_feature["properties"]["type"] == 1:
transform_line(data_feature)
line_data.append(data_feature)
out_boundary_data_path = os.path.join(out_path, "semantic", "Boundary.geojson")
out_line_data_path = os.path.join(out_path, "semantic", "Line.geojson")
save_data(boundary_data, out_boundary_data_path)
save_data(line_data, out_line_data_path)
if __name__ == '__main__':# 用于确保该代码块只在作为主程序运行时才执行,而在被导入为模块时不执行
in_data_path = "/home/linux/下载/557040098/output"
out_path = "/home/linux/下载/557040098/output"
transform_format(in_data_path, out_path)
# # 读取trajectory数据
# trajectory_features = read_data(os.path.join(in_data_path,"trajectory"))
#
# for data_features in trajectory_features.items():
# for data_feature in data_features["features"]:
# transform_trajectory(data_feature)
# out_trajectory_data_path = os.path.join(out_path, "trajectory", "Lane")
# save_data(trajectory_features, out_trajectory_data_path)
json.load()
和 json.loads()
都是用于读取 JSON 格式数据的函数,其中 json.load()
用于读取文件(File)对象,而 json.loads()
用于读取字符串(str)对象。
具体来说,json.load()
的参数应该是一个打开的文件对象,而 json.loads()
的参数应该是一个字符串对象。json.loads()
会将这个字符串解码为 Python 对象,而 json.load()
则会将文件中的 JSON 数据解码为 Python 对象。