一键归档 RDS &POLARDB数据到 Spark_一键归档Spark数仓计算_X-Pack Spark计算服务_云数据库 HBase 版
一键归档 RDS &POLARDB数据到 Spark
批量归档目前支持从 Mysql、POLARDB 等数据库把业务数据批量归档到 Spark。本文将以 MySQL 为例详细介绍如何使用本工具。
一、前置条件
购买 MySQL/POLARDB 数据库
如已经购买了 MySQL 数据库则本小结可以忽略,如未购买可以到 https://www.aliyun.com/product/rds/mysql 页面购买,比如本文购买好的 MySQL 数据库如下:注意:购买的 MySQL 实例的 VPC 需要和 Spark 集群一致。
购买 X-Pack Spark 集群
X-Pack Spark 集群主要用于合并 WAL 数据,把 HBase 列数据转换成行数据,归档后的数据就存储在 X-Pack Spark 集群中。本文购买好的 X-Pack Spark 集群如下:注意图中的 VPC ID 和 VSwitch ID,和上面 MySQL 需要一样,这样才可以连接。
创建测试表
在创建归档任务之前,我们先到 MySQL 里面创建好相关的测试库和表,如下:
MySQL [(none)]> create database spark_archives_db;
Query OK, 1 row affected (0.00 sec)
MySQL [(none)]> use spark_archives_db;
Database changed
MySQL [spark_archives_db]> create table orders(
-> id INT UNSIGNED AUTO_INCREMENT,
-> name VARCHAR(100) NOT NULL,
-> tele VARCHAR(11) NOT NULL,
-> email VARCHAR(30) NOT NULL,
-> order_no VARCHAR(19) NOT NULL,
-> create_time TIMESTAMP,
-> PRIMARY KEY (id)
-> );
Query OK, 0 rows affected (0.00 sec)
上面我们创建了一个 spark_archives_db 库以及 orders 测试表。我们现在往这张表里面插入一些测试数据:
insert into orders values (1, 'zhangsan', '13188888888', '[email protected]', '2019091712234223242', '2019-09-17 12:23:42');
insert into orders values (2, 'lisi', '13212345678', '[email protected]', '2019091714122244232', '2019-09-17 14:12:22');
insert into orders values (3, 'wangwu', '13166666666', '[email protected]', '2019091720113243213', '2019-09-17 20:11:32');
insert into orders values (4, 'zhaoliu', '13288888888', '[email protected]', '2019091809234223242', '2019-09-18 09:23:42');
insert into orders values (5, 'liqi', '13121233453', '[email protected]', '2019091819113412345', '2019-09-18 19:11:34');
insert into orders values (6, 'wuba', '18888888888', '[email protected]', '2019091822430167854', '2019-09-18 22:43:01');
insert into orders values (7, 'spark', '16666666666', '[email protected]', '2019091822554298765', '2019-09-18 22:55:42');
insert into orders values (8, 'hbase', '15555555555', '[email protected]', '2019091823123423242', '2019-09-18 23:12:34');
insert into orders values (9, 'cassandra', '13333333333', '[email protected]', '2019091901024566666', '2019-09-19 01:02:45');
insert into orders values (10, 'hadoop', '13434343434', '[email protected]', '2019091917552788888', '2019-09-19 17:55:27');
MySQL [spark_archives_db]> select * from orders;
+----+-----------+-------------+----------------------+---------------------+---------------------+
| id | name | tele | email | order_no | create_time |
+----+-----------+-------------+----------------------+---------------------+---------------------+
| 1 | zhangsan | 13188888888 | zhangsan@aliyun.com | 2019091712234223242 | 2019-09-17 12:23:42 |
| 2 | lisi | 13212345678 | lisi@aliyun.com | 2019091714122244232 | 2019-09-17 14:12:22 |
| 3 | wangwu | 13166666666 | wangwu@aliyun.com | 2019091720113243213 | 2019-09-17 20:11:32 |
| 4 | zhaoliu | 13288888888 | zhaoliu@aliyun.com | 2019091809234223242 | 2019-09-18 09:23:42 |
| 5 | liqi | 13121233453 | liqi@aliyun.com | 2019091819113412345 | 2019-09-18 19:11:34 |
| 6 | wuba | 18888888888 | wuba@aliyun.com | 2019091822430167854 | 2019-09-18 22:43:01 |
| 7 | spark | 16666666666 | spark@aliyun.com | 2019091822554298765 | 2019-09-18 22:55:42 |
| 8 | hbase | 15555555555 | hbase@aliyun.com | 2019091823123423242 | 2019-09-18 23:12:34 |
| 9 | cassandra | 13333333333 | cassandra@aliyun.com | 2019091901024566666 | 2019-09-19 01:02:45 |
| 10 | hadoop | 13434343434 | hadoop@aliyun.com | 2019091917552788888 | 2019-09-19 17:55:27 |
+----+-----------+-------------+----------------------+---------------------+---------------------+
10 rows in set (0.00 sec)
二、创建归档任务
到这里,我们已经准备好所有的测试环境,现在我们可以到数据工作台创建相关的归档任务了,具体步骤如下:
- 到 HBase X-Pack 控制台
- 选择左边菜单里面的一键归档
- 切换到新的页面之后选择创建归档
- 在弹出的对话框里面选择批量归档
- 填写归档名称,这里填的是 mysql2spark_archive_test
- 点击确认
点击 确认 之后我们就进入到归档编辑页面中,里面我们可以填写相关的信息,具体如下:
图中序号说明如下:
- 归档的 Spark 集群,对应的就是上面我们购买的 Spark 集群 ID;
- 数据源类型选择 MYSQL
- 数据源实例ID填写我们需要归档的表所在 MySQL 实例,也就是我们上面创建好的 MySQL 实例;
- 账号对应的是 MySQL 实例的用户名;
- 密码对应的是 MySQL 实例的密码;
归档配置里面对应的是我们归档类型的相关配置,目前批量归档的归档类型支持 一次全量、每天全量、每天增量。分别的含义为:
- 一次全量:一次就将对应 MySQL 里面的全量数据归档到 Spark 相关表,这个一般是在初始化数据的时候会用到;
- 每天全量:逻辑处理简单,适用于数据量不大的场景。即每天都会拉取全量数据,每个 Spark 分区表数据都是全量的;
- 每天增量:根据增量字段,每天只拉取有更新或新增的数据。增量字段一般使用创建时间或者修改时间。一次全量 + 每天增量 模式可以把 MySQL 的数据按照 T+1 的形式同步到 Spark 分区表中。
归档配置里面的分区字段 对应的就是每天增量数据存放到 Spark 对应表的分区名字;增量字段代表的是按照 MySQL 对应表中的那个字段来增量抽取数据,一般都会选择表的创建时间或者修改时间来作为增量字段;高级属性选项可以参照 https://spark.apache.org/docs/latest/sql-data-sources-jdbc.html 文档里的说明;
源库名代表的是需要归档的 MYSQL 表所在的数据库名字;
- 源表名代表的是需要归档的 MYSQL 表的名字;
- Spark库名代表我们归档的数据存放到 Spark 数据仓库的数据库名字;
- Spark表名代表我们归档的数据存放到 Spark 数据仓库的表名字;
- 这里对应的就是需要归档的 MySQL 表的列和 Spark 归档表列的对应关系
- 当我们上面信息填写完之后就可以点击提交按钮,这时候我们就创建好一个归档任务。
点击完提交按钮之后,会在数据工作台里面的工作流创建相关的工作流,并且会在数据工作台里面的作业管理创建相关作业,具体如下:
同时也会在 X-Pack Spark 里面创建 orders 表,如下:
三、测试及数据查看
执行归档任务(这里一步主要可以用来首次的测试)
归档的正常逻辑是按照指定定时调度时间周期性的把数据ETL处理后写入到spark表,但是在测试阶段,可以写入数据,然后手动的运行一下作业看看效果。我们在上一步骤往 orders 表插入了一些测试数据,我们可以手动执行上面创建好的 mysql2spark_archive_test 归档任务,如下:
我们进行测试的日期是 2019-09-19,当我们按照上面的配置来手动运行归档任务的时候,实际上是执行了 T+1 形式的归档,也就是归档了 2019-09-18 这天的数据。我们可以到运行记录里面查看归档的进度。
在 X-Pack Spark 里面查询归档的数据
等作业运行完成之后,我们就可以查到刚刚归档的数据(创建sql类型的交互式查询):
可以先执行
show partitions orders
查看下生成的分区情况:正如图中显示的,X-Pack Spark 正确的归档了 2019-09-18 这一天 MySQL 新增的数据。我们在查看一下里面的数据情况:可以看到我们正确的将 2019-09-18 这天 MySQL 的数据归档到 Spark 数据仓库的 orders 表里面了。
mysql -> spark 类型映射
目前 Spark 支持以下 MySQL 的类型:
MySQL | Spark |
---|---|
TINYINT(1), BOOL, BOOLEAN, BIT(1) | BOOLEAN |
TINYINT | TINYINT |
SMALLINT | SMALLINT |
MEDIUMINT, INT, INTEGER | INT |
BIGINT | BIGINT |
DECIMAL | DECIMAL |
FLOAT | FLOAT |
DOUBLE | DOUBLE |
DATE | DATE |
DATETIME, TIMESTAMP | TIMESTAMP |
CHAR | CHAR |
VARCHAR | VARCHAR |
TEXT | TEXT |
BINARY | BINARY |
版权声明
本文仅代表作者观点,不代表本站立场。
本文系作者授权发表,未经许可,不得转载。
评论