环境信息

  • docker
  • mysql 8.0.25
  • canal 1.1.5
  • canal-admin 1.1.5
  • canal-adapter 1.1.5
  • elasticsearch 7.3.0

环境搭建

mysql

修改 my.ini,开启 binlog

1
2
3
4
[mysqld]
log-bin=mysql-bin
binlog-format=ROW
server-id=1

使用 docker-compose 启动 mysql

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
version: "3"

services:
db:
container_name: mysql8
image: mysql:8.0.25
command: mysqld --default-authentication-plugin=mysql_native_password --character-set-server=utf8mb4 --collation-server=utf8mb4_unicode_ci
environment:
MYSQL_ROOT_PASSWORD: root
MYSQL_DATABASE: user
MYSQL_ALLOW_EMPTY_PASSWORD: "yes"
ports:
- "3306:3306"
volumes:
- "./data:/var/lib/mysql"
- ".my.cnf:/etc/mysql/conf.d/my.cnf"

elasticsearch

使用 docker-compose 搭建一个单机版的 es,并且启动 kibana 控制台方便一会操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
version: "3"
services:
node01:
image: docker.elastic.co/elasticsearch/elasticsearch:7.3.0
container_name: node01
environment:
- node.name=node01
- cluster.name=es-cluster-7
- discovery.type=single-node
- "ES_JAVA_OPTS=-Xms1024m -Xmx1024m"
ulimits:
memlock:
soft: -1
hard: -1
volumes:
- es-data01:/usr/share/elasticsearch/data
ports:
- 9200:9200
networks:
- es-network

kibana:
image: docker.elastic.co/kibana/kibana:7.3.0
environment:
ELASTICSEARCH_HOSTS: http://node01:9200
ports:
- 5601:5601
networks:
- es-network
depends_on:
- node01

volumes:
es-data01:
driver: local

networks:
es-network:
driver: bridge

kibana.yml

1
2
elasticsearch.url: "http://es:9200"
server.host: "0.0.0.0"

canal-admin

下载 canal-admin 的运行脚本

1
wget https://raw.githubusercontent.com/alibaba/canal/master/docker/run_admin.sh

执行 canal_manager.sql 初始化 canal-admin 的数据表
启动 canal-admin

1
2
3
4
5
6
7
sh run_admin.sh -e server.port=8089 \
-e canal.adminUser=admin \
-e canal.adminPasswd=123456 \
-e spring.datasource.address=docker.for.mac.host.internal:3306 \
-e spring.datasource.database=canal_manager \
-e spring.datasource.username=root \
-e spring.datasource.password=root

访问http://localhost:8089/ 密码是 admin/123456

canal

创建一个专用的 canal 账号

1
2
3
CREATE USER canal IDENTIFIED BY 'canal';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
FLUSH PRIVILEGES;

下载 canal 启动脚本

1
wget https://raw.githubusercontent.com/alibaba/canal/master/docker/run.sh

运行 canal

1
2
3
4
5
6
sh run.sh -e canal.admin.manager=docker.for.mac.host.internal:8089 \
-e canal.admin.port=11110 \
-e canal.port=11111 \
-e canal.metrics.pull.port=11122 \
-e canal.admin.user=admin \
-e canal.admin.passwd=6BB4837EB74329105EE4568DDA7DC67ED2CA2AD9

canal 启动成功之后登陆 canal-admin 创建一个 example 的 instance 一会会用到
一个 instance 对应一个 mysql 实例

canal-adapter

canal-adapter 官方没有提供镜像,这里我们自己做一个。
Dockerfile

1
2
3
4
5
6
7
FROM openjdk:8-jre-alpine
RUN echo "Asia/Shanghai" > /etc/timezone
ADD canal.adapter-1.1.5.tar.gz /opt/canal/adapter/

WORKDIR /opt/canal/adapter

ENTRYPOINT ["sh", "-c", "sh bin/startup.sh && tail -F logs/adapter/adapter.log"]

这里不从官方直接下载 1.1.5 的包。而是自己打一个,因为官方的 1.1.5 的包有问题,直到写这篇文章的时候还没解决。参照https://github.com/alibaba/canal/issues/3144自己打一个
docker-compose.yml

1
2
3
4
5
6
version: '3'
services:
adapter:
build: .
volumes:
- "./conf:/opt/canal/adapter/conf"

修改配置文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
server:
port: 8081
spring:
jackson:
date-format: yyyy-MM-dd HH:mm:ss
time-zone: GMT+8
default-property-inclusion: non_null

canal.conf:
mode: tcp #tcp kafka rocketMQ rabbitMQ
flatMessage: true
zookeeperHosts:
syncBatchSize: 1000
retries: 0
timeout:
accessKey:
secretKey:
consumerProperties:
# canal tcp consumer
canal.tcp.server.host: docker.for.mac.host.internal:11111
canal.tcp.zookeeper.hosts:
canal.tcp.batch.size: 500
canal.tcp.username:
canal.tcp.password:

srcDataSources:
defaultDS:
url: jdbc:mysql://docker.for.mac.host.internal:3306/test?useUnicode=true
username: canal
password: canal
canalAdapters:
- instance: example # canal instance Name or mq topic name
groups:
- groupId: g1
outerAdapters:
- name: logger
- name: es7
hosts: docker.for.mac.host.internal:9200 # 127.0.0.1:9200 for rest mode
properties:
mode: rest
cluster.name: elasticsearch

启动 canal-adapter

1
docker-compose up -d

数据同步

创建需要同步的表

1
2
3
4
5
6
7
8
CREATE TABLE `product` (
`id` bigint NOT NULL AUTO_INCREMENT,
`title` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci DEFAULT NULL,
`sub_title` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci DEFAULT NULL,
`price` decimal(10,2) DEFAULT NULL,
`pic` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci DEFAULT NULL,
PRIMARY KEY (`id`) USING BTREE
) ENGINE=InnoDB;

使用 kibana 创建索引

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
PUT product
{
"mappings": {
"properties": {
"title": {
"type": "text"
},
"sub_title": {
"type": "text"
},
"pic": {
"type": "text"
},
"price": {
"type": "double"
}
}
}
}

修改es7/product.yml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
dataSourceKey: defaultDS
destination: example
groupId: g1
esMapping:
_index: product
_id: _id
sql: "SELECT
p.id AS _id,
p.title,
p.sub_title,
p.price,
p.pic
FROM
product p"
etlCondition: "1=1"
commitBatch: 3000

重启 canal-adapter
使用 mysql 插入一条数据

1
INSERT INTO product ( id, title, sub_title, price, pic ) VALUES ( 1, 'iphone12', 'iphone', 6999.00, NULL );

查询 es

1
GET product/_search

查询结果,可以看到已经同步成功了