Canal入门教程 - 阿里巴巴MySQL增量订阅与消费完整指南
Canal入门教程 - 阿里巴巴MySQL增量订阅与消费完整指南
目录
- Canal简介
- Canal工作原理
- 环境准备
- MySQL配置
- Canal安装与配置
- Canal客户端开发
- 实际应用案例
- Canal与Spring Boot集成
- Canal高可用部署
- 性能优化与监控
- 常见问题与解决方案
- 总结与进阶
1. Canal简介
Canal(发音:[kə'næl],意为水道、管道)是由阿里巴巴开源的基于MySQL数据库增量日志(binlog)的增量订阅和消费组件。Canal通过模拟MySQL Slave的交互协议,将自己伪装成MySQL的从库,向MySQL Master发送dump请求,MySQL Master收到请求后,开始推送binlog给Canal,Canal解析binlog对象,供下游应用消费。
- ✅ 实时性:基于MySQL binlog,实现准实时的数据同步
- ✅ 可靠性:支持断点续传,保证数据不丢失
- ✅ 高性能:单机支持数千QPS的数据同步
- ✅ 易用性:配置简单,API友好
- ✅ 扩展性:支持多种下游消费方式(Kafka、RocketMQ、直接消费等)
- 数据库镜像:实现主从数据库的实时同步
- 数据库实时备份:对数据库进行实时的增量备份
- 索引构建和实时维护:如构建异构索引、倒排索引等
- 业务缓存刷新:当数据库数据发生变化时,实时更新缓存(如Redis)
- 带业务逻辑的增量数据处理:根据业务需求,对增量数据进行实时处理
- 数据仓库ETL:实时将数据同步到数据仓库
- 跨机房数据同步:实现跨机房的数据同步
MySQL 5.1.x
MySQL 5.5.x
MySQL 5.6.x
MySQL 5.7.x
MySQL 8.0.x
Canal Server:Canal服务端,负责连接MySQL,解析binlog
Canal Client:Canal客户端,负责消费Canal Server解析后的数据
Canal Admin:Canal管理端,用于管理Canal Server实例
2. Canal工作原理
MySQL主从复制的基本流程:
- Master(主库):将数据变更写入二进制日志(Binary Log)
- Slave(从库):从Master获取二进制日志事件,写入中继日志(Relay Log),并重放这些事件
Canal模拟MySQL Slave的交互协议,工作流程如下:
MySQL Master (主库)
↓ (binlog)
Canal Server (模拟Slave)
↓ (解析binlog)
Canal Client (消费数据)
↓
下游应用 (业务处理)详细步骤:
- 连接MySQL Master:Canal Server作为MySQL Slave连接到Master
- 发送Dump请求:向Master发送binlog dump请求
- 接收binlog:Master推送binlog事件给Canal Server
- 解析binlog:Canal Server解析binlog,转换为结构化数据
- 存储事件:将解析后的事件存储到EventStore中
- 客户端消费:Canal Client从EventStore中获取事件并处理
MySQL的binlog有三种格式:
- STATEMENT:记录SQL语句
- ROW:记录每行数据的变化(Canal推荐使用)
- MIXED:混合模式
Canal主要使用ROW格式,因为:
- 可以获取到完整的数据变更前后值
- 不受SQL语句影响,更准确
- 支持所有数据类型
3. 环境准备
- 操作系统:Linux、macOS、Windows
- Java环境:JDK 1.8或更高版本
- MySQL:5.1.x及以上版本
- 内存:建议至少2GB可用内存
检查Java版本
java -version如果未安装或版本过低,需要安装JDK 1.8+。
Linux安装JDK
# Ubuntu/Debian
sudo apt update
sudo apt install openjdk-8-jdk
# CentOS/RHEL
sudo yum install java-1.8.0-openjdk-develmacOS安装JDK
# 使用Homebrew
brew install openjdk@8Linux安装MySQL
# Ubuntu/Debian
sudo apt update
sudo apt install mysql-server
# CentOS/RHEL
sudo yum install mysql-servermacOS安装MySQL
# 使用Homebrew
brew install mysql
brew services start mysql验证MySQL安装
mysql --version
mysql -u root -p4. MySQL配置
查找MySQL配置文件
# Linux
/etc/mysql/my.cnf
# 或
/etc/my.cnf
# macOS (Homebrew)
/usr/local/etc/my.cnf修改配置文件
编辑MySQL配置文件,添加或修改以下内容:
[mysqld]
# 开启binlog
log-bin=mysql-bin
# binlog格式,必须设置为ROW
binlog-format=ROW
# 设置server-id,必须唯一
server-id=1
# binlog过期时间(天),0表示不过期
expire_logs_days=7
# binlog文件大小限制
max_binlog_size=100M重启MySQL服务
# Linux (systemd)
sudo systemctl restart mysql
# Linux (service)
sudo service mysql restart
# macOS
brew services restart mysql检查binlog是否开启
SHOW VARIABLES LIKE 'log_bin';
-- 应该显示: log_bin | ON检查binlog格式
SHOW VARIABLES LIKE 'binlog_format';
-- 应该显示: binlog_format | ROW查看binlog文件列表
SHOW BINARY LOGS;创建用户并授权
-- 创建canal用户
CREATE USER 'canal'@'%' IDENTIFIED BY 'canal';
-- 授予必要权限
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
-- 刷新权限
FLUSH PRIVILEGES;
-- 验证用户
SELECT user, host FROM mysql.user WHERE user = 'canal';权限说明
- SELECT:查询表结构
- REPLICATION SLAVE:读取binlog
- REPLICATION CLIENT:查看binlog状态
创建测试数据库和表:
-- 创建数据库
CREATE DATABASE IF NOT EXISTS test_db DEFAULT CHARACTER SET utf8mb4;
USE test_db;
-- 创建用户表
CREATE TABLE IF NOT EXISTS user (
id BIGINT PRIMARY KEY AUTO_INCREMENT,
username VARCHAR(50) NOT NULL COMMENT '用户名',
email VARCHAR(100) COMMENT '邮箱',
age INT COMMENT '年龄',
create_time DATETIME DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
update_time DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
INDEX idx_username (username)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='用户表';
-- 插入测试数据
INSERT INTO user (username, email, age) VALUES
('alice', 'alice@example.com', 25),
('bob', 'bob@example.com', 30),
('charlie', 'charlie@example.com', 28);5. Canal安装与配置
方式1:从GitHub下载
访问Canal GitHub仓库:https://github.com/alibaba/canal/releases
下载最新版本的canal.deployer:
# 下载(替换版本号)
wget https://github.com/alibaba/canal/releases/download/canal-1.1.7/canal.deployer-1.1.7.tar.gz
# 或使用curl
curl -L -o canal.deployer-1.1.7.tar.gz https://github.com/alibaba/canal/releases/download/canal-1.1.7/canal.deployer-1.1.7.tar.gz方式2:使用Maven构建
git clone https://github.com/alibaba/canal.git
cd canal
mvn clean install -DskipTests5.2 解压和目录结构
# 解压
tar -zxvf canal.deployer-1.1.7.tar.gz
# 进入目录
cd canal.deployer-1.1.7
# 查看目录结构
ls -la目录结构说明:
canal.deployer-1.1.7/
├── bin/ # 启动脚本
│ ├── startup.sh # Linux启动脚本
│ └── startup.bat # Windows启动脚本
├── conf/ # 配置文件目录
│ ├── canal.properties # Canal主配置文件
│ └── example/ # 实例配置目录
│ └── instance.properties # 实例配置文件
├── lib/ # 依赖jar包
└── logs/ # 日志目录5.3 配置canal.properties
编辑conf/canal.properties文件:
# Canal Server运行模式:standalone(单机模式)或cluster(集群模式)
canal.serverMode = standalone
# Canal Server绑定的IP和端口
canal.ip = 0.0.0.0
canal.port = 11111
# Canal Server的metrics端口
canal.metrics.pull.port = 11112
# Canal实例配置目录
canal.instance.global.mode = spring
canal.instance.global.lazy = false
canal.instance.global.spring.xml = classpath:spring/file-instance.xml
# Canal实例数据存储方式
canal.instance.global.spring.xml = classpath:spring/memory-instance.xml
# 或使用文件存储
# canal.instance.global.spring.xml = classpath:spring/file-instance.xml
# 日志配置
canal.instance.global.spring.xml = classpath:spring/default-instance.xml5.4 配置实例(instance.properties)
编辑conf/example/instance.properties文件:
# MySQL主库地址
canal.instance.master.address = 127.0.0.1:3306
# MySQL主库用户名和密码
canal.instance.dbUsername = canal
canal.instance.dbPassword = canal
# 数据库连接编码
canal.instance.connectionCharset = UTF-8
# 订阅的数据库和表(.*表示所有)
canal.instance.filter.regex = .*\\..*
# 排除的表(可选)
# canal.instance.filter.black.regex = mysql\\..*,information_schema\\..*
# 是否开启tsdb(时间序列数据库)
canal.instance.tsdb.enable = true
# tsdb配置目录
canal.instance.tsdb.spring.xml = classpath:spring/tsdb/h2-tsdb.xml
# 是否开启GTID模式(MySQL 5.6+)
canal.instance.gtidon = false
# 是否开启DDL同步
canal.instance.filter.table.error = true
# 是否开启表结构变更检测
canal.instance.filter.ddl = true
# 是否开启事务检测
canal.instance.filter.transaction.entry = true
# 批量获取binlog数量
canal.instance.rds.accesskey =
canal.instance.rds.secretkey =
canal.instance.rds.instanceId =5.5 启动Canal Server
Linux/macOS启动
# 赋予执行权限
chmod +x bin/startup.sh
# 启动
sh bin/startup.sh
# 或直接执行
./bin/startup.shWindows启动
bin\startup.bat查看启动日志
# 查看日志
tail -f logs/canal/canal.log
# 查看实例日志
tail -f logs/example/example.log检查进程
ps aux | grep canal检查端口
netstat -an | grep 11111
# 或
lsof -i :11111查看日志确认
日志中应该看到类似以下信息:
2024-01-15 10:00:00.000 [main] INFO c.a.otter.canal.deployer.CanalController - ## start the canal server.
2024-01-15 10:00:01.000 [main] INFO c.a.o.canal.instance.spring.CanalInstanceWithSpring - start CannalInstance for 1-example
2024-01-15 10:00:02.000 [main] INFO c.a.o.canal.instance.core.AbstractCanalInstance - start successful....# Linux/macOS
sh bin/stop.sh
# Windows
bin\stop.bat6. Canal客户端开发
创建Maven项目,添加Canal客户端依赖:
<dependencies>
<!-- Canal客户端 -->
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.client</artifactId>
<version>1.1.7</version>
</dependency>
<!-- MySQL驱动 -->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.33</version>
</dependency>
<!-- JSON处理 -->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>2.0.43</version>
</dependency>
<!-- 日志 -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.36</version>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>1.2.12</version>
</dependency>
</dependencies>创建简单的Canal客户端:
package com.example.canal;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry.*;
import com.alibaba.otter.canal.protocol.Message;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.net.InetSocketAddress;
import java.util.List;
public class SimpleCanalClient {
private static final Logger logger = LoggerFactory.getLogger(SimpleCanalClient.class);
public static void main(String[] args) {
// 创建Canal连接器
CanalConnector connector = CanalConnectors.newSingleConnector(
new InetSocketAddress("127.0.0.1", 11111),
"example", // 实例名称,对应conf/example目录
"", // 用户名(空)
"" // 密码(空)
);
try {
// 连接Canal Server
connector.connect();
logger.info("Connected to Canal Server");
// 订阅数据库和表(.*\\..* 表示所有数据库的所有表)
connector.subscribe(".*\\..*");
logger.info("Subscribed to all tables");
// 回滚到未消费的位置
connector.rollback();
logger.info("Rolled back to uncommitted position");
// 持续获取数据
while (true) {
// 获取指定数量的数据,不自动确认
Message message = connector.getWithoutAck(100);
long batchId = message.getId();
int size = message.getEntries().size();
if (batchId == -1 || size == 0) {
// 没有数据,休眠1秒
Thread.sleep(1000);
} else {
// 处理数据
printEntry(message.getEntries());
// 确认消费成功
connector.ack(batchId);
}
}
} catch (Exception e) {
logger.error("Error occurred", e);
} finally {
// 断开连接
connector.disconnect();
logger.info("Disconnected from Canal Server");
}
}
/**
* 打印Entry信息
*/
private static void printEntry(List<Entry> entries) {
for (Entry entry : entries) {
if (entry.getEntryType() == EntryType.ROWDATA) {
RowChange rowChange;
try {
rowChange = RowChange.parseFrom(entry.getStoreValue());
} catch (Exception e) {
throw new RuntimeException("Parse row change error", e);
}
EventType eventType = rowChange.getEventType();
String tableName = entry.getHeader().getTableName();
String databaseName = entry.getHeader().getSchemaName();
logger.info("Database: {}, Table: {}, EventType: {}",
databaseName, tableName, eventType);
// 处理每一行数据
for (RowData rowData : rowChange.getRowDatasList()) {
if (eventType == EventType.DELETE) {
printColumn(rowData.getBeforeColumnsList(), "DELETE");
} else if (eventType == EventType.INSERT) {
printColumn(rowData.getAfterColumnsList(), "INSERT");
} else if (eventType == EventType.UPDATE) {
printColumn(rowData.getBeforeColumnsList(), "UPDATE-BEFORE");
printColumn(rowData.getAfterColumnsList(), "UPDATE-AFTER");
}
}
}
}
}
/**
* 打印列信息
*/
private static void printColumn(List<Column> columns, String eventType) {
StringBuilder sb = new StringBuilder();
sb.append(eventType).append(": ");
for (Column column : columns) {
sb.append(column.getName())
.append("=")
.append(column.getValue())
.append(", ");
}
logger.info(sb.toString());
}
}6.3 高级客户端示例
创建更完善的客户端,包含错误处理和业务逻辑:
package com.example.canal;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry.*;
import com.alibaba.otter.canal.protocol.Message;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.net.InetSocketAddress;
import java.util.List;
import java.util.concurrent.TimeUnit;
public class AdvancedCanalClient {
private static final Logger logger = LoggerFactory.getLogger(AdvancedCanalClient.class);
private static final int BATCH_SIZE = 1000;
private static final long TIMEOUT = 100;
private CanalConnector connector;
private volatile boolean running = false;
public AdvancedCanalClient(String host, int port, String destination) {
this.connector = CanalConnectors.newSingleConnector(
new InetSocketAddress(host, port),
destination,
"",
""
);
}
public void start() {
try {
connector.connect();
connector.subscribe(".*\\..*");
connector.rollback();
running = true;
logger.info("Canal client started");
while (running) {
Message message = connector.getWithoutAck(BATCH_SIZE, TIMEOUT, TimeUnit.MILLISECONDS);
long batchId = message.getId();
int size = message.getEntries().size();
if (batchId == -1 || size == 0) {
continue;
}
try {
processMessage(message);
connector.ack(batchId);
} catch (Exception e) {
logger.error("Process message error, rollback batchId: {}", batchId, e);
connector.rollback(batchId);
}
}
} catch (Exception e) {
logger.error("Canal client error", e);
} finally {
stop();
}
}
private void processMessage(Message message) {
for (Entry entry : message.getEntries()) {
if (entry.getEntryType() == EntryType.ROWDATA) {
processRowChange(entry);
} else if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN) {
logger.debug("Transaction begin");
} else if (entry.getEntryType() == EntryType.TRANSACTIONEND) {
logger.debug("Transaction end");
}
}
}
private void processRowChange(Entry entry) {
try {
RowChange rowChange = RowChange.parseFrom(entry.getStoreValue());
EventType eventType = rowChange.getEventType();
String tableName = entry.getHeader().getTableName();
String databaseName = entry.getHeader().getSchemaName();
// 只处理特定表的特定操作
if (!"test_db".equals(databaseName) || !"user".equals(tableName)) {
return;
}
for (RowData rowData : rowChange.getRowDatasList()) {
if (eventType == EventType.INSERT) {
handleInsert(rowData.getAfterColumnsList(), databaseName, tableName);
} else if (eventType == EventType.UPDATE) {
handleUpdate(rowData.getBeforeColumnsList(),
rowData.getAfterColumnsList(),
databaseName, tableName);
} else if (eventType == EventType.DELETE) {
handleDelete(rowData.getBeforeColumnsList(), databaseName, tableName);
}
}
} catch (Exception e) {
logger.error("Process row change error", e);
}
}
private void handleInsert(List<Column> columns, String database, String table) {
logger.info("INSERT into {}.{}", database, table);
// 业务处理:更新缓存、同步到其他系统等
// TODO: 实现业务逻辑
}
private void handleUpdate(List<Column> beforeColumns,
List<Column> afterColumns,
String database, String table) {
logger.info("UPDATE {}.{}", database, table);
// 业务处理
// TODO: 实现业务逻辑
}
private void handleDelete(List<Column> columns, String database, String table) {
logger.info("DELETE from {}.{}", database, table);
// 业务处理
// TODO: 实现业务逻辑
}
public void stop() {
running = false;
if (connector != null) {
connector.disconnect();
}
logger.info("Canal client stopped");
}
public static void main(String[] args) {
AdvancedCanalClient client = new AdvancedCanalClient("127.0.0.1", 11111, "example");
// 添加关闭钩子
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
logger.info("Shutting down...");
client.stop();
}));
client.start();
}
}// 只订阅特定数据库的特定表
connector.subscribe("test_db\\.user,test_db\\.order");
// 在代码中过滤
if ("test_db".equals(databaseName) && "user".equals(tableName)) {
// 处理user表的数据
}7. 实际应用案例
package com.example.canal;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.protocol.CanalEntry.*;
import com.alibaba.otter.canal.protocol.Message;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;
import java.util.List;
public class CanalToRedis {
private JedisPool jedisPool;
private CanalConnector connector;
public CanalToRedis() {
// 初始化Redis连接池
JedisPoolConfig config = new JedisPoolConfig();
config.setMaxTotal(10);
this.jedisPool = new JedisPool(config, "127.0.0.1", 6379);
// 初始化Canal连接
this.connector = CanalConnectors.newSingleConnector(
new java.net.InetSocketAddress("127.0.0.1", 11111),
"example", "", ""
);
}
public void start() {
try {
connector.connect();
connector.subscribe("test_db\\.user");
connector.rollback();
while (true) {
Message message = connector.getWithoutAck(100);
long batchId = message.getId();
if (batchId == -1 || message.getEntries().size() == 0) {
Thread.sleep(1000);
continue;
}
processMessage(message);
connector.ack(batchId);
}
} catch (Exception e) {
e.printStackTrace();
} finally {
connector.disconnect();
}
}
private void processMessage(Message message) {
try (Jedis jedis = jedisPool.getResource()) {
for (Entry entry : message.getEntries()) {
if (entry.getEntryType() == EntryType.ROWDATA) {
RowChange rowChange = RowChange.parseFrom(entry.getStoreValue());
EventType eventType = rowChange.getEventType();
for (RowData rowData : rowChange.getRowDatasList()) {
if (eventType == EventType.INSERT || eventType == EventType.UPDATE) {
// 同步到Redis
syncToRedis(jedis, rowData.getAfterColumnsList());
} else if (eventType == EventType.DELETE) {
// 从Redis删除
deleteFromRedis(jedis, rowData.getBeforeColumnsList());
}
}
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
private void syncToRedis(Jedis jedis, List<Column> columns) {
String id = null;
String username = null;
for (Column column : columns) {
if ("id".equals(column.getName())) {
id = column.getValue();
} else if ("username".equals(column.getName())) {
username = column.getValue();
}
}
if (id != null) {
// 存储到Redis,key格式:user:{id}
String key = "user:" + id;
jedis.hset(key, "id", id);
if (username != null) {
jedis.hset(key, "username", username);
}
jedis.expire(key, 3600); // 设置过期时间
}
}
private void deleteFromRedis(Jedis jedis, List<Column> columns) {
String id = null;
for (Column column : columns) {
if ("id".equals(column.getName())) {
id = column.getValue();
break;
}
}
if (id != null) {
jedis.del("user:" + id);
}
}
}package com.example.canal;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.protocol.CanalEntry.*;
import com.alibaba.otter.canal.protocol.Message;
import com.alibaba.fastjson.JSON;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.common.xcontent.XContentType;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class CanalToElasticsearch {
private RestHighLevelClient esClient;
private CanalConnector connector;
public CanalToElasticsearch() {
// 初始化Elasticsearch客户端
this.esClient = new RestHighLevelClient(
RestClient.builder(
new org.apache.http.HttpHost("localhost", 9200, "http")
)
);
// 初始化Canal连接
this.connector = CanalConnectors.newSingleConnector(
new java.net.InetSocketAddress("127.0.0.1", 11111),
"example", "", ""
);
}
public void start() {
try {
connector.connect();
connector.subscribe("test_db\\.user");
connector.rollback();
while (true) {
Message message = connector.getWithoutAck(100);
long batchId = message.getId();
if (batchId == -1 || message.getEntries().size() == 0) {
Thread.sleep(1000);
continue;
}
processMessage(message);
connector.ack(batchId);
}
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
esClient.close();
} catch (Exception e) {
e.printStackTrace();
}
connector.disconnect();
}
}
private void processMessage(Message message) {
try {
for (Entry entry : message.getEntries()) {
if (entry.getEntryType() == EntryType.ROWDATA) {
RowChange rowChange = RowChange.parseFrom(entry.getStoreValue());
EventType eventType = rowChange.getEventType();
for (RowData rowData : rowChange.getRowDatasList()) {
if (eventType == EventType.INSERT || eventType == EventType.UPDATE) {
syncToES(rowData.getAfterColumnsList());
} else if (eventType == EventType.DELETE) {
deleteFromES(rowData.getBeforeColumnsList());
}
}
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
private void syncToES(List<Column> columns) {
Map<String, Object> doc = new HashMap<>();
String id = null;
for (Column column : columns) {
doc.put(column.getName(), column.getValue());
if ("id".equals(column.getName())) {
id = column.getValue();
}
}
if (id != null) {
IndexRequest request = new IndexRequest("user_index")
.id(id)
.source(JSON.toJSONString(doc), XContentType.JSON);
try {
esClient.index(request, RequestOptions.DEFAULT);
} catch (Exception e) {
e.printStackTrace();
}
}
}
private void deleteFromES(List<Column> columns) {
String id = null;
for (Column column : columns) {
if ("id".equals(column.getName())) {
id = column.getValue();
break;
}
}
if (id != null) {
DeleteRequest request = new DeleteRequest("user_index", id);
try {
esClient.delete(request, RequestOptions.DEFAULT);
} catch (Exception e) {
e.printStackTrace();
}
}
}
}8. Canal与Spring Boot集成
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.client</artifactId>
<version>1.1.7</version>
</dependency>
</dependencies>package com.example.canal.config;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.net.InetSocketAddress;
@Configuration
public class CanalConfig {
@Value("${canal.host:127.0.0.1}")
private String canalHost;
@Value("${canal.port:11111}")
private int canalPort;
@Value("${canal.destination:example}")
private String destination;
@Bean
public CanalConnector canalConnector() {
return CanalConnectors.newSingleConnector(
new InetSocketAddress(canalHost, canalPort),
destination,
"",
""
);
}
}package com.example.canal.service;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.protocol.CanalEntry.*;
import com.alibaba.otter.canal.protocol.Message;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.util.List;
@Service
public class CanalService {
private static final Logger logger = LoggerFactory.getLogger(CanalService.class);
@Autowired
private CanalConnector canalConnector;
private volatile boolean running = false;
private Thread workerThread;
@PostConstruct
public void start() {
running = true;
workerThread = new Thread(this::process);
workerThread.setDaemon(true);
workerThread.start();
logger.info("Canal service started");
}
@PreDestroy
public void stop() {
running = false;
if (workerThread != null) {
workerThread.interrupt();
}
logger.info("Canal service stopped");
}
private void process() {
try {
canalConnector.connect();
canalConnector.subscribe(".*\\..*");
canalConnector.rollback();
while (running) {
Message message = canalConnector.getWithoutAck(100);
long batchId = message.getId();
if (batchId == -1 || message.getEntries().size() == 0) {
Thread.sleep(1000);
continue;
}
try {
handleMessage(message);
canalConnector.ack(batchId);
} catch (Exception e) {
logger.error("Process message error", e);
canalConnector.rollback(batchId);
}
}
} catch (Exception e) {
logger.error("Canal process error", e);
} finally {
canalConnector.disconnect();
}
}
private void handleMessage(Message message) {
for (Entry entry : message.getEntries()) {
if (entry.getEntryType() == EntryType.ROWDATA) {
// 处理数据变更
// TODO: 实现业务逻辑
}
}
}
}9. Canal高可用部署
修改canal.properties:
# 设置为集群模式
canal.serverMode = cluster
# Zookeeper地址
canal.zkServers = 127.0.0.1:2181确保Zookeeper已安装并运行:
# 启动Zookeeper
zkServer.sh start在不同服务器上部署多个Canal实例,实现高可用。
10. 性能优化与监控
- 调整batchSize:根据数据量调整批量大小
- 使用异步处理:避免阻塞主线程
- 连接池优化:合理配置连接池参数
- 过滤不需要的表:减少处理的数据量
- Canal Server连接数
- 处理的消息数量
- 延迟时间
- 错误率
11. 常见问题与解决方案
问题:Canal连接MySQL失败
解决方案:
- 检查MySQL是否开启binlog
- 检查binlog格式是否为ROW
- 检查Canal用户权限
- 检查网络连接
问题:Canal客户端获取不到数据
解决方案:
- 检查Canal Server是否正常运行
- 检查订阅的表是否正确
- 检查是否有数据变更
- 查看Canal日志
问题:数据同步有延迟
解决方案:
- 检查网络延迟
- 调整batchSize
- 优化处理逻辑
- 检查服务器负载
12. 总结与进阶
通过本教程,你已经掌握了:
- ✅ Canal的基本概念和工作原理
- ✅ MySQL binlog配置
- ✅ Canal Server安装和配置
- ✅ Canal客户端开发
- ✅ 实际应用场景
- Canal Admin:学习使用Canal管理平台
- Canal与Kafka集成:通过Kafka消费Canal数据
- Canal高可用:学习集群部署
- 性能调优:深入学习性能优化技巧
- GitHub仓库:https://github.com/alibaba/canal
- 官方文档:https://github.com/alibaba/canal/wiki
- 社区支持:GitHub Issues
结语
Canal是一个强大的数据同步工具,在实时数据处理、缓存更新等场景中具有广泛应用。通过本教程的学习,相信你已经掌握了Canal的基本使用方法。
记住:
- 多实践:通过实际项目加深理解
- 关注性能:注意性能优化
- 持续学习:关注Canal的最新发展
祝你学习愉快,编程顺利! 🚀
本教程由Java突击队学习社区编写,如有问题欢迎反馈。