Debezium安装部署

2021-08-11

实时同步MySQL到Kafka的环境搭建

Debezium是用于捕获变更数据的开源分布式平台。可以响应数据库的所有插入,更新和删除操作。

本文主要讲在kafka confluent的基础上如何使用debezium插件获取mysql binlog数据事件完成实时数据流,debezium是以插件的方式配合confluent使用。

Kafka connect是Confluent公司(当时开发出Apache Kafka的核心团队成员出来创立的新公司)开发的confluent platform的核心功能. 大家都知道现在数据的ETL过程经常会选择kafka作为消息中间件应用在离线和实时的使用场景中,而kafka的数据上游和下游一直没有一个无缝衔接的pipeline来实现统一,比如会选择flume或者logstash采集数据到kafka,然后kafka又通过其他方式pull或者push数据到目标存储. 而kafka connect旨在围绕kafka构建一个可伸缩的,可靠的数据流通道,通过kafka connect可以快速实现大量数据进出kafka从而和其他源数据源或者目标数据源进行交互构造一个低延迟的数据pipeline。具体官网文档www.confluent.io/.

虽然kafka confluent提供了JDBC Connector使用JDBC的方式去获取数据源,这种方式kafka connector追踪每个表中检索到的组继续记录,可以在下一次迭代或者崩溃的情况下寻找到正确的位置,这里存在几种实现模式,具体可以参考官网说明JDBC Source Connector

这里推荐使用debezium,这种方式基于MySQL binlog的特性

一、安装zookeeper和kafka

其他准备:

1-Kafka : kafka_2.11-2.0.0.tgz 2-Confluent : confluent-oss-5.0.0-2.11.tar.gz

下载地址: www.confluent.io/download/

3-Debezium : debezium-connector-mysql-0.8.1.Final-plugin.tar.gz

文档说明: https://debezium.io/documentation/reference/1.6/connectors/mysql.html

下载地址: https://repo1.maven.org/maven2/io/debezium/debezium-connector-mysql/0.8.1.Final/debezium-connector-mysql-0.8.1.Final-plugin.tar.gz

二、安装Kafka Confluent

confluent的安装部署相对比较简单,confluent为我们提供了Confluent Platform,我们即可以快速启动整个confluent平台,也可以单独启动想要的组件。接下来我们详细介绍如何操作。

1)解压

tar -zxvf confluent-5.0.0-2.11.tar.gz后可以看到文件列表

 bin 		//Driver scripts for starting and stopping services
 etc 		//Configuration files
 lib 		//Systemd services
 logs 	//Log files
 share 	//Jars and licenses
 src 		//Source files that require a platform-dependent build

首先看看如何快速启动confluent platform全家桶:

ZooKeeper,Kafka,Schema Registry,Control Center,Kafka Connect,Kafka REST Proxy,KSQL

2)快速启动platform

confluent platform分两个版本Confluent EnterpriseConfluent Open Source

Confluent Enterprise拥有更多的组件,这里测试选择Confluent Enterprise启动,因为它里面包含了Control Center方便我们测试,直观的从浏览器上看到数据信息。

启动

(特别说明我们的命令执行目录都是在confluent目录下,如我的目录/Users/mo/runtime/confluent-5.0.0.2)

./bin/confluent start

看到如下信息,说明我们的confluent platform中的多个组件都启动成功。

  Starting zookeeper
  zookeeper is [UP]
  Starting kafka
  kafka is [UP]
  Starting schema-registry
  schema-registry is [UP]
  Starting kafka-rest
  kafka-rest is [UP]
  Starting connect
  connect is [UP]
  Starting ksql-server
  ksql-server is [UP]
  Starting control-center
  control-center is [UP]

访问测试: 通过使用http://localhost:9021来访问Control Center,如图。

3) 自定义启动

这里我们使用两台机器模拟集群192.168.226.184,192.168.226.27,’192.168.226.189’分别编排为host1,host2,host3。修改三台机器对应的hosts文件。分别添加如下配置

0.0.0.0 localhost  host1
192.168.226.184 localhost  host2
192.168.226.189  localhost host3

192.168.226.27  localhost host1
0.0.0.0   localhost  host2
192.168.226.189  localhost  host3

192.168.226.27  localhost  host1
192.168.226.189  localhost   host2
0.0.0.0  localhost  host3

分别为每台机器创建myid文件,每个myid保存要给唯一的数字即可,我这里三个host分别指定为1,2,3。

sudo mkdir /var/lib/zookeeper
sudo vi /var/lib/zookeeper/myid

每台机器分别指定如下配置

zookeeper配置和启动

  • vi etc/kafka/zookeeper.properties添加如下配置
  tickTime=2000
  dataDir=/var/lib/zookeeper/
  clientPort=2181
  initLimit=5
  syncLimit=2
  server.1=host1:2888:3888
  server.2=host2:2888:3888
  server.3=host3:2888:3888
  autopurge.snapRetainCount=3
  autopurge.purgeInterval=24

启动

 ./bin/zookeeper-server-start etc/kafka/zookeeper.properties

kafka配置和启动

修改配置vi etc/kafka/server.properties

zookeeper.connect=host1:2181,host2:2181,host3:2181

设置broker.id=0,这里我们可以使用broker.id.generation.enable=true自动生成替代

#broker.id=0
broker.id.generation.enable=true
advertised.listeners=PLAINTEXT://本机IP:9092

启动

./bin/kafka-server-start  etc/kafka/server.properties

Schema Registry配置和启动(可选)

配置vi etc/schema-registry/schema-registry.properties

kafkastore.connection.url=host1:2181,host2:2181,host3:2181

启动

./bin/schema-registry-start etc/schema-registry/schema-registry.properties

kafka connect配置和启动

这里我们不使用官方模式的avro序列化方式,所有不启动组件schema-registry

配置

cp etc/schema-registry/connect-avro-distributed.properties  etc/schema-registry/connect-distributed.properties

修改vi etc/schema-registry/connect-distributed.properties

bootstrap.servers=host1:9092,host2:9092,host3:9092
key.converter=org.apache.kafka.connect.json.JsonConverter
#key.converter.schema.registry.url=http://192.168.226.184:8081
value.converter=org.apache.kafka.connect.json.JsonConverter
#value.converter.schema.registry.url=http://192.168.226.184:8081

启动

./bin/connect-distributed etc/schema-registry/connect-distributed.properties

Control Center配置和启动

配置vi etc/confluent-control-center/control-center-dev.properties

bootstrap.servers=host1:9092,host2:9092,host3:9092
zookeeper.connect=host1:2181,host2:2181:host3:2181
#confluent.controlcenter.schema.registry.url=http://host1:8081,http://host2:8081,http://host3:8081
confluent.controlcenter.connect.cluster=http://192.168.222.54:8083

启动

./bin/control-center-start etc/confluent-control-center/control-center-dev.properties 

到此为止kafka connect集群搭建成功。

安装debezium插件

debezium-connector-mysql的压缩包解压,放到Confluent的解压后的插件目录(share/java)中: 解压命令 : tar -xzf debezium-connector-mysql-0.8.1.Final-plugin.tar.gz

再次启动confluent即可

注意:

使用debezium之前必须先开启mysql得binlog

不同的jar包放在插件目录不同的文件夹下,可以防止jar包冲突;

每台Kafka-connect-worker机器上的Confluent插件目录下,都要有插件文件夹(因为connector提交到一个分布式的worker集群后,不一定在哪台worker上调度运行)

由于需要用Avro格式的kafka消息和分布式的kafka connect,因此需要修改Confluent的schema-registry下的配置:目录 : confluent-5.0.0/etc/schema-registry

需要配置的是connect-avro-distributed.propertiesschema-registry.properties

schema-registry.properties修改配置 :

# 0.0.0.0 代表所有的网卡地址
# 默认8081端口,但8081端口被占用了
listeners = http://0.0.0.0:18081
# zookeeper集群信息
kafkastore.connection.url = ip1:2182, ip2:2182, ip3:2182

connect-avro-distributed.properties修改配置 :

# 其实就是配置了kafka集群
bootstrap.servers = ip1:9092, ip2:9092, ip3:9092

三、编写mysql-connector的配置信息

1)创建一个目录

用于存放配置信息(connector配置信息只要放在一台机器上就行了) : 目录 : /etc 创建文件夹命令 : mkdir kafka-connect-debezium

2)编写test.json

存放connector的配置信息 :

# debezium-mysql-connector配置如下:
{ 
	"name" : "debezium-mysql-source-3308",
	"config":
	{
	     "connector.class" : "io.debezium.connector.mysql.MySqlConnector",
	     "database.hostname" : "mysql的IP地址",
	     "database.port" : "mysql的端口号",
	     "database.user" : "mysql的用户名",
	     "database.password" : "mysql用户对应的密码",
	     "database.server.id" : "184000",
	     "database.server.name" : "mysql服务的逻辑名,例如fullfillment",
	     "database.history.kafka.bootstrap.servers" : "ip1:9092,ip2:9092,ip3:9092",
	     "database.history.kafka.topic" : "dbhistory.fullfillment" ,
	     "include.schema.changes" : "true" ,
	     "mode" : "incrementing",
	     "incrementing.column.name" : "id",
	     "database.history.skip.unparseable.ddl" : "true"
	}
}

详情见debezium-mysql官网 : https://debezium.io/docs/connectors/mysql/

另外confluent提供了restful api可快速创建kafka connect。

curl -i -X POST -H "Accept:application/json" -H  "Content-Type:application/json" http://localhost:8083/connectors/  -d '
{
  "name": "mysql-connector",
  "config": {
    "connector.class": "io.debezium.connector.mysql.MySqlConnector",
    "database.hostname": "localhost",
    "database.port": "3306",
    "database.user": "root",
    "database.password": "debezium",
    "database.server.id": "1",
    "database.server.name": "dbserver1",
    "database.history.kafka.bootstrap.servers": "localhost:9092",
    "database.history.kafka.topic": "dbhistory.inventory",
    "include.schema.changes": "true"
  }
}

这里的脚本其实是一行,我为了方便查看展开了json。

也可以通过curl -H "Accept:application/json" localhost:8083/查看已创建成功的connect

四、验证

debezium会读取MySQL binlog产生数据改变事件,将事件发送到kafka队列,最简单的验证办法就是监听这些队列(这些队列按照表名区分)

1)观察数据库的inventory.customers表,监听dbserver1.inventory.customers队列。

2)将customers表id为1004的email字段内容update

3)应用消费者会立马收到一条消费消息。