Kafka是应用较为广泛的分布式、高吞吐量、高可扩展性消息队列服务,普遍用于日志收集、监控数据聚合、流式数据处理、在线和离线分析等大数据领域,是大数据生态中不可或缺的产品之一。通过数据传输服务DTS(Data Transmission Service),您可以将POLARDB MySQL同步至自建Kafka集群,扩展消息处理能力。

前提条件

  • Kafka集群的版本为0.10或1.0版本。
  • Kafka集群已创建用于存储待同步数据的Topic。
  • POLARDB MySQL已开启Binlog,详情请参见如何开启Binlog

注意事项

如果源数据库没有主键或唯一约束,且所有字段没有唯一性,可能会导致目标数据库中出现重复数据。

功能限制

  • 仅支持表粒度的数据同步。
  • 不支持DDL操作的数据同步。
  • 不支持自动调整同步对象。
    说明 如果在同步的过程中,对源库中待同步的表执行了重命名操作,且重命名后的名称不在同步对象中,那么该表将不再被同步到目标Kafka集群中。如果该表还需要同步,那么您需要新增同步对象

支持同步的SQL操作

DML操作:INSERT、UPDATE、DELETE、REPLACE。

消息格式

同步到Kafka集群中的数据以avro格式存储,schema定义详情请参见DTS avro schema定义

在数据同步到Kafka集群后,您需要根据avro schema定义进行数据解析。

费用说明

详情请参见产品定价

操作步骤

  1. 购买数据同步作业
    说明 购买时,选择源实例为POLARDB、目标实例为Kafka,并选择同步拓扑为单向同步
  2. 登录数据传输控制台
  3. 在左侧导航栏,单击数据同步
  4. 同步作业列表页面顶部,选择数据同步实例所属地域。
    从POLARDB MySQL同步到Kafka_数据同步_PolarDB MySQL数据库_云数据库PolarDB 阿里云技术文档 第1张
  5. 定位至已购买的数据同步实例,单击配置同步链路
  6. 配置同步通道的源实例及目标实例信息。
    从POLARDB MySQL同步到Kafka_数据同步_PolarDB MySQL数据库_云数据库PolarDB 阿里云技术文档 第2张
    配置项目 配置选项 配置说明
    同步作业名称 -
    • DTS为每个数据同步作业自动生成一个名称,该名称没有唯一性要求。
    • 您可以根据需要修改同步作业名称,建议配置具有业务意义的名称,便于后续的任务识别。
    源实例信息 实例类型 选择POLARDB
    实例地区 购买数据同步实例时选择的源实例地域信息,不可变更。
    POLARDB实例ID 选择POLARDB集群ID。
    数据库账号 填入POLARDB集群的数据库账号,需要具备Replication slave、Replication client及所有待同步对象的Select权限。
    数据库密码 填入该账号对应的密码。
    目标实例信息 实例类型
    • Kafka集群部署在ECS上时,选择ECS上的自建数据库
    • Kafka集群部署在本地服务器时,选择通过专线/VPN网关/智能网关接入的自建数据库
      说明 选择通过专线/VPN网关/智能网关接入的自建数据库时,您需要配置VPC ID并填写IP地址端口信息。
    实例地区 购买数据同步实例时选择的目标实例地域信息,不可变更。
    ECS实例ID 选择部署了Kafka集群的ECS实例ID。
    数据库类型 选择为Kafka
    端口 Kafka集群对外提供服务的端口,默认为9092。
    数据库账号 填入Kafka集群的用户名,如Kafka集群未开启验证可不填写。
    数据库密码 填入Kafka集群用户名对应的密码,如Kafka集群未开启验证可不填写。
    Topic
    1. 单击击右侧的获取Topic列表
    2. 下拉选择具体的Topic名称。
    Kafka版本 根据目标Kafka集群版本,选择对应的版本信息。
  7. 单击页面右下角的授权白名单并进入下一步
    说明 此步骤会将DTS服务器的IP地址自动添加到源POLARDB集群的白名单和目标ECS实例的内网入方向安全组规则中,用于保障DTS服务器能够正常连接源和目标实例。
  8. 配置目标已存在表的处理模式和同步对象。
    从POLARDB MySQL同步到Kafka_数据同步_PolarDB MySQL数据库_云数据库PolarDB 阿里云技术文档 第3张
    配置项目 配置说明
    目标已存在表的处理模式
    • 预检查并报错拦截:检查目标数据库中是否有同名的表。如果目标数据库中没有同名的表,则通过该检查项目;如果目标数据库中有同名的表,则在预检查阶段提示错误,数据同步作业不会被启动。
      说明 如果目标库中同名的表不方便删除或重命名,您可以设置同步对象在目标实例中的名称来避免表名冲突。
    • 无操作:跳过目标数据库中是否有同名表的检查项。
      警告 选择为无操作,可能导致数据不一致,给业务带来风险,例如:
      • 表结构一致的情况下,如果在目标库遇到与源库主键的值相同的记录,在初始化阶段会保留目标库中的该条记录;在增量同步阶段则会覆盖目标库的该条记录。
      • 表结构不一致的情况下,可能会导致无法初始化数据、只能同步部分列的数据或同步失败。
    选择同步对象

    源库对象框中单击待同步的对象,然后单击从POLARDB MySQL同步到Kafka_数据同步_PolarDB MySQL数据库_云数据库PolarDB 阿里云技术文档 第4张将其移动至已选择对象框。

    说明
    • 仅支持表粒度的数据同步。
    • 默认情况下,同步对象的名称保持不变。如果您需要同步对象在目标实例上名称不同,那么需要使用DTS提供的对象名映射功能,详情请参见设置同步对象在目标实例中的名称
  9. 上述配置完成后,单击页面右下角的下一步
  10. 配置同步初始化的高级配置信息。
    从POLARDB MySQL同步到Kafka_数据同步_PolarDB MySQL数据库_云数据库PolarDB 阿里云技术文档 第5张
    说明 同步初始化类型细分为:结构初始化,全量数据初始化。选择结构初始化全量数据初始化后,DTS会在增量数据同步之前,将源数据库中待同步对象的结构和存量数据,同步到目标数据库。
  11. 上述配置完成后,单击页面右下角的预检查并启动
    说明
    • 在数据同步作业正式启动之前,会先进行预检查。只有预检查通过后,才能成功启动数据同步作业。
    • 如果预检查失败,单击具体检查项后的从POLARDB MySQL同步到Kafka_数据同步_PolarDB MySQL数据库_云数据库PolarDB 阿里云技术文档 第6张,查看失败详情。根据提示修复后,重新进行预检查。
  12. 预检查对话框中显示预检查通过后,关闭预检查对话框,同步作业将正式开始。
  13. 等待同步作业的链路初始化完成,直至处于同步中状态。
    您可以在 数据同步页面,查看数据同步作业的状态。从POLARDB MySQL同步到Kafka_数据同步_PolarDB MySQL数据库_云数据库PolarDB 阿里云技术文档 第7张