Söz konusu büyük veri işleme olduğunda, akla gelen ilk teknolojilerden biri de elbette Spark’tır. Son yıllarda Spark’ın popülerliğini artırmasının çok geçerli nedenleri olduğunu biliyoruz. Bu makalede, Spark’ın DataFrame API’nı kullanarak bazı ETL uygulamaları gerçekleştirecek ve bahsettiğim bu sebeplere de yer yer değineceğiz. Daha sonra bu yazının bir devamı olarak Spark SQL ile ETL işlemlerine değinmek adına ikinci bir bölüm de yayınlayacağım.

Yazı içerisinde:

  1. Örnek bir ETL uygulaması olması açısından PostgreSQL veritabanı içerisinde bulunan bir tabloyu Spark DataFrame olarak okuyacak,
  2. Daha sonra dosya sistemimizde bulunan bir CSV dosyasıyla birleştirerek (joinleyerek) entegre edeceğiz. Sonrasında da elde edilen veriye bir takım transformasyonlar uygulayarak finalde yine bir tablo olarak Spark’ın kendisine kaydedeceğiz.

Bu uygulama Spark üzerinde DataFrame API kullanılarak yapılabilecek olan ETL işlemlerine küçük bir örnek teşkil edecektir. Elbette çok daha kapsamlı işlemler de yapılabilir, ileriki yazılarda zaman olursa bunlara da değinmeye çalışacağım. Ek olarak, Spark işlemlerini de Python üzerinde PySpark kütüphanesini kullanarak gerçekleştireceğiz. Spark, en çok kullanılan programlama dillerinin neredeyse tamamı için API’lar sunmaktadır. Bunlar Java, R, Python, Scala ve SQL olmak üzere 5 adettir. Biz bu makalede ise Python API’ı olan PySpark üzerinden devam edeceğiz.

Daha önceki yazılarda PostgreSQL üzerinde Northwind veritabanını kurmuştuk. Yine bu veritabanındaki tabloları ETL senaryomuzda kullanabiliriz. Spark üzerinden PostgreSQL’a bağlanarak başlayalım.

1.Adım: PostgreSQL’e bağlanılması ve Verinin Spark DataFrame Olarak Okunması

Spark üzerinden PostgreSQL’a bağlanabilmek adına, öncelikle gerekli JDBC jar dosyasını indirerek daha sonra onu Spark classpath’ine eklememiz gerekiyor. Bu jar dosyasının içinde bizim bağlantıda kullanacağımız driver da bulunuyor. Jar dosyasını aşağıdaki linkten indirebilirsiniz:

Jar Dosyası: PostgreSQL JDBC Driver

İndirme işlemini tamamladıktan sonra, spark-shell üzerinde aşağıdaki komutu çalıştırarak classpath’e ekleyelim:

bin/spark-shell --jars Downloads/postgresql-42.2.23.jar

Tabii ki hangi versiyonu indirdiyseniz, jar dosyasının versiyonunu da buna göre ayarlamanız gerekiyor. Ek olarak, “bin” klasörüne direkt olarak gidip “./spark-shell” şeklinde de komutu çalıştırabilirsiniz. Şunu da belirtelim ki, bu üstteki yöntem eğer spark-shell kullanacaksanız geçerli. Eğer direkt olarak shell üzerinde çalışmayacaksanız (ki biz de öyle yapmayacağız) aşağıdaki metod çok daha kullanışlı olacaktır.

PostgreSQL üzerinde Northwind veritabanının tabloları nelerdi bir hatırlayalım. Bir görsel ile gösterecek olursak:

PGAdmin paneli üzerinde sol tarafta Northwind veritabanının tablolarını, sağ tarafta ise örnek olması açısından “Customers” tablosunun verilerini görüyoruz. Biz Spark üzerinden “Northwind” veritabanına bağlanarak, “order_details” tablosunu okuyacak ve onu bir Spark DataFrame olarak kaydedeceğiz.

İlk aşamada, aşağıdaki kod bloğu ile gerekli kütüphaneleri indirip sonrasında Spark Session’ı aktif edelim:

import pyspark
from pyspark.sql import SparkSession

spark = (SparkSession
       .builder
       .appName("Spark-ETL-1")
       .config("spark.jars", "/home/raven/Downloads/postgresql-42.2.23.jar")
       .config("spark.sql.catalogImplementation", "hive")
       .enableHiveSupport()
       .getOrCreate())

Spark session, Spark ile interaktif bir iletişim sağlayabilmek adına oluşturduğumuz bir objedir. Bu obje vasıtasıyla Spark’a istediğimiz operasyonları yaptırabiliyoruz. Spark’ın önceki versiyonlarında, farklı operasyon tipleri için farklı objeler vardı. Örnek olarak SparkContext, SparkConf gibi objeleri verebiliriz. Fakat artık tüm bu nesnelerin yerine SparkSession kullanarak ortak bir kanal vasıtasıyla Spark ile interaktif iletişim halinde olabiliyoruz. “Config” metoduyla belirttiğimiz özelliklerden ilki, postgres veritabanına bağlanırken kullanacak olduğumuz driver dosyasının yerini belirtiyor ve spark’ın jar dosyalarına eklemesini istiyor. İkinci ayar ise Spark tabloları yaratırken işimize yarayacak, dolayısıyla orada değineceğiz.

SparkSession objesini oluşturduktan sonra, bağlantıyı sağlayarak “order_details” tablosunu Spark DataFrame biçiminde okuyalım:

url = "jdbc:postgresql://localhost/Northwind"
table = "public.order_details"
user = "raven"
pw = "637379"
driver = "org.postgresql.Driver"

od_df = (spark
        .read
        .format("jdbc")
        .option("url", url)
        .option("dbtable", table)
        .option("user", user)
        .option("password", pw)
        .option("driver", driver)
        .load())

Kod bloğunu açıklayacak olursak, gerekli url, tablo adı, kullanıcı, şifre ve driver bilgilerini verdikten sonra, Spark’ın “DataFrameReader” arayüzünü kullanarak tabloyu okuyoruz. “Read” metoduna “jdbc” yöntemini kullanmasını söyledikten sonra, üstte oluşturmuş olduğumuz bilgileri de “option” metoduna vererek okuma işlemini gerçekleştiriyoruz.

Veriyi inceleyelim:

od_df.show(10, truncate=False)

Sonuç:

Bir de şemasına bakalım:

od_df.printSchema()

Görüleceği üzere, order_details tablosu başarıyla okunmuş durumda. Veri tipleri ise Spark tarafından belirlendi. Bu belirleme işlemini Spark her kolon için en uygun bulduğu veri tipini seçerek gerçekleştiriyor. Elbette bu okuma işlemini gerçekleştirmeden önce, gelecek olan verinin şemasının önceden Spark’a tanıtılıp, daha sonra okuma işlemini gerçekleştirmek en doğru yöntem olacaktı. Özellikle büyük veri setlerinde hayat kurtaran bu yöntem, Spark’ı verinin şemasını anlamaya çalışma yükünden tamamiyle kurtarır. Fakat burada veri zaten yeterince küçük olduğundan, böyle bir ekstra adıma gerek yoktu.

Veriyi anlamak adına, veri üzerinde bazı işlemler gerçekleştirelim.

2. Adım: Veri Seti Üzerinde Sorgular Çalıştırılması

Örneğin, ürün bazlı satış adetlerine bakabiliriz:

from pyspark.sql import functions as F

(od_df
.select("product_id", "quantity")
.groupBy("product_id")
.agg(F.sum("quantity").alias("ToplamAdet"))
.orderBy("ToplamAdet", ascending=False)
.show(n=10, truncate=False))

“functions” modülünü indirdikten sonra, “product_id” alanına göre gruplayıp, “quantity” alanına göre toplayıp sıralayarak istediğimiz bilgiye ulaştık. Sonuca bakacak olursak:

Ek olarak, sipariş bazında satılan tekil ürün adetlerini de inceleyebiliriz. Bu bilgi bana hangi siparişlerde tekil ürün adedi bazında iyi satış yapıldığı bilgisini sağlayacaktır:

from pyspark.sql.functions import *
from pyspark.sql.types import *

(od_df
.select("order_id", "product_id")
.where(col("order_id").isNotNull() & col("product_id").isNotNull())
.groupBy("order_id")
.agg(F.countDistinct("product_id").alias("TekilUrunAdedi"))
.orderBy("TekilUrunAdedi", ascending=False)
.show(5))

İleride de kullanacağımız için, “functions” ve “types” modüllerini tamamiyle indirdik. Sonrasında ise benzer bir mantıkla “countDistinct” metodunu kullanarak istediğimiz hesaplamayı yaptık. Sonuca bakalım:

Tekil ürün adedi 25 olan “11077” numaralı sipariş, açık ara farkla en çok ürün satılan sipariş olarak gözüküyor.

Elimizdeki veri setinde eksik olan çok kritik bilgilerden biri de, satış tutarı elbette. Birim fiyat, satılan adet ve indirim oranı bilgileri elimizde bulunuyor fakat satış tutarı bilgisine sahip değiliz. Bu bilgi analizler için çok kritik öneme sahip olduğundan, bu bilgiyi de veri setine bir kolon olarak eklemek işimizi kolaylaştıracaktır. Bu tarz bir bilginin, son kullanıcı seviyesinde bir rapor veya analiz hazırlanırken de hesaplanabileceğini düşünüyor olabilirsiniz. Fakat direkt olarak veri setinde hesaplayarak saklamak, son kullanıcılardan dolayı oluşabilecek potansiyel hataları da elimine edecek ve daha sağlam analizler sağlayacaktır. Aşağıdaki kod bloğu ile satış tutarı bilgisini veriye ekleyelim:

od_df_new = (od_df
            .withColumn("SalesAmount", 
                        F.round((col("unit_price")*col("quantity"))*(1-col("discount")), 2))
            .withColumnRenamed("order_id", "order_id_od"))

od_df_new.show(n=10, truncate=False)

İstediğimiz kolonu “withColumn” metodunu kullanarak hesapladık ve virgülden sonra maksimum 2 basamak olacak şekilde de yuvarladık. Ayrıca kullanacağımız diğer veri setiyle karışmaması açısından “order_id” kolonunun adını da “order_details” tablosundan geldiğini belirtmek amacıyla “order_id_od” olarak değiştirdik. Sonuca bakalım:

Satış tutarı kolonu hesaplanmış durumda. Bu kolon sonraki analizlerde kullanılmak adına yeni oluşturulmuş olan Spark DataFrame’e eklendi.

3. Adım: Veri Entegrasyonu

Unutmamak gerekir ki Spark DataFrame nesneleri memory içerisinde dağıtık bir şekilde işlenen tablolardır. Elbette bizim mevcut kurulumumuzda “single node”, yani tek bir makine üzerinde çalışıyoruz. Fakat gerçek hayat senaryolarında Spark bir cluster, yani birden fazla makineden oluşan dağıtık bir sistem üzerinde çalışacak. Bu da demektir ki bu sistem üzerinde tutulan her veri, partition’lar halinde farklı farklı makineler/diskler üzerinde tutulacak.

Üzerinde çalıştığımız “order_details” tablosu bir PostgreSQL veritabanı içerisinde tutuluyor. Fakat bu tabloyu bir Spark DataFrame olarak okuduğumuzda, Spark bunu memory üzerine alıyor ve her makine üzerindeki worker’larına partition’lar şeklinde dağıtıyor. Yani “order_details” tablosu, farklı makinaların memory’leri üzerinde partition’lanmış bir halde dağıtık olarak tutuluyor ve işleniyor. Bu partition işleminin nasıl olacağını kendi istediğimiz şekilde ayarlama opsiyonunu da Spark bize sunuyor.

Eğer işlediğimiz bu data bir veritabanı içerisinde değil de “file system” üzerinde olsaydı, zaten partition’lara ayrılmış ve makinelere dağıtılmış şekilde tutuluyor, hatta en az 3 kopyası da başka makinelerde bulunuyor olacaktı. Bu dosyayı Spark DataFrame olarak okumaya kalktığımızda, her Spark worker’ı kendisine cluster üzerinde en yakın olan (mümkünse) partition’a giderek onu okuyacak ve finalde yine üstteki paragrafta bahsettiğimiz dağıtık ve memory üzerinde işlenen bir tablo olarak karşımıza çıkartacaktı.

Bunu aşağıdaki görsel ile de ifade edebiliriz:

Görsel: O’Reilly

Görselde de anlatılmak istenen, üstte bahsettiğimiz gibi Spark çekirdeklerine yakın olan partition’ların o çekirdekler tarafından memory’e alınması ve dağıtık bir tablo şeklinde işlenmesidir. Spark DataFrame dediğimiz nesne de aslında bundan ibarettir.

Evet, “order_details” verisinden oluşan DataFrame nesnesi hazır. Şimdi bu siparişlerin “parent” yani ebeveyn verisinin tutulduğu “orders” verisini de alıp bu veri ile entegre edelim. Bu şekilde daha çok bilgiye ulaşma imkanımız olacak. “orders” verisi, dosya sistemi üzerinde bir “csv” dosyası olarak bulunuyor. Aşağıdaki kod bloğu ile bunu yine bir Spark DataFrame nesnesi olarak okuyalım:

file = "/home/raven/Desktop/orders.csv"

schema = StructType([StructField("order_id", IntegerType(), True),
                     StructField("customer_id", StringType(), True),
                     StructField("employee_id", StringType(), True),
                     StructField("order_date", DateType(), True),
                     StructField("required_date", DateType(), True),
                     StructField("shipped_date", DateType(), True),
                     StructField("ship_via", IntegerType(), True),
                     StructField("freight", FloatType(), True),
                     StructField("ship_name", StringType(), True),
                     StructField("ship_address", StringType(), True),
                     StructField("ship_city", StringType(), True),
                     StructField("ship_region", StringType(), True),
                     StructField("ship_postal_code", StringType(), True),
                     StructField("ship_country", StringType(), True)])
o_df = (spark
       .read
       .format("csv")
       .option("header", "true")
       .option("mode", "FAILFAST")
       .schema(schema)
       .load(file))

Öncelikle okunacak CSV dosyasının lokasyonunu belirttik, sonrasında ise hangi şemaya göre okumak istediğimizi ekledik. Bunu yapmamızın sebebinden yazının önceki kısımlarında bahsetmiştim. Bu veri elbette ki hala küçük bir veri, bu yüzdek şemayı önden belirtmenin bize pratik olarak bir faydası yok. Fakat yine de göstermek amacıyla, bir veriyi okumadan şema belirtme işlemini de ekledik. Son olarak da “DataFrameReader” arayüzünü kullanarak veriyi Spark DataFrame olarak okuduk. Buradaki “FAILFAST” seçeneği, herhangi bir hata olması durumunda Spark’a okuma işleminden çıkmasını söylüyor.

Veriye bakalım:

(o_df.select("order_id", "customer_id", "employee_id", 
            "shipped_date", "ship_country")
.show(10,truncate=False))

Veride kolon sayısı bir miktar fazla olduğundan, sadece birkaç kolonu seçtim. Sonuç:

Görüleceği üzere her şey yolunda. İki veri setini entegre etmeden önce “orders” tablosunu da 1-2 örnek analiz ile inceleyelim.

1997 yılında Fransa’ya gönderilmiş kargolar en çok hangi çalışan tarafından gerçekleştirildi?

(o_df
.select("employee_id", "order_id")
.where(col("ship_country") == "France")
.where(year(col("shipped_date")) == 1997)
.groupBy("employee_id")
.agg(F.count("order_id").alias("SiparisAdedi"))
.orderBy("SiparisAdedi", ascending=False)
.show(n=10, truncate=False)
)

Sonucu inceleyelim:

Elbette “employee” bilgilerini tutan tablo şuan elimizde olmadığı için, sadece ID’ler üzerinden yorum yaparak 4 numaralı çalışanın en yüksek siparişi aldığını görebiliyoruz.

Bir sorgu daha çekelim. 1996 yılında siparişleri zamanında teslim etme açısından en başarılı kargo şirketlerini bulalım. Bunu yaparken ortalama gecikme gün sayısını kullanacağız:

(o_df
.select("ship_via", "shipped_date", "required_date")
.where(year(col("shipped_date")) == 1996)
.groupBy("ship_via")
.agg(F.round(F.avg(F.datediff("shipped_date", "required_date")), 2)
    .alias("OrtalamaGecikme"))
.orderBy("OrtalamaGecikme", ascending=True)
.show(n=10, truncate=False)
)

Sonucu inceleyelim:

Aynı şekilde yine kargo şirketlerinin adlarına sahip değiliz, fakat 2 numaralı şirketin ortalamada en erken sipariş teslimi yapan şirket olduğunu görebiliyoruz.

Evet, iki veri seti hakkında da yeteri kadar inceleme yaptık, şimdi bunların entegrasyonunu gerçekleştireceğiz. Bu iki farklı Spark DataFrame’i join işlemi yaparak birleştirelim:

joined_df = o_df.join(od_df_new, 
                      o_df.order_id==od_df_new.order_id_od, 
                      how="left_outer")

“join” işleminde öncelikle “orders” tablosunu, sonra “order_details” tablosunu vererek hangi alanlar üzerinden joinlemek istediğimizi söyledik. Son parametrede ise join tipini belirttik.

Sonuca bakacak olursak:

joined_df.printSchema()

Görüleceği üzere son 5 kolon, “order_details” tablosundan gelen kolonlar.

Burada belirtelim ki aslında Spark’a verdiğimiz her operasyon, arka planda bir Job’a dönüştürülür ve bu Job’ın nasıl yürütüleceğinin planı da Spark tarafından bir DAG (Directed Acyclic Graph) çizilerek belirlenir. DAG derken bahsettiğimiz nesne aslında görsel bir yürütme planıdır. Düğümlerden ve dallardan oluşur. Aynı şekilde Job’ların bir alt seviyesi olan “Stage” dediğimiz birimler vardır ve bunlar da DAG’ların bir veya birden fazla düğümünden oluşurlar.

Bu durumu aşağıdaki görsel ile inceleyelim:

Üstte de bahsettiğimiz gibi Spark driver tarafından bir veya birden fazla Job başlatılır ve daha sonra bu jobların DAG’leri çizilir. Bu DAG’lerin hangi sırayla çalışacağı da yine Spark driver tarafından belirlenir. Daha sonra bu DAG’lerin içindeki düğüm noktalarına göre Stage’ler belirlenir. Bir Stage bir veya birden fazla düğüm noktasını kapsayabilir. Spark, Stage’leri oluştururken hangi işlemlerin birbiriyle paralel veya sırayla yapılacabileceklerine bakar ve buna göre karar verir. Stage’ler ise yine bir veya birden fazla Task’tan oluşur. Task’lar bu senaryoda en küçük işlem birimleridir ve Spark operasyonları gerçekleştirilirken Task’lar bazında gerçekleştirilir. Task en son kırılım olduğundan, her Task bir Spark çekirdeğine atanır ve bir partition üzerinde çalışır. Elbette bir Spark çekirdeği aynı anda birden fazla Task üzerinde de çalışabilir, dolayısıyla Task’lar ve çekirdekler arasında bire çok bir ilişki vardır diyebiliriz. Finalde, bir Spark executor’ı, üzerindeki çekirdekten daha fazla Task’ı paralel şekilde yürütebilir ve bu da Spark’ın paralel veri işlemedeki başarılarından biridir.

Bu bilgiyi de verdikten sonra, artık elimizdeki joinlenmiş veri setinden özet çıkartıp, tablo olarak kaydedelim. Bu tablo daha sonra analizlerde veya raporlarda kullanılmak istendiğinde, rahatçe erişilebilir halde olacaktır.

4. Adım: Final Tablonun Oluşturulması ve Kaydedilmesi

Eldeki veri setinden, yıllar bazında her müşterinin sipariş performansını ölçelim. Sipariş performansından kastımız, siparişlere toplamda ne kadar ücret ödediği. Bunları ödedikleri ücret toplamına göre düşük, orta ve yüksek olarak sınıflandıralım. Ek olarak, yine her yıl bazında ödedikleri toplam kargo ücretini de rapora ekleyelim. Ekleyeceğimiz son veri de bu müşterilerin sipariş adedi olsun ki ortalama sipariş değerini de hesaplayarak bu bilgileri kombine edelim. Yani verisetinin son halindeki kolonlar: Müşteri, Yıl, Sipariş Adedi, Toplam Sipariş Tutarı, Sipariş Tutarı Performansı, Ortalama Sipariş Değeri olacak.

Aşağıdaki kod bloğu ile hesaplayalım:

final_df_step1 = (
                  joined_df
                 .select("customer_id", "order_id", "shipped_date", "SalesAmount")
                 .where(col("customer_id").isNotNull() & col("order_id").isNotNull())
                 .groupBy(joined_df.customer_id.alias("Musteri"), year(col("shipped_date")).alias("Yil"))
                 .agg(F.countDistinct("order_id").alias("ToplamSiparisAdedi"), 
                      F.round(F.sum("SalesAmount"), 2).alias("ToplamSiparisTutari"))
                 )

final_df_step2 = (
                  final_df_step1
                 .withColumn("PerformansSinifi", 
                             when(final_df_step1.ToplamSiparisTutari < 500, 'Düşük')
                              .when((final_df_step1.ToplamSiparisTutari >= 500) & (final_df_step1.ToplamSiparisTutari < 1000), 'Orta')
                              .when((final_df_step1.ToplamSiparisTutari >= 1000) & (final_df_step1.ToplamSiparisTutari < 2000), 'Yüksek')
                              .otherwise("Çok Yüksek"))
                 .withColumn("OrtSiparisDegeri", 
                             col("ToplamSiparisTutari")/col("ToplamSiparisAdedi"))
                 .orderBy(["Musteri", "Yil"], ascending=True)
                 )

ETL işlemimizin son bacağını iki adımda gerçekleştirdik. İlk adımda, yıl ve müşteri bazında gruplama işlemini gerçekleştirdikten sonra toplam sipariş adedini ve satış tutarını hesaplayarak kolonları uygun bir şekilde isimlendirdik. İkinci adımda ise, müşterilerin toplam satış tutarlarını inceleyerek onları sınıflandırdık ve performanslarını ölçtük. Ek olarak, ortalama sipariş değerini de toplam satış tutarını toplam sipariş adedine bölerek hesapladık. Finalde elimizde olan tabloyu inceleyelim:

final_df_step2.show(n=20, truncate=False)

Sonuç:

Görüleceği üzere, final veri seti beklediğimiz gibi hesaplanmış durumda. Elbette sadece 20 satırlık bir çıktıyla sınırlamayıp, tüm müşterilerin performansını da görebilirsiniz.

ETL akışımızın son aşaması olarak, bu veri setini bir tablo halinde kaydedelim ve sonradan potansiyel analiz ve raporlamalarda kullanılabilecek halde hazır bulunmasını sağlayalım.

Aşağıdaki kod bloğu ile kaydı gerçekleştiriyoruz:

final_df_step2.write.saveAsTable("Spark_ETL_orders")

DataFrame’i kaydetmek adına bu kez de DataFrameWriter arayüzünü kullandık ve istediğimiz bir isimlendirme tercihiyle beraber bir tablo olarak kaydettik. Bu tablo daha sonra rahatlıkla bir Spark DataFrame olarak ya da bir Spark SQL temporary view olarak okunabilir. Burada şu noktaya değinmekte fayda var, bu şekilde kaydedilen bir DataFrame arka planda bir “managed” Spark tablosu oluşturur. Bu şu demektir, oluşturulan bu tablonun hem metadatası, hem de datanın kendisi Spark tarafından yönetilecektir. Bir de bunun zıttı olarak “unmanaged” tablolar mevcuttur ki bunların da metadatasını yine Spark kendi yönetirken, datanın kendisini ise yönetmez. Bu iki tablo tipi arasındaki en belirgin farklardan biri de “DROP TABLE” komutunu çalıştırdığınızda “managed” tablolar için hem data hem de metadata silinirken, “unmanaged” tablolar için sadece metadata silinecektir ve datanın kendisine bir şey olmayacaktır.

Ek olarak, tabloların metadatası Spark tarafından merkez bir lokasyonda tutulur. Bu lokasyon, varsayılan davranışta “Apache Hive Metastore” olarak seçilir. Fakat elbette bunu değiştirmek de mümkündür. Hatırlarsanız makalenin başında “catalogImplementation” parametresi ile Hive seçmiştik, bu da aslında Hive metastore’u kullanmak istediğimizi belirtiyor. Bu zaten varsayılan davranış, fakat yine de göstermek açısından bunu da ekledim.

Tabii Spark tabloları da unutmamak gerekir ki en nihayetinde “file system” üzerinde yani dağıtık bir halde disk üzerinde saklanan dosyalardır.

Evet, bu makalede Spark ile ETL işlemlerine değinmeye çalıştık ve ufak örnekler üzerinden bir akış gerçekleştirdik. Benzer bir senaryo ile, serinin ikinci makalesinde bu işlemleri Spark SQL kullanarak gerçekleştireceğiz.

Notlar:

  1. Bu yazıda ETL işlemlerini gerçekleştirmek adına Spark’ın DataFrame API’ını kullandık. Bu işlemler benzer şekilde Spark SQL kullanılarak da gerçekleştirilebilir ki, bu da bir sonraki makalenin konusunu oluşturuyor. Yine aynı şekilde bu metodlardan biriyle dahi bu işlemleri farklı yollarla da gerçekleştirebilirsiniz. Biz bu yazıda sadece bir tanesiyle ilerledik.
  2. Spark ile alakalı detaya girdikçe, konuların dallanıp budaklanması çok olası hale geliyor. Dolayısıyla, biz bu yazıda odağımızı kısıtlı tuttuk. Aksi takdirde yazı çok uzun hale gelebilirdi. Her bir detay ayrı bir makale konusu haline dahi gelebilir.

Kaynakça:

Learning Spark: Lightning-Fast Data Analytics by Jules S. Damji, Brooke Wenig, Tathagata Das and Denny Lee

https://spark.apache.org/documentation.html

https://databricks.com/blog/2015/01/09/spark-sql-data-sources-api-unified-data-access-for-the-spark-platform.html