Kafka

  1. 1. Kafka 概述
    1. 1.1. 核心概念
  2. 2. 安装 Kafka
    1. 2.1. 使用 Docker 安装(推荐)
    2. 2.2. 手动安装
  3. 3. Kafka 架构
    1. 3.1. 架构图
    2. 3.2. 分区和副本
  4. 4. 基本操作
    1. 4.1. Topic 管理
    2. 4.2. 生产消息
    3. 4.3. 消费消息
    4. 4.4. Consumer Group 管理
  5. 5. 程序化使用
    1. 5.1. Java Producer 示例
    2. 5.2. Java Consumer 示例
    3. 5.3. Python Producer 示例
    4. 5.4. Python Consumer 示例
  6. 6. Kafka 配置
    1. 6.1. Broker 重要配置
    2. 6.2. Producer 重要配置
    3. 6.3. Consumer 重要配置
  7. 7. Kafka Streams
    1. 7.1. 简单示例
  8. 8. 性能优化
    1. 8.1. Producer 优化
    2. 8.2. Consumer 优化
    3. 8.3. Broker 优化
  9. 9. 监控和管理
    1. 9.1. 常用监控指标
    2. 9.2. 常见 JMX 指标
  10. 10. 常见问题和解决方案
    1. 10.1. 消息丢失
    2. 10.2. 消息重复
    3. 10.3. 消息乱序
  11. 11. 总结核心知识要点
    1. 11.1. Kafka 架构核心
    2. 11.2. 核心概念示意图
    3. 11.3. 关键配置对比
    4. 11.4. 消息可靠性保证
      1. 11.4.1. 1. 避免消息丢失
      2. 11.4.2. 2. 避免消息重复
      3. 11.4.3. 3. 保证消息顺序
    5. 11.5. 实际应用场景代码
      1. 11.5.1. 场景1: 日志收集
      2. 11.5.2. 场景2: 实时数据管道
      3. 11.5.3. 场景3: 事件溯源
    6. 11.6. 性能调优清单
    7. 11.7. 常用运维命令
    8. 11.8. 最佳实践
    9. 11.9. 核心原理
  12. 12. References

Apache Kafka is an open-source distributed streaming system used for stream processing, real-time data pipelines, and data integration at scale.

Kafka 概述

Apache Kafka 是一个分布式流处理平台,最初由 LinkedIn 开发,现由 Apache 软件基金会维护。Kafka 的核心特性包括:

  • 高吞吐量: 每秒处理数百万条消息
  • 持久化存储: 消息持久化到磁盘,支持消息回溯
  • 分布式架构: 支持集群部署,可水平扩展
  • 容错性: 数据副本机制保证高可用
  • 实时处理: 支持实时流处理

核心概念

  • Producer(生产者): 向 Kafka 发送消息的客户端
  • Consumer(消费者): 从 Kafka 读取消息的客户端
  • Broker(代理): Kafka 集群中的服务器节点
  • Topic(主题): 消息的分类,类似数据库的表
  • Partition(分区): Topic 的物理分片,实现并行处理
  • Offset(偏移量): 消息在分区中的唯一标识
  • Consumer Group(消费者组): 多个消费者协作消费同一 Topic
  • Replication(副本): 分区的数据备份,保证高可用

安装 Kafka

使用 Docker 安装(推荐)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
# docker-compose.yml
version: '3'
services:
zookeeper:
image: confluentinc/cp-zookeeper:latest
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
ports:
- "2181:2181"

kafka:
image: confluentinc/cp-kafka:latest
depends_on:
- zookeeper
ports:
- "9092:9092"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1

# 启动
docker-compose up -d

手动安装

1
2
3
4
5
6
7
8
9
10
# 下载 Kafka
wget https://downloads.apache.org/kafka/3.6.0/kafka_2.13-3.6.0.tgz
tar -xzf kafka_2.13-3.6.0.tgz
cd kafka_2.13-3.6.0

# 启动 Zookeeper
bin/zookeeper-server-start.sh config/zookeeper.properties

# 启动 Kafka Server
bin/kafka-server-start.sh config/server.properties

Kafka 架构

架构图

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
┌─────────────────────────────────────────────────────────┐
│ Kafka Cluster │
│ │
│ ┌────────┐ ┌────────┐ ┌────────┐ │
│ │Broker 1│ │Broker 2│ │Broker 3│ │
│ │ │ │ │ │ │ │
│ │ Topic1 │ │ Topic1 │ │ Topic1 │ │
│ │ P0(L) │◄────►│ P0(F) │◄────►│ P0(F) │ │
│ │ P1(F) │ │ P1(L) │ │ P1(F) │ │
│ └────────┘ └────────┘ └────────┘ │
│ │
└─────────────────────────────────────────────────────────┘
▲ │
│ │
Producers Consumers
(发送消息) (消费消息)

L = Leader, F = Follower

分区和副本

  • Partition: 每个 Topic 分为多个 Partition,实现并行处理
  • Leader: 每个 Partition 有一个 Leader,负责读写
  • Follower: 其他副本作为 Follower,同步 Leader 数据
  • ISR (In-Sync Replicas): 与 Leader 保持同步的副本集合

基本操作

Topic 管理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
# 创建 Topic
kafka-topics.sh --create \
--bootstrap-server localhost:9092 \
--topic my-topic \
--partitions 3 \
--replication-factor 2

# 查看 Topic 列表
kafka-topics.sh --list --bootstrap-server localhost:9092

# 查看 Topic 详情
kafka-topics.sh --describe \
--bootstrap-server localhost:9092 \
--topic my-topic

# 删除 Topic
kafka-topics.sh --delete \
--bootstrap-server localhost:9092 \
--topic my-topic

# 增加分区(只能增加,不能减少)
kafka-topics.sh --alter \
--bootstrap-server localhost:9092 \
--topic my-topic \
--partitions 5

生产消息

1
2
3
4
5
6
7
8
9
10
11
# 控制台生产者
kafka-console-producer.sh \
--bootstrap-server localhost:9092 \
--topic my-topic

# 带 key 的生产者
kafka-console-producer.sh \
--bootstrap-server localhost:9092 \
--topic my-topic \
--property "parse.key=true" \
--property "key.separator=:"

消费消息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
# 从最新位置开始消费
kafka-console-consumer.sh \
--bootstrap-server localhost:9092 \
--topic my-topic

# 从头开始消费
kafka-console-consumer.sh \
--bootstrap-server localhost:9092 \
--topic my-topic \
--from-beginning

# 消费者组消费
kafka-console-consumer.sh \
--bootstrap-server localhost:9092 \
--topic my-topic \
--group my-group

# 显示 key 和 value
kafka-console-consumer.sh \
--bootstrap-server localhost:9092 \
--topic my-topic \
--from-beginning \
--property print.key=true \
--property key.separator=":"

Consumer Group 管理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
# 查看消费者组列表
kafka-consumer-groups.sh \
--bootstrap-server localhost:9092 \
--list

# 查看消费者组详情
kafka-consumer-groups.sh \
--bootstrap-server localhost:9092 \
--group my-group \
--describe

# 重置消费者组 offset
kafka-consumer-groups.sh \
--bootstrap-server localhost:9092 \
--group my-group \
--topic my-topic \
--reset-offsets \
--to-earliest \
--execute

# 删除消费者组
kafka-consumer-groups.sh \
--bootstrap-server localhost:9092 \
--group my-group \
--delete

程序化使用

Java Producer 示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
import org.apache.kafka.clients.producer.*;
import java.util.Properties;

public class KafkaProducerExample {
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("acks", "all"); // 等待所有副本确认
props.put("retries", 3); // 重试次数

Producer<String, String> producer = new KafkaProducer<>(props);

try {
// 同步发送
ProducerRecord<String, String> record =
new ProducerRecord<>("my-topic", "key1", "value1");
RecordMetadata metadata = producer.send(record).get();
System.out.printf("Sent to partition %d with offset %d%n",
metadata.partition(), metadata.offset());

// 异步发送
producer.send(new ProducerRecord<>("my-topic", "key2", "value2"),
new Callback() {
public void onCompletion(RecordMetadata metadata, Exception e) {
if (e != null) {
e.printStackTrace();
} else {
System.out.printf("Sent to partition %d%n", metadata.partition());
}
}
});
} catch (Exception e) {
e.printStackTrace();
} finally {
producer.close();
}
}
}

Java Consumer 示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
import org.apache.kafka.clients.consumer.*;
import java.time.Duration;
import java.util.*;

public class KafkaConsumerExample {
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "my-group");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("auto.offset.reset", "earliest"); // earliest, latest, none

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("my-topic"));

try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
System.out.printf("Topic: %s, Partition: %d, Offset: %d, Key: %s, Value: %s%n",
record.topic(), record.partition(), record.offset(),
record.key(), record.value());
}
}
} finally {
consumer.close();
}
}
}

Python Producer 示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
from kafka import KafkaProducer
import json

# 创建生产者
producer = KafkaProducer(
bootstrap_servers=['localhost:9092'],
value_serializer=lambda v: json.dumps(v).encode('utf-8'),
acks='all',
retries=3
)

# 发送消息
try:
# 同步发送
future = producer.send('my-topic', {'key': 'value'})
record_metadata = future.get(timeout=10)
print(f"Sent to partition {record_metadata.partition}")

# 异步发送
producer.send('my-topic', {'key': 'value2'}).add_callback(
lambda metadata: print(f"Sent to {metadata.partition}")
).add_errback(
lambda e: print(f"Error: {e}")
)
finally:
producer.flush()
producer.close()

Python Consumer 示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
from kafka import KafkaConsumer
import json

# 创建消费者
consumer = KafkaConsumer(
'my-topic',
bootstrap_servers=['localhost:9092'],
group_id='my-group',
value_deserializer=lambda m: json.loads(m.decode('utf-8')),
auto_offset_reset='earliest',
enable_auto_commit=True
)

# 消费消息
try:
for message in consumer:
print(f"Topic: {message.topic}")
print(f"Partition: {message.partition}")
print(f"Offset: {message.offset}")
print(f"Value: {message.value}")
except KeyboardInterrupt:
pass
finally:
consumer.close()

Kafka 配置

Broker 重要配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
# server.properties

# Broker ID(集群中唯一)
broker.id=1

# 监听地址
listeners=PLAINTEXT://0.0.0.0:9092
advertised.listeners=PLAINTEXT://your-host:9092

# Zookeeper 连接
zookeeper.connect=localhost:2181

# 日志目录
log.dirs=/var/kafka-logs

# 分区数默认值
num.partitions=3

# 副本因子默认值
default.replication.factor=2

# 最小同步副本数
min.insync.replicas=2

# 日志保留时间(7天)
log.retention.hours=168

# 日志保留大小
log.retention.bytes=1073741824

# 日志段大小
log.segment.bytes=1073741824

# 压缩类型
compression.type=producer

Producer 重要配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
# 必需配置
bootstrap.servers=localhost:9092
key.serializer=org.apache.kafka.common.serialization.StringSerializer
value.serializer=org.apache.kafka.common.serialization.StringSerializer

# ACK 配置
acks=all # all(所有副本), 1(Leader), 0(不等待)

# 重试配置
retries=3
retry.backoff.ms=100

# 批处理配置
batch.size=16384 # 批次大小(字节)
linger.ms=10 # 等待时间(毫秒)

# 缓冲区配置
buffer.memory=33554432

# 压缩
compression.type=snappy # none, gzip, snappy, lz4, zstd

# 幂等性
enable.idempotence=true

Consumer 重要配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
# 必需配置
bootstrap.servers=localhost:9092
group.id=my-group
key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
value.deserializer=org.apache.kafka.common.serialization.StringDeserializer

# Offset 配置
enable.auto.commit=true
auto.commit.interval.ms=5000
auto.offset.reset=earliest # earliest, latest, none

# 拉取配置
fetch.min.bytes=1
fetch.max.wait.ms=500
max.partition.fetch.bytes=1048576

# 会话配置
session.timeout.ms=10000
heartbeat.interval.ms=3000

# 每次拉取最大记录数
max.poll.records=500

Kafka Streams

Kafka Streams 是一个用于构建实时流处理应用的库。

简单示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
import org.apache.kafka.streams.*;
import org.apache.kafka.streams.kstream.*;
import java.util.Properties;

public class WordCountExample {
public static void main(String[] args) {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-app");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");

StreamsBuilder builder = new StreamsBuilder();

// 读取输入 Topic
KStream<String, String> textLines = builder.stream("input-topic");

// 处理流
KTable<String, Long> wordCounts = textLines
.flatMapValues(line -> Arrays.asList(line.toLowerCase().split("\\W+")))
.groupBy((key, word) -> word)
.count();

// 写入输出 Topic
wordCounts.toStream().to("output-topic");

KafkaStreams streams = new KafkaStreams(builder.build(), props);
streams.start();

Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
}
}

性能优化

Producer 优化

1
2
3
4
5
6
7
8
9
10
11
12
# 批处理优化
batch.size=32768
linger.ms=20

# 压缩
compression.type=lz4

# 缓冲区
buffer.memory=67108864

# 并行
max.in.flight.requests.per.connection=5

Consumer 优化

1
2
3
4
5
6
7
8
9
# 增加拉取大小
fetch.min.bytes=1024
max.partition.fetch.bytes=2097152

# 批量处理
max.poll.records=1000

# 减少网络往返
fetch.max.wait.ms=100

Broker 优化

1
2
3
4
5
6
7
8
9
10
# 使用更多的网络线程
num.network.threads=8
num.io.threads=16

# 增加 Socket 缓冲区
socket.send.buffer.bytes=1048576
socket.receive.buffer.bytes=1048576

# 副本拉取配置
replica.fetch.max.bytes=2097152

监控和管理

常用监控指标

1
2
3
4
5
6
7
8
9
# 查看 Topic 分区状态
kafka-topics.sh --describe --bootstrap-server localhost:9092 --topic my-topic

# 查看消费者 Lag
kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group my-group --describe

# JMX 监控
# 在启动脚本中添加
export JMX_PORT=9999

常见 JMX 指标

  • kafka.server:type=BrokerTopicMetrics: 消息速率、字节速率
  • kafka.network:type=RequestMetrics: 请求延迟
  • kafka.controller:type=KafkaController: Controller 状态
  • kafka.log:type=LogFlushStats: 日志刷盘统计

常见问题和解决方案

消息丢失

1
2
3
4
5
6
7
8
# Producer 配置
acks=all
retries=3
enable.idempotence=true

# Broker 配置
min.insync.replicas=2
unclean.leader.election.enable=false

消息重复

1
2
3
4
5
6
7
8
9
10
11
12
13
// 使用幂等性 Producer
props.put("enable.idempotence", true);

// 或使用事务
producer.initTransactions();
try {
producer.beginTransaction();
producer.send(record1);
producer.send(record2);
producer.commitTransaction();
} catch (Exception e) {
producer.abortTransaction();
}

消息乱序

1
2
3
4
5
6
# 确保顺序性
max.in.flight.requests.per.connection=1
enable.idempotence=true

# 或使用分区键确保相同 key 的消息进入同一分区
producer.send(new ProducerRecord<>("topic", key, value));

总结核心知识要点

Kafka 架构核心

  • 分布式架构: Broker 集群 + Zookeeper/KRaft 协调
  • 分区机制: Topic → Partition → Segment,实现并行和扩展
  • 副本机制: Leader-Follower 模型,ISR 保证数据一致性
  • 消费者组: 多个 Consumer 协作,每个 Partition 只被组内一个 Consumer 消费

核心概念示意图

1
2
3
4
5
6
7
8
9
Producer → [Topic: orders (3 partitions, RF=2)]
├─ Partition 0 [Leader: Broker1, Follower: Broker2]
├─ Partition 1 [Leader: Broker2, Follower: Broker3]
└─ Partition 2 [Leader: Broker3, Follower: Broker1]

Consumer Group
├─ Consumer 1 → Partition 0
├─ Consumer 2 → Partition 1
└─ Consumer 3 → Partition 2

关键配置对比

配置项 Producer Consumer 说明
acks all/1/0 - 生产者等待确认级别
retries 3 - 重试次数
batch.size 16384 - 批次大小(字节)
linger.ms 0 - 等待时间(毫秒)
auto.offset.reset - earliest/latest Offset 重置策略
enable.auto.commit - true/false 自动提交 Offset
max.poll.records - 500 单次拉取最大记录数

消息可靠性保证

1. 避免消息丢失

1
2
3
4
5
6
7
8
# Producer
acks=all
retries=Integer.MAX_VALUE
enable.idempotence=true

# Broker
min.insync.replicas=2
unclean.leader.election.enable=false

2. 避免消息重复

1
2
3
4
5
6
7
8
9
// 幂等性 Producer
Properties props = new Properties();
props.put("enable.idempotence", true);

// 事务性 Producer
producer.initTransactions();
producer.beginTransaction();
// ... send messages
producer.commitTransaction();

3. 保证消息顺序

1
2
3
4
5
6
# 方式1: 单分区
num.partitions=1

# 方式2: 相同 key 进同一分区
max.in.flight.requests.per.connection=1
enable.idempotence=true

实际应用场景代码

场景1: 日志收集

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// Producer: 收集应用日志
public class LogCollector {
private final KafkaProducer<String, String> producer;

public void sendLog(String app, String log) {
ProducerRecord<String, String> record =
new ProducerRecord<>("app-logs", app, log);
producer.send(record);
}
}

// Consumer: 存储到 Elasticsearch
public class LogStorage {
public void process(ConsumerRecords<String, String> records) {
for (ConsumerRecord<String, String> record : records) {
elasticsearchClient.index(record.value());
}
}
}

场景2: 实时数据管道

1
2
3
4
5
6
7
8
9
10
11
12
13
// Kafka Streams: 实时聚合
StreamsBuilder builder = new StreamsBuilder();
KStream<String, Transaction> transactions = builder.stream("transactions");

KTable<String, Double> revenue = transactions
.groupBy((key, tx) -> tx.getUserId())
.aggregate(
() -> 0.0,
(key, tx, sum) -> sum + tx.getAmount(),
Materialized.as("revenue-store")
);

revenue.toStream().to("user-revenue");

场景3: 事件溯源

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
// 保存事件
public class EventStore {
public void saveEvent(DomainEvent event) {
ProducerRecord<String, DomainEvent> record =
new ProducerRecord<>("domain-events", event.getAggregateId(), event);
producer.send(record);
}
}

// 重建状态
public class StateRebuilder {
public AggregateState rebuild(String aggregateId) {
consumer.subscribe(Collections.singletonList("domain-events"));
AggregateState state = new AggregateState();

while (true) {
ConsumerRecords<String, DomainEvent> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, DomainEvent> record : records) {
if (record.key().equals(aggregateId)) {
state.apply(record.value());
}
}
}
}
}

性能调优清单

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
# 1. Producer 吞吐量优化
batch.size=32768
linger.ms=20
compression.type=lz4
buffer.memory=67108864

# 2. Consumer 吞吐量优化
fetch.min.bytes=1024
max.partition.fetch.bytes=2097152
max.poll.records=1000

# 3. Broker 性能优化
num.network.threads=8
num.io.threads=16
log.segment.bytes=1073741824

# 4. 操作系统优化
# 增加文件描述符
ulimit -n 100000

# 调整 TCP 参数
sysctl -w net.core.rmem_max=2097152
sysctl -w net.core.wmem_max=2097152

常用运维命令

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
# 创建 Topic(3分区,2副本)
kafka-topics.sh --create --topic my-topic \
--partitions 3 --replication-factor 2 \
--bootstrap-server localhost:9092

# 查看消费者 Lag
kafka-consumer-groups.sh --bootstrap-server localhost:9092 \
--group my-group --describe

# 重置 Offset 到最早
kafka-consumer-groups.sh --bootstrap-server localhost:9092 \
--group my-group --topic my-topic \
--reset-offsets --to-earliest --execute

# 动态修改配置
kafka-configs.sh --bootstrap-server localhost:9092 \
--entity-type topics --entity-name my-topic \
--alter --add-config retention.ms=86400000

# 查看分区副本状态
kafka-topics.sh --describe --bootstrap-server localhost:9092 \
--topic my-topic --under-replicated-partitions

最佳实践

  • Topic 设计: 合理设置分区数(CPU 核数的 2-3 倍),副本因子至少为 2
  • 消息格式: 使用 Avro/Protobuf 等序列化格式,配合 Schema Registry
  • Offset 管理: 关键业务手动提交 Offset,确保消息处理完成后再提交
  • 监控告警: 监控 Consumer Lag、Broker 磁盘使用、网络流量
  • 容量规划: 预估消息大小和速率,规划磁盘容量和网络带宽
  • 安全配置: 启用 SASL/SSL 认证,配置 ACL 权限控制

核心原理

  • Zero-Copy: 利用 sendfile 系统调用减少数据拷贝
  • 顺序写入: 利用磁盘顺序写入特性,性能接近内存
  • 页缓存: 依赖操作系统页缓存,减少 JVM 堆内存压力
  • 批量压缩: 批量压缩消息,减少网络传输和磁盘占用
  • 消费者平衡: 通过 Group Coordinator 实现消费者动态平衡

References