Spark使用OSS Select加速数据查询
本文介绍如何配置 Spark 使用 OSS Select 加速数据查询。
背景信息
说明 文中所有 ${} 的内容为环境变量,请根据您实际的环境修改。
步骤一:配置 Spark 支持读写 OSS
由于默认 Spark 并没有将 OSS 的支持包放到它的 CLASSPATH 里面,所以我们需要配置 Spark 支持读写 OSS。您需要在所有的 CDH 节点执行以下操作:
步骤二:配置 Spark 支持 OSS Select
OSS Select 详情请参见OSS Select,以下内容基于 oss-cn-shenzhen.aliyuncs.com 这个 OSS EndPoint 来进行。您需要在所有的 CDH 节点执行以下操作:
对比测试
测试环境:使用 spark on yarn 进行对比测试,其中 Node Manager 节点是4个,每个节点最多可以运行4个 container,每个 container 配备的资源是 1 核 2GB 内存。
测试数据:共 630MB,包含 3 列,分别是姓名、公司和年龄。
ot@cdh-master jars]# hadoop fs -ls oss://select-test-sz/people/
Found 10 items
-rw-rw-rw- 1 63079930 2018-10-30 17:03 oss://select-test-sz/people/part-00000
-rw-rw-rw- 1 63079930 2018-10-30 17:03 oss://select-test-sz/people/part-00001
-rw-rw-rw- 1 63079930 2018-10-30 17:05 oss://select-test-sz/people/part-00002
-rw-rw-rw- 1 63079930 2018-10-30 17:05 oss://select-test-sz/people/part-00003
-rw-rw-rw- 1 63079930 2018-10-30 17:06 oss://select-test-sz/people/part-00004
-rw-rw-rw- 1 63079930 2018-10-30 17:12 oss://select-test-sz/people/part-00005
-rw-rw-rw- 1 63079930 2018-10-30 17:14 oss://select-test-sz/people/part-00006
-rw-rw-rw- 1 63079930 2018-10-30 17:14 oss://select-test-sz/people/part-00007
-rw-rw-rw- 1 63079930 2018-10-30 17:15 oss://select-test-sz/people/part-00008
-rw-rw-rw- 1 63079930 2018-10-30 17:16 oss://select-test-sz/people/part-00009
进入到${CDH_HOME}/lib/spark/,启动 spark-shell ,分别测试使用 OSS Select 查询数据和不使用 OSS Select 查询数据:
[root@cdh-master spark]# ./bin/spark-shell
WARNING: User-defined SPARK_HOME (/opt/cloudera/parcels/CDH-6.0.1-1.cdh6.0.1.p0.590678/lib/spark) overrides detected (/opt/cloudera/parcels/CDH/lib/spark).
WARNING: Running spark-class from user-defined location.
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
Spark context Web UI available at http://x.x.x.x:4040
Spark context available as 'sc' (master = yarn, app id = application_1540887123331_0008).
Spark session available as 'spark'.
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 2.2.0-cdh6.0.1
/_/
Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_152)
Type in expressions to have them evaluated.
Type :help for more information.
scala> val sqlContext = spark.sqlContext
sqlContext: org.apache.spark.sql.SQLContext = org.apache.spark.sql.SQLContext@4bdef487
scala> sqlContext.sql("CREATE TEMPORARY VIEW people USING com.aliyun.oss " +
| "OPTIONS (" +
| "oss.bucket 'select-test-sz', " +
| "oss.prefix 'people', " + // objects with this prefix belong to this table
| "oss.schema 'name string, company string, age long'," + // like 'column_a long, column_b string'
| "oss.data.format 'csv'," + // we only support csv now
| "oss.input.csv.header 'None'," +
| "oss.input.csv.recordDelimiter '\r\n'," +
| "oss.input.csv.fieldDelimiter ','," +
| "oss.input.csv.commentChar '#'," +
| "oss.input.csv.quoteChar '\"'," +
| "oss.output.csv.recordDelimiter '\n'," +
| "oss.output.csv.fieldDelimiter ','," +
| "oss.output.csv.quoteChar '\"'," +
| "oss.endpoint 'oss-cn-shenzhen.aliyuncs.com', " +
| "oss.accessKeyId 'Your Access Key Id', " +
| "oss.accessKeySecret 'Your Access Key Secret')")
res0: org.apache.spark.sql.DataFrame = []
scala> val sql: String = "select count(*) from people where name like 'Lora%'"
sql: String = select count(*) from people where name like 'Lora%'
scala> sqlContext.sql(sql).show()
+--------+
|count(1)|
+--------+
| 31770|
+--------+
scala> val textFile = sc.textFile("oss://select-test-sz/people/")
textFile: org.apache.spark.rdd.RDD[String] = oss://select-test-sz/people/ MapPartitionsRDD[8] at textFile at <console>:24
scala> textFile.map(line => line.split(',')).filter(_(0).startsWith("Lora")).count()
res3: Long = 31770
从下图可看到:使用 OSS Select 查询数据耗时为 15s,不使用 OSS Select 查询数据耗时为 54s,使用 OSS Select 能大幅度加快查询速度。
Spark 对接 OSS Select 支持包的实现(Preview)
通过扩展 Spark 的 DataSource API 可以实现 Spark 对接 OSS Select。通过实现 PrunedFilteredScan,可以把需要的列和过滤条件下推到 OSS Select 执行。目前这个支持包还在开发中。
- 定义的规范:
scala> sqlContext.sql("CREATE TEMPORARY VIEW people USING com.aliyun.oss " + | "OPTIONS (" + | "oss.bucket 'select-test-sz', " + | "oss.prefix 'people', " + // objects with this prefix belong to this table | "oss.schema 'name string, company string, age long'," + // like 'column_a long, column_b string' | "oss.data.format 'csv'," + // we only support csv now | "oss.input.csv.header 'None'," + | "oss.input.csv.recordDelimiter '\r\n'," + | "oss.input.csv.fieldDelimiter ','," + | "oss.input.csv.commentChar '#'," + | "oss.input.csv.quoteChar '\"'," + | "oss.output.csv.recordDelimiter '\n'," + | "oss.output.csv.fieldDelimiter ','," + | "oss.output.csv.quoteChar '\"'," + | "oss.endpoint 'oss-cn-shenzhen.aliyuncs.com', " + | "oss.accessKeyId 'Your Access Key Id', " + | "oss.accessKeySecret 'Your Access Key Secret')")
字段 说明 oss.bucket 数据所在的 Bucket oss.prefix 拥有这个前缀的 Object 都属于定义的这个 TEMPORARY VIEW。 oss.schema 这个 TEMPORARY VIEW 的 schema,目前通过字符串指定,后续会通过一个文件来指定 schema。 oss.data.format 数据内容的格式,目前支持 CSV 格式,其他格式也会陆续支持。 oss.input.csv.* 定义 CSV 输入格式参数。 oss.output.csv.* 定义 CSV 输出格式参数。 oss.endpoint bucket 所在的 Endpoint。 oss.accessKeyId 填写 AccessKeyId。 oss.accessKeySecret 填写 AccessKeySecret。 说明 目前只定义了基本参数,详情请参见OSS Select API 文档,其余参数将陆续支持。 - 支持的过滤条件:
=,<,>,<=, >=,||,or,not,and,in,like(StringStartsWith,StringEndsWith,StringContains)
。对于不能下推的过滤条件,例如算术运算、字符串拼接等通过 PrunedFilteredScan 获取不到的条件,则只下推需要的列到 OSS Select。说明OSS Select 还支持其他过滤条件,详情请参见 OSS Select API 文档。
对比TPC-H的查询
通过测试 TPC-H 中 query1.sql 对于 lineitem 这个 table 的查询性能,来检验配置效果。为了能使 OSS Select 过滤更多的数据,我们将 where 条件由 l_shipdate <= '1998-09-16' 改为 where l_shipdate > '1997-09-16',测试数据大小为 2.27GB。
- 仅使用 Spark SQL 查询:
[root@cdh-master ~]# hadoop fs -ls oss://select-test-sz/data/lineitem.csv -rw-rw-rw- 1 2441079322 2018-10-31 11:18 oss://select-test-sz/data/lineitem.csv
- 在 Spark SQL 上使用 OSS Select 查询:
scala> import org.apache.spark.sql.types.{IntegerType, LongType, StringType, StructField, StructType, DoubleType} import org.apache.spark.sql.types.{IntegerType, LongType, StringType, StructField, StructType, DoubleType} scala> import org.apache.spark.sql.{Row, SQLContext} import org.apache.spark.sql.{Row, SQLContext} scala> val sqlContext = spark.sqlContext sqlContext: org.apache.spark.sql.SQLContext = org.apache.spark.sql.SQLContext@74e2cfc5 scala> val textFile = sc.textFile("oss://select-test-sz/data/lineitem.csv") textFile: org.apache.spark.rdd.RDD[String] = oss://select-test-sz/data/lineitem.csv MapPartitionsRDD[1] at textFile at <console>:26 scala> val dataRdd = textFile.map(_.split('|')) dataRdd: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[2] at map at <console>:28 scala> val schema = StructType( | List( | StructField("L_ORDERKEY",LongType,true), | StructField("L_PARTKEY",LongType,true), | StructField("L_SUPPKEY",LongType,true), | StructField("L_LINENUMBER",IntegerType,true), | StructField("L_QUANTITY",DoubleType,true), | StructField("L_EXTENDEDPRICE",DoubleType,true), | StructField("L_DISCOUNT",DoubleType,true), | StructField("L_TAX",DoubleType,true), | StructField("L_RETURNFLAG",StringType,true), | StructField("L_LINESTATUS",StringType,true), | StructField("L_SHIPDATE",StringType,true), | StructField("L_COMMITDATE",StringType,true), | StructField("L_RECEIPTDATE",StringType,true), | StructField("L_SHIPINSTRUCT",StringType,true), | StructField("L_SHIPMODE",StringType,true), | StructField("L_COMMENT",StringType,true) | ) | ) schema: org.apache.spark.sql.types.StructType = StructType(StructField(L_ORDERKEY,LongType,true), StructField(L_PARTKEY,LongType,true), StructField(L_SUPPKEY,LongType,true), StructField(L_LINENUMBER,IntegerType,true), StructField(L_QUANTITY,DoubleType,true), StructField(L_EXTENDEDPRICE,DoubleType,true), StructField(L_DISCOUNT,DoubleType,true), StructField(L_TAX,DoubleType,true), StructField(L_RETURNFLAG,StringType,true), StructField(L_LINESTATUS,StringType,true), StructField(L_SHIPDATE,StringType,true), StructField(L_COMMITDATE,StringType,true), StructField(L_RECEIPTDATE,StringType,true), StructField(L_SHIPINSTRUCT,StringType,true), StructField(L_SHIPMODE,StringType,true), StructField(L_COMMENT,StringType,true)) scala> val dataRowRdd = dataRdd.map(p => Row(p(0).toLong, p(1).toLong, p(2).toLong, p(3).toInt, p(4).toDouble, p(5).toDouble, p(6).toDouble, p(7).toDouble, p(8), p(9), p(10), p(11), p(12), p(13), p(14), p(15))) dataRowRdd: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = MapPartitionsRDD[3] at map at <console>:30 scala> val dataFrame = sqlContext.createDataFrame(dataRowRdd, schema) dataFrame: org.apache.spark.sql.DataFrame = [L_ORDERKEY: bigint, L_PARTKEY: bigint ... 14 more fields] scala> dataFrame.createOrReplaceTempView("lineitem") scala> spark.sql("select l_returnflag, l_linestatus, sum(l_quantity) as sum_qty, sum(l_extendedprice) as sum_base_price, sum(l_extendedprice * (1 - l_discount)) as sum_disc_price, sum(l_extendedprice * (1 - l_discount) * (1 + l_tax)) as sum_charge, avg(l_quantity) as avg_qty, avg(l_extendedprice) as avg_price, avg(l_discount) as avg_disc, count(*) as count_order from lineitem where l_shipdate > '1997-09-16' group by l_returnflag, l_linestatus order by l_returnflag, l_linestatus").show() +------------+------------+-----------+--------------------+--------------------+--------------------+------------------+------------------+-------------------+-----------+ |l_returnflag|l_linestatus| sum_qty| sum_base_price| sum_disc_price| sum_charge| avg_qty| avg_price| avg_disc|count_order| +------------+------------+-----------+--------------------+--------------------+--------------------+------------------+------------------+-------------------+-----------+ | N| O|7.5697385E7|1.135107538838699...|1.078345555027154...|1.121504616321447...|25.501957856643052|38241.036487881756|0.04999335309103123| 2968297| +------------+------------+-----------+--------------------+--------------------+--------------------+------------------+------------------+-------------------+-----------+ scala> sqlContext.sql("CREATE TEMPORARY VIEW item USING com.aliyun.oss " + | "OPTIONS (" + | "oss.bucket 'select-test-sz', " + | "oss.prefix 'data', " + | "oss.schema 'L_ORDERKEY long, L_PARTKEY long, L_SUPPKEY long, L_LINENUMBER int, L_QUANTITY double, L_EXTENDEDPRICE double, L_DISCOUNT double, L_TAX double, L_RETURNFLAG string, L_LINESTATUS string, L_SHIPDATE string, L_COMMITDATE string, L_RECEIPTDATE string, L_SHIPINSTRUCT string, L_SHIPMODE string, L_COMMENT string'," + | "oss.data.format 'csv'," + // we only support csv now | "oss.input.csv.header 'None'," + | "oss.input.csv.recordDelimiter '\n'," + | "oss.input.csv.fieldDelimiter '|'," + | "oss.input.csv.commentChar '#'," + | "oss.input.csv.quoteChar '\"'," + | "oss.output.csv.recordDelimiter '\n'," + | "oss.output.csv.fieldDelimiter ','," + | "oss.output.csv.commentChar '#'," + | "oss.output.csv.quoteChar '\"'," + | "oss.endpoint 'oss-cn-shenzhen.aliyuncs.com', " + | "oss.accessKeyId 'Your Access Key Id', " + | "oss.accessKeySecret 'Your Access Key Secret')") res2: org.apache.spark.sql.DataFrame = [] scala> sqlContext.sql("select l_returnflag, l_linestatus, sum(l_quantity) as sum_qty, sum(l_extendedprice) as sum_base_price, sum(l_extendedprice * (1 - l_discount)) as sum_disc_price, sum(l_extendedprice * (1 - l_discount) * (1 + l_tax)) as sum_charge, avg(l_quantity) as avg_qty, avg(l_extendedprice) as avg_price, avg(l_discount) as avg_disc, count(*) as count_order from item where l_shipdate > '1997-09-16' group by l_returnflag, l_linestatus order by l_returnflag, l_linestatus").show() scala> sqlContext.sql("select l_returnflag, l_linestatus, sum(l_quantity) as sum_qty, sum(l_extendedprice) as sum_base_price, sum(l_extendedprice * (1 - l_discount)) as sum_disc_price, sum(l_extendedprice * (1 - l_discount) * (1 + l_tax)) as sum_charge, avg(l_quantity) as avg_qty, avg(l_extendedprice) as avg_price, avg(l_discount) as avg_disc, count(*) as count_order from item where l_shipdate > '1997-09-16' group by l_returnflag, l_linestatus order by l_returnflag, l_linestatus").show() +------------+------------+-----------+--------------------+--------------------+--------------------+------------------+-----------------+-------------------+-----------+ |l_returnflag|l_linestatus| sum_qty| sum_base_price| sum_disc_price| sum_charge| avg_qty| avg_price| avg_disc|count_order| +------------+------------+-----------+--------------------+--------------------+--------------------+------------------+-----------------+-------------------+-----------+ | N| O|7.5697385E7|1.135107538838701E11|1.078345555027154...|1.121504616321447...|25.501957856643052|38241.03648788181|0.04999335309103024| 2968297| +------------+------------+-----------+--------------------+--------------------+--------------------+------------------+-----------------+-------------------+-----------+
从下图可以看出:在 Spark SQL 上使用 OSS Select 查询数据耗时为 38s,在 Spark SQL 上不使用 OSS Select 查询数据耗时为2.5min,使用
OSS Select 可大幅度加快查询速度。
参考文档
版权声明
本文仅代表作者观点,不代表本站立场。
本文系作者授权发表,未经许可,不得转载。
评论