dotnet/orleans

EventHub pulling agent checkpointer should detect out of bounds offset and recover

Open

#7,256 创建于 2021年9月2日

在 GitHub 查看
 (9 评论) (0 反应) (1 负责人)C# (10,777 star) (2,123 fork)batch import
help wanted

描述

It's possible for checkpoint offset(s) to become invalid when the pulling agents are offline and partition data in the event hub is expunged/expired. When this happens, one or more pulling agent(s) is/are suspended - and exceptions repeatedly thrown - until their data in the storage account table is cleared.

I would suggest that the pulling agent(s) reset their position, following the pattern used by EventProcessorOptions.InitialOffsetProvider -- e.g.

image

This would give optional per-partition recovery control and maximum flexibility while reusing available classes in the event hub package.

Example issue:

The supplied offset '10402431925616' is invalid. The last offset in the system is '489637839544' TrackingId:d1526603-0359-4a2a-a397-e1a14fb32774_B14, SystemTracker:180-UsageMessagingComponentManager, Timestamp:2021-08-31T21:16:42 Reference:24b9ac31-3526-42e9-831e-f4e16f99210b, TrackingId:3cc193a5-6919-475c-81a6-babc8b8796b0_B14, SystemTracker:eh-digitaltwin-dev-01:eventhub:ayla-in~21503|test_oisin, Timestamp:2021-08-31T21:16:43 TrackingId:673510f56c824f93ac79c22d4df41d65_G19, SystemTracker:gateway5, Timestamp:2021-08-31T21:16:43

System.ArgumentException:
   at Microsoft.Azure.EventHubs.Amqp.AmqpPartitionReceiver+<OnReceiveAsync>d__13.MoveNext (Microsoft.Azure.EventHubs, Version=4.3.2.0, Culture=neutral, PublicKeyToken=7e34167dcc6d6d8c)
   at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw (System.Private.CoreLib, Version=5.0.0.0, Culture=neutral, PublicKeyToken=7cec85d7bea7798e)
   at Microsoft.Azure.EventHubs.Amqp.AmqpPartitionReceiver+<OnReceiveAsync>d__13.MoveNext (Microsoft.Azure.EventHubs, Version=4.3.2.0, Culture=neutral, PublicKeyToken=7e34167dcc6d6d8c)
   at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw (System.Private.CoreLib, Version=5.0.0.0, Culture=neutral, PublicKeyToken=7cec85d7bea7798e)
   at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess (System.Private.CoreLib, Version=5.0.0.0, Culture=neutral, PublicKeyToken=7cec85d7bea7798e)
   at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification (System.Private.CoreLib, Version=5.0.0.0, Culture=neutral, PublicKeyToken=7cec85d7bea7798e)
   at System.Runtime.CompilerServices.ConfiguredTaskAwaitable`1+ConfiguredTaskAwaiter.GetResult (System.Private.CoreLib, Version=5.0.0.0, Culture=neutral, PublicKeyToken=7cec85d7bea7798e)
   at Microsoft.Azure.EventHubs.PartitionReceiver+<ReceiveAsync>d__30.MoveNext (Microsoft.Azure.EventHubs, Version=4.3.2.0, Culture=neutral, PublicKeyToken=7e34167dcc6d6d8c)
   at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw (System.Private.CoreLib, Version=5.0.0.0, Culture=neutral, PublicKeyToken=7cec85d7bea7798e)
   at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess (System.Private.CoreLib, Version=5.0.0.0, Culture=neutral, PublicKeyToken=7cec85d7bea7798e)
   at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification (System.Private.CoreLib, Version=5.0.0.0, Culture=neutral, PublicKeyToken=7cec85d7bea7798e)
   at System.Runtime.CompilerServices.TaskAwaiter`1.GetResult (System.Private.CoreLib, Version=5.0.0.0, Culture=neutral, PublicKeyToken=7cec85d7bea7798e)
   at Orleans.ServiceBus.Providers.EventHubAdapterReceiver+<GetQueueMessagesAsync>d__22.MoveNext (Orleans.Streaming.EventHubs, Version=2.0.0.0, Culture=neutral, PublicKeyToken=null)
   at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw (System.Private.CoreLib, Version=5.0.0.0, Culture=neutral, PublicKeyToken=7cec85d7bea7798e)
   at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess (System.Private.CoreLib, Version=5.0.0.0, Culture=neutral, PublicKeyToken=7cec85d7bea7798e)
   at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification (System.Private.CoreLib, 
Version=5.0.0.0, Culture=neutral, PublicKeyToken=7cec85d7bea7798e)
   at System.Runtime.CompilerServices.TaskAwaiter`1.GetResult (System.Private.CoreLib, Version=5.0.0.0, Culture=neutral, PublicKeyToken=7cec85d7bea7798e)
   at Orleans.Streams.PersistentStreamPullingAgent+<ReadFromQueue>d__37.MoveNext (Orleans.Runtime, Version=2.0.0.0, Culture=neutral, PublicKeyToken=null)
   at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw (System.Private.CoreLib, Version=5.0.0.0, Culture=neutral, PublicKeyToken=7cec85d7bea7798e)
   at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess (System.Private.CoreLib, Version=5.0.0.0, Culture=neutral, PublicKeyToken=7cec85d7bea7798e)
   at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification (System.Private.CoreLib, Version=5.0.0.0, Culture=neutral, PublicKeyToken=7cec85d7bea7798e)
   at System.Runtime.CompilerServices.TaskAwaiter`1.GetResult (System.Private.CoreLib, Version=5.0.0.0, Culture=neutral, PublicKeyToken=7cec85d7bea7798e)
   at Orleans.Internal.AsyncExecutorWithRetries+<ExecuteWithRetriesHelper>d__4`1.MoveNext (Orleans.Core, Version=2.0.0.0, Culture=neutral, PublicKeyToken=null)

贡献者指南