ps:Kafka消息队列不能消费某些分区问题 – 飞书云文档 (feishu.cn)
一、kafka 部分分区不能正常消费问题
发现topic 12、13分区 有超过1M的消息体,所以开放平台不能正常消费到12、13分区消息.
或消费到某一个offset不能正常消费也会卡住,可以重置offset点位(当然这种仅用于 对消息体不敏感缺些也可以接收的场景)
修改调整:
在订阅时设置消费者接收消息体大小,重点设置:fetch.message.max.bytes 参数2M
public static final String FETCH_MESSAGE_MAX_BYTES_NAME = “fetch.message.max.bytes”;
properties.put(KafkaConsumer.FETCH_MESSAGE_MAX_BYTES_NAME, 2*1024)
1.1 broker 配置
message.max.bytes (默认:1000000) – broker能接收消息的最大字节数,这个值应该比消费端的fetch.message.max.bytes更小才对,否则broker就会因为消费端无法使用这个消息而挂起。 log.segment.bytes (默认: 1GB) – kafka数据文件的大小,确保这个数值大于一个消息的长度。一般说来使用默认值即可(一般一个消息很难大于1G,因为这是一个消息系统,而不是文件系统)。 replica.fetch.max.bytes (默认: 1MB) – broker可复制的消息的最大字节数。这个值应该比message.max.bytes大,否则broker会接收此消息,但无法将此消息复制出去,从而造成数据丢失。
1.2 Consumer 配置
fetch.message.max.bytes (默认 1MB) – 消费者能读取的最大消息。这个值应该大于或等于message.max.bytes。所以,如果你一定要选择kafka来传送大的消息,还有些事项需要考虑。要传送大的消息,不是当出现问题之后再来考虑如何解决,而是在一开始设计的时候,就要考虑到大消息对集群和主题的影响。
性能: 根据前面提到的性能测试,kafka在消息为10K时吞吐量达到最大,更大的消息会降低吞吐量,在设计集群的容量时,尤其要考虑这点。 可用的内存和分区数:Brokers会为每个分区分配replica.fetch.max.bytes参数指定的内存空间,假设replica.fetch.max.bytes=1M,且有1000个分区,则需要差不多1G的内存,确保 分区数最大的消息不会超过服务器的内存,否则会报OOM错误。同样地,消费端的fetch.message.max.bytes指定了最大消息需要的内存空间,同样,分区数最大需要内存空间 不能超过服务器的内存。所以,如果你有大的消息要传送,则在内存一定的情况下,只能使用较少的分区数或者使用更大内存的服务器。 垃圾回收:到现在为止,我在kafka的使用中还没发现过此问题,但这应该是一个需要考虑的潜在问题。更大的消息会让GC的时间更长(因为broker需要分配更大的块),随时关注GC的日志和服务器的日志信息。如果长时间的GC导致kafka丢失了zookeeper的会话,则需要配置zookeeper.session.timeout.ms参数为更大的超时时间。
二、特殊参数说明
GitHub Copilot: `max.partition.fetch.bytes`和`fetch.max.bytes`都是Kafka消费者的配置参数,但它们的含义略有不同。
`max.partition.fetch.bytes`参数指定了单个分区从Kafka服务器拉取的最大字节数。如果分区中的消息大小超过了这个值,消费者将只拉取`max.partition.fetch.bytes`大小的消息。这个参数的默认值是1MB。
`fetch.max.bytes`参数指定了单次拉取消息的最大字节数。如果拉取的消息大小超过了这个值,消费者将只拉取`fetch.max.bytes`大小的消息。这个参数的默认值是50MB。
因此,`max.partition.fetch.bytes`参数是针对单个分区的,而`fetch.max.bytes`参数是针对整个消费者的。如果您的消息大小超过了`max.partition.fetch.bytes`的值,那么您需要将`fetch.max.bytes`设置为更大的值,以确保消费者可以拉取到完整的消息。
以下是一个示例,展示了如何在Kafka的配置文件中设置`max.partition.fetch.bytes`和`fetch.max.bytes`参数的值:
“`properties
# 设置max.partition.fetch.bytes参数的值为2MB
max.partition.fetch.bytes=2097152
# 设置fetch.max.bytes参数的值为100MB
fetch.max.bytes=104857600
在这个示例中,我们将`max.partition.fetch.bytes`参数的值设置为`2097152`字节(即2MB),将`fetch.max.bytes`参数的值设置为`104857600`字节(即
100MB)。您可以根据自己的需求调整这些值。
三、消费重试说明
3.1 RocketMQ发送及消费重试机制
发送重试机制:
RocketMQ支持发送失败内部重试,默认是2次,在异步、SendOneWay模式下不支持重试。
消费重试机制:
RocketMQ在消费失败时,支持消费端重试继续消费消息,默认支持16次重试,每次重试的时间间隔增加,只有在Cluster模式支持重试、广播模式不支持重试。
在重试期间,消息的key和ID不会发生改变,应用程序可以以此做好幂等性控制。
在重试期间,还可以继续消费其它新消息
3.2 kafka 消费异常重试次数
我们作为Kafka者在使用Kafka是,必然考虑消息消费失败的重试次数,重试后仍然失败如何处理,要么阻塞,要么丢弃,或者保存
默认重试次数在哪里看
Kafka3.0 版本默认失败重试消费次数为10次,准确讲应该是1次正常调用+9次重试,这个在这个类可以看到 org.springframework.kafka.listener.SeekUtils
不同版本关注客户端参数:如果配置了max-attempts为10,