Parsely/pykafka

Messages lost when broker closes connection

Open

#814 建立於 2018年6月7日

在 GitHub 查看
 (5 留言) (5 反應) (0 負責人)Python (1,114 star) (231 fork)batch import
bughelp wanted

描述

Already queued messages are lost when a SocketDisconnectError is detected here.

This is occurring because the Producer._update method first closes all OwnedBrokers here, which results in the OwnedBroker setting self.running to False here. Then, Producer._update calls Producer._setup_owned_brokers to reestablish the broker connection.

Producer._setup_owned_brokers (which also does it's own OwnedBroker.stop call) calls OwnedBrokers.flush to collect all currently queued messages here.

The attempt to collect all currently queued messages will never return any messages, because OwnedBroker.flush checks to see if self.running is False and if so, returns an empty list here. self.running will be False, because that OwnedBroker was stopped.

I discovered this in a long running test that in the course of queuing 3,200,000 messages, a couple of dozen messages would go missing.

貢獻者指南