Spark访问
访问准备
HBase增强版支持从Spark访问,用户只需要加入阿里云发布的HBase客户端,或者alihbase-connector的依赖即可,最新版本详见JAVA SDK安装
获取访问地址
参见连接集群,使用地址中Java API访问地址,默认端口为30020,如果是公网访问,请使用公网域名
获取用户名和密码
参见连接集群,默认的用户名为root,密码为root。或者在集群管理页面中关闭ACL功能后,无需再提供用户名密码
添加HBase增强版访问配置
方式一:配置文件
hbase-site.xml 中增加下列配置项:
<configuration>
<!--
集群的连接地址,在控制台页面的数据库连接界面获得(注意公网地址和VPC内网地址)
-->
<property>
<name>hbase.zookeeper.quorum</name>
<value>ld-xxxx-proxy-hbaseue.hbaseue.xxx.rds.aliyuncs.com:30020</value>
</property>
<!--
设置用户名密码,默认root:root,可根据实际情况调整
-->
<property>
<name>hbase.client.username</name>
<value>root</value>
</property>
<property>
<name>hbase.client.password</name>
<value>root</value>
</property>
<!--
如果您直接依赖了阿里云hbase客户端,则无需配置connection.impl参数,如果您依赖了alihbase-connector,则需要配置此参数
-->
<!--property>
<name>hbase.client.connection.impl</name>
<value>org.apache.hadoop.hbase.client.AliHBaseUEClusterConnection</value>
</property-->
</configuration>
方式二:代码
通过代码在Configuration中添加参数:
// 新建一个Configuration
Configuration conf = HBaseConfiguration.create();
// 集群的连接地址,在控制台页面的数据库连接界面获得(注意公网地址和VPC内网地址)
conf.set("hbase.zookeeper.quorum", "ld-xxxx-proxy-hbaseue.hbaseue.xxx.rds.aliyuncs.com:30020");
// 设置用户名密码,默认root:root,可根据实际情况调整
conf.set("hbase.client.username", "root")
conf.set("hbase.client.password", "root")
// 如果您直接依赖了阿里云hbase客户端,则无需配置connection.impl参数,如果您依赖了alihbase-connector,则需要配置此参数
//conf.set("hbase.client.connection.impl", AliHBaseUEClusterConnection.class.getName());
Spark访问示例
test(" test the spark sql count result") {
//1. 添加hbase ue访问配置
var conf = HBaseConfiguration.create
conf.set("hbase.zookeeper.quorum", "ld-xxxx-proxy-hbaseue.hbaseue.xxx.rds.aliyuncs.com:30020")
conf.set("hbase.client.username", "test_user")
conf.set("hbase.client.password", "password")
//2. 创建表
val hbaseTableName = "testTable"
val cf = "f"
val column1 = cf + ":a"
val column2 = cf + ":b"
var rowsCount: Int = -1
var namespace = "spark_test"
val admin = ConnectionFactory.createConnection(conf).getAdmin()
val tableName = TableName.valueOf(namespace, hbaseTableName)
val htd = new HTableDescriptor(tableName)
htd.addFamily(new HColumnDescriptor(cf))
admin.createTable(htd)
//3. 插入测试数据
val rng = new Random()
val k: Array[Byte] = new Array[Byte](3)
val famAndQf = KeyValue.parseColumn(Bytes.toBytes(column))
val puts = new util.ArrayList[Put]()
var i = 0
for (b1 <- ('a' to 'z')) {
for (b2 <- ('a' to 'z')) {
for (b3 <- ('a' to 'z')) {
if(i < 10) {
k(0) = b1.toByte
k(1) = b2.toByte
k(2) = b3.toByte
val put = new Put(k)
put.addColumn(famAndQf(0), famAndQf(1), ("value_" + b1 + b2 + b3).getBytes())
puts.add(put)
i = i + 1
}
}
}
}
val conn = ConnectionFactory.createConnection(conf)
val table = conn.getTable(tableName)
table.put(puts)
//4. 创建spark表
val sparkTableName = "spark_hbase"
val createCmd = s"""CREATE TABLE ${sparkTableName} USING org.apache.hadoop.hbase.spark
| OPTIONS ('catalog'=
| '{"table":{"namespace":"$${hbaseTableName}", "name":"${hbaseTableName}"},"rowkey":"rowkey",
| "columns":{
| "col0":{"cf":"rowkey", "col":"rowkey", "type":"string"},
| "col1":{"cf":"cf1", "col":"a", "type":"string"},
| "col2":{"cf":"cf1", "col":"b", "type":"String"}}}'
| )""".stripMargin
println(" createCmd: \n" + createCmd + " rows : " + rowsCount)
sparkSession.sql(createCmd)
//5. 执行count sql
val result = sparkSession.sql("select count(*) from " + sparkTableName)
val sparkCounts = result.collect().apply(0).getLong(0)
println(" sparkCounts : " + sparkCounts)
版权声明
本文仅代表作者观点,不代表本站立场。
本文系作者授权发表,未经许可,不得转载。
评论