Event-Driven Architecture’larda Conditional Claim-Check Pattern’ı ile Event Boyut Sınırlarının Üstesinden Gelmek
Günümüz teknoloji çağında, geliştirdiğimiz uygulama çözümlerini olabildiğince ölçeklenebilir, dayanıklı ve esnek hale getirebilmek için genellikle event-driven architecture temelli yapılar üzerinde inşa ediyoruz. Event-driven architecture her ne kadar kulağa hoş gelse de beraberinde birçok karmaşıklık ve limitasyonlar da getirmektedir. Daha önceki makalelerim de dayanıklı ve esnek bir microservice altyapısı inşaa edebilmek için ne gibi best practice’leri uygulayabileceğimiz gibi konulara da değinmeye çalışmıştım.
Bunlardan bir kaçı;
- .NET Microservice’lerinde Outbox Pattern’ı ile Eventual Consistency için Atomicity Sağlama
- Event’im Nasıl Olmalı? Event-Based Sistemler Hakkında Bazı Düşünceler
- .NET Microservice’lerinde Choreography-based Saga
- Microservice Mimarisinde Resiliency Pattern’ları
Bu makalede ise, limitasyonlardan biri olan event/message boyut limitini ele almaya çalışacağım.
Event Boyutunun Önemi
Event-driven architecture bağlamında event boyutlarının önemi oldukça büyüktür. Tüm sistem event’ler üzerine kurulu olduğu için, bir event’in boyutu tüm sistemin ölçeklenebilirliğinde, performansında ve maliyeti üzerinde doğrudan etkili olabilmektedir.
Normal şartlar altında genellikle event payload boyutlarının kolay kolay çok fazla büyük olacağını düşünmüyorum. Ama bazen farklı gereksinimleri gerektiren domain’ler üzerinde çalışıyor olabiliriz.
Örneğin, e-ticaret domaininde sipariş servisi tarafından bir sipariş oluşturulduğunda fatura servisi ilgili event’i dinleyerek PDF formatında bir fatura oluşturabilir. Bu PDF faturanın ise farklı servisler tarafından kullanılması gerekebilir. Örneğin başka bir servis tarafından kullanıcıya ilgili fatura gönderilebilir veya yine başka bir servis tarafından ilgili fatura uzun süreli saklama için arşive kaydedilebilir.
Veya fat events yaklaşımını kullanıyor olabiliriz. Bu yaklaşımda, ilgili domain’in çok fazla attribute’e sahip olması nedeniyle event payload’ları oldukça büyük olabilir.
Kısacası, bu gibi senaryolarda bazen event boyutları domain’lere göre farklılık gösterebilmekte ve bu event’leri farklı domain’ler arasında paylaşabilmekteyiz. Elbette bu kararları vermeden önce, bunların genel sistem üzerindeki etkilerini de iyi anlamamız gerekmektedir. Mesela bazı durumlarda genel performansa olacağı minimal etkileri kabul edebilsek de, maliyeti arttırabileceği durumlarda farklı çözümler uygulamamız gerekebilmektedir.
Örneğin, Azure Service Bus gibi fully managed bir message broker kullanıyor olabiliriz. Azure Service Bus, standard tier’ı içerisinde bizlere 256KB’lık bir message boyut sınırlaması getirmektedir. Elbette bu sınırlamayı 100MB’a kadar premium tier’ı ile arttırabilmekteyiz. Ancak söz konusu maliyet-fayda oranını maksimize etmek olduğunda, maliyetleri çok fazla arttırmadan, best practice’lerle bu tarz problemleri nasıl çözebileceğimize odaklanmamız bizlere daha faydalı olacaktır.
Claim-Check Pattern’ı
Claim-check pattern’ı ise bahsetmiş olduğumuz senaryolar karşısında kullanabileceğimiz bir messaging pattern’ıdır.
Bu pattern, büyük boyutlu event payload’larının message broker’lar üzerinden taşınması yerine, ilgili payload verisinin external bir storage üzerinde saklanmasını ve buna referans eden bir token veya key’in (claim-check) oluşturulmasını önermektedir. İlgili event consumer’larının ise, oluşturulan referansı kullanarak ilgili veriyi storage üzerinden elde edebileceklerini söylemektedir. Bu sayede hem messaging sisteminin performansı optimize edebilmemize olanak tanırken, hem de oluşabilecek potansiyel maliyetleri de alternatif bir yöntemle kontrol altına alabilmemizi sağlamaktadır.
Implemente Edelim
Microservice’lerimiz içinde, claim-check pattern’ını koşullu bir şekilde kolayca nasıl implemente edebileceğimize bir bakalım.
Örnek senaryo olarak bir e-ticaret domain’i içerisinde çalıştığımızı düşünelim ve sistem içerisinde bir sipariş oluşturulduktan sonra büyük bir “OrderCreatedEvent
” publish edildiğini varsayalım. Bu örneğimizde, message broker olarak Azure Service Bus ‘ı, external storage olarak ise Azure Storage Account ‘u kullanacağım. Şimdi, işe servislerimiz arasında messaging işlemlerini gerçekleştirecek basit bir service bus kütüphanesi geliştirerek başlayalım.
İlk olarak “TodoEcom.ServiceBus
” adında bir .NET 9 class library projesi oluşturalım ve ardından içerisine aşağıdaki NuGet package’larını dahil edelim.
- Azure.Messaging.ServiceBus
- Azure.Storage.Blobs
- Microsoft.Extensions.Configuration.Binder
- Microsoft.Extensions.Http
- System.Configuration.ConfigurationManager
Ardından “ServiceBus
” adında bir class oluşturarak içerisinde ilk olarak “IServiceBus
” interface’ini ve Azure Service Bus message’larını customize edeceğimiz wrapper’ı tanımlayalım.
using System.Collections.Concurrent;
using System.Text.Json;
using Azure.Messaging.ServiceBus;
using Azure.Storage.Blobs;
using Microsoft.Extensions.Configuration;
namespace TodoEcom.ServiceBus;
public interface IServiceBus
{
Task Publish<T>(T @event, string? queueName = null, CancellationToken cancellationToken = default);
Task Receive<ServiceBusReceivedMessageWrapper<T>>(string? queueName = null, CancellationToken cancellationToken = default);
}
public class ServiceBusReceivedMessageWrapper<T>
{
private readonly ServiceBusReceivedMessage _receivedMessage;
private readonly ServiceBusReceiver _receiver;
public T Data { get; }
internal ServiceBusReceivedMessageWrapper(ServiceBusReceivedMessage receivedMessage, ServiceBusReceiver receiver, T payload)
{
_receivedMessage = receivedMessage;
_receiver = receiver;
Data = payload;
}
public async Task CompleteAsync()
{
await _receiver.CompleteMessageAsync(_receivedMessage);
}
}
Bu noktada, event’leri basit bir şekilde publish edip consume edebileceğimiz bir interface tanımlıyoruz. Bu service bus library’si içerisinde conditional bir şekilde claim-check pattern’ını implemente edeceğimiz için, Azure Service Bus client library’sinin kullanmakta olduğu “ServiceBusReceivedMessage
” class’ını encapsulate ederek kendi “ServiceBusReceivedMessageWrapper
” class’ımızı oluşturuyoruz. Bu tasarım, ilgili event’in payload’unun boyutuna göre, payload’un tutulacağı “Data
” property’sini ya Azure Storage Account üzerinden ya da doğrudan Azure Service Bus üzerinden elde ederek hazırlayabilmemizi mümkün kılıyor olacak. Böylece, bu library’yi kullanan servisler, altyapıda gerçekleşen işlemlerden bağımsız bir şekilde event’lerini kusursuz bir şekilde consume etmeye devam ediyor olabilecekler.
Ayrıca, Azure Service Bus client library’si, consume edilen event’in tamamlanması işlemini gerçekleştirebilmek için kendi message tipi olan “ServiceBusReceivedMessage
” objesine ihtiyaç duymaktadır. Bu nedenle, “ServiceBusReceivedMessage
” objesini de wrapper içerisinde encapsulate ediyor ve tamamlanma işleminin de consumer’lar tarafında tutarlı bir şekilde gerçekleştirilebilmesi için “CompleteAsync
” metodu ile dışarıya açıyor olacağız.
Şimdi yine aynı class içerisinde “IServiceBus
” interface’inin implementasyonunu öncelikle “Publish
” method’undan başlayarak parça parça ele alalım.
public class AzureServiceBus : IServiceBus
{
private readonly ServiceBusClient _serviceBusClient;
private readonly BlobContainerClient? _blobContainerClient;
private readonly IHttpClientFactory _httpClientFactory;
private readonly ConcurrentDictionary<string, ServiceBusSender> _senderPool;
private readonly ConcurrentDictionary<string, ServiceBusReceiver> _receiverPool;
private readonly int _maxEventPayloadSizeLimitInKB;
private readonly int _claimCheckTokenExpirationInHours;
private const string IsClaimCheckProperty = "IsClaimCheck";
private const string ClaimCheckBlobUriProperty = "ClaimCheckBlobUri";
private readonly bool _enableClaimCheck;
private readonly string? _defaultQueueName;
public AzureServiceBus(IConfiguration configuration, IHttpClientFactory httpClientFactory)
{
var serviceBusConnectionString = configuration.GetValue<string>("ServiceBus:ConnectionString") ?? throw new KeyNotFoundException("ServiceBus:ConnectionString is not found.");
_defaultQueueName = configuration.GetValue<string>("ServiceBus:DefaultQueueName");
_serviceBusClient = new ServiceBusClient(serviceBusConnectionString);
_httpClientFactory = httpClientFactory;
_senderPool = new ConcurrentDictionary<string, ServiceBusSender>();
_receiverPool = new ConcurrentDictionary<string, ServiceBusReceiver>();
_enableClaimCheck = configuration.GetValue<bool>("ServiceBus:ClaimCheck:EnableClaimCheck");
if(_enableClaimCheck)
{
var blobStorageConnectionString = configuration.GetValue<string>("ServiceBus:ClaimCheck:BlobStorage:ConnectionString") ?? throw new KeyNotFoundException("ServiceBus:BlobStorage:ConnectionString is not found.");
var claimCheckContainerName = configuration.GetValue<string>("ServiceBus:ClaimCheck:BlobStorage:ClaimCheckContainerName") ?? throw new KeyNotFoundException("ServiceBus:BlobStorage:ClaimCheckContainerName is not found.");
_maxEventPayloadSizeLimitInKB = configuration.GetValue<int?>("ServiceBus:ClaimCheck:MaxEventPayloadSizeLimitInKB") ?? throw new KeyNotFoundException("ServiceBus:MaxEventPayloadSizeLimitInKB is not found.");
_claimCheckTokenExpirationInHours = configuration.GetValue<int?>("ServiceBus:ClaimCheck:BlobStorage:ClaimCheckTokenExpirationInHours") ?? throw new KeyNotFoundException("ServiceBus:BlobStorage:ClaimCheckTokenExpirationInHours is not found.");
_blobContainerClient = new BlobContainerClient(blobStorageConnectionString, claimCheckContainerName);
}
}
public async Task Publish<T>(T @event, string? queueName = null, CancellationToken cancellationToken = default)
{
if(string.IsNullOrEmpty(queueName))
{
queueName = _defaultQueueName!;
}
var sender = _senderPool.GetOrAdd(queueName, _serviceBusClient.CreateSender);
var serializedEvent = JsonSerializer.SerializeToUtf8Bytes(@event);
ServiceBusMessage message;
if (_enableClaimCheck && IsEventPayloadSizeExceedsLimit(serializedEvent))
{
var blobUri = await UploadPayloadToBlobAsync(serializedEvent, queueName, @event!.GetType().Name);
message = new ServiceBusMessage
{
ApplicationProperties =
{
[IsClaimCheckProperty] = true,
[ClaimCheckBlobUriProperty] = blobUri
}
};
}
else
{
message = new ServiceBusMessage(serializedEvent);
}
await sender.SendMessageAsync(message, cancellationToken);
}
private bool IsEventPayloadSizeExceedsLimit(byte[] serializedEvent)
{
var eventSize = serializedEvent.Length;
Console.WriteLine(eventSize);
return eventSize > _maxEventPayloadSizeLimitInKB * 1024;
}
private async Task<string> UploadPayloadToBlobAsync(byte[] serializedEvent, string queueName, string eventType)
{
var blobName = $"{queueName}/{eventType}/{Guid.NewGuid()}.json";
await _blobContainerClient!.CreateIfNotExistsAsync();
var blobClient = _blobContainerClient!.GetBlobClient(blobName);
using var stream = new MemoryStream(serializedEvent);
await blobClient.UploadAsync(stream, overwrite: true);
var sasUri = blobClient.GenerateSasUri(Azure.Storage.Sas.BlobSasPermissions.Read, DateTimeOffset.UtcNow.AddHours(_claimCheckTokenExpirationInHours));
return sasUri.ToString();
}
}
Öncelikle, ihtiyaç duyduğumuz gerekli tüm konfigürasyon ve gerekli tüm servis injection işlemlerini gerçekleştiriyoruz. Claim-check kullanımıyla ilgili kararı consumer’lara bırakabilmek için ise, bu özelliğin kontrolünü “ServiceBus:ClaimCheck:EnableClaimCheck
” parametresine bağlıyoruz. Ayrıca “ServiceBusSender
” ve “ServiceBusReceiver
” objelerini daha verimli yönetebilmek adına bir pooling mekanizması tanımlıyoruz.
“Publish
” method’u ise asıl claim-check mekanizmasını conditional olarak implemente ettiğimiz ana nokta. Bu noktada, ilgili event ilk olarak serialize edilmektedir ve daha sonra, “IsEventPayloadSizeExceedsLimit
” bloğu içerisinde conditional olarak claim-check işlemi gerçekleştirilmektedir. Eğer ilgili event’in boyutu belirlenmiş sınırın üzerinde ise, claim-check işlemi otomatik olarak Azure Blob Storage üzerinden gerçekleştirilecektir.
Bu noktada sırasıyla:
- “
UploadPayloadToBlobAsync
” methud’u içerisinde ilgili event payload’unu Azure Blob Storage üzerine upload etme işlemini ele alıyoruz. - Ardından ilgili payload’un Azure Blob Storage üzerinden tekrar elde edilebilmesi için ise bir SAS URI oluşturuyoruz.
- Daha sonra ise oluşturmuş olduğumuz bu SAS URI bilgisini, ilgili event’in metadata bilgileri içerisine “
ApplicationProperties
” dahil ediyoruz.
Şimdi ise “Receive
” method’u ile devam edelim.
public class AzureServiceBus : IServiceBus
{
// other codes...
public async Task<ServiceBusMessageReceivedMessageWrapper<T>> Receive<T>(string? queueName = null, CancellationToken cancellationToken = default)
{
if (string.IsNullOrEmpty(queueName))
{
queueName = _defaultQueueName!;
}
var receiver = _receiverPool.GetOrAdd(queueName, _serviceBusClient.CreateReceiver);
var message = await receiver.ReceiveMessageAsync(cancellationToken: cancellationToken);
ServiceBusReceivedMessageWrapper<T> wrappedMessage;
if (message.ApplicationProperties.TryGetValue(IsClaimCheckProperty, out var isClaimCheck) && (bool)isClaimCheck)
{
var blobUri = message.ApplicationProperties[ClaimCheckBlobUriProperty] as string
?? throw new InvalidOperationException($"The claim-check value is not found. Message CorrelationId: {message.CorrelationId}");
byte[] payload = await DownloadPayloadFromBlobAsync((string)blobUri);
var actualEvent = JsonSerializer.Deserialize<T>(payload);
wrappedMessage = new ServiceBusReceivedMessageWrapper<T>(message, receiver, actualEvent!);
}
else
{
wrappedMessage = new ServiceBusReceivedMessageWrapper<T>(message, receiver, message.Body.ToObjectFromJson()!);
}
return wrappedMessage;
}
private async Task<byte[]> DownloadPayloadFromBlobAsync(string blobUri)
{
var httpClient = _httpClientFactory.CreateClient();
using var response = await httpClient.GetAsync(blobUri);
if (response.IsSuccessStatusCode)
{
return await response.Content.ReadAsByteArrayAsync();
}
throw new Exception($"Failed to download the claim-check payload from {blobUri}. Status Code: {response.StatusCode}");
}
}
Bu noktada, ilgili event’i consume ettikten sonra metadata bilgileri içerisinde claim-check token’ının yer alıp almadığını kontrol ediyoruz. Eğer claim-check token’ı mevcutsa, metadata bilgileri içerisinden Blob SAS URI bilgisini elde ediyor ve “DownloadPayloadFromBlobAsync
” method’unu kullanarak ilgili event payload’unu elde ediyoruz. Claim-check token’ı bulunmuyor ise, ilgili event’i consume ettiğimiz gibi wrapper aracılığıyla ilgili consumer’a iletiyoruz. Böylece, daha önce de bahsettiğimiz gibi, consumer’lar altyapıda gerçekleşen claim-check işlemlerinden bağımsız bir şekilde event’leri consume etmeye devam edecektir.
Şimdi, hızlıca bir test işlemi gerçekleştirebilmek için “OrderCreatedEvent
” ‘ini publish edecek olan publisher’ı implemente edelim. İlk olarak, “TodoEcom.Contracts
” adında bir class library oluşturalım ve içerisinde publisher ve consumer arasında paylaşacak olduğumuz “OrderCreatedEvent
” ini aşağıdaki gibi tanımlayalım.
namespace TodoEcom.Contracts.Events;
public class OrderCreatedEvent
{
public int OrderId { get; set; }
public int CustomerId { get; set; }
public required string PaymentMethod { get; set; }
public DateTime CreatedAt { get; set; }
public required IReadOnlyList<ProductDTO> Products { get; set; }
// other attributes such as address, applied discounts, shipping details, transaction details and etc.
}
public class ProductDTO
{
public int ProductId { get; set; }
public required string Name { get; set; }
public int Quantity { get; set; }
public decimal Price { get; set; }
}
Bu noktada, örneği basit tutmak adına çok fazla attribute eklemedim. Ancak ana fikri iletebildiğimi düşünüyorum.
Şimdi ise “TodoEcom.OrderService.API
” adında bir .NET 9 webapi projesi oluşturalım ve “TodoEcom.Contracts
” ile “TodoEcom.ServiceBus
” library’lerini referans olarak dahil edelim. Ardından “Program.cs
” class’ını aşağıdaki gibi güncelleyelim.
using TodoEcom.Contracts.Events;
using TodoEcom.ServiceBus;
var builder = WebApplication.CreateBuilder(args);
builder.Services.AddOpenApi();
builder.Services.AddHttpClient();
builder.Services.AddSingleton<IServiceBus, AzureServiceBus>();
var app = builder.Build();
if (app.Environment.IsDevelopment())
{
app.MapOpenApi();
}
app.UseHttpsRedirection();
app.MapPost("/orders", async (IServiceBus serviceBus) =>
{
// Perform order operations...
var orderCreatedEvent = new OrderCreatedEvent
{
OrderId = 1,
CustomerId = 123,
PaymentMethod = "CreditCard",
CreatedAt = DateTime.UtcNow,
Products = Enumerable.Range(1,20).Select(i => new ProductDTO
{
ProductId = i,
Name = $"My product {i}",
Quantity = 1,
Price = i,
}).ToList()
};
await serviceBus.Publish(orderCreatedEvent);
})
.WithName("CreateAnOrder");
app.Run();
Bu noktada, gerekli client’ların registration işlemlerini gerçekleştirdikten sonra implemente etmiş olduğumuz service bus library’sini kullanarak örnek bir “OrderCreatedEvent
” ‘i publish ediyoruz. Claim-check mekanizmasını test edebilmek için ise, publish edecek olduğumuz bu event’in belirleyecek olduğumuz payload limitini aşabilmesi adına içerisine birkaç ürün bilgisi ekliyoruz.
Şimdi “appsettings.json
” dosyasını ise aşağıdaki gibi güncelleyelim.
{
"Logging": {
"LogLevel": {
"Default": "Information",
"Microsoft.AspNetCore": "Warning"
}
},
"AllowedHosts": "*",
"ServiceBus": {
"ConnectionString": "Endpoint=sb://mypocsbus.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=YOUR_SHARED_ACCESS_KEY",
"DefaultQueueName": "orders",
"ClaimCheck": {
"EnableClaimCheck": true,
"MaxEventPayloadSizeLimitInKB": 1,
"BlobStorage": {
"ConnectionString": "DefaultEndpointsProtocol=https;AccountName=mypocstorageac;AccountKey=YOUR_ACCOUNT_KEY;EndpointSuffix=core.windows.net",
"ClaimCheckContainerName": "claim-check",
"ClaimCheckTokenExpirationInHours": 1
}
}
}
}
Bu noktada, “EnableClaimCheck
” parametresi ile claim-check özelliğini kullanacağımızı belirtiyor, “MaxEventPayloadSizeLimitInKB
” parametresi ile ise Azure Service Bus üzerine publish etmek istediğimiz event boyutunu 1 KB ile sınırlıyoruz.
Şimdi, “TodoEcom.BillingService.Consumer
” adında bir .NET 9 worker projesi oluşturalım ve “TodoEcom.Contracts
” ile “TodoEcom.ServiceBus
” library’lerini referans olarak ekleyelim. Ardından, “OrderCreatedEvent
” ‘inin consume işlemini gerçekleştirelim.
using TodoEcom.Contracts.Events;
using TodoEcom.ServiceBus;
namespace TodoEcom.BillingService.Consumer;
public class Worker : BackgroundService
{
private readonly ILogger<Worker> _logger;
private readonly IServiceBus _serviceBus;
private const string OrdersQueueName = "orders";
public Worker(ILogger<Worker> logger, IServiceBus serviceBus)
{
_logger = logger;
_serviceBus = serviceBus;
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
while (!stoppingToken.IsCancellationRequested)
{
if (_logger.IsEnabled(LogLevel.Information))
{
_logger.LogInformation("Worker running at: {time}", DateTimeOffset.Now);
}
var @event = await _serviceBus.Receive<OrderCreatedEvent>(OrdersQueueName, stoppingToken);
Console.WriteLine($"Event received. OrderId: {@event.Data.OrderId} ");
// some operations...
await @event.CompleteAsync();
}
}
}
“appsettings.json
” dosyasını da aşağıdaki gibi güncelleyelim.
{
"Logging": {
"LogLevel": {
"Default": "Information",
"Microsoft.Hosting.Lifetime": "Information"
}
},
"ServiceBus": {
"ConnectionString": "Endpoint=sb://mypocsbus.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=YOUR_SHARED_ACCESS_KEY"
}
}
Artık test etmeye hazırız.
Test Edelim
İlk olarak “TodoEcom.OrderService.API
” projesini çalıştıralım ve “/orders
” endpoint’ini trigger edelim.
c url -X POST http://localhost:5065/orders
Ardından, Azure Portal üzerinden “Service Bus Explorer
” ‘ı kullanarak, “orders
” queue’suna publish etmiş olduğumuz message’ın “Custom Properties
” bölümüne bir bakalım.
Bu noktada, publish ettiğimiz event payload’unun Azure Blob Storage ‘a yüklenip, ilgili Blob SAS URI bilgisinin “ClaimCheckBlobUri
” property’sine eklendiğini görebiliriz.
Ardından “TodoEcom.BillingService.Consumer
” projesini çalıştıralım ve ilgili event’i consume edelim.
Terminal log’larından da görebileceğimiz gibi, publish etmiş olduğumuz event, geliştirmiş olduğumuz service bus library’si tarafından consume edildikten sonra payload bilgisi başarıyla ilgili storage account üzerinden alınmış ve “TodoEcom.BillingService.Consumer
” projesine kesintisiz bir şekilde consume edebilmesi için iletilmiştir.
Bu makale kapsamında, conditional claim-check mekanizmasını büyük payload’ları yönetme süreçlerinde basit bir şekilde nasıl kullanabileceğimizi ele almaya çalıştık. Implemente etmiş olduğumuz service bus library’si sayesinde, ilgili consumer’lar altyapı işlemlerinden bağımsız olarak event’lerini kesintisiz bir şekilde publish ve consume etmeye devam ederken, sistemin ölçeklenebilirliğinin ve verimliliğinin minimum düzeyde etkilenebilmesini sağlamaya çalıştık.
Örnek kodlara buradan ulaşabilirsiniz: https://github.com/GokGokalp/claim-check-sample
Referanslar
https://learn.microsoft.com/en-us/azure/architecture/patterns/claim-check
https://learn.microsoft.com/en-us/azure/service-bus-messaging/service-bus-dotnet-get-started-with-queues
Eline sağlık, yine çok kıymetli bir yazı olmuş.