在本人另一篇文章(高德地图地理编码python(版本3.9)爬虫(含坐标转换及数据表模板)-CSDN博客)的基础上增加geopandas功能,使脚本能自动根据查找的高德地图坐标与现有的几何范围进行交互,根据坐标计算距离和判断是否在几何范围内,并将WGS84坐标和距离信息填写在表格中:
如发现问题欢迎指正,如有需求和交流请联系Q:775915005
增加功能包括:1.geopandas表格连接,将范围的几何坐标信息连接到表格
2.坐标转换,平面与经纬度坐标互转
3..判断提供的范围是否多对象,判断坐标点是否在提供的范围(譬如园区)内,如果不在范围内则计算到范围的平面距离
4.根据表格地点名称或地址查找高德地图的坐标,如果查出多个坐标,则计算提供的范围最短距离的坐标
5.根据提供的几何范围生成随机坐标(按需使用)
完整示例及说明:
示例文件下载:【免费】高德地图python地理编码和geopandas应用判断坐标点空间位置示例文件资源-CSDN文库
带编号的企业原始表格数据(共同字段名YQBH)
带编号的几何范围(共同字段名YQBH)
最终生成表格和对应的shp,距离为0表示在YQBH对应的几何范围内
完整代码:(207-209行按实际情况修改)
# -*- coding:utf-8 -*-
# ---------------------------------------------------------------------------
# Author: LGZ
# Created on:
# Reference:
# coding:cp936 or coding:utf-8
# ---------------------------------------------------------------------------
import logging, os, pprint
import pandas as pd
import numpy as np
# import arcpy
# import itertools, random, math
errfile = r"C:\SoftWare\ArcGISPro\bin\Python\ErrorRecord\ex.txt"
loggingfile = r"C:\SoftWare\ArcGISPro\bin\Python\ErrorRecord\lg.txt"
# logging.disable(logging.CRITICAL) #禁用CRITICAL级别以下的日志记录
# 按此格式显示DEBUG级别以上的日志记录
# logging.basicConfig(filename=loggingfile, level=logging.DEBUG, format="%(asctime)s-%(levelname)s-%(message)s")
logging.basicConfig(level=logging.DEBUG, format="%(asctime)s-%(levelname)s-%(message)s")
def logprint(*shape):
return logging.debug(pprint.pformat([*shape]))
# file =
# if os.path.isfile(file): # 判断文件是否已存在,已存在则删除
# os.unlink(file)
# if os.path.dirname(file) == "" : # 判断是否没有完整路径
# file = os.getcwd()+"//"+file # 如没有则连接工作目录制造完整路径
# file_list=os.listdir(file) # 根据需要的文件类型获取目录下文件名列表
# file_list=[i for i in file_list if i.endswith("xlsx") and \
# not i.startswith("~$")]
# save_fold =
# os.makedirs(save_fold, exist_ok=True) # 根据文件夹是否已存在制造文件夹
import math
x_pi = float(3.14159265358979324 * 3000.0 / 180.0)
# //pai
pi = float(3.1415926535897932384626)
# //离心率
ee = float(0.00669342162296594323)
# //长半轴
a = float(6378245.0)
# //经度转换
def transformlat(lng:float, lat:float)-> float:
ret = -100.0 + 2.0 * lng + 3.0 * lat + 0.2 * lat * lat + 0.1 * lng * lat + 0.2 * math.sqrt(abs(lng))
ret += (20.0 * math.sin(6.0 * lng * pi) + 20.0 * math.sin(2.0 * lng * pi)) * 2.0 / 3.0
ret += (20.0 * math.sin(lat * pi) + 40.0 * math.sin(lat / 3.0 * pi)) * 2.0 / 3.0
ret += (160.0 * math.sin(lat / 12.0 * pi) + 320 * math.sin(lat * pi / 30.0)) * 2.0 / 3.0
return ret
# //纬度转换
def transformlng(lng:float, lat:float)-> float:
ret = 300.0 + lng + 2.0 * lat + 0.1 * lng * lng + 0.1 * lng * lat + 0.1 * math.sqrt(abs(lng))
ret += (20.0 * math.sin(6.0 * lng * pi) + 20.0 * math.sin(2.0 * lng * pi)) * 2.0 / 3.0
ret += (20.0 * math.sin(lng * pi) + 40.0 * math.sin(lng / 3.0 * pi)) * 2.0 / 3.0
ret += (150.0 * math.sin(lng / 12.0 * pi) + 300.0 * math.sin(lng / 30.0 * pi)) * 2.0 / 3.0
return ret
# //国测局转84
def gcj02towgs84(lng:float, lat:float)-> list:
dlat = transformlat(lng - 105.0, lat - 35.0)
dlng = transformlng(lng - 105.0, lat - 35.0)
radlat = lat / 180.0 * pi
magic = math.sin(radlat)
magic = 1 - ee * magic * magic
sqrtmagic = math.sqrt(magic)
dlat = (dlat * 180.0) / ((a * (1 - ee)) / (magic * sqrtmagic) * pi)
dlng = (dlng * 180.0) / (a / sqrtmagic * math.cos(radlat) * pi)
mglat = lat + dlat
mglng = lng + dlng
return [lng * 2 - mglng, lat * 2 - mglat]
def requrl(name:str,ADDRESS:str,geo,key,city,XJQ):
OUTPUT = 'JSON'
nameparams = {
'city': city, # 修改为对应城市名称
'address': name,
'key': key, # 修改为自己注册的高德地图key
'output': OUTPUT}
ADDRESSparams = {
'city': city, # 修改为对应城市名称
'address': ADDRESS,
'key': key, # 修改为自己注册的高德地图key
'output': OUTPUT}
nameurl = "https://restapi.amap.com/v3/geocode/geo?parameters"
namej = json.loads(requests.get(nameurl, nameparams).content)
namelst = []
ADDRESSurl = "https://restapi.amap.com/v3/geocode/geo?parameters"
ADDRESSj = json.loads(requests.get(ADDRESSurl, ADDRESSparams).content)
ADDRESSlst = []
# if j['status'] == '1' and int(j['count']) >= 1:
# 使用名称
if namej['status'] == '1':
poislst = namej['geocodes']
for i in range(len(poislst)): # 能查出多少个POI
if poislst[i]['district'] == XJQ: # 是否在白云区内
location = poislst[i]['location'].split(',')
lng, lat = map(float, location)
# 进行转换为平面坐标
x, y = transformer_plane.transform(gcj02towgs84(lng, lat)[0], gcj02towgs84(lng, lat)[1])
point = Point(x, y)
polygon = geo
distance_to_boundary = distance_to_multipolygon_boundary(polygon, point)
namelst.append((gcj02towgs84(lng, lat)[0], gcj02towgs84(lng, lat)[1],distance_to_boundary)) # 将经纬坐标和距离放进列表
logprint(f"The distance from the namepoint to the polygon boundary is {distance_to_boundary:.2f} meters.")
namelst.sort(key=lambda x: x[2]) # 按照距离排序
logprint(namelst)
# 使用地址
if ADDRESSj['status'] == '1':
geocodeslst = ADDRESSj['geocodes']
for i in range(len(geocodeslst)): # 能查出多少个geocodes
if geocodeslst[i]['district'] == XJQ: # 是否在白云区内
location = geocodeslst[i]['location'].split(',')
lng, lat = map(float, location)
# 进行转换为平面坐标
x, y = transformer_plane.transform(gcj02towgs84(lng, lat)[0], gcj02towgs84(lng, lat)[1])
point = Point(x, y)
polygon = geo
distance_to_boundary = distance_to_multipolygon_boundary(polygon, point)
ADDRESSlst.append(
(gcj02towgs84(lng, lat)[0], gcj02towgs84(lng, lat)[1], distance_to_boundary)) # 将经纬坐标和距离放进列表
logprint(f"The distance from the ADDRESSpoint to the polygon boundary is {distance_to_boundary:.2f} meters.")
ADDRESSlst.sort(key=lambda x: x[2]) # 按照距离排序
logprint(ADDRESSlst)
if len(ADDRESSlst) > 0 and len(namelst) > 0:
if namelst[0][2] < ADDRESSlst[0][2]:
return ["使用名称",str(namelst[0][0]), str(namelst[0][1]), str(namelst[0][2])]
else:
return ["使用地址",str(ADDRESSlst[0][0]), str(ADDRESSlst[0][1]), str(ADDRESSlst[0][2])]
elif len(ADDRESSlst) > 0 and len(namelst) == 0:
return ["使用地址",str(ADDRESSlst[0][0]), str(ADDRESSlst[0][1]), str(ADDRESSlst[0][2])]
elif len(ADDRESSlst) == 0 and len(namelst) > 0:
return ["使用名称",str(namelst[0][0]), str(namelst[0][1]), str(namelst[0][2])]
else:
return ["未查询到", "", "", ""]
def generate_random_point(polygon):
minx, miny, maxx, maxy = polygon.bounds
while True:
# 在多边形的边界框内生成随机坐标
point = Point(random.uniform(minx, maxx), random.uniform(miny, maxy))
# 检查这个点是否在多边形内部(包括边界)
if polygon.contains(point) or polygon.touches(point):
x, y = transformer_longlat.transform(point.x, point.y)
return [x, y]
def distance_to_multipolygon_boundary(multipolygon, point):
# 检查是否为MultiPolygon
if isinstance(multipolygon, MultiPolygon):
# 初始化最小距离为无穷大
min_distance = np.inf
for polygon in multipolygon.geoms:
if polygon.contains(point):
return 0
# 计算点到当前多边形边界的距离
distance = polygon.exterior.distance(point)
# 更新最小距离
if distance < min_distance:
min_distance = distance
return min_distance
elif hasattr(multipolygon, 'exterior'):
# 如果是单个Polygon,则直接计算距离
if multipolygon.contains(point):
return 0
return multipolygon.exterior.distance(point)
else:
raise TypeError("The geometry is neither a Polygon nor a MultiPolygon.")
if __name__ == '__main__':
"""脚本单独使用时运行以下内容"""
"""----------------------------------------------"""
"""---------------------PARA---------------------"""
try:
logprint("start of program")
# TODO
import random
import requests, json, time
import openpyxl
from pyproj import Transformer
import geopandas as gpd
import pandas as pd
from shapely.geometry import Point,MultiPolygon
transformer_plane = Transformer.from_crs("EPSG:4490", "EPSG: 4526", always_xy=True)
transformer_longlat = Transformer.from_crs("EPSG:4526", "EPSG:4490", always_xy=True)
KEY = '***' # 修改为对应高德Key
city = '广州市' # 修改为对应城市名称
XJQ = '白云区' # 修改为对应县级区名称
dbfpath = r".\无坐标的企业数据0126.xls"
shppath = r".\范围.shp"
outpathshp = r".\新增企业坐标20250126BF.shp"
outpathxlsx = r".\新增企业坐标20250126BF.xlsx"
# read
dbfdata = pd.read_excel(dbfpath) # 需要读取全路径
shpdata = gpd.read_file(shppath, encoding='gbk') # 需要读取全路径
# dbfdata.drop(columns=['geometry'], inplace=True)
mergedf = pd.merge(dbfdata, shpdata, how='left', on=['YQBH'])
# mergedfhead = mergedf.head(10)
mergedfhead = mergedf
logprint(mergedfhead)
for index, row in mergedfhead.iterrows():
logprint(f"Index: {index}")
name = row["企业名称"]
ADDRESS = row["地址"]
geo = row["geometry"]
# logprint( f"geo.bounds:{geo.bounds}")
result = requrl(name, ADDRESS, geo, KEY, city, XJQ)
# result = requrlpoi_add(name, ADDRESS, geo)
mergedfhead.at[index, "备注"] = result[0]
mergedfhead.at[index, "经度"] = result[1]
mergedfhead.at[index, "纬度"] = result[2]
mergedfhead.at[index, "距离"] = result[3]
random_point = generate_random_point(geo)
mergedfhead.at[index, "随机经度"] = random_point[0]
mergedfhead.at[index, "随机纬度"] = random_point[1]
pprint.pprint(mergedfhead)
gdf = gpd.GeoDataFrame(mergedfhead, geometry='geometry')
gdf.to_file(outpathshp, encoding='gbk')
gdf.to_excel(outpathxlsx, engine='openpyxl',index=False)
# gdf.to_csv(outpathcsv, encoding='gbk',index=False)
# assert a != 0, "a不能为0"
# raise Exception("XX")
except Exception as ex:
# If an error occurred, print line number and error message
import traceback
import sys
tb = sys.exc_info()[2]
print("捕获到的异常信息是:{}".format(ex)) # 或者使用print("捕获到的异常信息是:",ex.args[0]或者str(ex)或者直接ex);3.9版本ex.message已不可用)
print("捕获到的异常代码行号是:Line {0}".format(tb.tb_lineno))
print(traceback.format_exc()) # 显示完整错误路径
# with open(errfile, "a") as err:
# err.write(traceback.format_exc())
# print("将traceback信息写入文件成功")
# except arcpy.ExecuteError:
# print(arcpy.GetMessages())
# 无错误运行else后代码
else:
logprint("program success")
# arcpy.AddMessage("program success")
# 有没有错误均运行finally后代码
finally:
pass