Kafka是应用较为广泛的分布式、高吞吐量、高可扩展性消息队列服务,普遍用于日志收集、监控数据聚合、流式数据处理、在线和离线分析等大数据领域,是大数据生态中不可或缺的产品之一。通过数据传输服务DTS(Data
Transmission Service),您可以将POLARDB MySQL同步至自建Kafka集群,扩展消息处理能力。
前提条件
- Kafka集群的版本为0.10或1.0版本。
- Kafka集群已创建用于存储待同步数据的Topic。
- POLARDB MySQL已开启Binlog,详情请参见如何开启Binlog。
注意事项
如果源数据库没有主键或唯一约束,且所有字段没有唯一性,可能会导致目标数据库中出现重复数据。
功能限制
- 仅支持表粒度的数据同步。
- 不支持DDL操作的数据同步。
- 不支持自动调整同步对象。
说明 如果在同步的过程中,对源库中待同步的表执行了重命名操作,且重命名后的名称不在同步对象中,那么该表将不再被同步到目标Kafka集群中。如果该表还需要同步,那么您需要
新增同步对象。
支持同步的SQL操作
DML操作:INSERT、UPDATE、DELETE、REPLACE。
消息格式
同步到Kafka集群中的数据以avro格式存储,schema定义详情请参见DTS avro schema定义。
在数据同步到Kafka集群后,您需要根据avro schema定义进行数据解析。
操作步骤
- 购买数据同步作业。
说明 购买时,选择源实例为POLARDB、目标实例为Kafka,并选择同步拓扑为单向同步。
- 登录数据传输控制台。
- 在左侧导航栏,单击数据同步。
- 在同步作业列表页面顶部,选择数据同步实例所属地域。
- 定位至已购买的数据同步实例,单击配置同步链路。
- 配置同步通道的源实例及目标实例信息。
配置项目 |
配置选项 |
配置说明 |
同步作业名称 |
- |
- DTS为每个数据同步作业自动生成一个名称,该名称没有唯一性要求。
- 您可以根据需要修改同步作业名称,建议配置具有业务意义的名称,便于后续的任务识别。
|
源实例信息 |
实例类型 |
选择POLARDB。
|
实例地区 |
购买数据同步实例时选择的源实例地域信息,不可变更。 |
POLARDB实例ID |
选择POLARDB集群ID。 |
数据库账号 |
填入POLARDB集群的数据库账号,需要具备Replication slave、Replication client及所有待同步对象的Select权限。 |
数据库密码 |
填入该账号对应的密码。 |
目标实例信息 |
实例类型 |
- Kafka集群部署在ECS上时,选择ECS上的自建数据库
- Kafka集群部署在本地服务器时,选择通过专线/VPN网关/智能网关接入的自建数据库。
说明 选择通过专线/VPN网关/智能网关接入的自建数据库时,您需要配置VPC ID并填写IP地址和端口信息。
|
实例地区 |
购买数据同步实例时选择的目标实例地域信息,不可变更。 |
ECS实例ID |
选择部署了Kafka集群的ECS实例ID。 |
数据库类型 |
选择为Kafka。
|
端口 |
Kafka集群对外提供服务的端口,默认为9092。 |
数据库账号 |
填入Kafka集群的用户名,如Kafka集群未开启验证可不填写。 |
数据库密码 |
填入Kafka集群用户名对应的密码,如Kafka集群未开启验证可不填写。 |
Topic |
- 单击击右侧的获取Topic列表。
- 下拉选择具体的Topic名称。
|
Kafka版本 |
根据目标Kafka集群版本,选择对应的版本信息。 |
- 单击页面右下角的授权白名单并进入下一步。
说明 此步骤会将DTS服务器的IP地址自动添加到源POLARDB集群的白名单和目标ECS实例的内网入方向安全组规则中,用于保障DTS服务器能够正常连接源和目标实例。
- 配置目标已存在表的处理模式和同步对象。
配置项目 |
配置说明 |
目标已存在表的处理模式 |
- 预检查并报错拦截:检查目标数据库中是否有同名的表。如果目标数据库中没有同名的表,则通过该检查项目;如果目标数据库中有同名的表,则在预检查阶段提示错误,数据同步作业不会被启动。
- 无操作:跳过目标数据库中是否有同名表的检查项。
警告 选择为 无操作,可能导致数据不一致,给业务带来风险,例如:
- 表结构一致的情况下,如果在目标库遇到与源库主键的值相同的记录,在初始化阶段会保留目标库中的该条记录;在增量同步阶段则会覆盖目标库的该条记录。
- 表结构不一致的情况下,可能会导致无法初始化数据、只能同步部分列的数据或同步失败。
|
选择同步对象 |
在源库对象框中单击待同步的对象,然后单击将其移动至已选择对象框。
说明
- 仅支持表粒度的数据同步。
- 默认情况下,同步对象的名称保持不变。如果您需要同步对象在目标实例上名称不同,那么需要使用DTS提供的对象名映射功能,详情请参见设置同步对象在目标实例中的名称。
|
- 上述配置完成后,单击页面右下角的下一步。
- 配置同步初始化的高级配置信息。
说明 同步初始化类型细分为:结构初始化,全量数据初始化。选择结构初始化和全量数据初始化后,DTS会在增量数据同步之前,将源数据库中待同步对象的结构和存量数据,同步到目标数据库。
- 上述配置完成后,单击页面右下角的预检查并启动。
说明
- 在数据同步作业正式启动之前,会先进行预检查。只有预检查通过后,才能成功启动数据同步作业。
- 如果预检查失败,单击具体检查项后的,查看失败详情。根据提示修复后,重新进行预检查。
- 在预检查对话框中显示预检查通过后,关闭预检查对话框,同步作业将正式开始。
- 等待同步作业的链路初始化完成,直至处于同步中状态。
您可以在
数据同步页面,查看数据同步作业的状态。
评论