jsdysw 2024. 2. 26. 21:41
728x90

일정 기간 데이터를 모아서 주기적으로 모델을 학습하거나 batch 묶음으로 한번에 예측을 하는 경우를 batch serving이라고 한다. Apache Airflow를 활용하면 scheduler를 통해 주기적인 학습 자동화 혹은 batch단위 예측을 구축할 수 있다.

 

1. DAGs (Directed Acyclic Graphs)

airflow에서 스케쥴링할 작업을 DAG라고 부른다.

작업이 하나의 노드인셈이다. DAG란 여러 작업들이 그래프로 구성된 파이프라인인 셈이다.

 

2. Airflow 설치

// 가상환경 생성
python -m venv .venv
source .venv/bin/activate

// airflow 설치
pip3 install pip --upgrade

AIRFLOW_VERSION=2.6.3
PYTHON_VERSION="$(python --version | cut -d " " -f 2 | cut -d "." -f 1-2)"

CONSTRAINT_URL="https://raw.githubusercontent.com/apache/airflow/constraints-${AIRFL OW_VERSION}/constraints-${PYTHON_VERSION}.txt"

pip3 install "apache-airflow==${AIRFLOW_VERSION}" --constraint "${CONSTRAINT_URL}"

 
// airflow home directory 변경
export AIRFLOW_HOME=`pwd` 
echo $AIRFLOW_HOME

// airflow에서 사용할 DB초기화 
airflow db init


// airflow 계정 생성
airflow users create \
--username admin \
--password '!xxx!' \ 
--firstname xxx \
--lastname xxx \
--role Admin \
--email xxx@gmail.com


// airflow webserver 실행
airflow webserver --port 8080

 

3. Airflow 스케쥴러 실행

source .venv/bin/activate
export AIRFLOW_HOME=`pwd`
airflow scheduler

 

4. Airflow DAG 작성

AIRFLOW_HOME 디렉토리에 DAG를 저장할 폴더를 생성한다. 이름은 반드시 dags여야 한다.

mkdir dags

여기에 아래 예시 .py 스크립트를 생성한다.

# hello_world.py

from datetime import timedelta

from airflow import DAG
from airflow.utils.dates import days_ago
from airflow.operators.bash import BashOperator 
from airflow.operators.python import PythonOperator

def print_world() -> None: 
	print("world")

# with 구문으로 DAG 정의를 시작합니다. 
with DAG(
	dag_id="hello_world", # DAG의 식별자용 아이디입니다. 
    description="My First DAG", # DAG에 대해 설명합니다. 
    start_date=days_ago(2), # DAG 정의 기준 2일 전부터 시작합니다. 
    schedule_interval="0 6 * * *", # 매일 06:00에 실행합니다. 
    tags=["my_dags"],
    # catchup,  # 유용한 인자 중 하나이다. 궁금하면 찾아봐라
    # depends_on_past # 유용한 인자 중 하나이다. 궁금하면 찾아봐라
) as dag:

# 테스크를 정의합니다.
# bash 커맨드로 echo hello 를 실행하는 task입니다. 
t1 = BashOperator(
	task_id="print_hello",
	bash_command="echo Hello",
	owner="heumsi", # 이 작업의 오너입니다. 보통 작업을 담당하는 사람 이름을 넣습니다. 
    retries=3, # 이 테스크가 실패한 경우, 3번 재시도 합니다. 
    retry_delay=timedelta(minutes=5), # 재시도하는 시간 간격은 5분입니다.
)

# 테스크를 정의합니다.
# python 함수인 print_world를 실행합니다. 
t2 = PythonOperator(
	task_id="print_world", 
    python_callable=print_world, 
    depends_on_past=True, 
    owner="heumsi",
	retries=3,
	retry_delay=timedelta(minutes=5), 
)

# 테스크 순서를 정합니다.
# t1 실행 후 t2를 실행합니다. 
t1 >> t2

저장 후 airflow 웹 콘솔에서 DAGs에 들어가보면 작업이 추가된 것을 볼 수 있다.

우리가 만든 DAG가 두번 돈 것이 기록되어있다. 총 2task가 순차적으로 실행된다. 각 날짜의 결과를 볼 수도 있다.

 

5. Operator

PythonOperator : 파이썬 함수를 실행할 수 있도록하는 Operator이다. 바로 위 예제에서 생성한 python함수를 task로 만들때 사용했다.

BashOperator : 위 예시에서 마찬가지로 살펴보았다. shell 스크립트나 scala파일을 실행할 수 있도록 한다.

DummyOperator : 아무것도 실행하지는 않지만 DAG내에 여러 Task들의 작업 SUCCESS를 기다려야할 때 추가할 수 있다.

B, C, D의 완료를 기다려야할 때 Dummy task인 E를 만들어 기다리도록 할 수 있다.

SimpleHttpOperator : http 요청을 보내야할 때 활용할 수 있다. 물론 PythonOperator가 실행하는 python함수 안에서 request를 할수도 있다.

BranchPythonOperator : 예를들어 학습 결과가 기존 모델보다 좋으면 업데이트 좋지 않으면 저장하고 싶을때 활용할 수 있다. 즉 특정 조건에 따라 실행을 제어할 수 있다.

 

6. Connection

connection을 활용하면 DAG이 실패했을 때 알림을 slack에 자동으로 보내도록 설정할 수 있다. 

// Airflow Slack Provider 설치
pip3 install 'apache-airflow-providers-slack[http]'==8.6.0

// slack api key 발급, webhook url 생성
// airflow webhook console에 webhook을 등록하는 과정을 거친다.
// dags/utils/slack_notifier.py 생성, task가 실패했을 때 알림 코드이다.
// 알림코드 예시 : https://github.com/zzsza/Boostcamp-AI-Tech-Product-Serving/blob/main/01-batch-serving(airflow)/dags/05-python-operator-with-slack-noti.py
// 자세한 방법은 검색을 해보자.

 

7. XCom

dag를 구성하는 task 끼리 return값을 주고받을 수 없다. 대신 airflow에서는 XCom으로 push하여 return값을 저장해두면 다른 task가 pull해올 수 있도록 되어있다.

728x90
반응형