Dağıtık ve ölçeklenebilir büyük veri uygulamalarında “event/message streaming” yani mesaj işleme işlemleri için Kafka’nın çok tercih edilen ve yaygınlıkla kullanılan bir teknoloji olduğunu biliyoruz. Bu makalede ise, Kafka’yı yine çok tercih edilen bir NoSQL veritabanı teknolojisi olan Cassandra ile entegre edecek ve Kafka’ya gelen event’lerin Cassandra üzerinde yarattığımız tabloya real-time(gerçek zamanlı) bir şekilde akışını izleyeceğiz.

Başlamadan önce bir ön not eklemek istiyorum. Makalede tüm adımlar, Linux işletim sistemi üzerinde gerçekleştirilecek. Dolayısıyla eğer Windows üzerinde çalışıyorsanız, bazı alternatif yollar izlemeniz gerekecektir.

KafkaConnect’in Mimarisi

Entegrasyona başlamadan önce, KafkaConnect’in mimarisinden ve ne olduğundan biraz bahsedelim. KafkaConnect, basitçe açıklamak gerekirse Kafka’nın dış sistemler ile data entegrasyonu yapmasını sağlayan bir araçtır. Herhangi bir dış sistemden bir Kafka topic’ine data almak veya tam tersi bir topic içerisindeki datayı başka bir sisteme akıtmak adına kullanılır.

Görsel: Confluent

Görselden de anlaşılacağı gibi, Kafka’nın içerisine veri aktaran sistemler “source” yani kaynak olarak gözükürken, veriyi Kafka’dan dışarı akıtan sistemler ise “sink” yani hedef sistemler olarak geçmektedir. Bu mimarinin en güzel yanlarından bir tanesi elbette ki Kafka’yı dış sistemlere bağlayan kodun bir “connector” halinde hazır olarak önceden yazılmış olmasıdır. Bu durumda siz bir geliştirici olarak büyük bir iş yükünden kurtulup direkt olarak bu connector’leri kullanabiliyor ve Kafka’yı dış sistemler ile entegre edebiliyorsunuz. Genellikle tek yapılması gereken gerekli connector dosyasının yüklenip konfigürasyonun yapılması oluyor.

KafkaConnect ile Gerçek Zamanlı Veri Akışı (Streaming Pipelines)

KafkaConnect’in elbette en çok kullanıldığı senaryolardan bir tanesi, “streaming pipelines” yani real-time veri akışlarıdır. En bilinen örneklerden bir tanesi, KafkaConnect kullanarak herhangi bir uygulamanın beslediği bir veritabanından CDC vasıtasıyla Kafka broker’larını beslemek ve sonrasında ise bu event’leri yine real-time bir şekilde herhangi bir dosya sistemine(bu HDFS, S3 veya CEPH gibi bir teknoloji olabilir) kaydetmektir. Veya alternatif olarak, bunlar başka bir uygulamaya da gönderilebilir.

Görsel: Confluent

Üstteki görselde de bahsettiğim senaryo gösterilmekte. Uygulamalardan veritabanına gelen veriler real-time bir şekilde Kafka broker’larına besleniyor ve bu genellikle CDC(Change Data Capture) metodu ile yapılıyor. Sonrasında ise yine farklı bir connector vasıtasıyla herhangi bir dış sisteme aktarılabileceği gibi, tam tersi senaryoda dış sistemden Kafka içerisine de aktarılma işlemi yapılabilir.

KafkaConnect’in iç mimarisine genel bir bakış atacak olursak:

Görsel: Confluent

  • KafkaConnect içerisindeki “connector” kısmı, source veya sink sistem ile iletişimi API vasıtasıyla sağlayacak olan kısım. Fiziksel olarak bakıldığında aslında burası bir “jar” dosyası, ya da bir diğer adıyla bir Java paketi. Kafka’nın birçok sistem ile rahatça entegre olabilmesini sağlamak amacıyla, sayısız teknoloji için connector’ler yazılmış bulunuyor ve geliştiriciler için büyük bir yükü ortadan kaldırıyor.
  • “Transform” denilen kısım, gelen veri üzerinde yapılabilecek ufak transformasyon ve dönüşümler için bulunuyor. Connector konfigürasyonu yapılırken, bu dönüşümler için de eklemeler yapılarak gelen kayıtların bu işlemlerden geçirilmesi sağlanabilir.
  • Son olarak “converter” kısmında ise, verinin “serialize” edilme işlemi gerçekleşiyor. Bu işlem protobuff, avro veya JSON gibi formatlarda gerçekleşebilir. Bu tamamen senaryoya ve geliştiricinin tercihine kalmış durumda.

KafkaConnect ile Kafka-Cassandra Entegrasyonu

Bu ön bilgilerden sonra, entegrasyon için ilk adımlara başlayalım. Öncelikle, buradaki tüm teknolojilerin kurulum adımları uzun süren ve bambaşka bir makalenin konusu olabilecek adımlar olduğundan, bu yazıda o kısımları tamamen geçiyorum. Dolayısıyla elimde hazır ve ayağa kalkmış single-node Kafka ve Cassandra server’ları bulunuyor ve bunlar Docker üzerinde “containerized” bir yapıda çalışmakta. Biz KafkaConnect’i yine bir container üzerinde kuracak ve gerekli konfigürasyonları yapacağız. Sonrasında Kafka üzerinde yeni bir topic oluşturarak buraya yeni mesajlar basacağız(publish). Daha sonrasında ise Cassandra üzerinden bunları dinleyerek(consume) real-time bir şekilde oluşturacağımız tabloya INSERT edeceğiz.

Başlamadan önce bir not olarak, eğer Docker üzerinde Kafka ve Cassandra kurulumlarını da yapmak isterseniz aşağıdaki linkleri kullanabilirsiniz:

Kafka: https://github.com/bitnami/bitnami-docker-kafka

Cassandra: https://github.com/bitnami/bitnami-docker-cassandra

1. Adım: KafkaConnect’in Docker Container Üzerinde Kurulması

KafkaConnect’in kurulumuna başlayalım. Kurulumu bir Docker container’i içerisine yapacağımızdan, direkt olarak bu konu için oluşturulmuş bir docker image’ini kullanalım. Benim buradaki tercihim debezium’un yazmış olduğu image oldu. Aşağıdaki kod ile image dosyasını alıyorum:

docker pull debezium/connect

Ben docker imajlarını aynı anda yönetebilmek adına işlemimi “docker-compose” üzerinden gerçekleştireceğim. Fakat siz sadece tekil bir docker container’ı oluşturup ayağa kaldırmak isterseniz, aşağıdaki kod ile devam edebilirsiniz.

Docker image dosyasını aldıktan sonra, aşağıdaki kod ile KafkaConnect instance’ı başlatılabilir:

$ docker run -it --name connect -p 8083:8083 -e GROUP_ID=1 -e CONFIG_STORAGE_TOPIC=my-connect-configs -e OFFSET_STORAGE_TOPIC=my-connect-offsets -e ADVERTISED_HOST_NAME=$(echo $DOCKER_HOST | cut -f3  -d'/' | cut -f1 -d':') --link zookeeper:zookeeper --link kafka:kafka debezium/connect

Bu tek bir instance içindi. Fakat biz kolaylık olması açısından, “docker-compose” üzerinden devam edeceğiz. Tüm container’ları tek bir isole ortam içerisinde ayağa kaldırmak adına, öncelikle “docker-compose.yml” dosyasının içerisine bir göz atalım. Bu dosya, ayağa kaldırılacak tüm container’lar için konfigürasyon bilgilerini barındırıyor:

version: '3.0'

networks:
  kafka-net:
    driver: bridge

services:
  zookeeper:
    container_name: zookeper
    image: docker.io/bitnami/zookeeper:3.7
    ports:
      - "2181:2181"
    volumes:
      - "zookeeper_data:/bitnami"
    networks:
      - kafka-net
    environment:
      - ALLOW_ANONYMOUS_LOGIN=yes
  kafka:
    container_name: kafka
    image: docker.io/bitnami/kafka:2
    ports:
      - "9092:9092"
    volumes:
      - "kafka_data:/bitnami"
    networks:
      - kafka-net
    environment:
      - KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
      - ALLOW_PLAINTEXT_LISTENER=yes
    depends_on:
      - zookeeper
  connect:
     image: docker.io/debezium/connect:latest
     container_name: connect
     hostname: connect
     expose:
       - 8081
       - 8083
     ports:
       - "8081:8081"
       - "8083:8083"
     volumes:
       -  'connect_data:/kafka/connect'
     networks:
       - kafka-net
     environment:
       - BOOTSTRAP_SERVERS=kafka:9092
       - GROUP_ID=1
       - CONFIG_STORAGE_TOPIC=connect-cassandra-config
       - OFFSET_STORAGE_TOPIC=connect-cassandra-offset
       - KEY_CONVERTER=org.apache.kafka.connect.json.JsonConverter
       - VALUE_CONVERTER=org.apache.kafka.connect.json.JsonConverter
       - INTERNAL_KEY_CONVERTER=org.apache.kafka.connect.json.JsonConverter
       - INTERNAL_VALUE_CONVERTER=org.apache.kafka.connect.json.JsonConverter
       - ADVERTISED_HOST_NAME=connect
       - ADVERTISED_PORT=8081
     depends_on:
       - kafka
       - zookeeper
  hivemq:
    image: 'hivemq/hivemq4'
    container_name: hivemq
    ports:
      - "1883:1883"
      - "8080:8080"
    networks:
      - kafka-net
    depends_on:
      - kafka
  cassandra:
    container_name: cassandra
    hostname: 'cassandra'
    image: docker.io/bitnami/cassandra:4.0
    expose:
      - 7000
      - 9042
    ports:
      - '7000:7000'
      - '9042:9042'
    volumes:
      - 'cassandra_data:/bitnami'
    environment:
      - CASSANDRA_SEEDS=cassandra
      - CASSANDRA_DC=DC1
      - CASSANDRA_RACK=rack1
    networks:
      - kafka-net
    depends_on:
      - kafka
      - zookeeper
      - hivemq
volumes:
  zookeeper_data:
    driver: local
  kafka_data:
    driver: local
  cassandra_data:
    driver: local
  connect_data:
    driver: local

Dosyanın içeriğini inceleyelim. Dosyada ortamda bulunan tüm container’ların konfigürasyon bilgisi bulunuyor. Bunların içerisinde kafka, zookeeper, hiveMQ, Cassandra ve KafkaConnect mevcut. Biz KafkaConnect’in konfigürasyonuna bakalım.

Container ismini “connect” olarak belirledik, “image” parametresi ise kullanılan docker imajının hangisi ve nerede olduğunu bildiriyor. “Port” parametresinde, açık olan portlardan birini kullanıyoruz. “Environment” kısmında ise bazı parametreleri belirledik. Son üç parametreye bakacak olursak:

  • BOOTSTRAP_SERVERS: Kafka cluster’a ilk bağlantıyı sağlayabilmek adına göstereceğimiz host ve port bilgisi.
  • GROUP_ID: Kafka Connect cluster’larını tekil bir şekilde ayırd etmemizi sağlayan ID.
  • CONFIG_STORAGE_TOPIC: Kafka Connect servislerinin config bilgilerini tutan kafka topic’i.
  • OFFSET_STORAGE_TOPIC: Kafka Connect servislerinin offset bilgilerini tutan kafka topic’i.
  • KEY_CONVERTER: Connector anahtarlarını Kafka’da saklama formatına dönüştüren Java class’ı.
  • VALUE_CONVERTER: Connector değerlerini Kafka’da saklama formatına dönüştüren Java class’ı.
  • INTERNAL_KEY_CONVERTER: Connector’ün offset ve configuration anahtarlarını Kafka’da saklama formatına dönüştüren Java class’ı.
  • INTERNAL_VALUE_CONVERTER: Connector’ün offset ve configuration değerlerini Kafka’da saklama formatına dönüştüren Java class’ı.
  • ADVERTISED_HOST_NAME: Worker’lara verilecek olan host adı.
  • ADVERTISED_PORT: Worker’lara verilecek olan port numarası.

“Volumes” kısmı ise, verinin kaydedileceği disk dosyasını belirtiyor. Bu dizin aynı zamanda host sistem ve container arasında da dosya alışverişini sağlayacak. Docker container’larından veriyi diske kaydedebilmek adına bu bilgiyi de konfigürasyonda sağlıyoruz.

Şimdi bu container’lar ile beraber KafkaConnect container’ını da ayağa kaldıralım:

docker-compose up -d

Üstteki komut ile beraber “docker-compose.yml” dosyasında bilgileri geçen container’ları ayağa kaldırmış bulunuyoruz. Daha doğrusu, diğerleri zaten ayaktaydı, bunlara KafkaConnect eklendi. Ayakta olan container’lara bakacak olursak:

docker ps

Üstteki komut vasıtasıyla ayaktaki container listesini çağırdık:

Gördüğünüz üzere, en üstte “connect” ismiyle beraber KafkaConnect container’ı da aktifleşti.

Logları da kontrol ederek herhangi bir problem ya da uyarı var mı tespit edelim. Aşağıdaki kod ile bunu gerçekleştirebiliriz:

docker-compose logs -f --tail=50

Sonuca göz atacak olursak:

Görüleceği üzere herhangi bir problem bulunmuyor, loglarda sadece girilmeyen parametreler ile ilgili konfigürasyon uyarıları görüyoruz. Dolayısıyla devam edebiliriz.

Buraya kadar ne yaptığımızı kısaca özetlersek, KafkaConnect container’ını ayağa kaldırmak için gerekli docker imajını indirdik, ve “docker-compose” üzerinden bir konfigürasyon dosyası(docker-compose.yml) vasıtasıyla diğer container’lar ile beraber ayağa kaldırdık. Tüm container’ların aktif olabilmesi için tek bir konfigürasyon dosyası ve docker-compose komutları bizim için yeterli. Son aşamada da loglardan durumu kontrol ettik.

2. Adım: Cassandra için Sink Connector’un İndirilmesi ve Hazırlanması

Bir sonraki adımda, Cassandra için “sink” connector’u indirerek kurulumunu gerçekleştirmemiz gerekiyor. Hatırlayalım, “sink” connector dememizin sebebi Cassandra’nın burada hedef olması. Tam tersi kaynak da olabilirdi. Ben burada Cassandra connector’u olarak DataStax firmasının açık kaynak kodlu ürününü kullanacağım. Elbette başka alternatifler de mevcut, fakat kullandığınız connector’un lisanslama durumunu gözden geçirmeyi unutmayın.

Aşağıdaki kod ile DataStax‘ın websitesine bağlanarak connector dosyasını tarball formatında indirelim.

wget https://downloads.datastax.com/kafka/kafka-connect-cassandra-sink.tar.gz

İndirme işlemi tamamlandıktan sonra, aşağıdaki komut ile tarball dosyası içerisinden çıkartalım:

tar -zxf kafka-connect-cassandra-sink.tar.gz

Bir sonraki aşamada ise, tarball’ın içerisinden çıkmış olan dosyanın içindeki jar dosyasını “server_side_connect_data” klasörüne kopyalayalım. Bu bizim KafkaConnect için “volumes” kısmında belirttiğimiz dosyalardan biriydi.

cp kafka-connect-cassandra-sink-1.4.0/kafka-connect-cassandra-sink-1.4.0.jar /var/lib/docker/volumes/server_side_connect_data/_data

Elbette üstteki kodda, sizin kendi klasörünüzün olduğu dizini ayarlamanız gerekiyor. docker-compose dosyası içerisinde, oluşturduğumuz volume için spesifik bir dosya yolu belirtmediğimiz için, direkt olarak docker/volumes dizininde oluşturdu. Bu dizin ise /var/lib dizini altında yer alıyor. Biz de bu yüzden tarball’dan çıkan jar dosyasını o dizine kopyaladık. Bu adımı da bitirdikten sonra, entegrasyonun gerçekleşmesi aşamasına geçelim.

3. Adım: Kafka Üzerinde Yeni bir Topic Oluşturulması

Öncelikle, datayı “publish” edeceğimiz bir Kafka topic’i oluşturalım. Burada üreteceğimiz event datasının, gerçek zamanlı bir şekilde Cassandra üzerinde oluşturacağımız tabloya düşmesini izleyeceğiz.

Aşağıdaki kod ile kafka topic’ine bağlanıyorum:

docker exec -it kafka /bin/bash

Bağlandıktan sonra, yeni bir topic oluşturmak üzere aşağıdaki kodu çalıştırıyorum:

kafka-topics.sh -create -topic json_example -zookeeper zookeeper:2181 -partitions 1 -replication-factor 1

JSON formatında bir veri alışverişi yapacağım için “json_example” adında bir topic oluşturdum. Partition ve replication sayılarına da 1 verdim. Zaten sadece 1 adet broker’ımız olduğu için, daha fazla replication belirtemiyoruz. Partition için ise, ufak bir veri ile test gerçekleştireceğimizden dolayı, 1 rakamı bizim için yeterli olacaktır.

Komutun çıktısını inceleyelim:

Gördüğünüz gibi topic oluşturulmuş durumda. Devam edelim.

Bir Kafka topic’i oluşturduğumuz gibi, bir de Cassandra tablosu oluşturmamız gerekiyor. Bu tabloda, bize topic’ten gelen datayı tutacağız.

Kafka container’ından çıkıp, Cassandra container’ına aşağıdaki kod bloğu ile bağlanalım:

exit
docker exec -it cassandra /bin/bash
cqlsh -u cassandra -p cassandra

Üstteki kod bloğunda öncelikle Kafka container’ından çıkış yaptıktan sonra, Cassandra container’ına bağlandık ve daha sonra “cqlsh” komutuyla beraber CQL(Cassandra Query Language) shell’e bağlandık.

4. Adım: Cassandra Üzerinde Yeni bir Keyspace ve Tablo Oluşturulması

Bir sonraki aşamada, öncelikle datayı INSERT edeceğimiz tabloyu tutacak olan KEYSPACE’i oluşturalım. Buradaki KEYSPACE’i, şimdilik bir server üzerindeki bir veritabanı olarak düşünebiliriz. Aşağıdaki kod ile bunu gerçekleştirelim:

CREATE KEYSPACE kconnect_json WITH REPLICATION = {'class': 'NetworkTopologyStrategy', 'datacenter1': 1};

KEYSPACE ismini “kconnect_json” olarak seçtik ve replication kısmında ise iki parametre belirledik. Birincisi replication stratejisi olarak Network Topology’nin kullanılmasını söylüyor ve ikincisi ise datacenter detayını bildiriyor. KEYSPACE’i inceleyelim:

DESCRIBE KEYSPACE kconnect_json

Sonuç:

Görüldüğü gibi, girdiğimiz parametrelerin detaylarını burada da görebiliyoruz.

Sıradaki işlem bu KEYSPACE içerisinde bir tablo oluşturmak. Aşağıdaki kod bloğu ile bunu da gerçekleştirelim:

USE kconnect_json;
CREATE TABLE musteri (id TEXT PRIMARY KEY, isim TEXT, soyisim TEXT, adres TEXT);

Öncelikle ilgili KEYSPACE’i seçip, tabloyu oluşturduk. PK(Primary Key) alanı olarak da id’yi seçtik. Sonucu görelim:

DESCRIBE TABLE musteri

DESCRIBE komutu bizim sağladığımız detaylar dışında “default” davranışta görülen bazı parametreleri de belirtiyor.

5. Adım: Cassandra Connector’un Oluşturulması ve Entegrasyonun Tamamlanması

Evet bu adımlardan sonra, elimizde eventleri basabileceğimiz bir Kafka topic’i ve bu eventleri akıtabileceğimiz bir Cassandra keyspace ve bunun içerisinde de bir tablo bulunuyor. Bundan sonraki adım, bunların ikisini birbirine bağlayıp entegrasyonu tamamlamak olacak.

Öncelikle, connector jar dosyasının Kafka Connect container’ına gittiğini teyit edelim. Hatırlayalım, o dosyayı direkt olarak “volumes” kısmı altında belirttiğimiz dizine kopyalamıştık, dolayısıyla container içerisinde de karşılık gelen dizinde direkt bulunmasını bekliyoruz.

docker exec -it connect /bin/bash
cd /kafka/connect
ls

Kafka Connect container’ına giriş sağladıktan sonra, docker-compose.yml dosyasında connect container’ı için “volumes” alanında belirttiğim dizine geçiş yaptım. Dosyada “volumes” bölümünün syntax’ı, host:container şeklinde. Dolayısıyla “:” ifadesinin solundaki bölüm host bilgisayarı, sağındaki ise container dizinini ifade ediyor. Biz de o dizine geçiş yaparak oradaki dosyaları listeliyoruz.

Sonuç:

Görüleceği üzere, jar dosyamız burada mevcut.

Öncelikle, Cassandra sink connectorünü Kafka Connect üzerindeki connector’lere kaydederek eklememiz gerekiyor. Bunu yapmanın birden fazla yolu var, fakat en yaygın olanı bir HTTP isteği vasıtasıyla gerçekleştirmek. Bu connector’un konfigürasyonunu bir HTTP request atarak Kafka Connect’e bildireceğiz, sonrasında bu connector orada oluşacak.

Konfigürasyonu aşağıdaki gibi oluşturalım:

{
  "name": "cassandra-json-sink",
  "config": {
    "connector.class": "com.datastax.oss.kafka.sink.CassandraSinkConnector",
    "tasks.max": "1",
    "topics": "json_example",
    "contactPoints": "cassandra",
    "loadBalancing.localDc": "datacenter1",
    "port": 9042,
    "auth.username": "cassandra",
    "auth.password": "cassandra",
    "topic.json_example.kconnect_json.musteri.mapping": "id=key, isim=value.isim, soyisim=value.soyisim, adres=value.adres",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "value.converter": "org.apache.kafka.connect.storage.StringConverter",
    "key.converter.schemas.enable": false,
    "value.converter.schemas.enable": false
  }
}

Konfigürasyon detaylarını JSON formatı halinde hazırladık. Connector’un ismini belirledikten sonra, “config” adı altında bazı parametreler verdik. Önemli bazı parametreler:

  • connector.class: “jar” dosyası içerisinde bulunan connector’un Java class’ı.
  • topics: Event’leri basacak olduğumuz Kafka topic’i.
  • contactPoints: Bağlanacağımız server/container.
  • loadBalancing.localDc: KEYSPACE’i oluştururken kullandığımız datacenter bilgisi.
  • port: Cassandra contariner portu.
  • auth.username: Bağlantıda kullanılan kullanıcı adı.
  • auth.password: Bağlantıda kullanılan şifre
  • topic.json_example.kconnect_json.musteri.mapping: Topic’ten Cassandra tablosuna mapping bilgileri.

Bunlar dışında kalan parametreler opsiyonel, başka parametreler de ekleyip çıkartabilirsiniz. Detayları için de DataStax Cassandra Sink connector sayfasını inceleyebilirsiniz.

Bu konfigürasyon dosyasını, bir HTTP request ile Kafka Connect connector’leri arasına kaydedeceğimizi söylemiştik. Biz bu isteği gönderdiğimizde, Kafka Connect ilgili java class’ını bulabilmek adına contariner içerisinde “/kafka/connect” dizini altına bakacak ve bu class’ın bilgisini oraya koymuş olduğumuz “jar” dosyasından alacak. Eğer bu bilgiyi bulamazsa, hata verecektir. Fakat unutmayın ki “docker-compose.yml” dosyasında “connect” container’ı için “PLUGIN_PATH” parametresini kullanarak bu dizini değiştirebilirsiniz.

İlgili HTTP request’i atarak connector’u kaydedelim(Hala connect container’ı içerisindeyiz):

curl -X POST -H 'Content-Type: application/json' -d '{"name":"cassandra-json-sink","config":{"connector.class":"com.datastax.oss.kafka.sink.CassandraSinkConnector","tasks.max":"1","topics":"json_example","contactPoints":"cassandra","loadBalancing.localDc":"datacenter1","port":9042,"auth.username":"cassandra","auth.password":"cassandra","topic.json_example.kconnect_json.musteri.mapping":"id=key, isim=value.isim, soyisim=value.soyisim, adres=value.adres","key.converter":"org.apache.kafka.connect.storage.StringConverter","value.converter":"org.apache.kafka.connect.storage.StringConverter","key.converter.schemas.enable":false,"value.converter.schemas.enable":false}}' http://172.20.0.4:8083/connectors

Bahsettiğimiz isteği gönderdik, JSON konfigürasyon bilgilerini de veri olarak bu isteğin içerisine gömdük. Sonucu görelim:

Connector’un oluşturulduğu bilgisini görüyoruz. Aşağıdaki kod ile de statüsünü görelim:

curl -X GET "http://172.20.0.4:8083/connectors/cassandra-json-sink/status"

Sonuca bakacak olursak:

Gördüğünüz gibi connector çalışır şekilde ve kullanılmaya hazır.

Artık son aşamaya geldik. Burada, yaratmış olduğumuz Kafka topic’ine event datasını producer console vasıtasıyla basarak, Cassandra’ya gelip gelmediğini kontrol edeceğiz. Aşağıdaki kod bloğuyla topic’leri listeyelim ve kullanacağımız topic’i hatırlayalım:

exit
docker exec -it kafka /bin/bash

Kafka Connect container’ından çıkarak Kafka container’ına geldim.

kafka-topics.sh --list --zookeeper zookeeper:2181

Sonuç:

Evet “json_example” topic’i görülüyor. Şimdi bu topic içerisine event verisini basalım:

kafka-console-producer.sh --broker-list localhost:9092 --topic json_example --property "parse.key=true" --property "key.separator=:"
>abc:{"isim":"john", "soyisim":"doe", "adres":"USA"}
>efg:{"isim":"raven", "soyisim":"fo", "adres":"TR"}

İlk satırı girdikten sonra, Kafka console producer bize mesaj verisini girebilmemiz için bir konsol açıyor. Biz de 2 farklı kayıttan oluşan bir JSON verisi girdik.

Aynı veriyi, şimdi Cassandra tarafındaki tabloda kontrol edelim ve oraya düşüp düşmediğini görelim:

exit
docker exec -it cassandra /bin/bash
cqlsh -u cassandra -p cassandra

Cassandra container’ına geçiş yaptık. ve CQL konsolu açtık. Sorguyu çekelim:

USE kconnect_json;
SELECT * FROM kconnect_json.musteri;

Sonuç:

Evet görüleceği üzere, veri Cassandra üzerinde yarattığımız tabloya düşmüş durumda.

Aslında burada, Cassandra tablosu bizim Kafka topic’imiz için bir “consumer” ya da “subscriber” konumundadır. Bu da demek oluyor ki Kafka Connect üzerinde oluşturmuş ve konfigüre etmiş olduğumuz Cassandra sink connector vasıtasıyla, Cassandra üzerindeki “musteri” tablomuz, Kafka broker’ı üzerindeki “json_example” topic’ini gerçek zamanlı bir şekilde dinlemektedir. Bu topic üzerinde ne zaman yeni bir event/mesaj oluşsa, yine gerçek zamanlı bir şekilde Cassandra’daki tabloya düşecektir.

NOTLAR:

  1. Gerek Kafka Connect Docker imajı konusunda, gerekse Cassandra connectorleri konusunda piyasada birden fazla alternatif ve entegrasyon için de birden fazla metod bulunmaktadır. Siz de bu işlemleri kendiniz gerçekleştirmek istediğinizde, farklı yollardan gidebilirsiniz.
  2. Biz bu makalede sadece entegrasyon tarafına odaklandık, Kafka veya Cassandra’nın detaylarına girmedik. Elbette onların kurulumları ve konfigürasyonları da ayrı konular, dolayısıyla bu hususta makaledeki linklerden faydalanabilirsiniz.

KAYNAKÇA:

https://hub.docker.com/r/debezium/connect

https://docs.datastax.com/en/kafka/doc/kafka/kafkaIntro.html

https://docs.lenses.io/4.2/integrations/connectors/stream-reactor/sinks/cassandrasinkconnector/

http://cloudurable.com/blog/kafka-tutorial-kafka-from-command-line/index.html