بررسی اجمالی
این مقاله بخشی از پروژهای است که به دو فاز اصلی تقسیم میشود. فاز اول بر ساخت یک پایپ لاین داده تمرکز دارد. این شامل دریافت دادهها از یک API و ذخیره آن در یک پایگاه داده PostgreSQL است. در فاز دوم، برنامهای را توسعه خواهیم داد که از یک مدل زبانی برای تعامل با این پایگاه داده استفاده میکند.
ایدهآل برای کسانی که تازه وارد سیستمهای داده یا برنامههای مدل زبانی شدهاند، این پروژه به دو بخش تقسیم میشود:
- این مقاله اولیه شما را در ساخت یک پایپ لاین داده با استفاده از Kafka برای جریان داده، Airflow برای هماهنگسازی، Spark برای تبدیل داده و PostgreSQL برای ذخیرهسازی راهنمایی میکند. برای راهاندازی و اجرای این ابزارها از Docker استفاده خواهیم کرد.
- مقاله دوم که بعداً منتشر خواهد شد، به ایجاد عوامل با استفاده از ابزارهایی مانند LangChain برای ارتباط با پایگاههای داده خارجی میپردازد.
این بخش اول پروژه برای مبتدیان در مهندسی داده، و همچنین برای دانشمندان داده و مهندسان یادگیری ماشین که به دنبال تعمیق دانش خود در کل فرآیند مدیریت داده هستند، ایدهآل است. استفاده مستقیم از این ابزارهای مهندسی داده مفید است. این به پالایش ایجاد و گسترش مدلهای یادگیری ماشین کمک میکند و اطمینان میدهد که آنها به طور موثر در تنظیمات عملی عمل میکنند.
این مقاله بیشتر بر کاربرد عملی تمرکز دارد تا جنبههای نظری ابزارهای مورد بحث. برای درک دقیق نحوه عملکرد داخلی این ابزارها، منابع عالی بسیاری به صورت آنلاین در دسترس است.
بررسی اجمالی
بیایید فرآیند پایپ لاین داده را گام به گام تجزیه کنیم:
- جریان داده: در ابتدا، دادهها از API به یک موضوع Kafka جریان مییابند.
- پردازش داده: سپس یک کار Spark مسئولیت را بر عهده میگیرد، دادهها را از موضوع Kafka مصرف کرده و به یک پایگاه داده PostgreSQL منتقل میکند.
- برنامهریزی با Airflow: هم کار جریان داده و هم کار Spark با استفاده از Airflow هماهنگ میشوند. در حالی که در یک سناریوی واقعی، تولیدکننده Kafka به طور مداوم به API گوش میدهد، برای اهداف نمایشی، کار جریان داده Kafka را به صورت روزانه برنامهریزی خواهیم کرد. هنگامی که جریان داده کامل شد، کار Spark دادهها را پردازش میکند و آن را برای استفاده توسط برنامه LLM آماده میکند.
تمام این ابزارها با استفاده از داکر و به طور خاص docker-compose ساخته و اجرا میشوند.
اکنون که یک طرح کلی از پایپ لاین خود داریم، بیایید به جزئیات فنی بپردازیم!
Airflow
همانطور که قبلاً گفته شد، Apache Airflow به عنوان ابزار هماهنگسازی در پایپ لاین داده عمل میکند. این مسئول برنامهریزی و مدیریت گردش کار وظایف است و اطمینان میدهد که آنها به ترتیب مشخص و تحت شرایط تعریف شده اجرا میشوند. در سیستم ما، Airflow برای خودکارسازی جریان داده از جریان داده با Kafka تا پردازش با Spark استفاده میشود.
Airflow DAG
بیایید نگاهی به گراف جهت دار غیر حلقوی (DAG) بیندازیم که توالی و وابستگیهای وظایف را مشخص میکند و Airflow را قادر میسازد تا اجرای آنها را مدیریت کند.
start_date = datetime.today() - timedelta(days=1) default_args = { "owner": "rappel_conso", schedule_interval="@daily", default_args=default_args, catchup=False, task_id="kafka_stream", image="python:3.9-slim-buster", command=["python", "/app/kafka_stream_data.py"], docker_conn_id="docker_localhost", volumes=[ "/Users/hamzagharbi/Documents/data-engineering-project/src/kafka_client:/app" ], name="kafka_stream_data", dag=dag, ) # task to execute the spark_streaming.py file spark_task = DockerOperator( task_id="spark_streaming", image="rappel-conso/spark:latest", command=["/opt/bitnami/spark/bin/spark-submit", "--py-files", "/opt/bitnami/spark/src/constants.py", "/opt/bitnami/spark/spark_streaming.py"], docker_conn_id="docker_localhost", name="spark_streaming", dag=dag, ) kafka_stream_data >> spark_task بیایید کلیدهای مختلف مورد استفاده در DAG را توضیح دهیم:
start_dateنشان دهنده تاریخی است که گردش کار شروع به برنامه ریزی میکند. باdatetime.today() - timedelta(days=1)، DAG تنظیم میشود تا از دیروز شروع به کار کند.schedule_interval="@daily"مشخص میکند که DAG باید روزانه اجرا شود.kafka_stream_dataیک DockerOperator است که برای اجرای یک کار جریان داده با استفاده از یک تصویر Docker پیکربندی شده است.task_id="kafka_stream": شناسه منحصر به فرد برای این کار.image="python:3.9-slim-buster": تصویر داکری که برای اجرای کار استفاده میشود. در این حالت، از یک تصویر پایتون استفاده میشود.command=["python", "/app/kafka_stream_data.py"]: دستوری که در داخل کانتینر Docker اجرا میشود. این دستور، اسکریپت پایتونkafka_stream_data.pyرا اجرا میکند.docker_conn_id="docker_localhost": اتصال Docker برای اتصال به Daemon Docker.volumes=["/Users/hamzagharbi/Documents/data-engineering-project/src/kafka_client:/app"]: این دستور دایرکتوری محلی را به دایرکتوری/appدر کانتینر Docker متصل میکند. این به اسکریپت پایتون اجازه میدهد تا به فایلهای مورد نیاز در دایرکتوری محلی دسترسی داشته باشد.
spark_taskوظیفه اجرای پردازش spark ما را بر عهده دارد. مانند وظیفه kafka، یک DockerOperator است اما از یک تصویر Docker متفاوت (rappel-conso/spark:latest) استفاده میکند که با وابستگیهای مورد نیاز برای اجرای کار Spark پیکربندی شده است.commandنشان دهنده دستوری است که برای ارسال کار Spark استفاده میشود. به طور خاص، از اسکریپتspark-submitارائه شده توسط Spark برای ارسالspark_streaming.pyاستفاده میکنیم.- ما همچنین از گزینه
--py-filesبرای افزودن فایلهای پایتون اضافی که کد Spark ما به آن بستگی دارد (در این مورد،constants.py) استفاده میکنیم. این فایلها به خوشه Spark ارسال میشوند و در حین اجرای کار در دسترس هستند.
راه اندازی Airflow
برای پیکربندی Airflow، از یک docker-compose استفاده میکنیم که در زیر آمده است:
version: "3.7"services: postgres: image: postgres:9.6 environment: - POSTGRES_USER=airflow - POSTGRES_PASSWORD=airflow - POSTGRES_DB=airflow logging: options: max-size: 10m max-file: "3" webserver: image: apache/airflow:2.7.1 depends_on: - postgres environment: - AIRFLOW__CORE__EXECUTOR=SequentialExecutor - AIRFLOW__CORE__SQL_ALCHEMY_CONN=postgresql+psycopg2://airflow:airflow@postgres:5432/airflow - AIRFLOW__CORE__FERNET_KEY=46BKJoQYlPPOexPi9I_hkyzK9f6tR5sCLPdCGgmF7Lk= - AIRFLOW__SMTP__SMTP_HOST=smtp.gmail.com - AIRFLOW__SMTP__SMTP_PORT=587 - AIRFLOW__SMTP__SMTP_STARTTLS=True - AIRFLOW__SMTP__SMTP_SSL=False - AIRFLOW__SMTP__SMTP_USER= - AIRFLOW__SMTP__SMTP_PASSWORD= volumes: - ./airflow/dags:/opt/airflow/dags # - ./airflow/logs:/opt/airflow/logs - ./airflow/config:/opt/airflow/config - ./plugins:/opt/airflow/plugins ports: - "8080:8080" networks: - airflow-kafka restart: always logging: options: max-size: 10m max-file: "3"networks: airflow-kafka: name: airflow-kafka external: true کلیدهای مختلف را تجزیه میکنیم:
- سرویس
postgresپایگاه داده را تنظیم میکند که Airflow از آن برای ذخیره فراداده استفاده میکند. از تصویرpostgres:9.6استفاده میکند و متغیرهای محیطی را برای پیکربندی اعتبارنامه و پایگاه داده تنظیم میکند. - سرویس
webserverجزء وب Airflow را اجرا میکند. به سرویسpostgresبستگی دارد و متغیرهای محیطی متعددی را برای پیکربندی Airflow تنظیم میکند، از جمله اتصال پایگاه داده (AIRFLOW__CORE__SQL_ALCHEMY_CONN) و کلید Fernet (AIRFLOW__CORE__FERNET_KEY) برای رمزگذاری اتصالات. volumesبرای اتصال دایرکتوریهای محلی به دایرکتوریهای مربوطه در داخل کانتینر استفاده میشود.- پورتها: پورت 8080 را برای دسترسی به رابط کاربری Airflow در دسترس قرار میدهد.
اکنون سرویس Airflow را شروع میکنیم:
docker-compose -f docker-compose-airflow.yaml up -d دستور زیر برای ایجاد اتصال Docker در Airflow مورد نیاز است:
- به رابط کاربری Airflow در http://localhost:8080/ مراجعه کنید و با استفاده از اعتبارنامههای پیشفرض Airflow (airflow/airflow) وارد شوید.
- به تب Admin بروید و روی Connections کلیک کنید.
- روی ایجاد (+) کلیک کنید تا یک اتصال جدید اضافه کنید.
- ID اتصال را به docker_localhost تنظیم کنید.
- نوع اتصال را روی Docker تنظیم کنید.
- مقادیر باقیمانده را خالی بگذارید، Docker به طور پیشفرض از سوکت محلی استفاده میکند.
در نهایت، DAG ما باید چیزی شبیه به این به نظر برسد:
Voila! ما با موفقیت تمام اجزای لازم را با استفاده از داکر تنظیم و پیکربندی کردهایم.