dotnet/orleans

EventHub pulling agent checkpointer should detect out of bounds offset and recover

Open

#7,256 opened on Sep 2, 2021

View on GitHub
 (9 comments) (0 reactions) (1 assignee)C# (10,777 stars) (2,123 forks)batch import
help wanted

Description

It's possible for checkpoint offset(s) to become invalid when the pulling agents are offline and partition data in the event hub is expunged/expired. When this happens, one or more pulling agent(s) is/are suspended - and exceptions repeatedly thrown - until their data in the storage account table is cleared.

I would suggest that the pulling agent(s) reset their position, following the pattern used by EventProcessorOptions.InitialOffsetProvider -- e.g.

image

This would give optional per-partition recovery control and maximum flexibility while reusing available classes in the event hub package.

Example issue:

The supplied offset '10402431925616' is invalid. The last offset in the system is '489637839544' TrackingId:d1526603-0359-4a2a-a397-e1a14fb32774_B14, SystemTracker:180-UsageMessagingComponentManager, Timestamp:2021-08-31T21:16:42 Reference:24b9ac31-3526-42e9-831e-f4e16f99210b, TrackingId:3cc193a5-6919-475c-81a6-babc8b8796b0_B14, SystemTracker:eh-digitaltwin-dev-01:eventhub:ayla-in~21503|test_oisin, Timestamp:2021-08-31T21:16:43 TrackingId:673510f56c824f93ac79c22d4df41d65_G19, SystemTracker:gateway5, Timestamp:2021-08-31T21:16:43

System.ArgumentException:
   at Microsoft.Azure.EventHubs.Amqp.AmqpPartitionReceiver+<OnReceiveAsync>d__13.MoveNext (Microsoft.Azure.EventHubs, Version=4.3.2.0, Culture=neutral, PublicKeyToken=7e34167dcc6d6d8c)
   at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw (System.Private.CoreLib, Version=5.0.0.0, Culture=neutral, PublicKeyToken=7cec85d7bea7798e)
   at Microsoft.Azure.EventHubs.Amqp.AmqpPartitionReceiver+<OnReceiveAsync>d__13.MoveNext (Microsoft.Azure.EventHubs, Version=4.3.2.0, Culture=neutral, PublicKeyToken=7e34167dcc6d6d8c)
   at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw (System.Private.CoreLib, Version=5.0.0.0, Culture=neutral, PublicKeyToken=7cec85d7bea7798e)
   at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess (System.Private.CoreLib, Version=5.0.0.0, Culture=neutral, PublicKeyToken=7cec85d7bea7798e)
   at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification (System.Private.CoreLib, Version=5.0.0.0, Culture=neutral, PublicKeyToken=7cec85d7bea7798e)
   at System.Runtime.CompilerServices.ConfiguredTaskAwaitable`1+ConfiguredTaskAwaiter.GetResult (System.Private.CoreLib, Version=5.0.0.0, Culture=neutral, PublicKeyToken=7cec85d7bea7798e)
   at Microsoft.Azure.EventHubs.PartitionReceiver+<ReceiveAsync>d__30.MoveNext (Microsoft.Azure.EventHubs, Version=4.3.2.0, Culture=neutral, PublicKeyToken=7e34167dcc6d6d8c)
   at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw (System.Private.CoreLib, Version=5.0.0.0, Culture=neutral, PublicKeyToken=7cec85d7bea7798e)
   at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess (System.Private.CoreLib, Version=5.0.0.0, Culture=neutral, PublicKeyToken=7cec85d7bea7798e)
   at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification (System.Private.CoreLib, Version=5.0.0.0, Culture=neutral, PublicKeyToken=7cec85d7bea7798e)
   at System.Runtime.CompilerServices.TaskAwaiter`1.GetResult (System.Private.CoreLib, Version=5.0.0.0, Culture=neutral, PublicKeyToken=7cec85d7bea7798e)
   at Orleans.ServiceBus.Providers.EventHubAdapterReceiver+<GetQueueMessagesAsync>d__22.MoveNext (Orleans.Streaming.EventHubs, Version=2.0.0.0, Culture=neutral, PublicKeyToken=null)
   at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw (System.Private.CoreLib, Version=5.0.0.0, Culture=neutral, PublicKeyToken=7cec85d7bea7798e)
   at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess (System.Private.CoreLib, Version=5.0.0.0, Culture=neutral, PublicKeyToken=7cec85d7bea7798e)
   at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification (System.Private.CoreLib, 
Version=5.0.0.0, Culture=neutral, PublicKeyToken=7cec85d7bea7798e)
   at System.Runtime.CompilerServices.TaskAwaiter`1.GetResult (System.Private.CoreLib, Version=5.0.0.0, Culture=neutral, PublicKeyToken=7cec85d7bea7798e)
   at Orleans.Streams.PersistentStreamPullingAgent+<ReadFromQueue>d__37.MoveNext (Orleans.Runtime, Version=2.0.0.0, Culture=neutral, PublicKeyToken=null)
   at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw (System.Private.CoreLib, Version=5.0.0.0, Culture=neutral, PublicKeyToken=7cec85d7bea7798e)
   at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess (System.Private.CoreLib, Version=5.0.0.0, Culture=neutral, PublicKeyToken=7cec85d7bea7798e)
   at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification (System.Private.CoreLib, Version=5.0.0.0, Culture=neutral, PublicKeyToken=7cec85d7bea7798e)
   at System.Runtime.CompilerServices.TaskAwaiter`1.GetResult (System.Private.CoreLib, Version=5.0.0.0, Culture=neutral, PublicKeyToken=7cec85d7bea7798e)
   at Orleans.Internal.AsyncExecutorWithRetries+<ExecuteWithRetriesHelper>d__4`1.MoveNext (Orleans.Core, Version=2.0.0.0, Culture=neutral, PublicKeyToken=null)

Contributor guide