Maxwell是由美国Zendesk公司开源,使用Java编写的MySQL变更数据抓取软件。他会实时监控Mysql数据库的数据变更操作(包括insert、update、delete),并将变更数据以JSON的格式发送给Kafka、Kinesis、RabbitMQ、Redis、Google CloudPub/Sub、文件或其它平台等等流数据处理平台
Maxwell项目官方网站:https://maxwells-daemon.io/ Maxwell项目Github官网:https://github.com/zendesk/maxwell
mysql> update test set name = 'wang111' where id=1;
Query OK, 1 row affected (0.01 sec)
原始SQL转化为json
xxxxxxxxxx
{
"database": "wangtingdb",
"table": "test",
"type": "update",
"ts": 1676444034,
"xid": 2569,
"commit": true,
"data": {
"id": 1,
"name": "wang111"
},
"old": {
"name": "wang"
}
}
字段说明:
database # 变更数据所属的数据库
table # 变更数据所属的表
type # 数据变更类型( insert,update,delete )
ts # 数据变更发生的时间戳( 1676443644 -> 2023-02-15 14:47:24 )
xid # 事务id
commit # 事务提交标志,可用于重新组装事务
data
old # 对于update的类型,表示修改前的数据,仅包含变更字段
Maxwell 的常见应用场景有数仓ETL的数据同步 、维护缓存、收集表级别的dml 指标、增量到搜索引擎、数据分区迁移、切库 binlog 回滚方案等等
Maxwell的实现原理很简单,就是将自己伪装成MySQL的Slave,并遵循Mysql主从复制的协议,从master中同步数据。
实时读取Mysql数据库的二进制日志--Binlog,从中获取变更数据,再将变更数据以Json的格式发送至Kafka等等流处理平台( Kafka并非唯一输出途径 )
MySql二级制日志
xxxxxxxxxx
[wangting@hdt-dmcp-ops05 mysql]$ pwd
/var/lib/mysql
[wangting@hdt-dmcp-ops05 mysql]$ sudo ls -l | grep mysql-bin
-rw-r----- 1 mysql mysql 3040 Feb 15 15:05 mysql-bin.000001
-rw-r----- 1 mysql mysql 19 Feb 15 13:59 mysql-bin.index
Mysql主从复制
Mysql主从复制:就是用来建立一个和主数据库完全一样的数据库环境,这个数据库称为从数据库。
主从复制的应用场景:
Mysql主从复制工作原理
【注意】:
Maxwell-1.30.0及以上的版本不再支持JDK1.8,而JDK1.8支持的最后一个版本为1.29.2
xxxxxxxxxx
# 下载安装包
[wangting@hdt-dmcp-ops05 software]$ wget https://github.com/zendesk/maxwell/releases/download/v1.29.2/maxwell-1.29.2.tar.gz
# 解压包
[wangting@hdt-dmcp-ops05 software]$ tar -xf maxwell-1.29.2.tar.gz -C /opt/module/
[wangting@hdt-dmcp-ops05 software]$ mv /opt/module/maxwell-1.29.2 /opt/module/maxwell
# 目录结构
[wangting@hdt-dmcp-ops05 maxwell]$ ll
total 84
drwxrwxr-x 2 wangting wangting 4096 Feb 15 13:54 bin
-rw-r--r-- 1 wangting wangting 25133 Jan 25 2021 config.md
-rw-r--r-- 1 wangting wangting 11970 Jan 25 2021 config.properties.example
-rw-r--r-- 1 wangting wangting 10259 Apr 23 2020 kinesis-producer-library.properties.example
drwxr-xr-x 3 wangting wangting 12288 Jan 27 2021 lib
-rw-r--r-- 1 wangting wangting 548 Apr 23 2020 LICENSE
-rw-r--r-- 1 wangting wangting 470 Jan 25 2021 log4j2.xml
-rw-r--r-- 1 wangting wangting 3328 Jan 27 2021 quickstart.md
-rw-r--r-- 1 wangting wangting 1429 Jan 27 2021 README.md
x[wangting@hdt-dmcp-ops05 maxwell]$ sudo vim /etc/my.cnf
[mysqld]
server-id=1
log-bin=mysql-bin
binlog_format=row
binlog-do-db=wangtingdb
server-id=1
- 数据库id
log-bin=mysql-bin
- 启动Binlog,该参数的值会作为binlog的文件名前缀
binlog_format=row
- binlog类型,maxwell要求为row类型
binlog-do-db=wangtingdb
- 启动binlog的数据库,需根据实际情况修改配置
xxxxxxxxxx
[wangting@hdt-dmcp-ops05 maxwell]$ sudo systemctl restart mysqld
xxxxxxxxxx
[wangting@hdt-dmcp-ops05 maxwell]$ mysql -uroot -h172.20.12.179 -p
# 创建数据库
mysql> create database maxwell character set utf8mb4;
Query OK, 1 row affected (0.00 sec)
# 创建Maxwell用户
mysql> create user 'maxwell'@'%' identified by 'maxwell';
Query OK, 0 rows affected (0.00 sec)
# 赋予其必要权限
mysql> grant all on maxwell.* to 'maxwell'@'%';
Query OK, 0 rows affected (0.00 sec)
mysql> grant select, replication client, replication slave on *.* to 'maxwell'@'%';
Query OK, 0 rows affected (0.01 sec)
# 刷新配置
mysql> flush privileges;
Query OK, 0 rows affected (0.00 sec)
mysql> quit;
Bye
[wangting@hdt-dmcp-ops05 maxwell]$ mysql -umaxwell -pmaxwell -e "show databases;"
+--------------------+
| Database |
+--------------------+
| information_schema |
| wangtingdb |
| maxwell |
| mysql |
| performance_schema |
| sys |
+--------------------+
xxxxxxxxxx
[wangting@hdt-dmcp-ops05 maxwell]$ mv config.properties.example config.properties
[wangting@hdt-dmcp-ops05 maxwell]$ vim config.properties
# Maxwell数据发送目的地,可选配置有stdout|file|kafka|kinesis|pubsub|sqs|rabbitmq|redis
producer=kafka
# 目标Kafka集群地址
kafka.bootstrap.servers=hdt-dmcp-ops01:9092,hdt-dmcp-ops02:9092,hdt-dmcp-ops03:9092
# 目标Kafka topic,可静态配置,例如:maxwell,也可动态配置,例如:%{database}_%{table}
kafka_topic=maxwell
#MySQL相关配置
host=hdt-dmcp-ops05
user=maxwell
password=maxwell
jdbc_options=useSSL=false&serverTimezone=Asia/Shanghai
注意:若Maxwell发送数据的目的地是kafka集群,需要首先将kafka集群启动
xxxxxxxxxx
# 启动Maxwell
[wangting@hdt-dmcp-ops05 maxwell]$ /opt/module/maxwell/bin/maxwell --config /opt/module/maxwell/config.properties --daemon
# 停止Maxwell
[wangting@hdt-dmcp-ops05 maxwell]$ ps -ef | grep maxwell | grep -v grep | awk '{print $2}' | xargs kill -9
xxxxxxxxxx
[wangting@hdt-dmcp-ops05 bin]$ vim mymaxwell
MAXWELL_HOME=/opt/module/maxwell
status_maxwell(){
result=`ps -ef | grep com.zendesk.maxwell.Maxwell | grep -v grep | wc -l`
return $result
}
start_maxwell(){
status_maxwell
if [[ $? -lt 1 ]]; then
echo "启动Maxwell"
$MAXWELL_HOME/bin/maxwell --config $MAXWELL_HOME/config.properties --daemon
else
echo "Maxwell正在运行"
fi
}
stop_maxwell(){
status_maxwell
if [[ $? -gt 0 ]]; then
echo "停止Maxwell"
ps -ef | grep com.zendesk.maxwell.Maxwell | grep -v grep | awk '{print $2}' | xargs kill -9
else
echo "Maxwell未在运行"
fi
}
case $1 in
start )
start_maxwell
;;
stop )
stop_maxwell
;;
restart )
stop_maxwell
sleep 1
start_maxwell
;;
esac
xxxxxxxxxx
[wangting@hdt-dmcp-ops05 bin]$ chmod +x mymaxwell
[wangting@hdt-dmcp-ops05 bin]$ mymaxwell stop
停止Maxwell
[wangting@hdt-dmcp-ops05 bin]$ mymaxwell start
启动Maxwell
xxxxxxxxxx
[wangting@hdt-dmcp-ops05 maxwell]$ mysql -uroot -p123456
mysql> use wangtingdb;
Database changed
mysql> create table test(id int,name varchar(20));
Query OK, 0 rows affected (0.02 sec)
mysql> insert into test value(1,"wang");
Query OK, 1 row affected (0.01 sec)
mysql> insert into test value(2,"wang2");
Query OK, 1 row affected (0.01 sec)
mysql>
xxxxxxxxxx
[wangting@hdt-dmcp-ops05 maxwell]$ tail -f logs/MaxwellDaemon.out
value.serializer = class org.apache.kafka.common.serialization.StringSerializer
14:39:07,612 INFO AppInfoParser - Kafka version : 1.0.0
14:39:07,612 INFO AppInfoParser - Kafka commitId : aaa7af6d4a11b29d
14:39:07,626 INFO Maxwell - Maxwell v1.29.2 is booting (MaxwellKafkaProducer), starting at Position[BinlogPosition[mysql-bin.000001:977], lastHeartbeat=0]
14:39:07,756 INFO MysqlSavedSchema - Restoring schema id 1 (last modified at Position[BinlogPosition[mysql-bin.000001:977], lastHeartbeat=0])
14:39:07,809 INFO BinlogConnectorReplicator - Setting initial binlog pos to: mysql-bin.000001:977
14:39:07,819 INFO BinaryLogClient - Connected to hdt-dmcp-ops05:3306 at mysql-bin.000001/977 (sid:6379, cid:36)
14:39:07,819 INFO BinlogConnectorReplicator - Binlog connected.
14:44:48,333 INFO AbstractSchemaStore - storing schema @Position[BinlogPosition[mysql-bin.000001:1042], lastHeartbeat=0] after applying "create table test(id int,name varchar(20))" to wangtingdb, new schema id is 2
xxxxxxxxxx
[wangting@hdt-dmcp-ops01 ~]$ kafka-console-wangtingdb.sh --topic maxwell --from-beginning --bootstrap-server hdt-dmcp-ops01:9092,hdt-dmcp-ops02:9092,hdt-dmcp-ops03:9092
{"database":"wangtingdb","table":"test","type":"insert","ts":1676443636,"xid":1687,"commit":true,"data":{"id":1,"name":"wang"}}
{"database":"wangtingdb","table":"test","type":"insert","ts":1676443644,"xid":1709,"commit":true,"data":{"id":2,"name":"wang2"}}
将消息格式化显示便于理解:
xxxxxxxxxx
{
"database": "wangtingdb",
"table": "test",
"type": "insert",
"ts": 1676443636,
"xid": 1687,
"commit": true,
"data": {
"id": 1,
"name": "wang"
}
}
{
"database": "wangtingdb",
"table": "test",
"type": "insert",
"ts": 1676443644,
"xid": 1709,
"commit": true,
"data": {
"id": 2,
"name": "wang2"
}
}
xxxxxxxxxx
mysql> update test set name = 'wang111' where id=1;
Query OK, 1 row affected (0.01 sec)
Rows matched: 1 Changed: 1 Warnings: 0
查看Kafka变化
xxxxxxxxxx
[wangting@hdt-dmcp-ops01 ~]$ kafka-console-wangtingdb.sh --topic maxwell --from-beginning --bootstrap-server hdt-dmcp-ops01:9092,hdt-dmcp-ops02:9092,hdt-dmcp-ops03:9092
{"database":"wangtingdb","table":"test","type":"insert","ts":1676443636,"xid":1687,"commit":true,"data":{"id":1,"name":"wang"}}
{"database":"wangtingdb","table":"test","type":"insert","ts":1676443644,"xid":1709,"commit":true,"data":{"id":2,"name":"wang2"}}
# 新增了如下更新消息
{"database":"wangtingdb","table":"test","type":"update","ts":1676444034,"xid":2569,"commit":true,"data":{"id":1,"name":"wang111"},"old":{"name":"wang"}}
# 格式化如下:
{
"database": "wangtingdb",
"table": "test",
"type": "update",
"ts": 1676444034,
"xid": 2569,
"commit": true,
"data": {
"id": 1,
"name": "wang111"
},
"old": {
"name": "wang"
}
}
xxxxxxxxxx
mysql> delete from test where id = 2;
Query OK, 1 row affected (0.00 sec)
查看Kafka变化
xxxxxxxxxx
[wangting@hdt-dmcp-ops01 ~]$ kafka-console-wangtingdb.sh --topic maxwell --from-beginning --bootstrap-server hdt-dmcp-ops01:9092,hdt-dmcp-ops02:9092,hdt-dmcp-ops03:9092
{"database":"wangtingdb","table":"test","type":"insert","ts":1676443636,"xid":1687,"commit":true,"data":{"id":1,"name":"wang"}}
{"database":"wangtingdb","table":"test","type":"insert","ts":1676443644,"xid":1709,"commit":true,"data":{"id":2,"name":"wang2"}}
{"database":"wangtingdb","table":"test","type":"update","ts":1676444034,"xid":2569,"commit":true,"data":{"id":1,"name":"wang111"},"old":{"name":"wang"}}
# 新增了如下更新消息
{"database":"wangtingdb","table":"test","type":"delete","ts":1676444127,"xid":2777,"commit":true,"data":{"id":2,"name":"wang2"}}
# 格式化如下:
{
"database": "wangtingdb",
"table": "test",
"type": "delete",
"ts": 1676444127,
"xid": 2777,
"commit": true,
"data": {
"id": 2,
"name": "wang2"
}
}
使用 Maxwell-bootstrap 命令
Maxwell提供了bootstrap命令功能来进行历史数据的全量同步,( 但依然前提要运行一个maxwell )
xxxxxxxxxx
mysql> select * from test;
+------+---------+
| id | name |
+------+---------+
| 1 | wang111 |
+------+---------+
1 row in set (0.00 sec)
mysql> insert into test value(2,"wang222");
Query OK, 1 row affected (0.00 sec)
mysql> insert into test value(3,"wang333");
Query OK, 1 row affected (0.00 sec)
mysql> insert into test value(4,"wang444");
Query OK, 1 row affected (0.01 sec)
mysql> select * from test;
+------+---------+
| id | name |
+------+---------+
| 1 | wang111 |
| 2 | wang222 |
| 3 | wang333 |
| 4 | wang444 |
+------+---------+
4 rows in set (0.00 sec)
xxxxxxxxxx
[wangting@hdt-dmcp-ops01 ~]$ kafka-console-wangtingdb.sh --topic maxwell --bootstrap-server hdt-dmcp-ops01:9092,hdt-dmcp-ops02:9092,hdt-dmcp-ops03:9092
# 此时无消息,在等待消费消息中
xxxxxxxxxx
# 查看Maxwell服务是否运行
[wangting@hdt-dmcp-ops05 bin]$ ps -ef | grep com.zendesk.maxwell.Maxwell | grep -v grep
wangting 22431 1 0 15:06 pts/2 00:00:08 /opt/module/java/bin/java -Dfile.encoding=UTF-8 -Dlog4j.shutdownCallbackRegistry=com.djdch.log4j.StaticShutdownCallbackRegistry -cp :/opt/module/maxwell/bin/../lib/*:/opt/module/maxwell/bin/../lib/kafka-clients/kafka-clients-1.0.0.jar com.zendesk.maxwell.Maxwell --config /opt/module/maxwell/config.properties --daemon
#
[wangting@hdt-dmcp-ops05 bin]$ cd /opt/module/maxwell/
[wangting@hdt-dmcp-ops05 maxwell]$ bin/maxwell-bootstrap --database wangtingdb --table test --config config.properties
connecting to jdbc:mysql://hdt-dmcp-ops05:3306/maxwell?allowPublicKeyRetrieval=true&connectTimeout=5000&serverTimezone=Asia%2FShanghai&zeroDateTimeBehavior=convertToNull&useSSL=false
xxxxxxxxxx
[wangting@hdt-dmcp-ops01 ~]$ kafka-console-wangtingdb.sh --topic maxwell --bootstrap-server hdt-dmcp-ops01:9092,hdt-dmcp-ops02:9092,hdt-dmcp-ops03:9092
{"database":"wangtingdb","table":"test","type":"bootstrap-start","ts":1676444947,"data":{}}
{"database":"wangtingdb","table":"test","type":"bootstrap-insert","ts":1676444947,"data":{"id":1,"name":"wang111"}}
{"database":"wangtingdb","table":"test","type":"bootstrap-insert","ts":1676444947,"data":{"id":2,"name":"wang222"}}
{"database":"wangtingdb","table":"test","type":"bootstrap-insert","ts":1676444947,"data":{"id":3,"name":"wang333"}}
{"database":"wangtingdb","table":"test","type":"bootstrap-insert","ts":1676444947,"data":{"id":4,"name":"wang444"}}
{"database":"wangtingdb","table":"test","type":"bootstrap-complete","ts":1676444947,"data":{}}
【注意】:
- 虽然是4条数据但对应了6条消息
- 第一条type为bootstrap-start和最后一条type为bootstrap-complete的数据,是bootstrap开始和结束的标志,不包含数据,中间的type为bootstrap-insert的才是包含数据
- 一次bootstrap输出的所有记录的ts都是相同的,为bootstrap开始的时间1676444947 -> 2023-02-15 15:09:07