
Bir önceki yazımda deprem verilerini nasıl toplayacağımızı anlatmıştım. Şimdi de gelen verinin nasıl analiz edileceğinin basit bir örneğini yapacağım Flink ile.
Flink ile ipucuları ve daha kapsamlı projeleri ayrı yazı olarak paylaşacağım.
Yararlı kaynaklar :
- https://ci.apache.org/projects/flink/flink-docs-stable/
- https://training.ververica.com
- Özellikle window konusunu iyi anlamak için https://flink.apache.org/news/2015/12/04/Introducing-windows.html buraya bakabilirsiniz.
Örnekte sadece ana class’lara değineceğim uzamaması adına. Projeye buradan ulaşabilirsiniz.
Keyifli okumalar.
Başlangıç olarak bir Spring Boot uygulaması açalım. Buradaki amacımız oluşturduğumuz bir stream’i dinleyerek Flink üzerinde DataStream oluşturup işlemleri yapıp çıkan analiz sonuçlarını bir yere yazmak(sink)
1-) KafkaTemplate Bean’ini oluşturma
Önceki yazımda hatırlarsanız earthquake adında bir topic’e eğer deprem varsa yazıyordum. Şimdi de bunu dinlememiz lazım ve sonrasında Kafka’ya tekrar yazacağımız için ProducerFactory oluşturmamız lazım.
2-) Kafka ilgili Consume ve Produce işleminin yapılması
earthquake topic’ini consume edip DataStream oluşturuyoruz. Sonrasında analyzedTopic adlı topic’e de analiz edilenleri yazacağız.
Kısaca DataStream’den bahsedeyim :
DataStream, Flink’in streamler üzerinde işlem(map, flatMap, filter,…) yapabilmesine olanak veren özel bir yapı. Tabi önce bunu eklememiz için StreamExecutionEnvironment adında bir env tanımlamamız lazım. Flink bizim oluşturduğumuz bir DataStream’in JobGraph’ını çıkararak bunu çalıştırıyor. Yani source üzerinde yaptığımız her işlem bir Graph dizini oluşturuyor(join -> filter -> agg , … ) ve bunu oluşturduğumuz env üzerinde yapıyoruz.Topolojimizi oluşturduktan sonra bir yere sink etmeden çalıştıramayız.(print(), addSink() …)
3-) Oluşturalan DataStream üzerinde işlem
Burada ayarları bir config üzerinden alması için application.properties’ den veriyoruz.
Bu örnek proje olduğu için StreamExecutionEnvironment’ ı burada oluşturdum. Bunu bir Bean olarak oluşturamanız proje açısından daha yararlı olacaktır.
Burada DataStream üzerinde yapılan işlemler(operatörler) :
- keyBy → Stream üzerinde aynı keyler aynı partitionda olması için(KeyedStream)
- timeWindow → Hangi aralıklarla veriyi incelemek istediğimizi belirtiyoruz(WindowedStream)
- apply → Oluşturulan windowedStream üzerinde bir fonksiyonu çalıştırmak için kullanabiliriz.(Bu örnekte belirlediğimiz window’da (1 dakika) içinde aynı key üzerinde oluşan depremleri toplayarak bir Tuple oluştuyoruz.)
- process → Bu fonksiyondan çıkan her elementi işlediğimiz yer olarak düşünebiliriz:(low-level stream processing operation)
Örnek çıkan veri :
("AKDENİZ",3)
Burada 1 dakika içinde AKDENİZ’de 3 deprem olduğunu anlıyoruz.
- addSink → Operatörler sonrasında veriyi nereye çıkartacağımızı belirttiğimiz yer. (Bu örnekte Kafka’da analyzedTopic’e analiz olmuş veriyi basıyor.)
Özet
Örnek olarak Kafka earthquake topic’indeki veri aşağıdaki şekilde olsun :
{"name":"2020.03.10 21:06:02","lokasyon":"AKDENIZ ","lat":"35.1457","lng":"27.9197","mag":"2.8","Depth":"3.3"}{"name":"2020.03.10 21:06:02","lokasyon":"AKDENIZ ","lat":"35.1457","lng":"27.9197","mag":"2.8","Depth":"3.3"}{"name":"2020.03.10 21:06:02","lokasyon":"AKDENIZ ","lat":"35.1457","lng":"27.9197","mag":"2.8","Depth":"3.3"}
Flink burada bu topic’i dinleyip belirlediğimiz window aralığında ne kadar olduğunu bulup analyzedTopic’e tekrar yazıyor.
{"location":"AKDENIZ","eqCount":"3","time":"1"}
şeklinde çıktısı oluyor.
Config’ler bir api üzerinden alınması da sağlanabilir.
Burada yararlı olması açısından şunu belirtmem lazım eğer flink’i
flink run <jar>
ile başlatmayıp IDE üzerinde başlatırsanız mini bir cluster ayağa kaldırıyor.
İlerde bunun api üzerinden iletişime geçmesi için bu cluster’ı kendi sunucunuzda ayağa kaldırıp programtik olarak erişmeniz gerekir. Burada da ClusterClient’i inceleyebilirsiniz. Flink cluster’ına bağlanıp SubmitJob ve CancelJob oluşturabilirsiniz. Ek olarak savePoint ile durdurma vb de yapabilirsiniz.
Son olarak,
Bu ayarları bir mobile app üzerinden verebilmesi için React Native ile basit bir arayüze sahip bir uygulama da yazdım.
Basit olarak çalışma mantığı şu şekilde
- Uygulama açıldığında ExponentToken alınıp backend userId ile kaydoluyor.
- Kullanıcı deprem istediği bölgenin Yer ve kaç dakika sıklığında olduğu bilgisini giriyor.
- Backend’e bu ayarda kaydoluyor.
- Flink’te analiz yapıldıktan sonra çıkan analyzedTopic’i dinleyen bir uygulama ise eğer gelen lokasyon ve zaman bilgisi tutan bir kullanıcı varsa bunu Expo Server’a gönderiyor.
- Expo Server ise gelen token’ının Andorid veya iOS olduğunu bilip buna uygun aksiyonlar alıyor.
Aşağıda görselleri incelenebilir. Projeye buradan ulaşabilirsiniz.

