I’ve been trying to retrieve events by its position using Microsoft.Azure.EventHubs.
I’ve been told that there is a way to calculate an event position using its offset or sequenceNumber, so everytime I’m adding an event to an EventBatch, I cache in redis an id and the event’s offset and sequenceNumber.
Then, whenever I want to retrieve an specific event, I search for its Id on redis, retrieve its offset and sequenceNumber and could pottentially retrieve it in the event hub streams.
The problem is that the offset and sequence numbers are a negative long which I couldn’t understand how to use it as an index.
Do you guys know how could it be done?
Here is both my publisher and the retriever classes
public class EventHubPublisher
{
public static async Task SendMessage(string _connectionString, string _eventHubName, string _message, string onboardingid)
{
var producerClient = new EventHubProducerClient(_connectionString, _eventHubName);
var eventBatch = await producerClient.CreateBatchAsync();
var data = new EventData(Encoding.UTF8.GetBytes(_message));
eventBatch.TryAdd(data);
addDatatoRedis(data.SequenceNumber,data.Offset,onboardingid);
await producerClient.SendAsync(eventBatch);
}
private static void addDatatoRedis(long sequenceNumber, long offset, string onboardingid)
{
try
{
var redisConnection = ConnectionMultiplexer.Connect(ConnectionStringsSettings.Properties.Redis);
var redis = redisConnection.GetDatabase();
var value = new
{
sequence_number = sequenceNumber.ToString(),
offset = offset.ToString(),
};
redis.StringSetAsync(onboardingid, JsonSerializer.Serialize(value));
}
catch (Exception)
{
throw;
}
}
}
public class EventHubRetriever
{
public static async Task GetEvent(string _connectionString, string _eventHubName, JObject existentEvent)
{
try
{
var client = EventHubClient.CreateFromConnectionString($"{_connectionString};entityPath={_eventHubName}");
var partitionRuntimeInformation = await client.GetPartitionRuntimeInformationAsync("0");
var eventHubRunTimeInformation = await client.GetRuntimeInformationAsync();
var eventPosition = EventPosition.FromOffset(existentEvent["offset"].ToString());
var lastEnqueuedOffset = Convert.ToInt32(partitionRuntimeInformation.LastEnqueuedOffset);
var offset = existentEvent["offset"];
// var offsetPosition = lastEnqueuedOffset + offset;
// var receiver = EventHubClient.Create();
}
catch (System.Exception)
{
throw;
}
}
}
2
Answers
Well I'm not sure if what I did was the right approach, but it seems to be working. When I'm going to publish the message, I get the lastEnqueuedNumber, from the GetPartitionRunTime, add 1 and add it as a property to eventData. Since I'm also adding the event to a redis cache, I'm able to retrieve its sequenceNumber.
Then, when retrieving the event from EventHub, I'm able to get the sequenceNumber from the cached event and get the event on the eventhub by its index.
The
Offset
andSequenceNumber
are broker-owned fields that aren’t populated when you create yourEventData
instance. The Event Hubs service will assign an offset and sequence number only after the event has been accepted and assigned to a partition. It is not possible to predict what those values are when publishing; those members are only valid when the event is consumed.The values that you’re storing in Redis are
long.MinValue
, corresponding to uninitialized data. (this is detailed in theremarks
section of the docs)Unfortunately, the random-access scenario that you’re looking to implement isn’t one that fits well with Event Hubs. The closest that occurs to me would be to assign a unique identifier to an application property when creating the event and then store that and its approximate publishing time (with a buffer to account for time drift) in Redis. When you want to retrieve that event, use the publish time as your starting point and the read forward until you find the event.
For example, something like:
(DISCLAIMER: I’m working from memory and unable to compile/test. Please forgive any syntax errors)
I’m illustrating using
Azure.Messaging.EventHubs
, which is the current generation Event Hubs client library. The concept would be the same if you need to continue using the legacy version as your question does.