Günümüz teknoloji çağında neredeyse hepimiz microservice’ler hakkında konuşuyor ve uygulamalar geliştirmeye çalışıyoruz. Yüzeysel baktığımızda her şey çok net ve uygulaması kolay gibi görünsede, özellikle söz konusu distributed transaction yönetimi olduğunda işler daha karmaşık bir hal almaya başlıyor.
Çünkü business’ımızın sağlıklı bir şekilde ilerleyebilmesini sağlayabilmemiz ve nihai business outcome’ına ulaşabilmemiz için, data’nın tutarlılığını sağlayabilmemiz gerekmektedir.
Bu makale kapsamında ise distributed ortamlarda transaction işlemlerini, Choreography-based Saga pattern’ı ile nasıl gerçekleştirebileceğimizi göstermeye çalışacağım.
Choreography-based Saga
Distributed ortamlardaki transaction yönetimi konusunda saga pattern’ı bizlere “Choreography” ve “Orchestration” olmak üzere iki farklı yaklaşım sunmaktadır.
2017 yılında saga pattern’ını orchestration yaklaşımı ile nasıl implemente edebileceğimizi buradaki makalemde ele almaya çalışmıştım. Bu makale kapsamında ise herhangi bir orchestrator’a sahip olmadan saga pattern’ını loosely coupled olarak nasıl implemente edebileceğimizi göstermeye çalışacağım.
Choreography-based saga yaklaşımındaki ana fikir, her microservice’in bireysel olarak sorumluluklarını sırasıyla yerine getirmesine ve consistency’i sağlayabilmek için beraber hareket etmelerine dayanmaktadır.
Bir başka değişle her microservice kendi sorumluluğunu yerine getirdiğinde, transaction’ı distributed ve asynchronous olarak devam ettirebilmesi için bir sonraki aşamayı ilgili bir business event’i ile tetiklemesi gerekmektedir.
Senaryo
Bir e-ticaret firmasında çalıştığımızı ve ödemeleri asynchronous olarak gerçekleştirebilmek için aşağıdaki gibi basit bir flow’a sahip olduğumuzu düşünelim.
Yukarıdaki happy-path flow’a baktığımızda;
- Client sipariş işlemini “Order Service” aracılığı ile gerçekleştirir ve bu service “Pending” durumunda bir sipariş yaratır. Ardından “OrderCreatedEvent” ini publish eder.
- Stock işlemlerinden sorumlu “Stock Service” i “OrderCreatedEvent” ini dinler ve ilgili ürünlerin stock’larını reserve eder. Ardından “StockReservedEvent” ini publish eder.
- Payment işlemlerinden sorumlu “Payment Service” i “StockReservedEvent” ini dinler ve ilgili ödeme işlemlerini gerçekleştirmeye çalışır. Ödeme işlemi başarıyla gerçekleştirilirse, “PaymentCompletedEvent” ini publish eder.
- “Order Service” i transaction’ı sonlandırabilmek için ise “PaymentCompletedEvent” ini dinler ve ilgili order’ın durumunu “Pending” den “Completed” a çeker. Böylece işlem, distributed ve consistent bir şekilde microservice’ler arasında tamamlanmış olur.
Implemente Edelim
Implementasyon kısmını açıklamaya geçmeden önce, örnek projenin tamamına buradan ulaşabilirsiniz.
Yukarıdaki happy-path flow’a ek olarak aşağıdaki gibi business requirement’larına da sahip olduğumuzu varsayalım:
- Asynchronous olarak gerçekleştirilecek ödeme işlemi sırasında herhangi bir hata meydana gelirse, “PaymentRejectedEvent” adında bir event publish edilecek.
- “PaymentRejectedEvent” event’i Stock Service’i tarafından consume edilecek ve stok’ları reserve edilen ürünler tekrardan release edilebilecek. Ardından “StocksReleasedEvent” adında bir event publish edilecek.
- “StocksReleasedEvent” event’i ise Order Service’i tarafından consume edilecek ve ilgili sipariş’in durumu pending durumundan rejected durumuna çekilecek.
Order API
Sistem içerinde bir sipariş oluşturabilmek için Order API aşağıdaki gibi bir controller ve service’e sahiptir.
[ApiController]
[Route("[controller]")]
public class OrdersController : ControllerBase
{
private readonly IOrderService _orderService;
public OrdersController(IOrderService orderService)
{
_orderService = orderService;
}
[HttpPost]
public async Task<IActionResult> CreateOrder(CreateOrderRequest request)
{
await _orderService.CreateOrderAsync(request);
return Accepted();
}
}
public class OrderService : IOrderService
{
private readonly IBus _bus;
public OrderService(IBus bus)
{
_bus = bus;
}
public async Task CreateOrderAsync(CreateOrderRequest request)
{
// Order creation logic in "Pending" state.
await _bus.PubSub.PublishAsync(new OrderCreatedEvent
{
UserId = 1,
OrderId = 1,
WalletId = 1,
TotalAmount = request.TotalAmount,
});
}
public Task CompleteOrderAsync(int orderId)
{
// Change the order status as completed.
return Task.CompletedTask;
}
public Task RejectOrderAsync(int orderId, string reason)
{
// Change the order status as rejected.
return Task.CompletedTask;
}
}
“CreateOrderAsync” method’u, sipariş’i ilk olarak pending durumunda oluşturduğumuz noktadır. Çünkü bu aşamada henüz sipariş içerisindeki ürünlerin stok durumlarını veya ödemeyi başarıyla gerçekleştirip gerçekleştiremeyeceğimizi bilmiyoruz.
Sipariş oluşturulduktan sonra ise distributed bir şekilde işlem bütünlüğünü sağlayabilmemiz için, “OrderCreatedEvent” adında bir event publish ediyoruz. Bir nevi işlemler zincirin bir sonraki aşamasını burada tetikliyoruz.
“CompleteOrderAsync” method’unu ise, tüm işlemlerin başarıyla gerçekleştiği durumda siparişi pending durumundan completed durumuna güncelleyebilmek için kullanacağız.
İşlemlerin başarıyla tamamlanıp tamamlanmadığını Order Service’in anlayabilmesi için ise, içerisinde işlemler zincirinin son parçası olan “PaymentCompletedEvent” ini dinleyen aşağıdaki gibi bir consumer bulunmaktadır.
public class PaymentCompletedEventConsumer : IConsumeAsync<PaymentCompletedEvent>
{
private readonly IOrderService _orderService;
public PaymentCompletedEventConsumer(IOrderService orderService)
{
_orderService = orderService;
}
public async Task ConsumeAsync(PaymentCompletedEvent message, CancellationToken cancellationToken = default)
{
await _orderService.CompleteOrderAsync(message.OrderId);
}
}
Ayrıca “RejectOrderAsync” method’unu da, ödeme işlemleri aşamasında oluşabilecek herhangi bir hata durumunda ilgili siparişi rejected durumuna getirebilmek için kullanacağız.
Order Service’in ilgili sipariş durumunu rejected olarak güncelleyebilmesi için ise, Stock Service’inin ilgili ürün stok’larını tekrardan release ettikten sonra publish ettiği “StocksReleasedEvent” ini aşağıdaki gibi consume etmektedir.
public class StocksReleasedEventConsumer : IConsumeAsync<StocksReleasedEvent>
{
private readonly IOrderService _orderService;
public StocksReleasedEventConsumer(IOrderService orderService)
{
_orderService = orderService;
}
public async Task ConsumeAsync(StocksReleasedEvent message, CancellationToken cancellationToken = default)
{
await _orderService.RejectOrderAsync(message.OrderId, message.Reason);
}
}
Stock Service
Sistem içerisinde bir sipariş pending durumunda oluşturulduğunda, o sipariş içerisindeki ürünlerin stok’ları Stock Service’i tarafından rezerve edilmektedir.
Bunun için Stock Service’i içerisinde “OrderCreatedEvent” ini dinleyen aşağıdaki gibi bir consumer bulunmaktadır.
public class OrderCreatedEventConsumer : IConsumeAsync<OrderCreatedEvent>
{
private readonly IStockService _stockService;
private readonly IBus _bus;
public OrderCreatedEventConsumer(IStockService stockService, IBus bus)
{
_stockService = stockService;
_bus = bus;
}
public async Task ConsumeAsync(OrderCreatedEvent message, CancellationToken cancellationToken = default)
{
await _stockService.ReserveStocksAsync(message.OrderId);
await _bus.PubSub.PublishAsync(new StocksReservedEvent
{
UserId = message.UserId,
OrderId = message.OrderId,
WalletId = message.WalletId,
TotalAmount = message.TotalAmount
});
}
}
Bu consumer içerisinde basit olarak ürünlerin stok’larının rezerve işlemleri gerçekleştirilip, ardından “StockReservedEvent” i publish edilmektedir. Böylelikle sipariş işlem bütünlüğünün bir aşaması daha tamamlanıp, bir sonraki aşaması tetiklenmiş olmaktadır.
Ayrıca ödeme işlemlerinde oluşabilecek herhangi bir hata durumuna kaşı rezerve edilen ürünlerin stok’larının tekrardan release edilebilmesi için, “PaymentRejectedEvent” ini dinleyen aşağıdaki gibi bir consumer’a da sahiptir.
public class PaymentRejectedEventConsumer : IConsumeAsync<PaymentRejectedEvent>
{
private readonly IStockService _stockService;
private readonly IBus _bus;
public PaymentRejectedEventConsumer(IStockService stockService, IBus bus)
{
_stockService = stockService;
_bus = bus;
}
public async Task ConsumeAsync(PaymentRejectedEvent message, CancellationToken cancellationToken = default)
{
await _stockService.ReleaseStocksAsync(message.OrderId);
await _bus.PubSub.PublishAsync(new StocksReleasedEvent
{
OrderId = message.OrderId,
Reason = message.Reason
});
}
}
Payment Service
Sipariş içerisindeki ürünlerin stok’ları rezerve edildikten sonra, ödeme işlemi Payment Service’i tarafından gerçekleştirilmektedir.
Bu service içerisinde ödeme işlemlerinin tetiklenebilmesi için “StocksReservedEvent” ini dinleyen aşağıdaki gibi bir consumer bulunmaktadır.
public class StocksReservedEventConsumer : IConsumeAsync<StocksReservedEvent>
{
private readonly IPaymentService _paymentService;
private readonly IBus _bus;
public StocksReservedEventConsumer(IPaymentService paymentService, IBus bus)
{
_paymentService = paymentService;
_bus = bus;
}
public async Task ConsumeAsync(StocksReservedEvent message, CancellationToken cancellationToken = default)
{
Tuple<bool, string> isPaymentCompleted = await _paymentService.DoPaymentAsync(message.WalletId, message.UserId, message.TotalAmount);
if (isPaymentCompleted.Item1)
{
await _bus.PubSub.PublishAsync(new PaymentCompletedEvent
{
OrderId = message.OrderId
});
}
else
{
await _bus.PubSub.PublishAsync(new PaymentRejectedEvent
{
OrderId = message.OrderId,
Reason = isPaymentCompleted.Item2
});
}
}
}
Burada ise basit olarak ödeme işlemleri gerçekleştirilmektedir. Eğer ödeme işlemi başarıyla gerçekleştirilirse, sipariş’in durumunun Order Service tarafından completed olarak güncellenebilmesi için “PaymentCompletedEvent” adında bir event publish edilmektedir.
Eğer ödeme işlemi başarıyla gerçekleştirilemezse, “PaymentRejectedEvent” adında bir event publish edilmektedir. Böylece Stock Service’i rezerve edilen ürünlerin stok’larını, örnek business requirement’ı gereği tekrardan release edebilmektedir.
Böylelikle sipariş transaction’ı distributed bir şekilde uçtan uca loosely coupled ve consistent olarak tamamlanmış olmaktadır.
Toparlayalım
Her design pattern’ın spesifik bir business problemine bir çözüm önerisi getirdiği gibi, saga pattern’ıda bizlere distributed ortamlarda transaction yönetimi konusunda bir yöntem sunmaktadır. Bu yöntemin arkasındaki temel mantık ise, bir dans ekibi içerisindeki her bir üyenin bir akış ve uyum içerisinde hareket ettikleri gibi uygulamalarımızın da bir akış ve uyum içerisinde hareket etmelerine dayanmaktadır.
Uygulanması her ne kadar basit bir pattern gibi görünsede, elbette getirdiği bazı dez avantajları/zorlu yönleri de bulunmaktadır.
- Sistem iyi bir şekilde düşünülüp, her bir senaryo dikkatlice ele alınmalı ve ilgili takımlar tarafından anlaşılmalıdır.
- İşlem zinciri içerisindeki her bir service’in, ilgili compensating method’larını sağlaması gerekmektedir.
- İşlem bütünlüğü event’ler vasıtasıyla gerçekleştirileceği için, resiliency büyük bir önem taşımaktadır. Bunun için outbox gibi pattern’lar yardımıyla ilgili event’lerin başarıyla publish edilebilmesi sağlanmalıdır.
- İşlem zinciri içerisine bir çok farklı service’in katılması gerektiği durumlarda ise sistem kompleks bir hal almaya başlayabilir. Bu tarz durumlarda da choreography yerine orchestration yaklaşımını tercih etmek daha doğru bir yaklaşım olabilmektedir.