Eskiden sadece belirli bir kesime hitap eden uygulamalar geliştirirken, günümüz teknoloji çağının ilerlemesiyle ve global marketten de bir pay alabilmek için, artık daha büyük bir kesime hitap eden uygulamalar geliştirmemiz gerekiyor.
Geliştirdiğimiz uygulamaların 7/24 kesintisiz hizmet verebiliyor olması ve response süreleri düşük olan, kullanıcı deneyimi sorunsuz ve yüksek olan uygulamar olması da oldukça önem arz ediyor.
Bildiğimiz gibi uygulamalarımıza bu kabiliyetleri kazandırabilmek için, bir çok teknoloji ve mimariler’den yararlanıyoruz. Microservice yaklaşımı ile scalable, resilient ve lightweight uygulamalar geliştiriyor, response sürelerini minimize edebilmek için de farklı caching teknolojilerinden faydalanmaya çalışıyoruz.
Peki, bu kadar mimari boyutta uygulamalarımıza bir şeyler kazandırabilmek için uğraşırken, bazı durumlarda neden kod’larımızı, thread’lerimizi blokluyoruz?
Unutmamalıyız ki mimari boyutta uygulamalarımıza bir şeyler kazandırmaya çalıştığımız kadar, kod boyutunda yaptıklarımız da bi o kadar önemlidir.
Bazı durumlarda procedural programlama paradigması mantığından farklı olarak, yani kod satırlarındaki akışların sırasıyla takip edilmesi yerine, event-based programlama mantığında hareket etmeli ve uygulamalarımızı reactive bir hale getirmeliyiz.
En büyük yardımcımız, Reactive Extensions (Rx)!
Rx’i, kısaca hatırlayalım.
Rx için kısaca, observable stream’leri kullanarak push-based, asynchronous ve daha responsive uygulamalar geliştirebilmemize olanak sağlayan güçlü bir library’dir diyebiliriz.
Rx, complex business logic’lerini daha basit bir halde ve ayrıca asynchronous olarak kolay bir şekilde handle edebilmemizi sağlamaktadır. Ayrıca Rx, bir çoğumuzun bildiği gibi yeni bir konsept olmamakla beraber, temelinde Observer design pattern konsept’i bulunmaktadır.
Kullanım senaryolarını ise, genel olarak aşağıdaki gibi sıralayabiliriz.
- Event-based işlemler. Özellikle uygulama düzeyindeki complex business logic’leri daha kolay bir şekilde handle edebilmek ve request’lere karşı responsive olabilmek için.
- Asynchronous stream’leri sürekli consume edebilme.
- ve concurrent programlama.
Bu makale kapsamında ise kod’larımızı ve thread’lerimizi blok’lamadan, asynchronous ve event-based interaction’ları uygulamalarımız içerisinde en basit haliyle nasıl handle edebiliriz senaryosunu cover etmeye çalışacağım.
Observers
Örnek bir uygulamaya geçmeden önce, Rx’in temel taşlarını oluşturan “IObservable<T>” ve “IObserver<T>” interface’lerinden bahsetmek istiyorum.
Bu harika ikili, tıpkı “IEnumerable<T>” and “IEnumerator<T>” interface’lerine benzemektedir. Farklı olarak pull-based bir yaklaşım yerine, push-based bir yaklaşım ile çalışmaktadır. Böylece sürekli bir kaynakta data var mı diye sormak yerine, ilgili kaynağa subscribe olarak event’lere karşı daha responsive uygulamalar geliştirebilmekteyiz.
“IObservable<T>” interface’ini, gözlemlemek istediğimiz bir kaynak olarak düşünebiliriz. İçerisinde ise “Subscribe(IObserver<T> observer)” method’unu bulundurmaktadır. Adından da anlayabileceğimiz üzere, “IObserver<T>” ise gözlemcimiz.
Hemen basit bir örnek gerçekleştirelim.
using System.Reactive.Linq;
namespace ReactiveNumbers
{
class Program
{
static void Main(string[] args)
{
IObservable<long> numbers = Observable.Interval(TimeSpan.FromSeconds(1));
numbers.Subscribe(num =>
{
Console.WriteLine(num);
});
Console.ReadKey();
}
}
}
Yukarıdaki console uygulamasına bakarsak, “IObservable<T>” tipinde her saniye tetiklenecek bir kaynak oluşturduk. Ardından bu kaynağa subscribe olarak, data var olduğu sürece console ekranına yazdırılmasını sağladık.
Console çıktısı ise, aşağıdaki gibi sonsuz bir şekilde olacaktır.
% dotnet run
0
1
2
3
4
5
6
7
8
9
...
Subject
Bir gün bizden anlık bir chat uygulaması geliştirmemiz istediğini varsayalım. Ayrıca chat mesajlarının persistent olması ve bu işlemin son kullanıcı response sürelerini etkilememesi de istenmektedir.
Bu kapsamda, anlık chat işlemleri için SignalR library’sinden yararlanalım.
Peki, öncelikle aşağıdaki gibi bir ASP.NET Core Web API projesi oluşturalım. Ardından “SignalR” ve “System.Reactive” library’lerini NuGet üzerinden projeye dahil edelim.
dotnet new webapi -n MyChat
dotnet add package SignalR
dotnet add package System.Reactive
Şimdi “Models” isminde bir klasör oluşturalım ve içerisinde “ChatMessageReceivedEvent” adında bir event model’i tanımlayalım.
namespace MyChat.Models
{
public class ChatMessageReceivedEvent
{
public string Message { get; set; }
}
}
Bu event’i, socket üzerinden bir chat mesajı aldığımızda publish edeceğiz. Böylece non-blocking, event-based bir chat uygulaması geliştirmeye çalışacağız.
Şimdi ise “Handlers” isminde bir klasör oluşturalım ve içerisinde Rx’i implemente edeceğimiz interface’i aşağıdaki gibi tanımlayalım.
using System;
using MyChat.Models;
namespace MyChat.Handlers
{
public interface IChatEventHandler
{
void Publish(ChatMessageReceivedEvent eventMessage);
void Subscribe(string subscriberName, Action<ChatMessageReceivedEvent> action);
void Subscribe(string subscriberName, Func<ChatMessageReceivedEvent, bool> predicate, Action<ChatMessageReceivedEvent> action);
}
}
Tanımlamış olduğumuz “Publish” method’u ile, Rx stream’ine bir event göndereceğiz. Ardından “Subscribe” method’u ile de, Rx stream’ine dilediğimiz bir action’ı ekleyeceğiz.
Şimdi aşağıdaki gibi “Handlers/Implementations” klasörü altında implementation işlemini gerçekleştirelim.
using System;
using System.Collections.Generic;
using System.Reactive.Linq;
using System.Reactive.Subjects;
using MyChat.Models;
namespace MyChat.Handlers.Implementations
{
public class ChatEventHandler : IChatEventHandler, IDisposable
{
private readonly Subject<ChatMessageReceivedEvent> _subject;
private readonly Dictionary<string, IDisposable> _subscribers;
public ChatEventHandler()
{
_subject = new Subject<ChatMessageReceivedEvent>();
_subscribers = new Dictionary<string, IDisposable>();
}
public void Publish(ChatMessageReceivedEvent eventMessage)
{
_subject.OnNext(eventMessage);
}
public void Subscribe(string subscriberName, Action<ChatMessageReceivedEvent> action)
{
if (!_subscribers.ContainsKey(subscriberName))
{
_subscribers.Add(subscriberName, _subject.Subscribe(action));
}
}
public void Subscribe(string subscriberName, Func<ChatMessageReceivedEvent, bool> predicate, Action<ChatMessageReceivedEvent> action)
{
if (!_subscribers.ContainsKey(subscriberName))
{
_subscribers.Add(subscriberName, _subject.Where(predicate).Subscribe(action));
}
}
public void Dispose()
{
if (_subject != null)
{
_subject.Dispose();
}
foreach (var subscriber in _subscribers)
{
subscriber.Value.Dispose();
}
}
}
}
Dikkat edersek burada, “IObservable<T>” yerine “Subject<T>” class’ını kullandık. Çünkü bu harika class, hem “IObservable<T>” hem de “IObserver<T>” ı implemente etmektedir ve bir proxy gibi davranmaktadır.
“Publish” method’unda ise, stream içerisinde yeni bir event gerçekleştiğinde “Subject” class’ının tüm subscriber’ları bilgilendireceği bir observable contract’ı olan “OnNext” method’unu implemente ettik.
Bu akışı görselleştirdiğimizde ise, aşağıdaki gibi görünecektir.
“Subscribe” method’u içerisinde ise, Rx stream’ine subscribe olma işlemini gerçekleştiriyoruz. Rx’in en sevdiğim tarafı ise, Linq operasyonlarını da destekliyor olması.
Unsubscribe işlemini gerçekleştirebilmemiz için ise, subscribe olma işlemi geriye bir “IDisposable” dönmektedir. “Dispose” method’u içerisinde ise, dictionary’e eklemiş olduğumuz subscriber’ların unsubscribe işlemlerini gerçekleştiriyoruz. Bu konu hakkındaki detaylı bilgiye ise, buradan ulaşabilirsiniz.
Şimdi chat kısmına geçebiliriz.
Bunun için, “Hubs” isminde bir klasör oluşturarak içerisinde “ChatHub” isminde bir class tanımlayalım ve aşağıdaki gibi implemente edelim.
using System.Threading.Tasks;
using Microsoft.AspNetCore.SignalR;
using MyChat.Handlers;
using MyChat.Models;
namespace MyChat.Hubs
{
public class ChatHub : Hub
{
private readonly IChatEventHandler _chatEventHandler;
public ChatHub(IChatEventHandler chatEventHandler)
{
_chatEventHandler = chatEventHandler;
}
public async Task SendMessage(string sender, string message)
{
await Clients.All.SendAsync("chat", sender, message);
_chatEventHandler.Publish(new ChatMessageReceivedEvent
{
Message = message
});
}
}
}
Burada basit olarak “SignalR” library’sinin “Hub” class’ını inherit alarak, chat için socket üzerinden kullanacağımız “SendMessage” method’unu tanımladık.
Ayrıca son kullanıcının gönderdiği mesajları, son kullanıcının response süresini etkilemeden persistent bir hale getirebilmek için, “IChatEventHandler” aracılığıyla bir observable stream haline çeviriyoruz.
Her bir mesaj publish edildiğinde ise, Rx stream’i içerisindeki “OnNext” method’u çağırılacak ve ilgili tüm subscriber’lar bu event’ten haberdar edilecektir.
Şimdi chat mesajlarını persist edebilmek için, Rx stream’ine “ChatHistoryConsumer” adında “Handlers/Implementations” path’i altına aşağıdaki gibi bir observer subscribe edelim.
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.Hosting;
using MyChat.Models;
namespace MyChat.Handlers.Implementations
{
public class ChatHistoryConsumer : BackgroundService
{
private readonly IChatEventHandler _eventHandler;
public ChatHistoryConsumer(IChatEventHandler eventHandler)
{
_eventHandler = eventHandler;
}
protected override Task ExecuteAsync(CancellationToken stoppingToken)
{
_eventHandler.Subscribe(subscriberName: typeof(ChatHistoryConsumer).Name,
action: async (e) =>
{
if (e is ChatMessageReceivedEvent)
{
await PersistChatMessagesToDBAsync((ChatMessageReceivedEvent)e);
}
});
return Task.CompletedTask;
}
private async Task PersistChatMessagesToDBAsync(ChatMessageReceivedEvent e)
{
await System.Console.Out.WriteLineAsync($"Chat message received and persisted: {e.Message}");
}
}
}
Burada ise “ChatHistoryConsumer” class’ının bir background task’ı olarak çalışabilmesi için, “BackgroundService” class’ını inherit ederek “ExecuteAsync” method’unu implemente ettik.
Ardından “ExecuteAsync” method’u içerisinde ise, “ChatMessageReceivedEvent” için subscription işlemini gerçekleştirdik.
Şimdi “Startup” class’ını ise, aşağıdaki gibi güncelleyelim.
using Microsoft.AspNetCore.Builder;
using Microsoft.AspNetCore.Hosting;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using MyChat.Handlers;
using MyChat.Handlers.Implementations;
using MyChat.Hubs;
namespace MyChat
{
public class Startup
{
public Startup(IConfiguration configuration)
{
Configuration = configuration;
}
public IConfiguration Configuration { get; }
// This method gets called by the runtime. Use this method to add services to the container.
public void ConfigureServices(IServiceCollection services)
{
services.AddRazorPages();
services.AddControllers();
services.AddSignalR();
services.AddSingleton<IChatEventHandler, ChatEventHandler>();
services.AddHostedService<ChatHistoryConsumer>();
}
// This method gets called by the runtime. Use this method to configure the HTTP request pipeline.
public void Configure(IApplicationBuilder app, IWebHostEnvironment env)
{
if (env.IsDevelopment())
{
app.UseDeveloperExceptionPage();
}
app.UseHttpsRedirection();
app.UseStaticFiles();
app.UseRouting();
app.UseAuthorization();
app.UseEndpoints(endpoints =>
{
endpoints.MapRazorPages();
endpoints.MapHub<ChatHub>("/hubs/chat");
});
}
}
}
Burada ise chat uygulamalasını test edebilmek için, Razor Pages‘i ve SignalR‘ı etkinleştirdik. Ardından gerekli injection işlemlerini gerçekleştirdik.
Şimdi test işlemini yapabilmek için, basit bir UI hazırlayalım.
Öncelikle gerekli SignalR javascript paketlerini projeye dahil edebilmek için, buradaki ilk adımı takip edelim.
Ardından “Pages” isminde bir klasör oluşturarak, içerisinde “Chat” adında aşağıdaki gibi bir Razor Page oluşturalım.
@page
<style>
input[type=text] {
width: 100%;
padding: 12px 20px;
margin: 8px 0;
box-sizing: border-box;
}
</style>
<div>
<h2>Chat Test</h2>
<div>
<ul id="messageList"></ul>
</div>
</div>
<div>
<div>Sender: <input type="text" id="sender"/></div>
<div>Message: <input type="text" id="message""></div>
<div><input type="button" id="sendMessage" value="Send" /></div>
</div>
<script src="/lib/signalr/signalr.js"></script>
<script src="/lib/signalr/chat.js"></script>
SignalR javascript client’ını ise, “chat.js” adıyla “wwwroot/lib/signalr” klasör path’i altında aşağıdaki gibi oluşturalım.
const connection = new signalR.HubConnectionBuilder()
.withUrl("https://localhost:5001/hubs/chat")
.build();
document.getElementById("sendMessage").addEventListener("click", event => {
const message = document.getElementById("message").value;
const sender = document.getElementById("sender").value;
connection.invoke("SendMessage", sender, message).catch(err => console.error(err.toString()));
event.preventDefault();
});
connection.on("chat", (sender, message) => {
const recMessage = sender + ": " + message;
const li = document.createElement("li");
li.textContent = recMessage;
document.getElementById("messageList").appendChild(li);
});
connection.start().catch(err => console.error(err.toString()));
Burada ise oluşturmuş olduğumuz “ChatHub” a socket üzerinden bağlanacak ve mesaj gönderilebilecek basit bir UI hazırladık.
Şimdi test işlemine geçebiliriz.
Test edebilmek için uygulamayı “dotnet run” komutu ile çalıştıralım ve iki farklı browser üzerinden aşağıdaki gibi chat işlemini gerçekleştirelim.
Yukarıdaki console ekranına dikkat edersek, biz chat işlemini gerçekleştirirken oluşturmuş olduğumuz observer ise, mesajların persist edilme işlemini non-blocking ve asynchronous bir şekilde gerçekleştirmiştir.
Rx yaklaşımının güzel olan tarafı ise, yeni bir özellik eklenilmesi istendiğinde business logic’leri kompleks bir hale getirmeden bu değişikliklere izin vermesi. Örneğin, chat mesajları içerisinde kötü kelimelere karşı filtreleme yapılması özelliğinin istendiğini düşünebiliriz. Tek yapmamız gereken, ilgili Rx stream’ine yeni bir observer daha eklemek olacaktır.
GitHub: https://github.com/GokGokalp/dotnetcore-reactive-extensions
Referanslar
docs.microsoft.com/en-us/dotnet/api/system.iobservable-1?view=netframework-4.7.2&WT.mc_id=DT-MVP-5003382
dotnetcorecentral.com/blog/reactive-extensions-in-net-core/
docs.microsoft.com/en-us/previous-versions/dotnet/reactive-extensions/hh242974%28v%3dvs.103%29?WT.mc_id=DT-MVP-5003382