JavaEE鸿蒙应用开发HTML&JS+前端Python+大数据开发人工智能开发AI+设计软件测试新媒体+短视频直播运营产品经理集成电路应用开发(含嵌入式)Linux云计算+运维开发C/C++拍摄剪辑+短视频制作PMP项目管理认证电商运营Go语言与区块链大数据PHP工程师Android+物联网iOS.NET

云计算大数据教程实时数据同步方案

来源:黑马程序员

浏览42615人

2019.10.30

概述

1.1、目标

实时数据同步主要实现从源数据库到目标数据库的实时数据同步。源数据主要支持mysql数据库,目标数据包括mysql数据库和hbase数据库。

下面是实时数据同步的数据流转图,mysql的增量订阅数据经过canal和kafka,数据最终实时流入hbase或mysql。

1.png

1.2、整体设计架构

实时数据同步基于数据库变更订阅中心,实现从源数据到目标数据的实时数据同步应用。

整体设计架构如下图所示:

2.jpg


1.3、概要设计

实时数据同步分两部分:生产端(productor)和消费端(consumer)

1.3.1、生产端(productor)

集成canal的consumer和kafka的productor。主要完成如下任务

1、监听canal producer 发送过来的

2、将数据进行格式化, 

3、调用kafka producer,发送数据。

1.3.2、消费端(consumer)

 主要集成kafka consumer和HBase,主要完成如下任务

 1、监听producer发送过来的数据。

 2、解析数据

 3、数据写入到HBase

1.4、技术组件

1.4.1 canal

1.4.1.1 canal简介

canal 是阿里巴巴mysql数据库binlog的增量订阅&消费组件。

1.4.1.2 canal工作原理

canal模拟mysql slave的交互协议,伪装自己为mysql slave,mysql master发送dump协议

mysql master收到dump请求,开始推送binary log给slave(也就是canal)

canal解析binary log对象(原始为byte流)

1572429961780553.jpg


1.4.1.3 canal工作流程

数据按照instance为单位进行划分,每个server上可以部署多个instance。但是同一个instance在整个集群中仅在一台上处于activity状态。其余均处于standby状态。也就是说instanceA两台server中均有部署,但是在这个集群中,仅有一个server上的instanceA处于activity状态。

为了保证数据的有序性,每个instance只能被一个client接收。而且数据称队列方式消费消费,有且仅能被消费一次。

4.png


1.4.2 Kafka

1.4.2.1 kafka介绍

kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者规模的网站中的所有动作流数据。它提供了类似于JMS的特性,但是在设计实现上完全不同,此外它并不是JMS规范的实现。kafka对消息保存时根据Topic进行归类,发送消息者成为Producer,消息接受者成为Consumer,此外kafka集群有多个kafka实例组成,每个实例(server)成为broker。broker依赖zookeeper管理集群和存储一些meta信息

14.2.2工作流程

kafka 在server端将数据分topic进行管理,每个topic按照需求,可以分多个partition。

Kafka消息传输同时集成了队列和广播传输两种模式,

针对consumer端进行分group管理,每组会有多个consumer。

每个topic的消息可以被多个group同时消费,每个group的多个consumer正常情况下只能消费一次消息。

5.jpg

详细设计

2.1、生产端设计(producer)

由于canal服务端集成生产者,故程序直接调用canal consumer api 。收集canal producer发送的数据。

在producer对数据的处理比较少,设计时希望数据原汁原味的把数据发送出去。Consumer端可以根据不同的场景需求,对数据进行处理。

2.1.1、类关系图

1572430115112528.png

类ClusterCanalClient

主类,程序的启动入口,继承AbstractCanalClient

接收启动参数。并组织调用其他类完成消费端工作。

类AbstractCanalClient

调用consumerConfig,设置consumer参数。启动consumer。

周期性获取消费数据,调用数据解析和格式化程序,处理数据。

调用kafka接口,将处理后的数据发送。

按照要求,解析并格式化数据。

类 ItheimaProductorConfig

读取并初始化配置信息

2.1.2、可靠性设计

canal设计规则。两台server。采用主从模式,有zookeeper管理。依据需求,合理配置instance个数。

每个instance最多只允许一个client接收数据。

每台consumer client可以接收两个instance。

每两个instance配置3台client。确保系统稳定性

2.1.3 数据处理规则

1、处理规则

Producer不在对数据进行任何形式的过滤。数据是否发送,发送那些数据,需要在canal instance的配置文件中配置。Producer只负责将接收到canal producer发送过来的数据进行解析和格式化。这样虽然会增加占用带宽和存储资源。但可以保证数据在不同的应用中使用。

2、发送时的数据格式

{

table:database.tableName, 

binlog_id:””,

event_type:”insert/update/delete”

binlog_id:””,

exe_time:””,

cur_time:””,

cols:[{col:columnName,Val:value,type:columnType},{…},...]}

例如: 同步jeehe_goods_info表中的数据如下,将得到如下消息

{“table”:”yzbmp.jehhe_goods_info”, “binlog”:”23455234234”,

“event_type”:”INSERT/UPDATE/DELETE”,

“exe_time”:””,

“cur_time”:””,

 “cols”[

{“col”:“order_id”,”val”:”20014587”,”type”:”double”,”update”:”true/flase”,},

{“col”:“user_id”,”val”:”123456”,”type”:”varchar”}

]}

其中

binlog_id 记录获取的binlog ID,用于核对数据,

event_type 当前数据操作类型。插入/修改/删除

exe_time:binlog生成时间

cur_time:canal获取binlog时间

table标签值为当前行所在的表名(数据库.表名)

cols:将列作为数组传输。

col:列名称

val:当前列的值

type:当前列的数据类型,为当前数据库规定的类型,比如mysql的varchar。

* 1、log日志中记录下当前批次,数据获取获取时间和当前处理时间,用于统计数据延迟和数据处理情况。

 2、记录下数据binlog信息,并在consumer端同样记录,用于核实数据丢失情况。

目前,consumer在解析数据时,首先查找table标签,发现table标签后,再做进一步解析,如果没有发现table标签,丢弃该条消息。

2.2、消费端设计(consumer)

此消费端主要是从kafka中获取数据,将该数据存入到hbase中

2.2.1、类关系图

1572430199132688.png


类 ClusterKafkaClient

消费端启动类。调用kafka consumer启动程序

类KafkaConsumerController

消费端启动类,启动时负责初始化数据。

调用kafkaconsumer消费端,周期性(暂定30秒)接收producer发送的数据。

调用格式化,格式化数据。

调用Hbase控制类,实例化数据

类 HBaseController

调用HBaseConfig,

获取rowMeta数据,以row为单位,持久化数据。

鉴于线上采用HBase v1.0 版本,目前,程序主要使用V1.0 版本的API。

类YZHBaseTransferConfig

该类主要用于记录mysql数据同步至hbase时的对应关系:

创建对象时,连接一次数据库,并初始化数据。

依据数据库对应关系表,将数据实例化为两个对象,分别为SynColumn,SyTable。

同步时主要分为全表同步和部分同步。全表同步时,所有列都会同步至hbase中,部分同步时,只同步指定列

类YZHBaseTransfer

该类主要负责格式化数据。将kafka接收的消息按照同步对应关系要求。进行格式化,将数据存入rowMeta实例中。

类  SynColumn、 SynTable

数据库中数据同步至hbase时的字段对应关系,分别与yzdc_sync_table_mapping和yzdc_sync_column_mapping表相对应。

类 ColumnMeta、RowMeta

存入hbase数据库的数据对象

2.2.2、可靠性设计 

在kafka中,由于每台partition需要对应一台 consumer client。目前环境做如下配置:

2台broker(server).

topic的partition设置为3。这样就可以设置3台 consumer client

2.2.3、数据准确性保证

由于kafka消息传播再多个partition之间是无序的。

Hbase写入时必须设置合适的key,在出现故障时可以将数据冗余覆盖

kafka的offset修改为手动提交,保证HBase写入后再关提交offset。

2.3、数据库设计

数据库主要表结构设计

2.3.1数据表对应关系表 yzdc_sync_table_mapping

字段名称

字段类型

注释

tb_id

int

主键,自增

orig_tb

VARCHAR

原始表名称

dest_tb

VARCHAR

目标表名称

key_col

VARCHAR

作为rowkey的列

default_family

VARCHAR

默认family

syn_type

ENUM

数据同步类型,分为all和part。all表示全表同步,part表示仅同步表的一部分

is_delete

TINYINT

是否删除

update_time

long

记录当前数据修改时间

2.3.2数据列对应关系表 yzdc_sync_column_mapping

字段名称

字段类型

注释

col_id

int

主键、自增

orig_clo

VARCHAR

原始列名称

dest_qualifier

VARCHAR

对应的目标qualifier

dest_family

VARCHAR

对应的目标family

tb_id

int

对应的表主键

update_time

long

修改时间

is_delete

boolean

是否删除

 附录一、kafka数据无序性解决方案

为了解决数据负载均衡,通常情况下会为kafkatopic设置多个partition。便于多consumer接收数据,这样便会引起数据时序性问题。

例如: 首先修改A A1,修改结果发送至partition 1.

再次修改A1 A2,修改结果发送至partition 2

而客户端再接收数据时,针对不同的partition并没有时序性,很有可能会先接收partition 2 的数据,将结果存为A2,然后又接收到partition 1 数据,将结果再次修改为A1,这样的结果和实际结果不符。