HBase实时数据导出RocketMQ
前置条件
版本支持
- 开源RocketMQ 4.5
- 阿里云消息队列 RocketMQ
限制
- HBase实时数据同步MQ目前还不支持保序
- BDS任务重试, MQ自身问题导致消息会被重复消费,需要业务方做到消费幂等
- BDS同步过程中不支持点位回退
- 只同步更新的列,相同Rowkey的其他未更新的列不会被同步
- BDS不支持同步Bulkload进来的数据
- BDS不支持Phoenix表数据的转码,需要客户自行处理Phoenix表的原生数据
任务创建
点击 任务管理 -> HBase实时数据同步RocketMQ -> 创建通道
选择源HBase集群、目标MQ数据源,指定默认同步到MQ的Topic,以及同步通道所属的Group
查看任务执行情况
订阅MQ指定Topic的消息,对消息中body进行转化,转化成字符串,就能得到KV对应的JSON格式的数据
参数说明
任务创建同步表支持如下的填写格式
t1 {"tag": "xxxx"} // t1表的message带上xxxx的标签,方便业务消息订阅时对tag标签进行消息过滤
t2 {"tag": "xxxx", "topic": "topic1"} // t2表的数据同步到topic1中,不特殊指定就走默认topic
t3 // t3表默认tag为空字符串
同步行为
BDS解析HBase WAL日志中的数据, 并对每个KV转化成对应的JSON格式写入到MQ,JSON格式如下所示
{
"namespace": "default",
"tableName": "t1",
"rowKey": "727878",
"opCode": 4,
"family": "6631",
"qualifier": "61",
"value": "7631",
"timestamp": 1572503986205
}
namespace表示的是HBase表的namespace,tableName表示的是表名
KV所包含的rowkey、family、qualifier、value在HBase都是byte数组形式进行存储,BDS同步KV默认会把byte数组转化成HexString,可以通过如下方式将HexString转化会原生的数组
import org.apache.commons.codec.binary.Hex;
...
Hex.decodeHex(rowkey.toCharArray())
Hex.decodeHex(family.toCharArray())
...
timestamp 表示的是KV的版本,业务没有特殊指定,默认是KV入库的时间戳
opCode 表示的是KV的类型,不同的值对应如下信息
enum Type {
Put((byte) 4),
Delete((byte) 8),
DeleteFamilyVersion((byte) 10),
DeleteColumn((byte) 12),
DeleteFamily((byte) 14);
}
版权声明
本文仅代表作者观点,不代表本站立场。
本文系作者授权发表,未经许可,不得转载。
评论