Kafka入门教程 - 从零开始学习Apache Kafka消息队列
Kafka入门教程 - 从零开始学习Apache Kafka消息队列
目录
- Kafka简介
- Kafka核心概念
- Kafka安装与配置
- Kafka基本操作
- Kafka生产者
- Kafka消费者
- Kafka主题和分区
- Kafka集群部署
- Kafka与Spring Boot集成
- Kafka监控与管理
- Kafka最佳实践
- 常见问题与解决方案
- 总结与进阶
1. Kafka简介
Apache Kafka是一个开源的分布式流处理平台,由LinkedIn开发,后成为Apache基金会的顶级项目。Kafka主要用于构建实时数据管道和流式应用程序。
核心特点:
- ✅ 高吞吐量:支持每秒百万级消息处理
- ✅ 可扩展性:支持水平扩展,可轻松扩展到数百个节点
- ✅ 持久化:消息持久化到磁盘,支持数据保留策略
- ✅ 分布式:天然支持分布式部署和集群
- ✅ 容错性:支持数据复制和故障转移
- ✅ 实时性:低延迟的消息传递
- ✅ 多客户端支持:支持多种编程语言
典型应用场景:
- 消息队列:解耦系统,异步处理
- 日志收集:集中收集各系统日志
- 流式处理:实时数据处理和分析
- 事件溯源:记录系统状态变化
- 指标监控:收集和监控系统指标
- 活动流处理:用户行为追踪和分析
| 特性 | Kafka | RabbitMQ | RocketMQ | ActiveMQ |
|---|---|---|---|---|
| 吞吐量 | 极高 | 中等 | 高 | 中等 |
| 延迟 | 低 | 低 | 低 | 低 |
| 持久化 | 支持 | 支持 | 支持 | 支持 |
| 顺序保证 | 分区内有序 | 队列有序 | 队列有序 | 队列有序 |
| 适用场景 | 大数据、日志 | 传统消息队列 | 电商、金融 | 传统消息队列 |
- 0.8.x:基础版本
- 0.9.x:引入安全特性
- 0.10.x:引入Kafka Streams
- 1.0.x:稳定版本
- 2.0.x:性能优化
- 3.0.x:移除ZooKeeper依赖(KRaft模式)
2. Kafka核心概念
Topic(主题)
Topic是消息的类别或名称,类似于数据库中的表。生产者向Topic发送消息,消费者从Topic消费消息。
Topic: user-events
├── Message 1: User registered
├── Message 2: User logged in
└── Message 3: User purchasedPartition(分区)
Topic被分成多个Partition,每个Partition是一个有序的消息队列。Partition允许Topic在多个Broker上分布,实现并行处理。
Topic: user-events (3个分区)
├── Partition 0: [msg1, msg4, msg7]
├── Partition 1: [msg2, msg5, msg8]
└── Partition 2: [msg3, msg6, msg9]Broker(代理)
Broker是Kafka集群中的服务器节点,负责存储和转发消息。一个Kafka集群由多个Broker组成。
Kafka Cluster
├── Broker 1 (localhost:9092)
├── Broker 2 (localhost:9093)
└── Broker 3 (localhost:9094)Producer(生产者)
Producer是向Kafka Topic发送消息的客户端应用程序。
Producer → Topic: user-events
└── Message: "User registered: user123"Consumer(消费者)
Consumer是从Kafka Topic读取消息的客户端应用程序。
Topic: user-events → Consumer
└── Message: "User registered: user123"Consumer Group(消费者组)
Consumer Group是一组Consumer的集合,它们共同消费一个Topic。组内的Consumer会负载均衡地消费消息。
Consumer Group: user-processors
├── Consumer 1 → Partition 0
├── Consumer 2 → Partition 1
└── Consumer 3 → Partition 2Offset(偏移量)
Offset是消息在Partition中的位置标识,Consumer通过Offset来跟踪已消费的消息位置。
Partition 0:
Offset 0: Message 1
Offset 1: Message 2
Offset 2: Message 3
↑
Consumer当前位置: Offset 1整体架构
┌─────────┐ ┌─────────┐ ┌─────────┐
│Producer │ │Producer │ │Producer │
└────┬────┘ └────┬────┘ └────┬────┘
│ │ │
└────────────────┼────────────────┘
│
┌───────▼───────┐
│ Kafka Cluster│
│ │
│ ┌──────────┐ │
│ │ Broker 1 │ │
│ └──────────┘ │
│ ┌──────────┐ │
│ │ Broker 2 │ │
│ └──────────┘ │
│ ┌──────────┐ │
│ │ Broker 3 │ │
│ └──────────┘ │
└───────┬───────┘
│
┌────────────────┼────────────────┐
│ │ │
┌────▼────┐ ┌────▼────┐ ┌────▼────┐
│Consumer │ │Consumer │ │Consumer │
│Group 1 │ │Group 2 │ │Group 3 │
└─────────┘ └─────────┘ └─────────┘数据流
Producer → Topic (Partitions) → Consumer Groups → ConsumersKafka将消息持久化到磁盘,采用顺序写入的方式,性能极高。
Topic: user-events
Partition 0: segment-0.log, segment-1.log, ...
Partition 1: segment-0.log, segment-1.log, ...3. Kafka安装与配置
- Java环境:JDK 8或更高版本
- 操作系统:Linux、macOS、Windows
- 内存:至少2GB(推荐4GB+)
- 磁盘:足够的磁盘空间存储消息
访问Kafka官网下载:https://kafka.apache.org/downloads
# 下载Kafka(以2.13-3.5.0为例)
wget https://downloads.apache.org/kafka/3.5.0/kafka_2.13-3.5.0.tgz
# 解压
tar -xzf kafka_2.13-3.5.0.tgz
cd kafka_2.13-3.5.0启动ZooKeeper
Kafka依赖ZooKeeper(3.5.0之前的版本),需要先启动ZooKeeper:
# 启动ZooKeeper(Kafka自带ZooKeeper,仅用于开发测试)
bin/zookeeper-server-start.sh config/zookeeper.properties启动Kafka Broker
# 启动Kafka
bin/kafka-server-start.sh config/server.properties验证安装
# 创建Topic测试
bin/kafka-topics.sh --create --topic test-topic --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1
# 列出Topic
bin/kafka-topics.sh --list --bootstrap-server localhost:9092server.properties核心配置
# Broker ID(集群中唯一)
broker.id=0
# 监听地址
listeners=PLAINTEXT://localhost:9092
# 日志目录
log.dirs=/tmp/kafka-logs
# ZooKeeper连接(如果使用ZooKeeper)
zookeeper.connect=localhost:2181
# 默认分区数
num.partitions=3
# 默认副本数
default.replication.factor=1
# 消息保留时间(小时)
log.retention.hours=168
# 日志段大小
log.segment.bytes=1073741824
# 压缩类型
compression.type=producer3.5 Docker方式安装(推荐)
使用Docker Compose
创建docker-compose.yml:
version: '3.8'
services:
zookeeper:
image: confluentinc/cp-zookeeper:latest
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
ports:
- "2181:2181"
kafka:
image: confluentinc/cp-kafka:latest
depends_on:
- zookeeper
ports:
- "9092:9092"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1启动:
docker-compose up -d# 检查Kafka是否运行
jps | grep Kafka
# 测试连接
telnet localhost 90924. Kafka基本操作
创建Topic
# 基本创建
bin/kafka-topics.sh --create \
--topic my-topic \
--bootstrap-server localhost:9092 \
--partitions 3 \
--replication-factor 1
# 带配置创建
bin/kafka-topics.sh --create \
--topic my-topic \
--bootstrap-server localhost:9092 \
--partitions 3 \
--replication-factor 1 \
--config retention.ms=86400000列出所有Topic
bin/kafka-topics.sh --list --bootstrap-server localhost:9092查看Topic详情
bin/kafka-topics.sh --describe \
--topic my-topic \
--bootstrap-server localhost:9092输出示例:
Topic: my-topic PartitionCount: 3 ReplicationFactor: 1
Topic: my-topic Partition: 0 Leader: 0 Replicas: 0 Isr: 0
Topic: my-topic Partition: 1 Leader: 0 Replicas: 0 Isr: 0
Topic: my-topic Partition: 2 Leader: 0 Replicas: 0 Isr: 0删除Topic
bin/kafka-topics.sh --delete \
--topic my-topic \
--bootstrap-server localhost:9092修改Topic配置
# 修改消息保留时间
bin/kafka-configs.sh --alter \
--entity-type topics \
--entity-name my-topic \
--add-config retention.ms=86400000 \
--bootstrap-server localhost:9092发送消息
bin/kafka-console-producer.sh \
--topic my-topic \
--bootstrap-server localhost:9092然后输入消息,每行一条,按Ctrl+C退出。
发送带Key的消息
bin/kafka-console-producer.sh \
--topic my-topic \
--bootstrap-server localhost:9092 \
--property "parse.key=true" \
--property "key.separator=:"输入格式:key:value
消费消息
bin/kafka-console-consumer.sh \
--topic my-topic \
--bootstrap-server localhost:9092 \
--from-beginning--from-beginning:从最早的消息开始消费
消费指定分区
bin/kafka-console-consumer.sh \
--topic my-topic \
--bootstrap-server localhost:9092 \
--partition 0 \
--from-beginning消费带Key的消息
bin/kafka-console-consumer.sh \
--topic my-topic \
--bootstrap-server localhost:9092 \
--property print.key=true \
--property print.value=true \
--from-beginning消费者组操作
# 查看消费者组
bin/kafka-consumer-groups.sh \
--bootstrap-server localhost:9092 \
--list
# 查看消费者组详情
bin/kafka-consumer-groups.sh \
--bootstrap-server localhost:9092 \
--group my-group \
--describe
# 重置Offset
bin/kafka-consumer-groups.sh \
--bootstrap-server localhost:9092 \
--group my-group \
--topic my-topic \
--reset-offsets \
--to-earliest \
--execute5. Kafka生产者
Maven依赖
<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.5.0</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.36</version>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>1.2.12</version>
</dependency>
</dependencies>Gradle依赖
dependencies {
implementation 'org.apache.kafka:kafka-clients:3.5.0'
implementation 'org.slf4j:slf4j-api:1.7.36'
implementation 'ch.qos.logback:logback-classic:1.2.12'
}import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.ProducerConfig;
import java.util.Properties;
public class SimpleProducer {
public static void main(String[] args) {
// 配置属性
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");
// 创建生产者
Producer<String, String> producer = new KafkaProducer<>(props);
// 发送消息
for (int i = 0; i < 10; i++) {
ProducerRecord<String, String> record =
new ProducerRecord<>("my-topic", "key-" + i, "value-" + i);
producer.send(record);
}
// 关闭生产者
producer.close();
}
}核心配置
Properties props = new Properties();
// 必需配置
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");
// 可选配置
props.put(ProducerConfig.ACKS_CONFIG, "all"); // 确认机制
props.put(ProducerConfig.RETRIES_CONFIG, 3); // 重试次数
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384); // 批次大小
props.put(ProducerConfig.LINGER_MS_CONFIG, 1); // 等待时间
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432); // 缓冲区大小
props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy"); // 压缩类型重要配置说明
acks:
0:不等待确认1:等待Leader确认all:等待所有副本确认(最安全)
retries:发送失败时的重试次数
batch.size:批次大小,提高吞吐量
linger.ms:等待时间,用于批量发送
compression.type:压缩类型(none, gzip, snappy, lz4)
try {
RecordMetadata metadata = producer.send(record).get();
System.out.println("消息发送成功: " + metadata.topic() +
"-" + metadata.partition() +
"-" + metadata.offset());
} catch (Exception e) {
e.printStackTrace();
}producer.send(record, new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (exception == null) {
System.out.println("消息发送成功: " + metadata.topic() +
"-" + metadata.partition() +
"-" + metadata.offset());
} else {
System.err.println("消息发送失败: " + exception.getMessage());
}
}
});import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.RecordMetadata;
public class ProducerWithCallback {
public static void main(String[] args) {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer<>(props);
for (int i = 0; i < 10; i++) {
ProducerRecord<String, String> record =
new ProducerRecord<>("my-topic", "key-" + i, "value-" + i);
producer.send(record, (metadata, exception) -> {
if (exception == null) {
System.out.println("Topic: " + metadata.topic() +
", Partition: " + metadata.partition() +
", Offset: " + metadata.offset());
} else {
System.err.println("发送失败: " + exception.getMessage());
}
});
}
producer.close();
}
}import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import java.util.Map;
public class CustomPartitioner implements Partitioner {
@Override
public int partition(String topic, Object key, byte[] keyBytes,
Object value, byte[] valueBytes, Cluster cluster) {
// 自定义分区逻辑
if (key == null) {
return 0;
}
String keyStr = key.toString();
// 根据key的hash值分区
return Math.abs(keyStr.hashCode()) % cluster.partitionCountForTopic(topic);
}
@Override
public void close() {
// 清理资源
}
@Override
public void configure(Map<String, ?> configs) {
// 配置初始化
}
}使用自定义分区器:
props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, CustomPartitioner.class.getName());import org.apache.kafka.common.serialization.Serializer;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.util.Map;
public class UserSerializer implements Serializer<User> {
private ObjectMapper objectMapper = new ObjectMapper();
@Override
public void configure(Map<String, ?> configs, boolean isKey) {
// 配置初始化
}
@Override
public byte[] serialize(String topic, User user) {
try {
return objectMapper.writeValueAsBytes(user);
} catch (Exception e) {
throw new RuntimeException("序列化失败", e);
}
}
@Override
public void close() {
// 清理资源
}
}6. Kafka消费者
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
public class SimpleConsumer {
public static void main(String[] args) {
// 配置属性
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "my-consumer-group");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getName());
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
// 创建消费者
Consumer<String, String> consumer = new KafkaConsumer<>(props);
// 订阅Topic
consumer.subscribe(Collections.singletonList("my-topic"));
// 消费消息
try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
records.forEach(record -> {
System.out.println("Topic: " + record.topic() +
", Partition: " + record.partition() +
", Offset: " + record.offset() +
", Key: " + record.key() +
", Value: " + record.value());
});
}
} finally {
consumer.close();
}
}
}核心配置
Properties props = new Properties();
// 必需配置
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "my-consumer-group");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getName());
// 重要配置
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); // earliest/latest/none
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true"); // 自动提交Offset
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 1000); // 自动提交间隔
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30000); // 会话超时
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 500); // 每次拉取最大记录数
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 300000); // 最大拉取间隔重要配置说明
auto.offset.reset:
earliest:从最早的消息开始latest:从最新的消息开始none:如果没有Offset则抛出异常
enable.auto.commit:是否自动提交Offset
max.poll.records:每次poll的最大记录数
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
Consumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("my-topic"));
try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
// 处理消息
processMessage(record);
}
// 手动提交Offset
consumer.commitSync();
}
} finally {
consumer.close();
}// 订阅特定分区
TopicPartition partition = new TopicPartition("my-topic", 0);
consumer.assign(Collections.singletonList(partition));
// 从指定Offset开始消费
consumer.seek(partition, 100);
// 消费消息
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
// 处理消息...
}// 同一个消费者组的消费者会负载均衡地消费消息
Properties props = new Properties();
props.put(ConsumerConfig.GROUP_ID_CONFIG, "my-consumer-group");
// ... 其他配置
Consumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("my-topic"));import org.apache.kafka.common.serialization.Deserializer;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.util.Map;
public class UserDeserializer implements Deserializer<User> {
private ObjectMapper objectMapper = new ObjectMapper();
@Override
public void configure(Map<String, ?> configs, boolean isKey) {
// 配置初始化
}
@Override
public User deserialize(String topic, byte[] data) {
try {
return objectMapper.readValue(data, User.class);
} catch (Exception e) {
throw new RuntimeException("反序列化失败", e);
}
}
@Override
public void close() {
// 清理资源
}
}7. Kafka主题和分区
- 并行处理:不同分区可以并行处理
- 扩展性:可以增加分区数提高吞吐量
- 顺序性:分区内消息有序
- 负载均衡:消息分布到不同分区
默认分区策略
- 指定分区:直接发送到指定分区
- 指定Key:根据Key的hash值选择分区
- 轮询:没有Key时轮询分配
// 指定分区
ProducerRecord<String, String> record =
new ProducerRecord<>("my-topic", 0, "key", "value");
// 指定Key(会根据Key的hash值选择分区)
ProducerRecord<String, String> record2 =
new ProducerRecord<>("my-topic", "key", "value");
// 不指定Key(轮询)
ProducerRecord<String, String> record3 =
new ProducerRecord<>("my-topic", null, "value");副本的作用
- 高可用性:Leader故障时,Follower可以成为新的Leader
- 数据冗余:数据在多个Broker上备份
副本配置
# 创建带副本的Topic
bin/kafka-topics.sh --create \
--topic my-topic \
--bootstrap-server localhost:9092 \
--partitions 3 \
--replication-factor 2- Leader:处理读写请求的分区副本
- Follower:同步Leader数据的副本
- ISR:In-Sync Replicas,与Leader同步的副本集合
8. Kafka集群部署
Kafka Cluster
├── Broker 1 (broker.id=1)
│ ├── Topic A Partition 0 (Leader)
│ └── Topic A Partition 1 (Follower)
├── Broker 2 (broker.id=2)
│ ├── Topic A Partition 0 (Follower)
│ └── Topic A Partition 1 (Leader)
└── Broker 3 (broker.id=3)
├── Topic A Partition 0 (Follower)
└── Topic A Partition 1 (Follower)server.properties配置
# Broker 1
broker.id=1
listeners=PLAINTEXT://localhost:9092
log.dirs=/tmp/kafka-logs-1
zookeeper.connect=localhost:2181
# Broker 2
broker.id=2
listeners=PLAINTEXT://localhost:9093
log.dirs=/tmp/kafka-logs-2
zookeeper.connect=localhost:2181
# Broker 3
broker.id=3
listeners=PLAINTEXT://localhost:9094
log.dirs=/tmp/kafka-logs-3
zookeeper.connect=localhost:2181# 启动ZooKeeper
bin/zookeeper-server-start.sh config/zookeeper.properties
# 启动Broker 1
bin/kafka-server-start.sh config/server-1.properties
# 启动Broker 2
bin/kafka-server-start.sh config/server-2.properties
# 启动Broker 3
bin/kafka-server-start.sh config/server-3.propertiesbin/kafka-topics.sh --create \
--topic my-topic \
--bootstrap-server localhost:9092 \
--partitions 3 \
--replication-factor 39. Kafka与Spring Boot集成
<dependencies>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>3.0.0</version>
</dependency>
</dependencies>spring:
kafka:
bootstrap-servers: localhost:9092
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
consumer:
group-id: my-group
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
auto-offset-reset: earliestimport org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
@Service
public class KafkaProducerService {
private final KafkaTemplate<String, String> kafkaTemplate;
public KafkaProducerService(KafkaTemplate<String, String> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
public void sendMessage(String topic, String message) {
kafkaTemplate.send(topic, message);
}
public void sendMessage(String topic, String key, String message) {
kafkaTemplate.send(topic, key, message);
}
}import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
@Component
public class KafkaConsumerService {
@KafkaListener(topics = "my-topic", groupId = "my-group")
public void consume(String message) {
System.out.println("收到消息: " + message);
}
@KafkaListener(topics = "my-topic", groupId = "my-group")
public void consumeWithHeaders(String message,
@Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition) {
System.out.println("Topic: " + topic + ", Partition: " + partition + ", Message: " + message);
}
}10. Kafka监控与管理
Kafka Manager是一个Web管理工具,用于管理Kafka集群。
Kafka暴露JMX指标,可以使用JConsole或Prometheus监控。
- 消息吞吐量:每秒消息数
- 延迟:消息处理延迟
- Offset Lag:消费者延迟
- Broker状态:Broker健康状态
11. Kafka最佳实践
使用异步发送:提高吞吐量
合理设置批次大小:平衡延迟和吞吐量
使用压缩:减少网络传输
处理异常:实现重试和错误处理
关闭生产者:确保资源释放
合理设置消费者组:避免重复消费
手动提交Offset:确保消息处理完成
处理异常:避免消息丢失
控制拉取大小:避免内存溢出
监控消费延迟:及时发现问题
合理设置分区数:根据吞吐量需求
设置副本数:保证高可用
设置保留策略:控制存储空间
命名规范:清晰的命名规则
12. 常见问题与解决方案
原因:
- Producer未等待确认
- Consumer未提交Offset
解决方案:
- 设置
acks=all - 手动提交Offset
原因:
- Producer重试
- Consumer重复提交
解决方案:
- 实现幂等性
- 使用事务
原因:
- 消费者处理慢
- 分区数不足
解决方案:
- 增加消费者
- 增加分区数
- 优化处理逻辑
13. 总结与进阶
通过本教程,你已经掌握了:
- ✅ Kafka核心概念
- ✅ Kafka安装和配置
- ✅ 生产者和消费者开发
- ✅ Topic和分区管理
- ✅ 集群部署
- ✅ Spring Boot集成
- Kafka Streams:流处理框架
- Kafka Connect:数据集成
- Schema Registry:Schema管理
- 性能优化:调优和监控
- 安全配置:认证和授权
- 官方文档:https://kafka.apache.org/documentation/
- GitHub:https://github.com/apache/kafka
- Confluent:https://www.confluent.io/
结语
Kafka是一个功能强大的分布式消息系统,广泛应用于大数据和微服务架构中。通过本教程的学习,相信你已经掌握了Kafka的核心功能和使用方法。
记住:
- 多实践:通过实际项目练习
- 理解原理:深入理解Kafka的工作原理
- 关注性能:注意性能优化
- 持续学习:关注Kafka新特性
祝你学习愉快,编程顺利! 🚀
本教程由Java突击队学习社区编写,如有问题欢迎反馈。