سیستم مهندسی داده انتها به انتها بر روی داده‌های واقعی با Kafka، Spark، Airflow، Postgres و Docker

بررسی اجمالی

این مقاله بخشی از پروژه‌ای است که به دو فاز اصلی تقسیم می‌شود. فاز اول بر ساخت یک پایپ لاین داده تمرکز دارد. این شامل دریافت داده‌ها از یک API و ذخیره آن در یک پایگاه داده PostgreSQL است. در فاز دوم، برنامه‌ای را توسعه خواهیم داد که از یک مدل زبانی برای تعامل با این پایگاه داده استفاده می‌کند.

ایده‌آل برای کسانی که تازه وارد سیستم‌های داده یا برنامه‌های مدل زبانی شده‌اند، این پروژه به دو بخش تقسیم می‌شود:

  • این مقاله اولیه شما را در ساخت یک پایپ لاین داده با استفاده از Kafka برای جریان داده، Airflow برای هماهنگ‌سازی، Spark برای تبدیل داده و PostgreSQL برای ذخیره‌سازی راهنمایی می‌کند. برای راه‌اندازی و اجرای این ابزارها از Docker استفاده خواهیم کرد.
  • مقاله دوم که بعداً منتشر خواهد شد، به ایجاد عوامل با استفاده از ابزارهایی مانند LangChain برای ارتباط با پایگاه‌های داده خارجی می‌پردازد.

این بخش اول پروژه برای مبتدیان در مهندسی داده، و همچنین برای دانشمندان داده و مهندسان یادگیری ماشین که به دنبال تعمیق دانش خود در کل فرآیند مدیریت داده هستند، ایده‌آل است. استفاده مستقیم از این ابزارهای مهندسی داده مفید است. این به پالایش ایجاد و گسترش مدل‌های یادگیری ماشین کمک می‌کند و اطمینان می‌دهد که آنها به طور موثر در تنظیمات عملی عمل می‌کنند.

این مقاله بیشتر بر کاربرد عملی تمرکز دارد تا جنبه‌های نظری ابزارهای مورد بحث. برای درک دقیق نحوه عملکرد داخلی این ابزارها، منابع عالی بسیاری به صورت آنلاین در دسترس است.

بررسی اجمالی

بیایید فرآیند پایپ لاین داده را گام به گام تجزیه کنیم:

  1. جریان داده: در ابتدا، داده‌ها از API به یک موضوع Kafka جریان می‌یابند.
  2. پردازش داده: سپس یک کار Spark مسئولیت را بر عهده می‌گیرد، داده‌ها را از موضوع Kafka مصرف کرده و به یک پایگاه داده PostgreSQL منتقل می‌کند.
  3. برنامه‌ریزی با Airflow: هم کار جریان داده و هم کار Spark با استفاده از Airflow هماهنگ می‌شوند. در حالی که در یک سناریوی واقعی، تولیدکننده Kafka به طور مداوم به API گوش می‌دهد، برای اهداف نمایشی، کار جریان داده Kafka را به صورت روزانه برنامه‌ریزی خواهیم کرد. هنگامی که جریان داده کامل شد، کار Spark داده‌ها را پردازش می‌کند و آن را برای استفاده توسط برنامه LLM آماده می‌کند.

تمام این ابزارها با استفاده از داکر و به طور خاص docker-compose ساخته و اجرا می‌شوند.

نمای کلی پایپ لاین داده
نمای کلی پایپ لاین داده. تصویر از نویسنده.

اکنون که یک طرح کلی از پایپ لاین خود داریم، بیایید به جزئیات فنی بپردازیم!

نمای کلی رابط کاربری Kafka
نمای کلی رابط کاربری Kafka. تصویر از نویسنده.
نمای کلی رابط pgAdmin
نمای کلی رابط pgAdmin. تصویر از نویسنده.
اتصال داکر
اتصال داکر. تصویر از نویسنده.
نمای کلی DAG
نمای کلی DAG. تصویر از نویسنده.

Airflow

همانطور که قبلاً گفته شد، Apache Airflow به عنوان ابزار هماهنگ‌سازی در پایپ لاین داده عمل می‌کند. این مسئول برنامه‌ریزی و مدیریت گردش کار وظایف است و اطمینان می‌دهد که آنها به ترتیب مشخص و تحت شرایط تعریف شده اجرا می‌شوند. در سیستم ما، Airflow برای خودکارسازی جریان داده از جریان داده با Kafka تا پردازش با Spark استفاده می‌شود.

Airflow DAG

بیایید نگاهی به گراف جهت دار غیر حلقوی (DAG) بیندازیم که توالی و وابستگی‌های وظایف را مشخص می‌کند و Airflow را قادر می‌سازد تا اجرای آنها را مدیریت کند.

start_date = datetime.today() - timedelta(days=1)  default_args = {    "owner":        "rappel_conso",        schedule_interval="@daily",        default_args=default_args,        catchup=False,        task_id="kafka_stream",        image="python:3.9-slim-buster",        command=["python", "/app/kafka_stream_data.py"],        docker_conn_id="docker_localhost",        volumes=[            "/Users/hamzagharbi/Documents/data-engineering-project/src/kafka_client:/app"        ],        name="kafka_stream_data",        dag=dag,    )    # task to execute the spark_streaming.py file    spark_task = DockerOperator(        task_id="spark_streaming",        image="rappel-conso/spark:latest",        command=["/opt/bitnami/spark/bin/spark-submit",                 "--py-files", "/opt/bitnami/spark/src/constants.py", "/opt/bitnami/spark/spark_streaming.py"],        docker_conn_id="docker_localhost",        name="spark_streaming",        dag=dag,    )    kafka_stream_data >> spark_task

بیایید کلیدهای مختلف مورد استفاده در DAG را توضیح دهیم:

  • start_date نشان دهنده تاریخی است که گردش کار شروع به برنامه ریزی می‌کند. با datetime.today() - timedelta(days=1)، DAG تنظیم می‌شود تا از دیروز شروع به کار کند.
  • schedule_interval="@daily" مشخص می‌کند که DAG باید روزانه اجرا شود.
  • kafka_stream_data یک DockerOperator است که برای اجرای یک کار جریان داده با استفاده از یک تصویر Docker پیکربندی شده است.
    • task_id="kafka_stream": شناسه منحصر به فرد برای این کار.
    • image="python:3.9-slim-buster": تصویر داکری که برای اجرای کار استفاده می‌شود. در این حالت، از یک تصویر پایتون استفاده می‌شود.
    • command=["python", "/app/kafka_stream_data.py"]: دستوری که در داخل کانتینر Docker اجرا می‌شود. این دستور، اسکریپت پایتون kafka_stream_data.py را اجرا می‌کند.
    • docker_conn_id="docker_localhost": اتصال Docker برای اتصال به Daemon Docker.
    • volumes=["/Users/hamzagharbi/Documents/data-engineering-project/src/kafka_client:/app"]: این دستور دایرکتوری محلی را به دایرکتوری /app در کانتینر Docker متصل می‌کند. این به اسکریپت پایتون اجازه می‌دهد تا به فایل‌های مورد نیاز در دایرکتوری محلی دسترسی داشته باشد.
  • spark_task وظیفه اجرای پردازش spark ما را بر عهده دارد. مانند وظیفه kafka، یک DockerOperator است اما از یک تصویر Docker متفاوت (rappel-conso/spark:latest) استفاده می‌کند که با وابستگی‌های مورد نیاز برای اجرای کار Spark پیکربندی شده است.
    • command نشان دهنده دستوری است که برای ارسال کار Spark استفاده می‌شود. به طور خاص، از اسکریپت spark-submit ارائه شده توسط Spark برای ارسال spark_streaming.py استفاده می‌کنیم.
    • ما همچنین از گزینه --py-files برای افزودن فایل‌های پایتون اضافی که کد Spark ما به آن بستگی دارد (در این مورد، constants.py) استفاده می‌کنیم. این فایل‌ها به خوشه Spark ارسال می‌شوند و در حین اجرای کار در دسترس هستند.

راه اندازی Airflow

برای پیکربندی Airflow، از یک docker-compose استفاده می‌کنیم که در زیر آمده است:

version: "3.7"services:  postgres:    image: postgres:9.6    environment:      - POSTGRES_USER=airflow      - POSTGRES_PASSWORD=airflow      - POSTGRES_DB=airflow    logging:      options:        max-size: 10m        max-file: "3"  webserver:    image: apache/airflow:2.7.1    depends_on:      - postgres    environment:      - AIRFLOW__CORE__EXECUTOR=SequentialExecutor      - AIRFLOW__CORE__SQL_ALCHEMY_CONN=postgresql+psycopg2://airflow:airflow@postgres:5432/airflow      - AIRFLOW__CORE__FERNET_KEY=46BKJoQYlPPOexPi9I_hkyzK9f6tR5sCLPdCGgmF7Lk=      - AIRFLOW__SMTP__SMTP_HOST=smtp.gmail.com      - AIRFLOW__SMTP__SMTP_PORT=587      - AIRFLOW__SMTP__SMTP_STARTTLS=True      - AIRFLOW__SMTP__SMTP_SSL=False      - AIRFLOW__SMTP__SMTP_USER=      - AIRFLOW__SMTP__SMTP_PASSWORD=    volumes:      - ./airflow/dags:/opt/airflow/dags      # - ./airflow/logs:/opt/airflow/logs      - ./airflow/config:/opt/airflow/config      - ./plugins:/opt/airflow/plugins    ports:      - "8080:8080"    networks:      - airflow-kafka    restart: always    logging:      options:        max-size: 10m        max-file: "3"networks:  airflow-kafka:    name: airflow-kafka    external: true

کلیدهای مختلف را تجزیه می‌کنیم:

  • سرویس postgres پایگاه داده را تنظیم می‌کند که Airflow از آن برای ذخیره فراداده استفاده می‌کند. از تصویر postgres:9.6 استفاده می‌کند و متغیرهای محیطی را برای پیکربندی اعتبارنامه و پایگاه داده تنظیم می‌کند.
  • سرویس webserver جزء وب Airflow را اجرا می‌کند. به سرویس postgres بستگی دارد و متغیرهای محیطی متعددی را برای پیکربندی Airflow تنظیم می‌کند، از جمله اتصال پایگاه داده (AIRFLOW__CORE__SQL_ALCHEMY_CONN) و کلید Fernet (AIRFLOW__CORE__FERNET_KEY) برای رمزگذاری اتصالات.
  • volumes برای اتصال دایرکتوری‌های محلی به دایرکتوری‌های مربوطه در داخل کانتینر استفاده می‌شود.
  • پورت‌ها: پورت 8080 را برای دسترسی به رابط کاربری Airflow در دسترس قرار می‌دهد.

اکنون سرویس Airflow را شروع می‌کنیم:

docker-compose -f docker-compose-airflow.yaml up -d

دستور زیر برای ایجاد اتصال Docker در Airflow مورد نیاز است:

  1. به رابط کاربری Airflow در http://localhost:8080/ مراجعه کنید و با استفاده از اعتبارنامه‌های پیش‌فرض Airflow (airflow/airflow) وارد شوید.
  2. به تب Admin بروید و روی Connections کلیک کنید.
  3. روی ایجاد (+) کلیک کنید تا یک اتصال جدید اضافه کنید.
  4. ID اتصال را به docker_localhost تنظیم کنید.
  5. نوع اتصال را روی Docker تنظیم کنید.
  6. مقادیر باقی‌مانده را خالی بگذارید، Docker به طور پیش‌فرض از سوکت محلی استفاده می‌کند.
اتصال داکر
اتصال داکر. تصویر از نویسنده.

در نهایت، DAG ما باید چیزی شبیه به این به نظر برسد:

نمای کلی DAG
نمای کلی DAG. تصویر از نویسنده.

Voila! ما با موفقیت تمام اجزای لازم را با استفاده از داکر تنظیم و پیکربندی کرده‌ایم.