与Spark集成分析
与Spark集成分析
Ganos Spark模块允许用户基于Apache Spark分布式系统进行大规模的地理信息数据处理与分析。它基于Spark环境提供了一系列的接口进行数据加载、分析和保存。Ganos Spark提供了不同级别的数据分析模型,最基础的是GeometryRDD模型,用来实现Ganos数据中SimpleFeature与Spark中RDD模型的之间的转换。在GeometryRDD基础上,Ganos Spark基于SparkSQL设计了一系列用于空间数据表达的UDT与UDF或UDAF,允许用户使用类似SQL结构化查询语言进行数据的查询与分析。Ganos Spark整体框架如下:
1. 初始化Ganos Spark环境
1.1 获取依赖包
首先请从此链接获取GanosSpark SDK开发包 下载GanosSpark SDK开发包
获取ganos-spark-runtime-1.0-SNAPSHOT.jar包后,在pom文件中增加依赖:
<dependency>
<groupId>com.aliyun.ganos</groupId>
<artifactId>ganos-spark-runtime</artifactId>
<version>1.0-SNAPSHOT</version>
<scope>system</scope>
<systemPath>/ganos-spark-runtime-1.0-SNAPSHOT.jar</systemPath>
</dependency>
同时还需要hbase与hbase ganos相关依赖,具体可参考HBase Ganos教程。
1.2 连接HBase Ganos
首先需要初始化Spark环境,创建SparkContext对象:
import com.aliyun.ganos.spark.{GanosSpark, GanosSparkKryoRegistrator}
import org.apache.hadoop.conf.Configuration
import org.apache.spark.{SparkConf, SparkContext}
import org.geotools.data.simple.SimpleFeatureStore
import org.geotools.data.{DataStore, DataStoreFinder, DataUtilities, Query}
import org.geotools.factory.Hints
import org.geotools.geometry.jts.JTSFactoryFinder
import org.locationtech.jts.geom.Coordinate
import org.opengis.feature.simple.SimpleFeature
var sc: SparkContext = _
val conf = new SparkConf().setMaster("local[2]").setAppName("testSpark")
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
conf.set("spark.kryo.registrator", classOf[GanosSparkKryoRegistrator].getName)
sc = SparkContext.getOrCreate(conf)
然后通过GanosSpark对象连接HBase Ganos并加载数据。连接HBase Ganos需要指定相关连接参数并保存为Map类型,具体参数如下:
//连接HBase Ganos
val dsParams = Map(
"hbase.catalog" -> "AIS",
"hbase.zookeepers" -> "localhost",
"geotools" -> "true")
val ds = DataStoreFinder.getDataStore(dsParams)
//加载数据转换为RDD模型:
val rdd = GanosSpark(dsParams).rdd(new Configuration(), sc, dsParams, new Query("...."))
上面代码会将用户配置的标准的Query对象下推到HBase Ganos并进行查询,并将查询结果返回Spark集群,最后转为RDD[SimpleFeature]数据类型。
3. 使用Spark SQL查询数据
3.1 GanosSpark SQL模型
Ganos Spark提供了
其中UDT模型包括:
- GeometryUDT
- PointUDT
- LineStringUDT
- PolygonUDT
- MultiPointUDT
- MultiLineStringUDT
- MultiPolygonUDT
- GeometryCollectionUDT
3.2 使用Spark SQL加载数据
加载Spark SQL运行环境:
import scala.collection.JavaConversions._
import org.apache.spark.sql.{SQLTypes, SparkSession}
import com.aliyun.ganos.spark.GanosSparkKryoRegistrator
implicit val displayer: String => Unit = { s => kernel.display.content("text/html", s) }
//初始化Ganos Spark环境
val params = Map("hbase.zookeepers"->"localhost","hbase.catalog"->"AIS")
val ds = org.geotools.data.DataStoreFinder.getDataStore(params)
val sparkSession = SparkSession.builder
.appName("Simple Application")
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.config("spark.kryo.registrator", classOf[GanosSparkKryoRegistrator].getName)
.master("local[*]")
.getOrCreate()
SQLTypes.init(spark.sqlContext)
3.3 UDF时空算子
成功创建构建SparkSession对象后,就可以通过read方法加载数据为DataFrame对象了。Ganos Spark提供了一系列UDF时空算子实现基于SQL的时空数据查询,详细介绍请参考:https://yuque.antfin-inc.com/adb-spatial/fx8m4k/oolqhw
示例1:加载DataFrame对象
如需要进行空间过滤:
// 读取Ganos轨迹数据生成DataFrame
val dataFrame = sparkSession.read
.format("ganos")
.options(params)
.option("ganos.feature", "POINT")
.load()
dataFrame.createOrReplaceTempView("aispoint");
println(dataFrame.schema)
示例2:使用SQL UDF时空查询:
//船舶轨迹数据时空查询
val spatialDf1 = spark.sql(
"""
|SELECT ship_id,status,dtg,geom FROM aispoint
|WHERE st_contains(st_makeBBOX(114.00000,22.00000,115.00000,23.00000), geom)
|AND dtg between cast('2018-09-08T01:00:00Z' as timestamp) AND cast('2018-09-13T01:00:00Z' as timestamp)
""".stripMargin)
println("查询录数:" + spatialDf1.count())
spatialDf1.createOrReplaceTempView("point")
spatialDf1.show
输出结果如下图所示:
查询录数:4291
+---------+------+-------------------+--------------------+
| ship_id|status| dtg| geom|
+---------+------+-------------------+--------------------+
|566652000| 机动船在航|2018-09-08 09:04:00|POINT (114.749211...|
|566652000| 机动船在航|2018-09-08 09:05:19|POINT (114.74888 ...|
|566652000| 机动船在航|2018-09-08 09:08:00|POINT (114.74858 ...|
|413280000| 锚泊|2018-09-08 09:11:39|POINT (114.710655...|
|566652000| 机动船在航|2018-09-08 09:21:20|POINT (114.746276...|
|413280000| 锚泊|2018-09-08 09:20:38|POINT (114.710836...|
|566652000| 机动船在航|2018-09-08 09:27:01|POINT (114.747328...|
|413280000| 锚泊|2018-09-08 09:32:37|POINT (114.710993...|
|566652000| 机动船在航|2018-09-08 09:41:01|POINT (114.7508 2...|
|413280000| 锚泊|2018-09-08 09:38:38|POINT (114.710961...|
|566652000| 机动船在航|2018-09-08 09:42:50|POINT (114.751018...|
|566652000| 机动船在航|2018-09-08 09:48:20|POINT (114.750356...|
|566652000| 机动船在航|2018-09-08 09:50:10|POINT (114.750155...|
|566652000| 机动船在航|2018-09-08 09:52:00|POINT (114.749886...|
|413280000| 锚泊|2018-09-08 09:56:38|POINT (114.711043...|
|566652000| 机动船在航|2018-09-08 10:04:01|POINT (114.748488...|
|566652000| 机动船在航|2018-09-08 10:07:50|POINT (114.747945...|
|566652000| 机动船在航|2018-09-08 10:10:19|POINT (114.7476 2...|
|566652000| 机动船在航|2018-09-08 10:12:09|POINT (114.74736 ...|
|413280000| 锚泊|2018-09-08 10:05:38|POINT (114.71118 ...|
+---------+------+-------------------+--------------------+
only showing top 20 rows
4. 数据保存
用户可以通过DataFrame的write接口将分析结果保存在HBase Ganos中的result表中。注意,result表一定要在写入前已经创建,否则下面程序将报错。
//通过Ganos Spark Connector将查询结果反写入Ganos
spatialDf1.write.format("ganos").options(params).option("ganos.feature", "result").save()
版权声明
本文仅代表作者观点,不代表本站立场。
本文系作者授权发表,未经许可,不得转载。
评论