skip to Main Content

I’ve been trying to retrieve events by its position using Microsoft.Azure.EventHubs.

I’ve been told that there is a way to calculate an event position using its offset or sequenceNumber, so everytime I’m adding an event to an EventBatch, I cache in redis an id and the event’s offset and sequenceNumber.

Then, whenever I want to retrieve an specific event, I search for its Id on redis, retrieve its offset and sequenceNumber and could pottentially retrieve it in the event hub streams.

The problem is that the offset and sequence numbers are a negative long which I couldn’t understand how to use it as an index.

Do you guys know how could it be done?

Here is both my publisher and the retriever classes

    public class EventHubPublisher
    {
        public static async Task SendMessage(string _connectionString, string _eventHubName, string _message, string onboardingid)
        {
            var producerClient = new EventHubProducerClient(_connectionString, _eventHubName);
            var eventBatch = await producerClient.CreateBatchAsync();
            var data = new EventData(Encoding.UTF8.GetBytes(_message));
            eventBatch.TryAdd(data);
            addDatatoRedis(data.SequenceNumber,data.Offset,onboardingid);
            await producerClient.SendAsync(eventBatch);
        }
        private static void addDatatoRedis(long sequenceNumber, long offset, string onboardingid)
        {
            try
            {
                var redisConnection = ConnectionMultiplexer.Connect(ConnectionStringsSettings.Properties.Redis);
                var redis = redisConnection.GetDatabase();
                var value = new
                {
                    sequence_number = sequenceNumber.ToString(),
                    offset = offset.ToString(),
                };
                redis.StringSetAsync(onboardingid, JsonSerializer.Serialize(value));
            }
            catch (Exception)
            {
                throw;
            }
        }
    }
public class EventHubRetriever
    {
        public static async Task GetEvent(string _connectionString, string _eventHubName, JObject existentEvent)
        {
            try
            {
                var client = EventHubClient.CreateFromConnectionString($"{_connectionString};entityPath={_eventHubName}");
                var partitionRuntimeInformation = await client.GetPartitionRuntimeInformationAsync("0");
                var eventHubRunTimeInformation = await client.GetRuntimeInformationAsync();
                var eventPosition = EventPosition.FromOffset(existentEvent["offset"].ToString());
                var lastEnqueuedOffset = Convert.ToInt32(partitionRuntimeInformation.LastEnqueuedOffset);

                var offset = existentEvent["offset"];

                // var offsetPosition = lastEnqueuedOffset + offset;

                // var receiver = EventHubClient.Create();
            }
            catch (System.Exception)
            {

                throw;
            }
        }
    }

2

Answers


  1. Chosen as BEST ANSWER

    Well I'm not sure if what I did was the right approach, but it seems to be working. When I'm going to publish the message, I get the lastEnqueuedNumber, from the GetPartitionRunTime, add 1 and add it as a property to eventData. Since I'm also adding the event to a redis cache, I'm able to retrieve its sequenceNumber.

        public class EventHubPublisher
        {
            public static async Task SendMessage(string _connectionString, string _eventHubName, string _message, string onboardingid, string partition)
            {
                var client = EventHubClient.CreateFromConnectionString($"{_connectionString};entityPath={_eventHubName}");
                var partitionRuntimeInformation = await client.GetPartitionRuntimeInformationAsync(partition);
                var sequenceNumber = partitionRuntimeInformation.LastEnqueuedSequenceNumber + 1;
    
                var data = new EventData(Encoding.UTF8.GetBytes(_message));
    
                data.Properties.Add("Id", onboardingid);
                data.Properties.Add("Message", _message);
                data.Properties.Add("SequenceNumber", sequenceNumber);
                await client.SendAsync(data);
                addDatatoRedis(onboardingid, sequenceNumber, data.Body.Offset, _eventHubName);
            }
            private static void addDatatoRedis(string onboardingid, long sequenceNumber, int offset, string topic)
            {
                try
                {
                    var redisConnection = ConnectionMultiplexer.Connect(ConnectionStringsSettings.Properties.Redis);
                    var redis = redisConnection.GetDatabase();
                    var value = new
                    {
                        offset = offset,
                        id = onboardingid,
                        sequenceNumber = sequenceNumber,
                        topic = topic
                    };
                    redis.StringSetAsync(onboardingid, JsonSerializer.Serialize(value));
                }
                catch (Exception)
                {
                    throw;
                }
            }
    
    

    Then, when retrieving the event from EventHub, I'm able to get the sequenceNumber from the cached event and get the event on the eventhub by its index.

        public class EventHubRetriever
        {
            public static async Task<EventData> GetEvent(string _connectionString, string _eventHubName, JObject existentEvent)
            {
                try
                {
                    var client = EventHubClient.CreateFromConnectionString($"{_connectionString};entityPath={_eventHubName}");
                    var eventHubRunTimeInformation = await client.GetRuntimeInformationAsync();
                    var partitionsIds = eventHubRunTimeInformation.PartitionIds;
    
                    var sequenceNumber = (long)existentEvent["sequenceNumber"];
    
                    var retrievedEvent = GetEventByPosition(client, sequenceNumber, partitionsIds);
    
                    return retrievedEvent;
    
                }
                catch (Exception exc)
                {
                    throw new EventRetrieveException(exc, "An error ocurred while retrieving data from Event Hub.");
                }
            }
    
            private static EventData GetEventByPosition(EventHubClient client, long sequenceNumber, string[] partitionsIds, string eventHubName)
            {
                var eventPosition = EventPosition.FromSequenceNumber(sequenceNumber, true);
                var partitionReceivers = partitionsIds
                                            .Select(id => client.CreateReceiver(eventHubName, id, eventPosition));
                var events = partitionReceivers.Select(receiver => receiver.ReceiveAsync(1)).SelectMany(x => x.Result).ToList();
    
                return events.ElementAt(0);
            }
        }
    
    

  2. The Offset and SequenceNumber are broker-owned fields that aren’t populated when you create your EventData instance. The Event Hubs service will assign an offset and sequence number only after the event has been accepted and assigned to a partition. It is not possible to predict what those values are when publishing; those members are only valid when the event is consumed.

    The values that you’re storing in Redis are long.MinValue, corresponding to uninitialized data. (this is detailed in the remarks section of the docs)

    Unfortunately, the random-access scenario that you’re looking to implement isn’t one that fits well with Event Hubs. The closest that occurs to me would be to assign a unique identifier to an application property when creating the event and then store that and its approximate publishing time (with a buffer to account for time drift) in Redis. When you want to retrieve that event, use the publish time as your starting point and the read forward until you find the event.

    For example, something like:

    (DISCLAIMER: I’m working from memory and unable to compile/test. Please forgive any syntax errors)

        public class EventHubPublisher : IAsyncDisposable
        {
            // NOTE: You want the client instance to be static; by creating
            //       each time that you wish to send, you're paying the cost
            //       of establishing a connection each time.
            //
            private static readonly EventHubProducerClient ProducerClient = 
                new EventHubProducerClient(
                    _connectionString, 
                    _eventHubName);
    
            // NOTE: You should query the Event Hub for partition identifiers; it is 
            //       not safe to assume "0" as the first partition.  This is used here
            //       for illustration only.
            //
            private static readonly SendEventOptions FirstPartitionOptions =
                new SendEventOptions { PartitionId = "0" };
    
            // NOTE: There is no benefit to using the batch overload, as you
            //        are publishing only a single event at a time.  It's worth 
            //        mentioning that this isn't an efficient pattern for publishing;
            //        if you can batch events, you'll see better throughput.
            //
            public async Task SendMessage()
            {            
                var data = new EventData(Encoding.UTF8.GetBytes(_message));            
                data.Properties.Add("Id", Guid.NewGuid.ToString());
    
                await producerClient.SendAsync(new[] { data }, FirstPartitionOptions);
    
                // Allow for up to 5 minutes of clock skew.  This may need to be tuned
                // depending on your environment.
                
                var publishTime = DateTimeOffset.UtcNow.Subtract(TimeSpan.FromMinutes(5));
                addDatatoRedis(publishTime, data.Properties["Id"]);
            }
    
            private void addDatatoRedis(DateTimeOffset enqueueTime, string id)
            {
                // ... Save to Redis
            }
    
            public virtual async ValueTask DisposeAsync()
            {
                await ProducerClient.CloseAsync().ConfigureAwait(false);
                GC.SuppressFinalize(this);
            }
        }
    

    I’m illustrating using Azure.Messaging.EventHubs, which is the current generation Event Hubs client library. The concept would be the same if you need to continue using the legacy version as your question does.

        public class EventHubRetriever : IAsyncDisposable
        {
            // NOTE: You want the client instance to be static; by creating
            //       each time that you wish to send, you're paying the cost
            //       of establishing a connection each time.
            //
            private static readonly EventHubConsumerClient ConsumerClient = 
                new EventHubProducerClient(
                    EventHubConsumerClient.DefaultConsumerGroupName
                    _connectionString, 
                    _eventHubName);
    
            private static readonly ReadEventOptions ReadOptions = 
                new ReadEventOptions { MaximumWaitTime = TimeSpan.FromSeconds(1) };
    
            public async Task GetEvent(JObject persistedEvent)
            {
                var firstPartition = (await consumer.GetPartitionIdsAsync()).First();
                var enqueueTime = DateTimeOffset.Parse(persistedEvent["publishTime"]);
                var eventId = persistedEvent["id"];
    
                await foreach (PartitionEvent partitionEvent in 
                    consumer.ReadEventsFromPartitionAsync(
                        firstPartition,
                        EventPosition.FromEnqueuedTime(publishTime),
                        ReadOptions))
                {
                   if (partitionEvent.Data == null)
                   {
                        // We're at the end of the event stream and didn't find
                        // your event.
                   }
                   else if (partitionEvent.Data.Properties["Id"] == eventId)
                   {
                        // This is your event.  Do what you want and break
                        // the loop.
                   }                   
                }            
            }
    
            public virtual async ValueTask DisposeAsync()
            {
                await ConsumerClient.CloseAsync().ConfigureAwait(false);
                GC.SuppressFinalize(this);
            }
        }
    
    Login or Signup to reply.
Please signup or login to give your own answer.
Back To Top
Search