Membuat Data Pipeline Sederhana di Windows

Pada artikel ini, saya akan mencoba membuat sebuah data pipeline sederhana pada sistem operasi Windows 11. Bagi teman-teman yang ingin membuat sebuah project sederhana sebagai Data Engineer atau ETL Developer dan hanya ingin menggunakan sistem operasi Windows dapat mengikuti tutorial ini. Namun, saya meng-encourage teman-teman semua untuk eksplorasi bagaimana melakukan orkestrasi pada sistem operasi Linux dan menggunakan Cron atau dengan Apache Airflow.

Alur Data Pipeline

data-pipeline

Pada alur data pipeline di atas, saya akan mengambil data dari PostgreSQL (dalam kasus ini adalah database dari website ini) kemudian dilakukan proses ETL yang kemudian hasilnya di-load ke Google BigQuery sebagai data warehouse di mana semua proses tersebut dilakukan atau diorkestrasi oleh Task Scheduler. Setelah data berhasil tersimpan di data warehouse, akan dilakukan visualisasi datanya menggunakan Looker Data Studio (Google Data Studio).

Seperti yang teman-teman lihat, pada alur data pipeline ini saya menggunakan Google Cloud Platform yaitu Google BigQuery sebagai data warehouse dan Looker Data Studio sebagai data visualisasinya sehingga diharapkan teman-teman membuat terlebih dahulu project di Google Cloud Platform. Untuk project ini, saya hanya menggunakan sandbox gratis yang disediakan Google Cloud Platform dengan segala limitasinya sehingga teman-teman tidak perlu sampai mengaktifkan billing untuk mengikuti tutorial ini.

Membuat Service Account Google Cloud Platform

Sebelum memulai membuat data pipeline, dibutuhkan service account Google Cloud Platform yang akan digunakan untuk melakukan proses load ke Google BigQuery. Untuk membuat service account, teman-teman dapat melakukan langkah berikut ini.

  1. Masuk ke menu IAM & ADMIN pada konsol Google Cloud Platform, pilih Service Accounts dan pilih CREATE SERVICE ACCOUNT.

    gcp-sa

  2. Setelah itu klik service account yang berhasil dibuat. Kemduian pilih KEYS -> ADD KEY -> Create new key dan pilih key type JSON untuk mengunduh kredensial service account dalam bentuk JSON.

    dl-sa

  3. Selanjutnya, tambahkan akses Google BigQuery ke service account dengan cara masuk ke menu IAM & ADMIN pada konsol Google Cloud Platform, pilih IAM dan klik Grant Access. Isi principals dengan service account dan tambahkan roles-nya sebagai BigQuery Admin dan BigQuery Data Transfer Service Agent.

    gcp-grant

Membuat Data Pipeline

Salah satu tantangan seorang Data Engineer atau ETL Developer jika ingin membuat fake project data pipeline untuk portofolio adalah datanya dipakai untuk apa dan harus bagaimana datanya. Untuk mendapatkan jawabannya, kita harus masuk ke ranah user-nya Data Engineer atau ETL Developer yaitu salah satunya adalah Data Analyst.

Sebagai Data Analyst, jika kita ingin membuat portofolio project data analisis, kita maunya menganalisis apa sih? Apakah mau analisis kompleks seperti mengunakan statistika inferensia atau hanya membuat dashboard data visualisasi?.

Pada tutorial kali ini, saya ingin membuat dashboard visualisasi trafik website ini sehingga data yang saya perlukan dan akan dibuat adalah data trafik.

Pratinjau Data Sumber

Setelah menentukan akan membutuhkan data seperti apa yang akan digunakan untuk membuat dashboard trafik website. Saya sudah membuat tabel yang menyimpan data trafik user yang mengunjungi website ini yaitu sebagai berikut.

tabel-trafik

Dari data di atas, sebenarnya saya tidak perlu mentransformasi data terlalu banyak. Seperti cukup mengubah nama kolom date, uri_lvl_0, dan uri_lvl_1 menjadi timestamp, uri_lvl_1, dan uri_lvl_2 serta membuat kolom baru yaitu date dari kolom timestamp. Saya butuh timestamp untuk mengetahui rata-rata user yang mengunjungi website ini setiap jamnya.

Selain mengubah nama kolom dan membuat kolom baru, saya juga menambahkan transformasi yaitu mengubah nilai kolom uri_lvl_1 jika uri_lvl_2 bernilai "koleksi-dataset" atau "belajar-sql". Meskipun dua artikel tersebut masuk ke dalam blog, saya ingin membedakan bukan sebagai blog karena termasuk proyek independen yang sedang saya kerjakan.

Bagi teman-teman, bisa coba cari data lain dan kalau bisa datanya selalu update setiap hari ini karena di sini kita akan membuat data pipeline yang selalu berjalan di setiap jadwal yang telah ditentukan sehingga data warehouse kita juga selalu ter-update apabila ada data baru.

Sebagai contoh, teman-teman bisa coba ambil data video TikTok yang pernah saya jelaskan di artikel sebelumnya dan dilakukan orkestrasi juga sehingga data selalu update setiap harinya.

Membuat Tabel Target di Google BigQuery

Sebelum membuat job ETL, saya akan terlebih dahulu membuat target tabel di Google BigQuery. Pada Google BigQuery, kita akan mengenal hierarki yaitu:

  • project_id
    • datasets
      • tables

karena sebelumnya saya sudah membuat project otomatis akan mendapatkan project_id sehingga yang saya perlukan untuk membuat tabel adalah membuat dataset terlebih dahulu. Pada RDBMS lain, dataset setara seperti database. Untuk membuat dataset, cukup mudah yaitu tinggal masuk ke BigQuery dan klik ikon three dots vertical dan pilih create dataset.

create-dataset

Perlu diingat bahwa saya menggunakan sandbox gratis dari Google Cloud Platform sehingga semua tabel yang nanti dibuat pada dataset hanya akan menyimpan data selama 60 hari. Untuk mencegah tabel tidak hapus setelah 60 hari, teman-teman harus selalu memperbaruhinya.

Pada data pipeline ini, saya akan membuat tabel dengan partisi kolom date. Kenapa sebuah tabel perlu di partisi? Tabel yang dipartisi dibagi menjadi beberapa segmen, yang disebut partisi, yang mempermudah pengelolaan dan kueri data. Dengan membagi tabel besar menjadi partisi yang lebih kecil, kita dapat meningkatkan kinerja kueri dan mengontrol biaya dengan mengurangi jumlah byte yang dibaca oleh kueri.

Untuk membuat tabel, teman-teman bisa menggunakan statement sql di query editor yang telah disediakan Google BigQuery ataupun dengan CLI jika teman-teman meng-install Google Cloud SDK.

CREATE TABLE `yt-analytics-357615.study.web_traffic`
(
  id INT64,
  date DATE,
  uri STRING,
  uri_lvl_1 STRING,
  uri_lvl_2 STRING,
  uri_title STRING,
  session_key STRING,
  timestamp TIMESTAMP
)
PARTITION BY date
OPTIONS(
  partition_expiration_days=60.0,
  require_partition_filter=true
);

Di atas adalah contoh DDL (Data Definition Language) yang saya gunakan untuk membuat tabel web_traffic. Pada kueri tersebut saya menambahkan opsi partition_expiration_days=60.0 agar data yang umurnya lebih dari 60 hari langsung dihapus dan require_partition_filter=true agar untuk mengkueri tabel harus selalu menggunakan filter kolom date.

Membuat Job ETL

Dalam membuat job ETL saya akan menggunakan Python, pustaka sqlachemy dan pustaka Google Cloud yaitu pandas-gbq di mana dia sudah termasuk google-auth yang akan digunakan untuk otentikasi service account. Pustaka pandas-gbq adalah pustaka pandas yang sudah terintegrasi dengan pustaka google-cloud-bigquery sehingga saya tidak perlu menggunakan pustaka pandas dan google-cloud-bigquery secara terpisah. Sedangkan pustaka sqlachemy digunakan untuk menghubungkan Python dengan PostgreSQL sehingga dapat membaca pada database PostgreSQL.

Untuk bagian ETL ini, saya juga meng-encourage teman-teman untuk eksplor menggunakan Apache Spark karena pandas kurang bagus performanya jika data yang diproses memiliki banyak kolom dan record yang banyak. Jika data yang teman-teman gunakan memiliki record lebih dari 1 juta, saya saranin untuk menggunakan Apache Spark. Apache Spark juga tersedia sebagai pustaka Python.

Berikut ini contoh kode ETL yang saya buat.

import logging
import os
from datetime import datetime, timedelta

import pandas as pd
import pandas_gbq as pd_gbq
from dotenv import load_dotenv
from google.oauth2 import service_account
from sqlalchemy import create_engine

load_dotenv()

# Configure logging
logging.basicConfig(
    filename="D:\Project\Contoh ETL\logs\web_traffic.log",
    level=logging.INFO,
    format="%(asctime)s - %(levelname)s - %(message)s",
)


class DataExtractor:
    def __init__(self):
        self.db_url = os.environ.get("DATABASE_URL")
        self.connection = create_engine(self.db_url)
        self.trx_date = (datetime.now() + timedelta(days=-60)).strftime("%Y-%m-%d")

    def extract_data(self):
        try:
            logging.info("Extracting data from the database...")
            query = f"SELECT * FROM public.app_traffic_traffic att WHERE DATE(att.date) >= '{self.trx_date}'"
            df = pd.read_sql_query(query, self.connection)
            return df
        except Exception as e:
            logging.error("An error occurred during data extraction:")
            logging.error(str(e))
            raise


class DataTransformer:
    def __init__(self):
        self.postException = ['koleksi-dataset', 'belajar-sql']

    def transform_data(self, df):
        try:
            logging.info("Transforming data...")
            df["timestamp"] = pd.to_datetime(df["date"])
            df["date"] = df["date"].dt.date
            df = df.rename(columns={"uri_lvl_0": "uri_lvl_1", "uri_lvl_1": "uri_lvl_2"})
            condition_uri_lvl_1_changes = df['uri_lvl_2'].isin(self.postException)
            df.loc[condition_uri_lvl_1_changes, 'uri_lvl_1'] = df.loc[condition_uri_lvl_1_changes, 'uri_lvl_2']
            return df
        except Exception as e:
            logging.error("An error occurred during data transformation:")
            logging.error(str(e))
            raise


class DataLoader:
    def __init__(self, project_id, dataset, table, credentials_file):
        self.project_id = project_id
        self.dataset = dataset
        self.table = table
        self.credentials_file = credentials_file
        self.table_schema = [
            {"name": "date", "type": "DATE"},
            {"name": "timestamp", "type": "TIMESTAMP"},
            {"name": "id", "type": "INT64"},
            {"name": "uri", "type": "STRING"},
            {"name": "uri_lvl_1", "type": "STRING"},
            {"name": "uri_lvl_2", "type": "STRING"},
            {"name": "uri_title", "type": "STRING"},
            {"name": "session_key", "type": "STRING"},
        ]

    def load_data(self, df):
        try:
            credentials = service_account.Credentials.from_service_account_file(
                self.credentials_file
            )

            logging.info("Loading data to BigQuery...")
            pd_gbq.to_gbq(
                dataframe=df,
                table_schema=self.table_schema,
                destination_table=f"{self.dataset}.{self.table}",
                project_id=self.project_id,
                if_exists="replace",  # change to append if want add new data
                credentials=credentials,
            )
        except Exception as e:
            logging.error("An error occurred during data loading:")
            logging.error(str(e))
            raise


# Usage
data_extractor = DataExtractor()
data_transformer = DataTransformer()
data_loader = DataLoader(
    project_id="yt-analytics-357615",
    dataset="study",
    table="web_traffic",
    credentials_file="D:\Project\Contoh ETL\yt-analytics-357615-bfed06808a14.json",
)

try:
    logging.info("Starting the program...")
    df = data_extractor.extract_data()
    df_transformed = data_transformer.transform_data(df)
    data_loader.load_data(df_transformed)
    logging.info("Program completed successfully.")
except Exception as e:
    logging

Pada kode ETL di atas, alih-alih hanya mengambil data pada hari di mana kode tersebut dijalankan dan menambahkannya di Google BigQuery, saya mengambil data di mana kolom date-nya yang lebih besar dari 60 hari yang lalu dan kemudian me-replace data yang sudah ada. Hal ini saya lakukan karena limitasi yaitu komputer saya tidak mungkin hidup 24/7.

Best practice-nya pada sebuah job ETL adalah kita menggunakan email jika terdapat proses yang gagal. Namun, di sini saya menggunakan logging untuk mencatat setiap proses yang dijalankan. Jika teman-teman menggunakan Apache Airflow untuk orkestrasinya, kalian bisa menambahkan email alert jika terdapat proses yang gagal di DAGs-nya.

Orkestrasi dengan Task Scheduler di Windows 11

Sebelum membuat task di Task Scheduler, kita perlu membuat skrip batch dengan ekstensi .cmd sebagai berikut yang nanti akan di jalankan oleh task.

@echo off
start /MIN "" "Python directory" "Python file directory"
@REM Example
@REM start /MIN "" "D:\Project\Contoh ETL\venv\Scripts\python.exe" "D:\Project\Contoh ETL\Web Traffic\etl_traffic_web.py"

Di Windows 11, kita dapat menggunakan Task Scheduler untuk mengatur jadwal skrip yang ingin dieksekusi secara otomatis. Task Scheduler adalah utilitas bawaan Windows yang memungkinkan kita mengotomatiskan berbagai tugas di komputer Anda.

Berikut adalah langkah-langkah untuk membuat jadwal tugas dengan Task Scheduler di Windows 11:

  1. Buka Task Scheduler dengan salah satu cara berikut:
    • Buka Start Menu, cari “Task Scheduler”, dan buka aplikasinya.
    • Tekan tombol Win + R untuk membuka jendela Run, ketik “taskschd.msc”, lalu tekan Enter.

  2. Setelah Task Scheduler terbuka, klik kanan pada “Task Scheduler Library” dan pilih “Create Basic Task” atau “Create Task”.

  3. Di jendela Create Basic Task atau Create Task, isi nama dan deskripsi task, lalu klik “Next”.

  4. Pilih frekuensi penjadwalan tugas. Anda dapat memilih untuk menjalankan task setiap hari, seminggu sekali, setiap bulan, atau penjadwalan kustom. Klik “Next”.

  5. Tentukan tanggal dan waktu penjadwalan tugas. Kita dapat memilih tanggal dan waktu tertentu atau sesuai dengan suatu kondisi, seperti saat komputer dihidupkan. Klik “Next”.

  6. Pilih action yang akan dilakukan oleh task. Pada kasus ini kita akan menjalankan program atau skrip, pilih file cmd yang sudah dibuat sebelumnya, lalu klik “Next”.

  7. Konfirmasi task sebelum menyimpan. Jika semua pengaturan sudah benar, klik “Finish” untuk membuat task.


task-scheduler

Dengan langkah-langkah di atas, kita telah berhasil membuat jadwal sebuah task menggunakan Task Scheduler di Windows 11.

Data Visualisasi

Data visualisasi sebenarnya sudah di luar ranah pekerjaan Data Engineer atau ETL Developer jadi teman-teman bisa eksplor secara mandiri bagaimana membuat dashboard yang menarik menggunakan Looker Data Studio. Untuk koneksi data dari Looker Data Studio dan Google BigQuery juga cukup mudah karena masih satu ekosistem Google.

Berikut ini contoh dashboard Looker Data Studio menggunakan data trafik website yang berhasil saya load ke Google BigQuery.


data-viz

Kesimpulan

Artikel ini ditujukan bagi teman-teman yang ingin membuat fake project sebagai Data Engineer atau ETL Developer pada sistem operasi Windows 11. Pada artikel pun kita sudah belajar:

  • Membuat alur data pipeline yang melibatkan pengambilan data dari PostgreSQL, melakukan proses ETL, dan memuat hasilnya ke Google BigQuery sebagai data warehouse.
  • Membuat Service Account di Google Cloud Platform yang akan digunakan untuk memuat data ke Google BigQuery. Proses ini melibatkan pengaturan izin akses BigQuery Admin dan BigQuery Data Transfer Service Agent.
  • Menggunakan Google Cloud Platform, yaitu Google BigQuery sebagai data warehouse dengan limitasi yang ada seperti data hanya bertahan selama 60 hari menggunakan sandbox gratis dan Looker Data Studio sebagai alat visualisasi data.
  • Menggunakan Task Scheduler di Windows 11 untuk mengorkestrasi eksekusi job ETL secara otomatis. Ini membantu menjaga kekonsistenan dan kebaruan data dalam data pipeline.

Terakhir, saya meng-encourage teman-teman semua untuk terus eksplor dan memperluas pengetahuan kalian dengan mencoba platform dan alat lain seperti Apache Airflow dan Apache Spark untuk membuat data pipeline yang lebih kompleks.