Apache Sedona很方便读取geojson、ShapeFile、geopackage等文件,提供了很多spark sql函数和rdd算子。下面例子主要用于熟悉spark和sedona的使用。
引入的maven包
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.example</groupId>
<artifactId>GeoJsonToMvt</artifactId>
<version>0.1</version>
<packaging>jar</packaging>
<properties>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<scala.version>2.13.8</scala.version>
<geotrellis.version>3.7.1</geotrellis.version>
<spark.version>3.3.4</spark.version>
<spray.json.version>1.3.6</spray.json.version>
</properties>
<dependencies>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.13</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.13</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.13</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>io.spray</groupId>
<artifactId>spray-json_2.13</artifactId>
<version>${spray.json.version}</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
<version>2.13.4</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.13.4</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
<version>2.13.4</version>
</dependency>
<dependency>
<groupId>org.locationtech.jts</groupId>
<artifactId>jts-core</artifactId>
<version>1.19.0</version>
</dependency>
<dependency>
<groupId>org.geotools</groupId>
<artifactId>gt-main</artifactId>
<version>25.4</version>
</dependency>
<dependency>
<groupId>org.geotools</groupId>
<artifactId>gt-epsg-hsql</artifactId>
<version>25.4</version>
</dependency>
<dependency>
<groupId>org.apache.sedona</groupId>
<artifactId>sedona-viz-3.0_2.13</artifactId>
<version>1.4.1</version>
</dependency>
<dependency>
<groupId>org.apache.sedona</groupId>
<artifactId>sedona-core-3.0_2.13</artifactId>
<version>1.4.1</version>
</dependency>
<dependency>
<groupId>org.apache.sedona</groupId>
<artifactId>sedona-sql-3.0_2.13</artifactId>
<version>1.4.1</version>
</dependency>
<dependency>
<groupId>org.apache.sedona</groupId>
<artifactId>sedona-spark-3.0_2.13</artifactId> <!-- 替换为你的 Spark 和 Scala 版本 -->
<version>1.6.1</version> <!-- 替换为你的 Sedona 版本 -->
</dependency>
<dependency>
<groupId>org.locationtech.proj4j</groupId>
<artifactId>proj4j</artifactId>
<version>1.1.0</version>
</dependency>
</dependencies>
<repositories>
<repository>
<id>osgeo</id>
<name>OSGeo Release Repository</name>
<url>https://repo.osgeo.org/repository/release/</url>
</repository>
</repositories>
<build>
<plugins>
<plugin>
<groupId>org.scala-tools</groupId>
<artifactId>maven-scala-plugin</artifactId>
<version>2.15.2</version>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
下面是java实现的spark代码 部分是由AI(Gemini2.0)生成代码
import org.apache.sedona.spark.SedonaContext;
import org.apache.sedona.viz.core.Serde.SedonaVizKryoRegistrator;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import org.geotools.referencing.CRS;
import org.locationtech.jts.geom.GeometryFactory;
import org.locationtech.jts.io.WKTReader;
import scala.Tuple3;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import static java.lang.Math.cos;
import static java.lang.Math.floor;
import static java.lang.Math.pow;
import static java.lang.Math.toRadians;
import static org.apache.spark.sql.functions.*;
public class GeoJsonToTilesJava {
public static void main(String[] args) throws Exception {
System.setProperty("org.geotools.referencing.forceXY", "true");
// 创建SparkSession
SparkSession config = SedonaContext.builder()
.master("local[*]") // Delete this if run in cluster mode
.appName("readTestJava") // Change this to a proper name
.config("spark.kryo.registrator", SedonaVizKryoRegistrator.class.getName())
.config("spark.sql.extensions", "org.apache.sedona.viz.sql.SedonaVizExtensions,org.apache.sedona.sql.SedonaSqlExtensions")
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.getOrCreate();
SparkSession sedona = SedonaContext.create(config);
Dataset<Row> df = sedona.read().format("geojson")
.option("multiPolygon", "true")
.load("src/main/resources/guangdong.json")
.selectExpr("explode(features) as features") // Explode the envelope to get one feature per row.
.select("features.*") // Unpack the features struct.
.withColumn("name", expr("properties['name']")).drop("properties").drop("type");
Dataset<Row> df1 = df;
df1.show();
//Dataset<Row> filteredDF = df.filter(expr("ST_Y(ST_Centroid(ST_GeomFromGeoJSON(geometry))) BETWEEN -85.05 AND 85.05"));
// 创建坐标转换器
try {
Dataset<Row> geomDF = df.withColumn("sedona_geom",
expr("ST_AsEWKT(ST_Transform(geometry,'epsg:4326','epsg:3857',true))"));
// 使用 UDF 创建 Sedona Geometry 列
//Dataset<Row> geomDF = df.withColumn("sedona_geom", callUDF("createSedonaGeometry", col("geometry")));
geomDF.show();
// 修改: getTileIndexUDF 的返回值类型, 并保证如果坐标为空,则返回一个 null struct
org.apache.spark.sql.api.java.UDF2<String, Integer, Row> getTileIndexUDF = (centroidStr, zoom)->{
if (centroidStr == null || centroidStr.isEmpty()) {
return RowFactory.create(null, null, null);
} else {
String[] str = centroidStr.replace("POINT (", "").replace(")", "").split(" ");
List<Double> coordinates = new ArrayList<>();
coordinates.add(Double.parseDouble(str[0]));
coordinates.add(Double.parseDouble(str[1]));
Tuple3<Integer,Integer,Integer> tileIndex = getTileIndex(coordinates,zoom);
return RowFactory.create(tileIndex._1(), tileIndex._2(), tileIndex._3());
}
};
//CRS.decode("EPSG:4326", true);
// 注册 UDF
StructType tileIndexSchema = new StructType(new StructField[]{
new StructField("x", DataTypes.IntegerType, true, Metadata.empty()),
new StructField("y", DataTypes.IntegerType, true, Metadata.empty()),
new StructField("z", DataTypes.IntegerType, true, Metadata.empty())
});
sedona.udf().register("getTileIndexUDF", getTileIndexUDF, tileIndexSchema);
org.apache.spark.sql.api.java.UDF1<String,String> getCentroid = (geomStr)->{
if (geomStr == null || geomStr.isEmpty()) {
return null;
} else {
GeometryFactory factory = new GeometryFactory();
WKTReader wktReader = new WKTReader(factory);
org.locationtech.jts.geom.Geometry geom = wktReader.read(geomStr);
return geom.getCentroid().toString();
}
};
// 注册 UDF
sedona.udf().register("getCentroid", getCentroid, DataTypes.StringType);
List<Integer> zooms = Arrays.asList(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14);
// 关键修改:直接在原始的 geomDF 上面进行循环操作
for (Integer zoom : zooms){
Dataset<Row> tilesDF = geomDF.withColumn("centroid",callUDF("getCentroid", col("sedona_geom")))
.withColumn("tile", callUDF("getTileIndexUDF", col("centroid"), lit(zoom)));
tilesDF.show();
Dataset<Row> groupedTiles = tilesDF
.filter(col("tile").isNotNull())
.groupBy(col("tile"))
.agg(collect_list("sedona_geom").alias("features"));
groupedTiles.write().format("json").mode("overwrite").save("D:/temp/output/zoom_" + zoom);
}
sedona.stop();
}catch (Exception e){
e.printStackTrace();
}
}
private static Tuple3<Integer, Integer, Integer> getTileIndex(List<Double> coordinates, int zoom) {
if (coordinates == null || coordinates.isEmpty()) {
return null;
} else {
double x = coordinates.get(0);
double y = coordinates.get(1);
double res = 156543.03392 * cos(toRadians(0)) / pow(2, zoom);
int tileX = (int) floor((x + 20037508.34) / (res * 256));
int tileY = (int) floor((20037508.34 - y) / (res * 256));
return new Tuple3<>(tileX,tileY,zoom);
}
}
}
参考链接
https://sedona.apache.org/1.7.0/tutorial/sql/
https://sedona.apache.org/1.7.0/api/sql/Function/
谷歌双子座AIhttps://aistudio.google.com/