如何解决kafka Response Heartbeat The group is rebalancing, so a rejoin is needed导致重复调用eachMessage函数
datehoer 4/28/2022 kafkanodejs
[Nest] 19 - 06/16/2021, 1:09:12 PM [ClientKafka] ERROR [Connection] Response Heartbeat(key: 12, version: 3) {"timestamp":"2021-06-16T13:09:12.779Z","logger":"kafkajs","broker":"kafka-0.kafka-headless.dev.svc.cluster.local:9092","clientId":"reviews-ts-service-client","error":"The group is rebalancing, so a rejoin is needed","correlationId":1241,"size":10} +2857ms
[Nest] 19 - 06/16/2021, 1:09:12 PM [ClientKafka] ERROR [Runner] The group is rebalancing, re-joining {"timestamp":"2021-06-16T13:09:12.779Z","logger":"kafkajs","groupId":"reviews-consumer-ts-customer-client","memberId":"reviews-ts-service-client-453b2860-fdab-4c01-aa98-e015667b8d3b","error":"The group is rebalancing, so a rejoin is needed","retryCount":0,"retryTime":330} +1m
1
2
2
其实这个报错非常恶心人,如果是入库等程序一般不会遇到,但是如果是请求网页的进程,很容易就遇到了,然后就会心跳超时,之后就会重启消费者,重新执行eachMessage函数,但是之前的eachMessage函数依然在执行,会导致eachMessage越来越多然后卡死。
https://github.com/nestjs/nest/issues/7270
https://github.com/tulios/kafkajs/issues/1097
1
2
2
这里是两篇遇到这个问题的文章,第一个解决的方法是 HOW TO RESOLVE THIS ISSUE?
sessionTimeout: 60000,
heartbeatInterval: 40000,
maxWaitTimeInMs: 43000,
1
2
3
2
3
sessionTimeout : it should be greater than the processing time of method. heartbeatInterval : someone said, it should 2/3 of sessionTimeout maxWaitTimeInMs : it must be _greater with heartbeatInterval
This issue was resolved by above configuration.
也就是增加会话时间。 其实也不需要说第二个文章了,直接说答案,就是增加会话时间和心跳间隔,将这两个增加一定程度,就会避免遇到这个问题了。
现在看日志是非常的舒服,不像以前那样一堆问题。
参考内容:
# Options
kafka.consumer({
groupId: <String>,
partitionAssigners: <Array>,
sessionTimeout: <Number>,
rebalanceTimeout: <Number>,
heartbeatInterval: <Number>,
metadataMaxAge: <Number>,
allowAutoTopicCreation: <Boolean>,
maxBytesPerPartition: <Number>,
minBytes: <Number>,
maxBytes: <Number>,
maxWaitTimeInMs: <Number>,
retry: <Object>,
})
1
2
3
4
5
6
7
8
9
10
11
12
13
14
2
3
4
5
6
7
8
9
10
11
12
13
14
option | description | default |
---|---|---|
partitionAssigners | List of partition assigners | [PartitionAssigners.roundRobin] |
sessionTimeout | Timeout in milliseconds used to detect failures. The consumer sends periodic heartbeats to indicate its liveness to the broker. If no heartbeats are received by the broker before the expiration of this session timeout, then the broker will remove this consumer from the group and initiate a rebalance | 30000 |
rebalanceTimeout | The maximum time that the coordinator will wait for each member to rejoin when rebalancing the group | 60000 |
heartbeatInterval | The expected time in milliseconds between heartbeats to the consumer coordinator. Heartbeats are used to ensure that the consumer's session stays active. The value must be set lower than session timeout | 3000 |
metadataMaxAge | The period of time in milliseconds after which we force a refresh of metadata even if we haven't seen any partition leadership changes to proactively discover any new brokers or partitions | 300000 (5 minutes) |
allowAutoTopicCreation | Allow topic creation when querying metadata for non-existent topics | true |
maxBytesPerPartition | The maximum amount of data per-partition the server will return. This size must be at least as large as the maximum message size the server allows or else it is possible for the producer to send messages larger than the consumer can fetch. If that happens, the consumer can get stuck trying to fetch a large message on a certain partition | 1048576 (1MB) |
minBytes | Minimum amount of data the server should return for a fetch request, otherwise wait up to maxWaitTimeInMs for more data to accumulate. | 1 |
maxBytes | Maximum amount of bytes to accumulate in the response. Supported by Kafka >= 0.10.1.0 | 10485760 (10MB) |
maxWaitTimeInMs | The maximum amount of time in milliseconds the server will block before answering the fetch request if there isn’t sufficient data to immediately satisfy the requirement given by minBytes | 5000 |
retry | See retry (opens new window) for more information | { retries: 5 } |
readUncommitted | Configures the consumer isolation level. If false (default), the consumer will not return any transactional messages which were not committed. | false |
maxInFlightRequests | Max number of requests that may be in progress at any time. If falsey then no limit. | null (no limit) |
rackId | Configure the "rack" in which the consumer resides to enable follower fetching (opens new window) | null (fetch from the leader always) |
自己用了一天感觉还是没啥卵用,于是又找了一个新的方法,就是把重试次数调低,默认是5次。
因为我们的程序是等待时间长,而不是停止了,所以在心跳间隔没有发送心跳,那么就会重启,但是不会将之前的给关闭。那么我们直接不让他重启就可以了,这个我觉得最好是应该在重新平衡的时候将之前的eachMessage关闭,这个样子就不会有这样的问题了。