普遍的大数据抽数的方式都是从散落在各个地方的 MySQL 数据库中开账号,然后把数据同步过来。
这不,大数据的同事提出了一个需求,想把某个库中的某个表的数据单独同步到大数据的统一库中的某个表中。
研究了一个星期,canal 的配置对运维来说非常的不友好,除非用它那个 admin 的 dashboard,但是运维通常是 cli 界面,谁没事装个 dashboard 来配置呢,太 low 了。
用的是 canal 1.1.5 的版本,不能升级,升级后 java 不对了。还没有到用 1.1.6 的时候!
背景:
1源数据库:10.8.2.12
2库:xxl_job
3表:demotable666
4
5目的数据库:10.8.2.17
6库:xxl_job_bak
7表:demotable666
而且不用任何的Zookeeper、Kafka、RabbitMQ的中间件,越简洁越好。
一、安装canal
1wget https://github.com/alibaba/canal/releases/download/canal-1.1.5/canal.deployer-1.1.5.tar.gz
2wget https://github.com/alibaba/canal/releases/download/canal-1.1.5/canal.adapter-1.1.5.tar.gz
3
4#都装到 /app 目录下
5mkdir /app
6mkdir /app/deploy
7mkdir /app/adapter
8
9cd /app/deploy
10tar zxvf canal.deployer-1.1.5.tar.gz
11
12cd /app/adapter
13tar zxvf canal.adapter-1.1.5.tar.gz
二、MySQL源准备
1# /etc/my.cnf 加入以下几项并重启
2[mysqld]
3log-bin=mysql-bin # Start log bin.
4binlog_format=ROW # Log format.
5server-id=1 # Server id,different from slave.
授权binlog同步:
1# 创建用户授权
2mysql> CREATE USER canal IDENTIFIED BY 'canal';
3mysql> GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'10.8.2.17' identified by "xxxxxx";
4mysql> FLUSH PRIVILEGES;
5# 查看授权
6mysql> show grants for 'canal';
三、配置deploy
其实关键就是两个配置
1、canal.properties
1mkdir /app/deploy/conf/10.8.2.12
2cp /app/deploy/conf/example/instance.properties /app/deploy/conf/10.8.2.12
3
4vi /app/deploy/conf/canal.properties
5
6#就改两个地方
7canal.destinations = 10.8.2.12
8canal.auto.scan = false
注意,上面一定要把 canal.auto.scan 修改成 false,否则它会自动扫描 conf 目录下的所有配置,这样 example 子目录无论是否有用到,配置都会被扫进去。这不是神经病么,典型的 java 程序猿自以为是的配置。
2、instance.properties 的配置
1vi /app/deploy/conf/10.8.2.12/instance.properties
2
3# instance.properties 是拷贝example目录中的
4# 改动
5
6canal.instance.master.address=10.8.2.12:3306
7canal.instance.tsdb.enable=false
8
9# username/password
10canal.instance.dbUsername=canal
11canal.instance.dbPassword=xxxxxx
12
13# table regex
14#1. 所有表:.* or .*\\..*
15#2. canal schema下所有表: canal\\..*
16#3. canal下的以canal打头的表:canal\\.canal.*
17#4. canal schema下的一张表:canal\\.test1
18#5. 多个规则组合使用:canal\\..*,mysql.test1,mysql.test2 (逗号分隔)
19#canal.instance.filter.regex=.*\\..*
20canal.instance.filter.regex=xxl_job\\..*
上面其实就是设置了 canal 去伪装成一个 slave,读取 binlog 日志,并且过滤只读取 xxl_job 的日志。
启动 deploy
1cd /app/deploy/bin
2./startup.sh
看看日志无异常即可:
四、准备好目的数据库的库表结构
源数据库已经有 demotable666 的表了,我们需要在目的数据库事先准备好 demotable666
1# 在 10.8.2.17 的 xxl_job_bak 库上执行
2mysql> use xxl_job_bak;
3mysql> create table demotable666(
4 UserId int NOT NULL AUTO_INCREMENT PRIMARY KEY,
5 UserName varchar(100),
6 UserLoginDate date NOT NULL
7);
8Query OK, 0 rows affected (0.04 sec)
五、设置adapter
1、application.yml
首先给 canal 的机器对目的库赋 mysql 权
1mysql> grant all privileges on xxl_job_bak.* to sync_17@'%' identified by "xxxxxx";
2mysql> flush privileges;
然后编辑application.yml
1vi /app/adapter/conf/application.yml
2
3#注释掉srcDataSources,mysql 同步到 mysql的场景中完全用不到这个,es要用到
4# srcDataSources:
5# defaultDS:
6# url: jdbc:mysql://127.0.0.1:3306/mytest?useUnicode=true
7# username: root
8# password: 121212
9
10 canalAdapters:
11 - instance: 10.8.2.12 # canal instance Name or mq topic name
12 groups:
13 - groupId: g1
14 outerAdapters:
15 - name: logger
16 - name: rdb
17 key: mysqlbak1
18 properties:
19 jdbc.driverClassName: com.mysql.jdbc.Driver
20 jdbc.url: jdbc:mysql://10.8.2.17:3306/xxl_job_bak?useUnicode=true
21 jdbc.username: sync_17
22 jdbc.password: xxxxxx
注意上面配的instance,这个跟deploy里面的instance没有半毛钱关系,这个是为了rdb里面的配置参考用的。
2、编辑表同步配置文件mysqlbak1.yml
注意上面的jdbc.url指定了库是xxl_job_bak,所以下面配置的 targetTable 就是单独一个 demotable666,很多文章里写库名.表名, xxl_job_bak.demotable666,八戒是没这样成功。
1#用不到的挪成备份
2mv /app/adapter/conf/rdb/mytest_user.yml /app/adapter/conf/rdb/mytest_user.bak
3
4vi /app/adapter/conf/rdb/mysqlbak1.yml
5#dataSourceKey: mysqlbakDS1
6# cannal的instance或者MQ的topic
7destination: 10.8.2.12
8groupId: g1
9outerAdapterKey: mysqlbak1
10concurrent: true
11
12dbMapping:
13 database: xxl_job
14 table: demotable666
15 targetTable: demotable666
16 targetPk:
17 UserId: UserId
18# mapAll: true
19 targetColumns:
20 UserId:
21 UserName:
22 UserLoginDate:
23# etlCondition: "where c_time>={}"
24 commitBatch: 1 # 批量提交的大小
启动 adapter:
1cd /app/adapter/bin
2./startup.sh
这样就好了。
在10.8.2.12的源数据库上插入数据:
1insert into demotable666(UserName,UserLoginDate) values('john',DATE(NOW()));
在 10.8.2.17 可以看到新数据自动过来了,这样就搞定了。
六、拓展整库同步
上面我们实现了表同步,下面拓展说一下如何实现整库的同步:
1、设置application.yml,rdb增加一个key: mysqlbak2,jdbc.url 连接串不设置库名
1 canalAdapters:
2 - instance: 10.8.2.12 # canal instance Name or mq topic name
3 groups:
4 - groupId: g1
5 outerAdapters:
6 - name: logger
7 - name: rdb
8 key: mysqlbak1
9 properties:
10 jdbc.driverClassName: com.mysql.jdbc.Driver
11 jdbc.url: jdbc:mysql://10.8.2.17:3306/xxl_job_bak?useUnicode=true
12 jdbc.username: sync_17
13 jdbc.password: xxxxxx
14 - name: rdb
15 key: mysqlbak2
16 properties:
17 jdbc.driverClassName: com.mysql.jdbc.Driver
18 jdbc.url: jdbc:mysql://10.8.2.17:3306/?useUnicode=true
19 jdbc.username: sync_17
20 jdbc.password: xxxxxx
2、编辑表同步配置文件mysqlbak2.yml
注意,里面其实只有一个配置起作用 database,而且目的库中必须事先建立好xxl_job的库表结构,库名也必须和源中库名完全一致,举例来说,从 xxl_job 只能同步到 xxl_job,不能从xxl_job同步到xxl_jobbak中。
1#dataSourceKey: defaultDS
2# cannal的instance或者MQ的topic
3destination: 10.8.2.12
4groupId: g1
5outerAdapterKey: mysqlbak2
6concurrent: true
7
8dbMapping:
9 mirrorDb: true
10 database: xxl_job
然后我们看 10.8.2.17 上面 xxl_job 库里的数据就会增量开始跟源库同步了。