EventHub pulling agent checkpointer should detect out of bounds offset and recover
#7,256 创建于 2021年9月2日
描述
It's possible for checkpoint offset(s) to become invalid when the pulling agents are offline and partition data in the event hub is expunged/expired. When this happens, one or more pulling agent(s) is/are suspended - and exceptions repeatedly thrown - until their data in the storage account table is cleared.
I would suggest that the pulling agent(s) reset their position, following the pattern used by EventProcessorOptions.InitialOffsetProvider -- e.g.

This would give optional per-partition recovery control and maximum flexibility while reusing available classes in the event hub package.
Example issue:
The supplied offset '10402431925616' is invalid. The last offset in the system is '489637839544' TrackingId:d1526603-0359-4a2a-a397-e1a14fb32774_B14, SystemTracker:180-UsageMessagingComponentManager, Timestamp:2021-08-31T21:16:42 Reference:24b9ac31-3526-42e9-831e-f4e16f99210b, TrackingId:3cc193a5-6919-475c-81a6-babc8b8796b0_B14, SystemTracker:eh-digitaltwin-dev-01:eventhub:ayla-in~21503|test_oisin, Timestamp:2021-08-31T21:16:43 TrackingId:673510f56c824f93ac79c22d4df41d65_G19, SystemTracker:gateway5, Timestamp:2021-08-31T21:16:43
System.ArgumentException:
at Microsoft.Azure.EventHubs.Amqp.AmqpPartitionReceiver+<OnReceiveAsync>d__13.MoveNext (Microsoft.Azure.EventHubs, Version=4.3.2.0, Culture=neutral, PublicKeyToken=7e34167dcc6d6d8c)
at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw (System.Private.CoreLib, Version=5.0.0.0, Culture=neutral, PublicKeyToken=7cec85d7bea7798e)
at Microsoft.Azure.EventHubs.Amqp.AmqpPartitionReceiver+<OnReceiveAsync>d__13.MoveNext (Microsoft.Azure.EventHubs, Version=4.3.2.0, Culture=neutral, PublicKeyToken=7e34167dcc6d6d8c)
at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw (System.Private.CoreLib, Version=5.0.0.0, Culture=neutral, PublicKeyToken=7cec85d7bea7798e)
at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess (System.Private.CoreLib, Version=5.0.0.0, Culture=neutral, PublicKeyToken=7cec85d7bea7798e)
at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification (System.Private.CoreLib, Version=5.0.0.0, Culture=neutral, PublicKeyToken=7cec85d7bea7798e)
at System.Runtime.CompilerServices.ConfiguredTaskAwaitable`1+ConfiguredTaskAwaiter.GetResult (System.Private.CoreLib, Version=5.0.0.0, Culture=neutral, PublicKeyToken=7cec85d7bea7798e)
at Microsoft.Azure.EventHubs.PartitionReceiver+<ReceiveAsync>d__30.MoveNext (Microsoft.Azure.EventHubs, Version=4.3.2.0, Culture=neutral, PublicKeyToken=7e34167dcc6d6d8c)
at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw (System.Private.CoreLib, Version=5.0.0.0, Culture=neutral, PublicKeyToken=7cec85d7bea7798e)
at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess (System.Private.CoreLib, Version=5.0.0.0, Culture=neutral, PublicKeyToken=7cec85d7bea7798e)
at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification (System.Private.CoreLib, Version=5.0.0.0, Culture=neutral, PublicKeyToken=7cec85d7bea7798e)
at System.Runtime.CompilerServices.TaskAwaiter`1.GetResult (System.Private.CoreLib, Version=5.0.0.0, Culture=neutral, PublicKeyToken=7cec85d7bea7798e)
at Orleans.ServiceBus.Providers.EventHubAdapterReceiver+<GetQueueMessagesAsync>d__22.MoveNext (Orleans.Streaming.EventHubs, Version=2.0.0.0, Culture=neutral, PublicKeyToken=null)
at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw (System.Private.CoreLib, Version=5.0.0.0, Culture=neutral, PublicKeyToken=7cec85d7bea7798e)
at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess (System.Private.CoreLib, Version=5.0.0.0, Culture=neutral, PublicKeyToken=7cec85d7bea7798e)
at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification (System.Private.CoreLib,
Version=5.0.0.0, Culture=neutral, PublicKeyToken=7cec85d7bea7798e)
at System.Runtime.CompilerServices.TaskAwaiter`1.GetResult (System.Private.CoreLib, Version=5.0.0.0, Culture=neutral, PublicKeyToken=7cec85d7bea7798e)
at Orleans.Streams.PersistentStreamPullingAgent+<ReadFromQueue>d__37.MoveNext (Orleans.Runtime, Version=2.0.0.0, Culture=neutral, PublicKeyToken=null)
at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw (System.Private.CoreLib, Version=5.0.0.0, Culture=neutral, PublicKeyToken=7cec85d7bea7798e)
at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess (System.Private.CoreLib, Version=5.0.0.0, Culture=neutral, PublicKeyToken=7cec85d7bea7798e)
at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification (System.Private.CoreLib, Version=5.0.0.0, Culture=neutral, PublicKeyToken=7cec85d7bea7798e)
at System.Runtime.CompilerServices.TaskAwaiter`1.GetResult (System.Private.CoreLib, Version=5.0.0.0, Culture=neutral, PublicKeyToken=7cec85d7bea7798e)
at Orleans.Internal.AsyncExecutorWithRetries+<ExecuteWithRetriesHelper>d__4`1.MoveNext (Orleans.Core, Version=2.0.0.0, Culture=neutral, PublicKeyToken=null)