Parsely/pykafka

Messages lost when broker closes connection

Open

#814 opened on Jun 7, 2018

View on GitHub
 (5 comments) (5 reactions) (0 assignees)Python (1,114 stars) (231 forks)batch import
bughelp wanted

Description

Already queued messages are lost when a SocketDisconnectError is detected here.

This is occurring because the Producer._update method first closes all OwnedBrokers here, which results in the OwnedBroker setting self.running to False here. Then, Producer._update calls Producer._setup_owned_brokers to reestablish the broker connection.

Producer._setup_owned_brokers (which also does it's own OwnedBroker.stop call) calls OwnedBrokers.flush to collect all currently queued messages here.

The attempt to collect all currently queued messages will never return any messages, because OwnedBroker.flush checks to see if self.running is False and if so, returns an empty list here. self.running will be False, because that OwnedBroker was stopped.

I discovered this in a long running test that in the course of queuing 3,200,000 messages, a couple of dozen messages would go missing.

Contributor guide