本文最后更新于 2025-03-24,文章超过7天没更新,应该是已完结了~

1 配置Canal+MQ数据同步环境

1.1 配置Mysql主从同步

根据Canal的工作原理,首先需要开启MySQL主从同步。

1.在MySQL中需要创建一个用户,并授权

进入mysql容器:

docker exec -it mysql /bin/bash

-- 使用命令登录:

mysql -u root -p

-- 创建用户 用户名:canal 密码:canal

create user 'canal'@'%' identified WITH mysql_native_password by 'canal';

-- 授权 .表示所有库

GRANT SELECT,REPLICATION SLAVE,REPLICATION CLIENT ON *.* TO 'canal'@'%';
FLUSH PRIVILEGES;
  • SELECT: 允许用户查询(读取)数据库中的数据。

  • REPLICATION SLAVE: 允许用户作为 MySQL 复制从库,用于同步主库的数据。

  • REPLICATION CLIENT: 允许用户连接到主库并获取关于主库状态的信息。

在MySQL配置文件my.cnf设置如下信息,开启 Binlog 写入功能,配置 binlog-format 为 ROW 模式

ROW 模式表示以行为单位记录每个被修改的行的变更

修改如下:

vi /usr/mysql/conf/my.cnf

[mysqld]
#打开binlog
log-bin=mysql-bin
#选择ROW(行)模式
binlog-format=ROW
#配置MySQL replaction需要定义,不要和canal的slaveId重复
server_id=1

expire_logs_days=3
max_binlog_size = 100m
max_binlog_cache_size = 512m

说明:在学习阶段为了保证足够的服务器存储空间,binlog日志最大保存100m,mysql会定时清理binlog

2、重启MySQL,查看配置信息

  • 使用命令查看是否打开binlog模式:

SHOW VARIABLES LIKE 'log_bin';

ON表示开启binlog模式。

show variables like 'binlog_format';

binlog_format 的值为 row 时,表示 MySQL 服务器当前配置为使用行级别的二进制日志记录,这对于数据库复制和数据同步来说更为安全,因为它记录了对数据行的确切更改。

  • 查看binlog日志文件列表:

SHOW BINARY LOGS;

  • 查看当前正在写入的binlog文件:

SHOW MASTER STATUS;

1.2 安装Canal(使用下发虚拟机无需安装

获取canal镜像

docker pull canal/canal-server:latest

创建/data/soft/canal目录:

mkdir -p /data/soft/canal

在/data/soft/canal下创建 canal.properties,内容如下,注意修改mq的配置信息:

#################################################
#########               common argument         #############
#################################################
# tcp bind ip
canal.ip =
# TCP监听的IP地址,默认绑定所有可用IP。可以根据需求指定。

# register ip to zookeeper
canal.register.ip =
# 注册到Zookeeper的IP地址。如果为空,使用canal.ip的值。

canal.port = 11111
# Canal TCP服务监听的端口,默认是11111。

canal.metrics.pull.port = 11112
# 用于Metrics监控的端口。 启用 Metrics 端口(默认 11112),通过工具监控 Canal 状态。

# canal instance user/passwd
# canal.user = canal
# canal.passwd = E3619321C1A937C46A0D8BD1DAC39F93B27D4458
# Canal实例的用户和密码(可选配置),用于对客户端进行身份验证。

# canal admin config
#canal.admin.manager = 127.0.0.1:8089
# canal.admin.port = 11110
# 用于Admin管理页面的端口号。

canal.admin.user = admin
canal.admin.passwd = 4ACFE3202A5FF5CF467898FC58AAB1D615029441
# Canal Admin的用户名和密码。

# admin auto register
#canal.admin.register.auto = true
# 是否自动注册到Canal Admin。

#canal.admin.register.cluster =
#canal.admin.register.name =
# 注册到Admin的集群和实例名称(可选配置)。

canal.zkServers =
# Zookeeper的地址,用于存储位置信息和HA(如果启用)。

canal.zookeeper.flush.period = 1000
# Canal向Zookeeper刷新信息的周期,单位为毫秒。

canal.withoutNetty = false
# 是否禁用Netty网络模块(默认false)。

# tcp, kafka, rocketMQ, rabbitMQ
canal.serverMode = rabbitMQ
# Canal服务器的运行模式,可以是tcp、kafka、rocketMQ或rabbitMQ。

# flush meta cursor/parse position to file
canal.file.data.dir = ${canal.conf.dir}
# 数据存储目录(例如binlog位置信息等)。

canal.file.flush.period = 1000
# 向文件中刷新的时间间隔,单位为毫秒。

## memory store RingBuffer size, should be Math.pow(2,n)
canal.instance.memory.buffer.size = 16384
# 内存中RingBuffer的大小,必须是2的幂,控制缓冲区的最大记录数。

## memory store RingBuffer used memory unit size , default 1kb
canal.instance.memory.buffer.memunit = 1024 
# 每条记录的内存单位大小,默认1KB。

## memory store gets mode used MEMSIZE or ITEMSIZE
canal.instance.memory.batch.mode = MEMSIZE
# 控制RingBuffer的读取模式,可以是MEMSIZE或ITEMSIZE。

canal.instance.memory.rawEntry = true
# 是否使用原始格式存储binlog条目(true表示存储RowData格式)。

## detecing config
canal.instance.detecting.enable = false
# 是否启用连接检测。

#canal.instance.detecting.sql = insert into retl.xdual values(1,now()) on duplicate key update x=now()
canal.instance.detecting.sql = select 1
# 检测连接时执行的SQL语句。

canal.instance.detecting.interval.time = 3
# 检测连接的时间间隔,单位为秒。

canal.instance.detecting.retry.threshold = 3
# 检测失败的重试次数。

canal.instance.detecting.heartbeatHaEnable = false
# 是否启用心跳检测用于高可用。

# support maximum transaction size, more than the size of the transaction will be cut into multiple transactions delivery
canal.instance.transaction.size =  1024
# 支持的最大事务大小,超过此大小的事务将被分割为多个。

# mysql fallback connected to new master should fallback times
canal.instance.fallbackIntervalInSeconds = 60
# MySQL主从切换后的回退时间间隔,单位为秒。

# network config
canal.instance.network.receiveBufferSize = 16384
# 网络接收缓冲区大小,单位为字节。

canal.instance.network.sendBufferSize = 16384
# 网络发送缓冲区大小,单位为字节。

canal.instance.network.soTimeout = 30
# 网络超时时间,单位为秒。

# binlog filter config
canal.instance.filter.druid.ddl = true
# 是否过滤掉Druid解析的DDL语句。

canal.instance.filter.query.dcl = false
# 是否过滤DCL语句(如GRANT、REVOKE等)。

# 这个配置一定要修改
canal.instance.filter.query.dml = true
# 是否允许同步DML语句(INSERT、UPDATE、DELETE等)。

canal.instance.filter.query.ddl = false
# 是否允许同步DDL语句。

canal.instance.filter.table.error = false
# 是否忽略表解析错误。

canal.instance.filter.rows = false
# 是否过滤掉行级别数据。

canal.instance.filter.transaction.entry = false
# 是否过滤事务边界数据。

canal.instance.filter.dml.insert = false
# 是否过滤INSERT操作。

canal.instance.filter.dml.update = false
# 是否过滤UPDATE操作。

canal.instance.filter.dml.delete = false
# 是否过滤DELETE操作。

# binlog format/image check
canal.instance.binlog.format = ROW,STATEMENT,MIXED 
# 支持的binlog格式(ROW、STATEMENT、MIXED)。

canal.instance.binlog.image = FULL,MINIMAL,NOBLOB
# 支持的binlog row image类型。

# binlog ddl isolation
canal.instance.get.ddl.isolation = false
# 是否隔离DDL操作。

# parallel parser config
canal.instance.parser.parallel = true
# 是否启用并行解析。

## concurrent thread number, default 60% available processors, suggest not to exceed Runtime.getRuntime().availableProcessors()
#canal.instance.parser.parallelThreadSize = 16
# 并行解析的线程数,建议不超过可用处理器的数量。

## disruptor ringbuffer size, must be power of 2
canal.instance.parser.parallelBufferSize = 256
# 并行解析的缓冲区大小,必须是2的幂。

# table meta tsdb info
canal.instance.tsdb.enable = true
# 是否启用表结构元数据存储。

canal.instance.tsdb.dir = ${canal.file.data.dir:../conf}/${canal.instance.destination:}
# 表结构元数据的存储路径。

canal.instance.tsdb.url = jdbc:h2:${canal.instance.tsdb.dir}/h2;CACHE_SIZE=1000;MODE=MYSQL;
# 表结构元数据的数据库连接URL。

canal.instance.tsdb.dbUsername = canal
canal.instance.tsdb.dbPassword = canal
# 表结构元数据的数据库用户名和密码。

# dump snapshot interval, default 24 hour
canal.instance.tsdb.snapshot.interval = 24
# 表结构快照的转储间隔,默认24小时。

# purge snapshot expire , default 360 hour(15 days)
canal.instance.tsdb.snapshot.expire = 360
# 表结构快照的过期时间,默认15天(360小时)。

#################################################
#########               destinations            #############
#################################################
canal.destinations = xzb-canal
# 目标实例的名称,多个实例用逗号分隔。

# conf root dir
canal.conf.dir = ../conf
# 配置文件的根目录。

# auto scan instance dir add/remove and start/stop instance
canal.auto.scan = true
# 是否自动扫描实例目录并启停实例。

canal.auto.scan.interval = 5
# 自动扫描的时间间隔,单位为秒。

# set this value to 'true' means that when binlog pos not found, skip to latest.
# WARN: pls keep 'false' in production env, or if you know what you want.
canal.auto.reset.latest.pos.mode = false
# 是否跳过未找到的binlog位置并重置到最新位置(生产环境建议保持false)。

canal.instance.tsdb.spring.xml = classpath:spring/tsdb/h2-tsdb.xml
# 表结构元数据的Spring XML配置文件路径(H2数据库)。

#canal.instance.tsdb.spring.xml = classpath:spring/tsdb/mysql-tsdb.xml
# 表结构元数据的Spring XML配置文件路径(MySQL数据库)。

canal.instance.global.mode = spring
# 全局配置模式,默认为Spring
canal.instance.global.lazy = false
canal.instance.global.manager.address = ${canal.admin.manager}
#canal.instance.global.spring.xml = classpath:spring/memory-instance.xml
canal.instance.global.spring.xml = classpath:spring/file-instance.xml
#canal.instance.global.spring.xml = classpath:spring/default-instance.xml
##################################################
#########             MQ Properties      #############
##################################################
# aliyun ak/sk , support rds/mq
canal.aliyun.accessKey =
canal.aliyun.secretKey =
canal.aliyun.uid=

canal.mq.flatMessage = true #如果 canal.mq.flatMessage 设置为 true,消费端的消息解析必须以 JSON 格式进行。如果消费端不支持 JSON 格式解析,可以设置为 false。
canal.mq.canalBatchSize = 50
canal.mq.canalGetTimeout = 100
# Set this value to "cloud", if you want open message trace feature in aliyun.
canal.mq.accessChannel = local

canal.mq.database.hash = true
canal.mq.send.thread.size = 30
canal.mq.build.thread.size = 8
##################################################
#########                   RabbitMQ         #############
##################################################
rabbitmq.host = 192.168.101.68
rabbitmq.virtual.host = /xzb
rabbitmq.exchange = exchange.canal-jzo2o
rabbitmq.username = xzb
rabbitmq.password = xzb
rabbitmq.deliveryMode = 2

创建instance.properties,内容如下:

canal.instance.master.journal.name 用于指定主库正在写入的 binlog 文件的名称。

如果不配置 canal.instance.master.journal.name,Canal 会尝试自动检测 MySQL 主库的 binlog 文件,并从最新位置开始进行复制。

#################################################
## MySQL serverId , v1.0.26+ will autoGen
canal.instance.mysql.slaveId=1000  # Canal实例的唯一标识符,避免多个实例间冲突。千万不能和mysql配置的slaveId冲突

# enable gtid use true/false
canal.instance.gtidon=false  # 是否启用GTID模式,同步基于GTID的Binlog。

# position info
canal.instance.master.address=192.168.101.68:3306  # MySQL主库的地址和端口。
canal.instance.master.journal.name=mysql-bin.000001  # 从指定的Binlog文件开始读取。
canal.instance.master.position=0  # Binlog读取的初始位置。
canal.instance.master.timestamp=  # 使用时间戳定位读取位置,留空则忽略。
canal.instance.master.gtid=  # 使用GTID定位读取位置,留空则忽略。

# rds oss binlog
canal.instance.rds.accesskey=  # RDS服务的访问密钥(如果使用RDS OSS Binlog)。
canal.instance.rds.secretkey=  # RDS服务的访问密钥(如果使用RDS OSS Binlog)。
canal.instance.rds.instanceId=  # RDS实例ID。

# table meta tsdb info
canal.instance.tsdb.enable=true  # 是否启用表结构元数据的时间序列数据库(TSDB)。
#canal.instance.tsdb.url=jdbc:mysql://127.0.0.1:3306/canal_tsdb  # TSDB数据库的连接URL。
#canal.instance.tsdb.dbUsername=canal  # TSDB数据库的用户名。
#canal.instance.tsdb.dbPassword=canal  # TSDB数据库的密码。

#canal.instance.standby.address =  # 备用MySQL主库地址。
#canal.instance.standby.journal.name =  # 备用MySQL主库的Binlog文件名。
#canal.instance.standby.position =  # 备用MySQL主库的Binlog位置。
#canal.instance.standby.timestamp =  # 备用MySQL主库的时间戳位置。
#canal.instance.standby.gtid =  # 备用MySQL主库的GTID位置。

# username/password
canal.instance.dbUsername=canal  # Canal连接MySQL的用户名。
canal.instance.dbPassword=canal  # Canal连接MySQL的密码。
canal.instance.connectionCharset=UTF-8  # Canal连接MySQL使用的字符集。
# enable druid Decrypt database password
canal.instance.enableDruid=false  # 是否启用Druid解密数据库密码。
#canal.instance.pwdPublicKey=  # 如果启用Druid解密,这里填写加密用的公钥。

# table regex
# canal.instance.filter.regex=test01\\..*,test02\\..*  # 需要同步的表的正则匹配规则。
#canal.instance.filter.regex=test01\\..*,test02\\.t1
#canal.instance.filter.regex=jzo2o-foundations\\.serve_sync,jzo2o-orders-0\\.orders_seize,jzo2o-orders-0\\.orders_dispatch,jzo2o-orders-0\\.serve_provider_sync,jzo2o-customer\\.serve_provider_sync
canal.instance.filter.regex=jzo2o-orders-1\\.orders_dispatch,jzo2o-orders-1\\.orders_seize,jzo2o-foundations\\.serve_sync,jzo2o-customer\\.serve_provider_sync,jzo2o-orders-1\\.serve_provider_sync,jzo2o-orders-1\\.history_orders_sync,jzo2o-orders-1\\.history_orders_serve_sync,jzo2o-market\\.activity  # 需要同步的表的精确匹配规则,多个表用逗号分隔。
# table black regex
canal.instance.filter.black.regex=mysql\\.slave_.*  # 排除同步的表的正则规则。
# table field filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
#canal.instance.filter.field=test1.t_product:id/subject/keywords,test2.t_company:id/name/contact/ch  # 只同步指定的字段。
# table field black filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
#canal.instance.filter.black.field=test1.t_product:subject/product_image,test2.t_company:id/name/contact/ch  # 排除同步的字段。

# mq config
#canal.mq.topic=topic_test01  # 指定MQ的Topic名称。
# dynamic topic route by schema or table regex
#canal.mq.dynamicTopic=mytest1.user,mytest2\\..*,.*\\..*  # 动态生成Topic,按库或表的正则分组。
#canal.mq.dynamicTopic=topic_test01:test01\\..*,topic_test02:test02\\..*
#canal.mq.dynamicTopic=canal-mq-jzo2o-orders-dispatch:jzo2o-orders-0\\.orders_dispatch,canal-mq-jzo2o-orders-seize:jzo2o-orders-0\\.orders_seize,canal-mq-jzo2o-foundations:jzo2o-foundations\\.serve_sync,canal-mq-jzo2o-customer-provider:jzo2o-customer\\.serve_provider_sync,canal-mq-jzo2o-orders-provider:jzo2o-orders-0\\.serve_provider_sync
canal.mq.dynamicTopic=canal-mq-jzo2o-orders-dispatch:jzo2o-orders-1\\.orders_dispatch,canal-mq-jzo2o-orders-seize:jzo2o-orders-1\\.orders_seize,canal-mq-jzo2o-foundations:jzo2o-foundations\\.serve_sync,canal-mq-jzo2o-customer-provider:jzo2o-customer\\.serve_provider_sync,canal-mq-jzo2o-orders-provider:jzo2o-orders-1\\.serve_provider_sync,canal-mq-jzo2o-orders-serve-history:jzo2o-orders-1\\.history_orders_serve_sync,canal-mq-jzo2o-orders-history:jzo2o-orders-1\\.history_orders_sync,canal-mq-jzo2o-market-resource:jzo2o-market\\.activity  # 根据不同的表动态生成对应的MQ Topic。
canal.mq.partition=0  # MQ的分区ID,默认0。
# hash partition config
#canal.mq.partitionsNum=3  # MQ的分区数量。
#canal.mq.partitionHash=test.table:id^name,.*\\..*  # 使用字段生成哈希值确定分区。
#canal.mq.dynamicTopicPartitionNum=test.*:4,mycanal:6  # 动态设置每个Topic的分区数量。
#################################################
  • canal.instance.filter.regex 需要监听的mysql库和表

  • 全库: .*\\..*

  • 指定库下的所有表: canal\\..*

  • 指定库下的指定表: canal\\.canal,test\\.test

  • 库名\\.表名:转义需要用\\,使用逗号分隔多个库

这里配置监听 jzo2o-foundations数据库下serve_sync表,如下:

canal.instance.filter.regex=jzo2o-foundations\\.serve_sync

3、在Canal配置MQ的topic

这里使用动态topic,格式为:topic:schema.table,topic:schema.table,topic:schema.table

配置如下:

canal.mq.dynamicTopic=canal-mq-jzo2o-foundations:jzo2o-foundations\\.serve_sync

上边的配置表示:对jzo2o-foundations数据库的serve_sync表的修改消息发到topic为canal-mq-jzo2o-foundations关联的队列

TSDB(Table Schema Database) 是 Canal 中用于管理和存储表结构元数据(table schema metadata)的组件。它的主要作用是保存和维护目标 MySQL 数据库中表结构的历史快照和当前状态,以便 Canal 在解析 Binlog 时可以正确地映射字段。

为什么需要 TSDB?

  1. Binlog 的特性
    MySQL 的 Binlog 文件只记录了数据的变更操作(如 INSERTUPDATEDELETE),但不包含表结构的信息。
    例如:
    UPDATE users SET name='Alice' WHERE id=1;

在 Binlog 中只会存储字段的位置(如 field[1]=Alice),并不会包含字段的名称 name 或数据类型。

  1. 表结构可能发生变化
    当表结构发生变更(如添加字段、修改字段类型),Canal 需要一个机制来匹配当前的表结构,否则会导致数据解析错误。

  2. TSDB 的作用
    TSDB 用来存储表结构的历史快照,确保 Canal 在解析不同时间点的 Binlog 时,能够使用正确的表结构信息。

TSDB 的配置与作用

  1. canal.instance.tsdb.enable=true

  • 启用 TSDB 功能,Canal 将保存表结构快照。

  1. canal.instance.tsdb.url

  • TSDB 的存储位置(默认使用 H2 内存数据库)。可以设置为外部数据库(如 MySQL)。

  1. canal.instance.tsdb.snapshot.interval=24

  • 每隔多少小时生成一次表结构快照(单位:小时)。

  1. canal.instance.tsdb.snapshot.expire=360

  • 快照保留的时间,超过这个时间的快照会被清理(单位:小时)。

总结
TSDB 是 Canal 用来管理表结构的元数据存储机制。它的作用是保证 Canal 在解析 Binlog 时使用正确的表结构。如果不启用 TSDB,Canal 可能会在表结构变更后解析失败或数据错误。

创建日志目录:

mkdir -p /data/soft/canal/logs /data/soft/canal/conf

启动容器:

docker run --name canal -p 11111:11111 -d \
    -v /data/soft/canal/instance.properties:/home/admin/canal-server/conf/xzb-canal/instance.properties \
    -v /data/soft/canal/canal.properties:/home/admin/canal-server/conf/canal.properties \
    -v /data/soft/canal/logs:/home/admin/canal-server/logs/xzb-canal \
    -v /data/soft/canal/conf:/home/admin/canal-server/conf/xzb-canal \
    canal/canal-server:latest

1.3 安装RabbitMQ(使用下发虚拟机无需安装

  1. 拉取镜像(如果未拉取过镜像)

docker pull registry.cn-hangzhou.aliyuncs.com/itheima/rabbitmq:3.9.17-management-delayed

此镜像包含了 RabbitMQ 的管理插件以及对延迟队列的支持。

  1. 创建文件夹和文件

mkdir -p /data/soft/rabbitmq/config /data/soft/rabbitmq/data /data/soft/rabbitmq/plugins

  1. 启动容器

docker run \
--privileged \
 -e RABBITMQ_DEFAULT_USER=czri \
 -e RABBITMQ_DEFAULT_PASS=czri1234 \
 --restart=always \
 --name rabbitmq \
 --hostname rabbitmq \
 -v /data/soft/rabbitmq/config:/etc/rabbitmq \
 -v /data/soft/rabbitmq/data:/var/lib/rabbitmq \
 -p 15672:15672 \
 -p 5672:5672 \
 -d \
 registry.cn-hangzhou.aliyuncs.com/itheima/rabbitmq:3.9.17-management-delayed
  1. 启动rabbitmq管理端

进入rabbitmq容器:docker exec -it rabbitmq /bin/bash

运行下边的命令:

# 启动rabbitmq管理端
rabbitmq-plugins enable rabbitmq_management
# 启动延迟队列插件
rabbitmq-plugins enable rabbitmq_delayed_message_exchange

5、进入rabbitmq管理界面

http://192.168.101.68:15672/

账号:czri

密码:czri1234

6、创建虚拟主机 /xzb

7、创建账号和密码

xzb/xzb

设置权限可以访问/ /xzb

设置成功:

1.4 配置Canal+RabbitMQ

下边通过配置Canal与RabbitMQ,保证Canal收到binlog消息将数据发送至MQ。

最终我们要实现的是:

修改jzo2o-foundations数据库下的serve_sync表的数据后通过canal将修改信息发送到MQ。

1、在Canal中配置RabbitMQ的连接信息

修改/data/soft/canal/canal.properties

# tcp, kafka, rocketMQ, rabbitMQ
canal.serverMode = rabbitMQ

##################################################
#########                     RabbitMQ             #############
##################################################
rabbitmq.host = 192.168.101.68
rabbitmq.virtual.host = /xzb
rabbitmq.exchange = exchange.canal-jzo2o
rabbitmq.username = xzb
rabbitmq.password = xzb
rabbitmq.deliveryMode = 2

本项目用于数据同步的MQ交换机:exchange.canal-jzo2o

虚拟主机地址:/xzb

账号和密码:xzb/xzb

rabbitmq.deliveryMode = 2 设置消息持久化

2、设置需要监听的mysql库和表

修改/data/soft/canal/instance.properties

  • canal.instance.filter.regex 需要监听的mysql库和表

  • 全库: .*\\..*

  • 指定库下的所有表: canal\\..*

  • 指定库下的指定表: canal\\.canal,test\\.test

  • 库名\\.表名:转义需要用\\,使用逗号分隔多个库

这里配置监听 jzo2o-foundations数据库下serve_sync表,如下:

canal.instance.filter.regex=jzo2o-foundations\\.serve_sync

3、在Canal配置MQ的topic

这里使用动态topic,格式为:topic:schema.table,topic:schema.table,topic:schema.table

配置如下:

canal.mq.dynamicTopic=canal-mq-jzo2o-foundations:jzo2o-foundations\\.serve_sync

上边的配置表示:对jzo2o-foundations数据库的serve_sync表的修改消息发到topic为canal-mq-jzo2o-foundations关联的队列

4、进入rabbitMQ配置交换机和队列

创建exchange.canal-jzo2o交换机:

创建队列:canal-mq-jzo2o-foundations

type 字段是指队列的类型,不同类型的队列有不同的消息存储方式和行为。RabbitMQ 中常见的队列类型如下:

  1. classic
    描述:classic 队列是 RabbitMQ 的默认队列类型,它是最常见的一种类型,消息会存储在内存和磁盘上,保证消息的可靠性。
    特点:
    适用于大多数应用场景,提供了可靠的消息存储和高效的性能。
    支持持久化,消息可以在队列消费后保留,也可以在重启后恢复。
    支持优先级队列。

  2. quorum
    描述:quorum 队列是一种基于 Raft 协议的高可用队列,旨在提供更高的可靠性,特别是当 RabbitMQ 节点发生故障时,quorum 队列能够更好地保证消息不会丢失。
    特点:
    强一致性:使用 Raft 协议确保多节点间的强一致性,适合高可靠性场景。
    高可用性:消息会被多个节点存储,并且支持节点间的数据复制。
    恢复能力:即使某个节点失效,也能从其他节点恢复数据。
    没有消息优先级:quorum 队列不支持消息优先级。
    性能问题:由于强一致性,quorum 队列的性能比 classic 队列稍低,尤其是在高并发场景下。

  3. lazy
    描述:lazy 队列会将消息存储在磁盘上,减少内存的占用,适用于存储大量消息的场景。消息在队列中存储时不会立即加载到内存中,只有在消费时才会加载。
    特点:
    低内存使用:消息优先存储在磁盘上,内存占用非常低。
    性能较低:由于需要频繁从磁盘加载消息,因此相较于 classic 队列,lazy 队列在性能上可能有所下降。
    适用场景:适合于存储大量消息且对内存占用有严格要求的场景,常用于数据存储密集型的应用。

  4. stream
    描述:stream 队列是 RabbitMQ 在较新的版本中引入的队列类型,专为流式数据处理设计。它支持高效的顺序消息存储,并能处理大量数据流。
    特点:
    高效流式存储:专为流式数据处理设计,适用于高吞吐量的消息队列应用。
    与 classic 队列的区别:stream 队列可以通过读取数据流的方式高效地处理消息,而不是依赖传统的队列机制。
    内存和磁盘结合存储:消息会根据需要存储在内存或磁盘中,以优化性能。

RabbitMQ 队列的配置参数

  1. Message TTL (消息TTL)
    描述:设置消息在队列中的最大存活时间(TTL)。超出该时间后,消息会自动过期并被删除。
    类型:整数,单位是毫秒。
    用途:用于限制消息在队列中的存活时间,防止老旧消息堆积,占用队列空间。

  2. Auto expire (自动过期)
    描述:设置队列本身的过期时间。如果队列在指定时间内没有被使用,或者没有消费者连接,则队列会自动删除。
    类型:整数,单位是毫秒。
    用途:对于临时队列或短期使用的队列,可以设置自动过期,避免浪费系统资源。

  3. Overflow behaviour (溢出行为)
    描述:当队列达到最大长度时,RabbitMQ 处理消息的策略。常见的溢出行为有:
    drop-head:丢弃最旧的消息。
    drop-tail:丢弃最新的消息。
    reject-publish:拒绝新消息的发布(直接丢弃)。
    reject-publish-dlx:将被拒绝的消息发送到死信交换机(DLX)。
    用途:当队列存储空间满时,决定应该如何丢弃消息或进行处理。

  4. Single active consumer (单活消费者)
    描述:确保队列中只能有一个活跃消费者。如果设置为 true,RabbitMQ 会保证每次只有一个消费者从队列中消费消息。这通常用于防止多个消费者竞争同一消息的情况。
    用途:适用于处理需要独占消费的任务场景,例如处理分配任务或消息的顺序性要求。

  5. Dead letter exchange (死信交换机)
    描述:设置死信交换机(DLX)。当队列中的消息因为过期、拒绝或无法消费时,这些消息将被发送到死信交换机。死信交换机通常用于保存错误的、需要特殊处理的消息。
    用途:可以将无法处理的消息转发到指定的死信交换机,避免丢失数据,并允许后续处理或人工干预。

  6. Dead letter routing key (死信路由键)
    描述:设置死信消息的路由键。当消息被发送到死信交换机时,x-dlx 会指定消息的路由键。这是将死信消息路由到特定队列的关键。
    用途:确保死信消息在死信交换机中按照特定的路由规则进行转发,便于后续处理或记录。

  7. Max length (最大消息长度)
    描述:设置队列能够存储的最大消息数量。当队列中消息数量达到该值时,RabbitMQ 会根据设定的溢出行为丢弃最旧的消息。
    类型:整数,表示最大消息数。
    用途:可以防止队列无限制堆积消息,避免内存溢出或磁盘空间被占满。

  8. Max length bytes (最大字节数)
    描述:设置队列能够存储的最大字节数。当队列中的消息总字节数达到该值时,RabbitMQ 会根据溢出行为丢弃最旧的消息。
    类型:整数,单位是字节。
    用途:用于控制队列的大小,防止某些队列存储过大的数据,导致系统资源被耗尽。

  9. Maximum priority (最大优先级)
    描述:设置队列的最大优先级。启用优先级队列后,可以为每个消息指定一个优先级值,优先级较高的消息会优先被消费者消费。
    类型:整数,表示最大优先级(通常从 0 到该值的范围)。
    用途:适用于需要根据消息重要性顺序处理的场景。例如,优先处理紧急任务、重要数据等。

  10. Lazy mode (懒队列模式)
    描述:启用懒队列模式,消息会尽量存储在磁盘中,减少内存占用。消费者请求时才会从磁盘加载消息到内存中。
    用途:适用于对内存要求较高的环境,或者消息消费不频繁但需要存储大量消息的场景。通过将消息存储到磁盘,减少了内存占用,但可能增加消息消费的延迟。

  11. Master locator (主队列定位器)
    描述:设置队列在集群中的主节点选择策略。常见的选项有:
    min-masters:选择最少主节点的策略。
    client-local:根据消费者的本地节点选择主队列。
    random:随机选择一个主队列。
    用途:用于分布式环境中队列的主节点选择,影响消息的存储和消费性能。

绑定交换机:

绑定成功:

1.5 测试数据同步

重启canal

修改jzo2o-foundations数据库的serve_sync表的数据,稍等片刻查看canal-mq-jzo2o-foundations队列,如果队列中有的消息说明同步成功,如下 图:

如果没有同步到 MQ参考常见问题中“数据不同步”进行解决。

我们可以查询队列中的消息内容发现它一条type为"UPDATE"的消息,如下所示:

{
  "data" : [
    {
      "city_code" : "010",
      "detail_img" : "https://yjy-xzbjzfw-oss.oss-cn-hangzhou.aliyuncs.com/be1449d6-1c2d-4cca-9f8a-4b562b79998d.jpg",
      "hot_time_stamp" : "1692256062300",
      "id" : "1686352662791016449",
      "is_hot" : "1",
      "price" : "5.0",
      "serve_item_icon" : "https://yjy-xzbjzfw-oss.oss-cn-hangzhou.aliyuncs.com/8179d29c-6b85-4c08-aa13-08429a91d86a.png",
      "serve_item_id" : "1678727478181957634",
      "serve_item_img" : "https://yjy-xzbjzfw-oss.oss-cn-hangzhou.aliyuncs.com/9b87ab7c-9592-4090-9299-5bcf97409fb9.png",
      "serve_item_name" : "日常维修ab",
      "serve_item_sort_num" : "6",
      "serve_type_icon" : "https://yjy-xzbjzfw-oss.oss-cn-hangzhou.aliyuncs.com/c8725882-1fa7-49a6-94ab-cac2530b3b7b.png",
      "serve_type_id" : "1678654490336124929",
      "serve_type_img" : "https://yjy-xzbjzfw-oss.oss-cn-hangzhou.aliyuncs.com/00ba6d8a-fd7e-4691-8415-8ada95004b33.png",
      "serve_type_name" : "日常维修12",
      "serve_type_sort_num" : "2",
      "unit" : "1"
    }
  ],
  "database" : "jzo2o-foundations",
  "es" : 1697443035000.0,
  "id" : 1,
  "isDdl" : false,
  "mysqlType" : {
    "city_code" : "varchar(20)",
    "detail_img" : "varchar(255)",
    "hot_time_stamp" : "bigint",
    "id" : "bigint",
    "is_hot" : "int",
    "price" : "decimal(10,2)",
    "serve_item_icon" : "varchar(255)",
    "serve_item_id" : "bigint",
    "serve_item_img" : "varchar(255)",
    "serve_item_name" : "varchar(100)",
    "serve_item_sort_num" : "int",
    "serve_type_icon" : "varchar(255)",
    "serve_type_id" : "bigint",
    "serve_type_img" : "varchar(255)",
    "serve_type_name" : "varchar(255)",
    "serve_type_sort_num" : "int",
    "unit" : "int"
  },
  "old" : [
    {
      "serve_item_name" : "日常维修a"
    }
  ],
  "pkNames" : [ "id" ],
  "sql" : "",
  "sqlType" : {
    "city_code" : 12,
    "detail_img" : 12,
    "hot_time_stamp" : -5,
    "id" : -5,
    "is_hot" : 4,
    "price" : 3,
    "serve_item_icon" : 12,
    "serve_item_id" : -5,
    "serve_item_img" : 12,
    "serve_item_name" : 12,
    "serve_item_sort_num" : 4,
    "serve_type_icon" : 12,
    "serve_type_id" : -5,
    "serve_type_img" : 12,
    "serve_type_name" : 12,
    "serve_type_sort_num" : 4,
    "unit" : 4
  },
  "table" : "serve_sync",
  "ts" : 1697443782457.0,
  "type" : "UPDATE"
}

数据不同步

当发现修改了数据库后修改的数据并没有发送到MQ,通过查看Canal的日志发现下边的错误。

进入Canal目录,查看日志:

cd /data/soft/canal/logs
tail -f logs/xzb-canal.log

Canal报错如下:

2023-09-22 08:34:40.802 [destination = xzb-canal , address = /192.168.101.68:3306 , EventParser] WARN  c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - ---> find start position successfully, EntryPosition[included=false,journalName=mysql-bin.000055,position=486221,serverId=1,gtid=,timestamp=1695341830000] cost : 13ms , the next step is binlog dump
2023-09-22 08:34:40.811 [destination = xzb-canal , address = /192.168.101.68:3306 , EventParser] ERROR c.a.o.canal.parse.inbound.mysql.dbsync.DirectLogFetcher - I/O error while reading from client socket
java.io.IOException: Received error packet: errno = 1236, sqlstate = HY000 errmsg = Could not find first log file name in binary log index file
        at com.alibaba.otter.canal.parse.inbound.mysql.dbsync.DirectLogFetcher.fetch(DirectLogFetcher.java:102) ~[canal.parse-1.1.5.jar:na]
        at com.alibaba.otter.canal.parse.inbound.mysql.MysqlConnection.dump(MysqlConnection.java:238) [canal.parse-1.1.5.jar:na]
        at com.alibaba.otter.canal.parse.inbound.AbstractEventParser$1.run(AbstractEventParser.java:262) [canal.parse-1.1.5.jar:na]
        at java.lang.Thread.run(Thread.java:748) [na:1.8.0_181]

找到关键的位置:Could not find first log file name in binary log index file

根据日志分析是Canal找不到mysql-bin.000055 的486221位置,原因是mysql-bin.000055文件不存在,这是由于为了节省磁盘空间将binlog日志清理了。

解决方法:

把canal复位从最开始开始同步的位置。

1)首先重置mysql的bin log:

连接mysql执行:reset master

执行后所有的binlog删除,从000001号开始

通过show master status;查看 ,结果显示 mysql-bin.000001

2)先停止canal

docker stop canal

3)删除meta.dat

rm -rf /data/soft/canal/conf/meta.dat

4) 再启动canal

docker start canal

MQ同步消息无法消费

这里以Es和MySQL之间的同步举例:

当出现ES和MySQL数据不同步时可能会出现MQ的同步消息无法被消费,比如:从MySQL删除一条记录通过同步程序将ES中对应的记录进行删除,此时由于ES中没有该记录导致删除ES中的记录失败。出现此问题的原因是因为测试数据混乱导致,可以手动将MQ中的消息删除。

进入MQ的管理控制台,进入要清理消息的队列,通过purge功能清理消息:

2 配置Elasticsearch数据同步环境

Elasticsearch7.17.7

拉取镜像

docker pull elasticsearch:7.17.7

创建文件夹:

mkdir -p /data/soft/es7.17.7/xzb

在/data/soft/es7.17.7/xzb下创建data目录并且修改权限为777

mkdir data
chmod 777 data

将课程资料下的"ES安装"目录中的 es.zip上传到/data/soft/es7.17.7/xzb下,并进行解压

unzip es.zip

解压成功如下图:

创建容器

docker run -d \
--name elasticsearch7.17.7 \
--restart always \
-p 9200:9200 \
-p 9300:9300 \
-e "discovery.type=single-node" \
-e "ES_JAVA_OPTS=-Xms512m -Xmx512m" \
-v /data/soft/es7.17.7/xzb/data:/usr/share/elasticsearch/data \
-v /data/soft/es7.17.7/xzb/plugins:/usr/share/elasticsearch/plugins \
-v /data/soft/es7.17.7/xzb/config:/usr/share/elasticsearch/config \
elasticsearch:7.17.7

访问http://192.168.101.68:9200/,如下图说明启动成功:

kibana7.17.7

拉取镜像

docker pull kibana:7.17.7

创建容器:

注意修改es的地址

docker run --name kibana7.17.7 \
-e ELASTICSEARCH_HOSTS=http://192.168.101.68:9200 \
-p 5601:5601 \
-d kibana:7.17.7

下边启动容器,先保证Elasticsearch启动成功。

启动kibana容器成功,在浏览器输入地址访问:http://192.168.101.68:5601,进入DevTools,如下图:

执行:GET /_cat/indices?v 查询索引信息

创建项目所需要的索引结构

启动ES和kibana:

如果没有安装参考本文档 安装elasticsearch7.17.7 和 kibana7.17.7。

安装完成后进行启动:

docker start elasticsearch7.17.7 
docker start kibana7.17.7

本项目共需创建下边三个索引结构:

首先通过下边的命令查询索引

GET /_cat/indices?v

创建orders_seize和serve_provider_info(已经存在无法重复创建)

PUT /serve_provider_info
{
  "mappings" : {
    "properties" : {
      "acceptance_num" : {
        "type" : "integer"
      },
      "city_code" : {
        "type" : "keyword"
      },
      "evaluation_score" : {
        "type" : "double"
      },
      "id" : {
        "type" : "long"
      },
      "location" : {
        "type" : "geo_point"
      },
      "pick_up" : {
        "type" : "integer"
      },
      "serve_item_ids" : {
        "type" : "long"
      },
      "serve_provider_type" : {
        "type" : "integer"
      },
      "serve_times" : {
        "type" : "integer"
      },
      "setting_status" : {
        "type" : "long"
      },
      "settting_status" : {
        "type" : "integer"
      },
      "skills" : {
        "type" : "long"
      }
    }
  }
}

如果需要修改索引结构需要删除重新创建:

DELETE 索引名

查询索引结构

GET /索引名/_mapping

下边继续创建其它索引:

创建:orders_seize (已经存在无法重复创建)

PUT /orders_seize
{
  "mappings" : {
    "properties" : {
      "city_code" : {
        "type" : "keyword"
      },
      "id" : {
        "type" : "long"
      },
      "key_words" : {
        "type" : "text",
        "analyzer" : "ik_max_word",
        "search_analyzer" : "ik_smart"
      },
      "location" : {
        "type" : "geo_point"
      },
      "orders_amount" : {
        "type" : "float"
      },
      "pur_num" : {
        "type" : "integer"
      },
      "serve_address" : {
        "type" : "text",
        "index" : false
      },
      "serve_item_id" : {
        "type" : "long"
      },
      "serve_item_img" : {
        "type" : "text",
        "fields" : {
          "keyword" : {
            "type" : "keyword",
            "ignore_above" : 256
          }
        }
      },
      "serve_item_name" : {
        "type" : "text",
        "index" : false
      },
      "serve_start_time" : {
        "type" : "text",
        "index" : false
      },
      "serve_time" : {
        "type" : "integer"
      },
      "serve_type_id" : {
        "type" : "long"
      },
      "serve_type_name" : {
        "type" : "text",
        "index" : false
      },
      "total_amount" : {
        "type" : "double"
      }
    }
  }
}

创建serve_aggregation索引 (已经存在无法重复创建)

PUT /serve_aggregation
{
  "mappings" : {
    "properties" : {
      "city_code" : {
        "type" : "keyword"
      },
      "detail_img" : {
        "type" : "text",
        "index" : false
      },
      "hot_time_stamp" : {
        "type" : "long"
      },
      "id" : {
        "type" : "keyword"
      },
      "is_hot" : {
        "type" : "short"
      },
      "price" : {
        "type" : "double"
      },
      "serve_item_icon" : {
        "type" : "text",
        "index" : false
      },
      "serve_item_id" : {
        "type" : "keyword"
      },
      "serve_item_img" : {
        "type" : "text",
        "index" : false
      },
      "serve_item_name" : {
        "type" : "text",
        "analyzer": "ik_max_word",
        "search_analyzer":"ik_smart"

      },
      "serve_item_sort_num" : {
        "type" : "short"
      },
      "serve_type_icon" : {
        "type" : "text",
        "index" : false
      },
      "serve_type_id" : {
        "type" : "keyword"
      },
      "serve_type_img" : {
        "type" : "text",
        "index" : false
      },
      "serve_type_name" : {
        "type" : "text",
        "analyzer": "ik_max_word",
        "search_analyzer":"ik_smart"
      },
      "serve_type_sort_num" : {
        "type" : "short"
      }
    }
  }
}