实时同步MySQL到Kafka的环境搭建
Debezium是用于捕获变更数据的开源分布式平台。可以响应数据库的所有插入,更新和删除操作。
本文主要讲在kafka confluent的基础上如何使用debezium插件获取mysql binlog数据事件完成实时数据流,debezium是以插件的方式配合confluent使用。
Kafka connect是Confluent公司(当时开发出Apache Kafka的核心团队成员出来创立的新公司)开发的confluent platform的核心功能. 大家都知道现在数据的ETL过程经常会选择kafka作为消息中间件应用在离线和实时的使用场景中,而kafka的数据上游和下游一直没有一个无缝衔接的pipeline来实现统一,比如会选择flume或者logstash采集数据到kafka,然后kafka又通过其他方式pull或者push数据到目标存储. 而kafka connect旨在围绕kafka构建一个可伸缩的,可靠的数据流通道,通过kafka connect可以快速实现大量数据进出kafka从而和其他源数据源或者目标数据源进行交互构造一个低延迟的数据pipeline。具体官网文档www.confluent.io/.
虽然kafka confluent提供了JDBC Connector
使用JDBC的方式去获取数据源,这种方式kafka connector追踪每个表中检索到的组继续记录,可以在下一次迭代或者崩溃的情况下寻找到正确的位置,这里存在几种实现模式,具体可以参考官网说明JDBC Source Connector。
这里推荐使用debezium,这种方式基于MySQL binlog的特性
一、安装zookeeper和kafka
略
其他准备:
1-Kafka
: kafka_2.11-2.0.0.tgz
2-Confluent
: confluent-oss-5.0.0-2.11.tar.gz
下载地址: www.confluent.io/download/
3-Debezium
: debezium-connector-mysql-0.8.1.Final-plugin.tar.gz
文档说明: https://debezium.io/documentation/reference/1.6/connectors/mysql.html
下载地址: https://repo1.maven.org/maven2/io/debezium/debezium-connector-mysql/0.8.1.Final/debezium-connector-mysql-0.8.1.Final-plugin.tar.gz
二、安装Kafka Confluent
confluent的安装部署相对比较简单,confluent为我们提供了Confluent Platform,我们即可以快速启动整个confluent平台,也可以单独启动想要的组件。接下来我们详细介绍如何操作。
1)解压
tar -zxvf confluent-5.0.0-2.11.tar.gz后可以看到文件列表
bin //Driver scripts for starting and stopping services
etc //Configuration files
lib //Systemd services
logs //Log files
share //Jars and licenses
src //Source files that require a platform-dependent build
首先看看如何快速启动confluent platform全家桶:
ZooKeeper
,Kafka
,Schema Registry
,Control Center
,Kafka Connect
,Kafka REST Proxy
,KSQL
。
2)快速启动platform
confluent platform分两个版本Confluent Enterprise
和Confluent Open Source
。
Confluent Enterprise
拥有更多的组件,这里测试选择Confluent Enterprise
启动,因为它里面包含了Control Center
方便我们测试,直观的从浏览器上看到数据信息。
启动
(特别说明我们的命令执行目录都是在confluent目录下,如我的目录/Users/mo/runtime/confluent-5.0.0.2
)
./bin/confluent start
看到如下信息,说明我们的confluent platform中的多个组件都启动成功。
Starting zookeeper
zookeeper is [UP]
Starting kafka
kafka is [UP]
Starting schema-registry
schema-registry is [UP]
Starting kafka-rest
kafka-rest is [UP]
Starting connect
connect is [UP]
Starting ksql-server
ksql-server is [UP]
Starting control-center
control-center is [UP]
访问测试: 通过使用http://localhost:9021
来访问Control Center
,如图。
3) 自定义启动
这里我们使用两台机器模拟集群192.168.226.184
,192.168.226.27
,’192.168.226.189’分别编排为host1
,host2
,host3
。修改三台机器对应的hosts文件。分别添加如下配置
0.0.0.0 localhost host1
192.168.226.184 localhost host2
192.168.226.189 localhost host3
192.168.226.27 localhost host1
0.0.0.0 localhost host2
192.168.226.189 localhost host3
192.168.226.27 localhost host1
192.168.226.189 localhost host2
0.0.0.0 localhost host3
分别为每台机器创建myid
文件,每个myid保存要给唯一的数字即可,我这里三个host分别指定为1,2,3。
sudo mkdir /var/lib/zookeeper
sudo vi /var/lib/zookeeper/myid
每台机器分别指定如下配置
zookeeper配置和启动
vi etc/kafka/zookeeper.properties
添加如下配置
tickTime=2000
dataDir=/var/lib/zookeeper/
clientPort=2181
initLimit=5
syncLimit=2
server.1=host1:2888:3888
server.2=host2:2888:3888
server.3=host3:2888:3888
autopurge.snapRetainCount=3
autopurge.purgeInterval=24
启动
./bin/zookeeper-server-start etc/kafka/zookeeper.properties
kafka配置和启动
修改配置vi etc/kafka/server.properties
zookeeper.connect=host1:2181,host2:2181,host3:2181
设置broker.id=0
,这里我们可以使用broker.id.generation.enable=true
自动生成替代
#broker.id=0
broker.id.generation.enable=true
advertised.listeners=PLAINTEXT://本机IP:9092
启动
./bin/kafka-server-start etc/kafka/server.properties
Schema Registry配置和启动(可选)
配置vi etc/schema-registry/schema-registry.properties
kafkastore.connection.url=host1:2181,host2:2181,host3:2181
启动
./bin/schema-registry-start etc/schema-registry/schema-registry.properties
kafka connect配置和启动
这里我们不使用官方模式的avro
序列化方式,所有不启动组件schema-registry
。
配置
cp etc/schema-registry/connect-avro-distributed.properties etc/schema-registry/connect-distributed.properties
修改vi etc/schema-registry/connect-distributed.properties
bootstrap.servers=host1:9092,host2:9092,host3:9092
key.converter=org.apache.kafka.connect.json.JsonConverter
#key.converter.schema.registry.url=http://192.168.226.184:8081
value.converter=org.apache.kafka.connect.json.JsonConverter
#value.converter.schema.registry.url=http://192.168.226.184:8081
启动
./bin/connect-distributed etc/schema-registry/connect-distributed.properties
Control Center配置和启动
配置vi etc/confluent-control-center/control-center-dev.properties
bootstrap.servers=host1:9092,host2:9092,host3:9092
zookeeper.connect=host1:2181,host2:2181:host3:2181
#confluent.controlcenter.schema.registry.url=http://host1:8081,http://host2:8081,http://host3:8081
confluent.controlcenter.connect.cluster=http://192.168.222.54:8083
启动
./bin/control-center-start etc/confluent-control-center/control-center-dev.properties
到此为止kafka connect集群搭建成功。
安装debezium插件
把debezium-connector-mysql的压缩包解压,放到Confluent的解压后的插件目录(share/java)中: 解压命令 : tar -xzf debezium-connector-mysql-0.8.1.Final-plugin.tar.gz
再次启动confluent即可
注意:
使用debezium之前必须先开启mysql得binlog;
不同的jar包放在插件目录不同的文件夹下,可以防止jar包冲突;
每台Kafka-connect-worker机器上的Confluent插件目录下,都要有插件文件夹(因为connector提交到一个分布式的worker集群后,不一定在哪台worker上调度运行)
由于需要用Avro格式的kafka消息和分布式的kafka connect,因此需要修改Confluent的schema-registry下的配置:目录 : confluent-5.0.0/etc/schema-registry;
需要配置的是connect-avro-distributed.properties 和 schema-registry.properties
schema-registry.properties修改配置 :
# 0.0.0.0 代表所有的网卡地址
# 默认8081端口,但8081端口被占用了
listeners = http://0.0.0.0:18081
# zookeeper集群信息
kafkastore.connection.url = ip1:2182, ip2:2182, ip3:2182
connect-avro-distributed.properties修改配置 :
# 其实就是配置了kafka集群
bootstrap.servers = ip1:9092, ip2:9092, ip3:9092
三、编写mysql-connector的配置信息
1)创建一个目录
用于存放配置信息(connector配置信息只要放在一台机器上就行了) :
目录 : /etc
创建文件夹命令 : mkdir kafka-connect-debezium
2)编写test.json
存放connector的配置信息 :
# debezium-mysql-connector配置如下:
{
"name" : "debezium-mysql-source-3308",
"config":
{
"connector.class" : "io.debezium.connector.mysql.MySqlConnector",
"database.hostname" : "mysql的IP地址",
"database.port" : "mysql的端口号",
"database.user" : "mysql的用户名",
"database.password" : "mysql用户对应的密码",
"database.server.id" : "184000",
"database.server.name" : "mysql服务的逻辑名,例如fullfillment",
"database.history.kafka.bootstrap.servers" : "ip1:9092,ip2:9092,ip3:9092",
"database.history.kafka.topic" : "dbhistory.fullfillment" ,
"include.schema.changes" : "true" ,
"mode" : "incrementing",
"incrementing.column.name" : "id",
"database.history.skip.unparseable.ddl" : "true"
}
}
详情见debezium-mysql官网 : https://debezium.io/docs/connectors/mysql/
另外confluent提供了restful api可快速创建kafka connect。
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/ -d '
{
"name": "mysql-connector",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"database.hostname": "localhost",
"database.port": "3306",
"database.user": "root",
"database.password": "debezium",
"database.server.id": "1",
"database.server.name": "dbserver1",
"database.history.kafka.bootstrap.servers": "localhost:9092",
"database.history.kafka.topic": "dbhistory.inventory",
"include.schema.changes": "true"
}
}
这里的脚本其实是一行,我为了方便查看展开了json。
也可以通过curl -H "Accept:application/json" localhost:8083/
查看已创建成功的connect
四、验证
debezium会读取MySQL binlog产生数据改变事件,将事件发送到kafka队列,最简单的验证办法就是监听这些队列(这些队列按照表名区分)
1)观察数据库的inventory.customers
表,监听dbserver1.inventory.customers
队列。
2)将customers表id为1004的email字段内容update
3)应用消费者会立马收到一条消费消息。