일정 기간 데이터를 모아서 주기적으로 모델을 학습하거나 batch 묶음으로 한번에 예측을 하는 경우를 batch serving이라고 한다. Apache Airflow를 활용하면 scheduler를 통해 주기적인 학습 자동화 혹은 batch단위 예측을 구축할 수 있다.
1. DAGs (Directed Acyclic Graphs)
airflow에서 스케쥴링할 작업을 DAG라고 부른다.
작업이 하나의 노드인셈이다. DAG란 여러 작업들이 그래프로 구성된 파이프라인인 셈이다.
2. Airflow 설치
// 가상환경 생성
python -m venv .venv
source .venv/bin/activate
// airflow 설치
pip3 install pip --upgrade
AIRFLOW_VERSION=2.6.3
PYTHON_VERSION="$(python --version | cut -d " " -f 2 | cut -d "." -f 1-2)"
CONSTRAINT_URL="https://raw.githubusercontent.com/apache/airflow/constraints-${AIRFL OW_VERSION}/constraints-${PYTHON_VERSION}.txt"
pip3 install "apache-airflow==${AIRFLOW_VERSION}" --constraint "${CONSTRAINT_URL}"
// airflow home directory 변경
export AIRFLOW_HOME=`pwd`
echo $AIRFLOW_HOME
// airflow에서 사용할 DB초기화
airflow db init
// airflow 계정 생성
airflow users create \
--username admin \
--password '!xxx!' \
--firstname xxx \
--lastname xxx \
--role Admin \
--email xxx@gmail.com
// airflow webserver 실행
airflow webserver --port 8080
3. Airflow 스케쥴러 실행
source .venv/bin/activate
export AIRFLOW_HOME=`pwd`
airflow scheduler
4. Airflow DAG 작성
AIRFLOW_HOME 디렉토리에 DAG를 저장할 폴더를 생성한다. 이름은 반드시 dags여야 한다.
mkdir dags
여기에 아래 예시 .py 스크립트를 생성한다.
# hello_world.py
from datetime import timedelta
from airflow import DAG
from airflow.utils.dates import days_ago
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator
def print_world() -> None:
print("world")
# with 구문으로 DAG 정의를 시작합니다.
with DAG(
dag_id="hello_world", # DAG의 식별자용 아이디입니다.
description="My First DAG", # DAG에 대해 설명합니다.
start_date=days_ago(2), # DAG 정의 기준 2일 전부터 시작합니다.
schedule_interval="0 6 * * *", # 매일 06:00에 실행합니다.
tags=["my_dags"],
# catchup, # 유용한 인자 중 하나이다. 궁금하면 찾아봐라
# depends_on_past # 유용한 인자 중 하나이다. 궁금하면 찾아봐라
) as dag:
# 테스크를 정의합니다.
# bash 커맨드로 echo hello 를 실행하는 task입니다.
t1 = BashOperator(
task_id="print_hello",
bash_command="echo Hello",
owner="heumsi", # 이 작업의 오너입니다. 보통 작업을 담당하는 사람 이름을 넣습니다.
retries=3, # 이 테스크가 실패한 경우, 3번 재시도 합니다.
retry_delay=timedelta(minutes=5), # 재시도하는 시간 간격은 5분입니다.
)
# 테스크를 정의합니다.
# python 함수인 print_world를 실행합니다.
t2 = PythonOperator(
task_id="print_world",
python_callable=print_world,
depends_on_past=True,
owner="heumsi",
retries=3,
retry_delay=timedelta(minutes=5),
)
# 테스크 순서를 정합니다.
# t1 실행 후 t2를 실행합니다.
t1 >> t2
저장 후 airflow 웹 콘솔에서 DAGs에 들어가보면 작업이 추가된 것을 볼 수 있다.
우리가 만든 DAG가 두번 돈 것이 기록되어있다. 총 2task가 순차적으로 실행된다. 각 날짜의 결과를 볼 수도 있다.
5. Operator
PythonOperator : 파이썬 함수를 실행할 수 있도록하는 Operator이다. 바로 위 예제에서 생성한 python함수를 task로 만들때 사용했다.
BashOperator : 위 예시에서 마찬가지로 살펴보았다. shell 스크립트나 scala파일을 실행할 수 있도록 한다.
DummyOperator : 아무것도 실행하지는 않지만 DAG내에 여러 Task들의 작업 SUCCESS를 기다려야할 때 추가할 수 있다.
B, C, D의 완료를 기다려야할 때 Dummy task인 E를 만들어 기다리도록 할 수 있다.
SimpleHttpOperator : http 요청을 보내야할 때 활용할 수 있다. 물론 PythonOperator가 실행하는 python함수 안에서 request를 할수도 있다.
BranchPythonOperator : 예를들어 학습 결과가 기존 모델보다 좋으면 업데이트 좋지 않으면 저장하고 싶을때 활용할 수 있다. 즉 특정 조건에 따라 실행을 제어할 수 있다.
6. Connection
connection을 활용하면 DAG이 실패했을 때 알림을 slack에 자동으로 보내도록 설정할 수 있다.
// Airflow Slack Provider 설치
pip3 install 'apache-airflow-providers-slack[http]'==8.6.0
// slack api key 발급, webhook url 생성
// airflow webhook console에 webhook을 등록하는 과정을 거친다.
// dags/utils/slack_notifier.py 생성, task가 실패했을 때 알림 코드이다.
// 알림코드 예시 : https://github.com/zzsza/Boostcamp-AI-Tech-Product-Serving/blob/main/01-batch-serving(airflow)/dags/05-python-operator-with-slack-noti.py
// 자세한 방법은 검색을 해보자.
7. XCom
dag를 구성하는 task 끼리 return값을 주고받을 수 없다. 대신 airflow에서는 XCom으로 push하여 return값을 저장해두면 다른 task가 pull해올 수 있도록 되어있다.
'boostcamp AI tech > boostcamp AI' 카테고리의 다른 글
[Math] Viewing Deep Learning From Maximum Likelihood Estimation Perspective (0) | 2024.09.25 |
---|---|
Docker (1) | 2024.03.06 |
Product Serving 관련 용어 정리 (0) | 2024.02.26 |
Closed Book Question Answering (0) | 2024.02.22 |
Passage Retrieval - Scaling up, FAISS (0) | 2024.02.19 |