본문 바로가기

boostcamp AI tech/boostcamp AI

Airflow

728x90

일정 기간 데이터를 모아서 주기적으로 모델을 학습하거나 batch 묶음으로 한번에 예측을 하는 경우를 batch serving이라고 한다. Apache Airflow를 활용하면 scheduler를 통해 주기적인 학습 자동화 혹은 batch단위 예측을 구축할 수 있다.

 

1. DAGs (Directed Acyclic Graphs)

airflow에서 스케쥴링할 작업을 DAG라고 부른다.

작업이 하나의 노드인셈이다. DAG란 여러 작업들이 그래프로 구성된 파이프라인인 셈이다.

 

2. Airflow 설치

// 가상환경 생성
python -m venv .venv
source .venv/bin/activate

// airflow 설치
pip3 install pip --upgrade

AIRFLOW_VERSION=2.6.3
PYTHON_VERSION="$(python --version | cut -d " " -f 2 | cut -d "." -f 1-2)"

CONSTRAINT_URL="https://raw.githubusercontent.com/apache/airflow/constraints-${AIRFL OW_VERSION}/constraints-${PYTHON_VERSION}.txt"

pip3 install "apache-airflow==${AIRFLOW_VERSION}" --constraint "${CONSTRAINT_URL}"

 
// airflow home directory 변경
export AIRFLOW_HOME=`pwd` 
echo $AIRFLOW_HOME

// airflow에서 사용할 DB초기화 
airflow db init


// airflow 계정 생성
airflow users create \
--username admin \
--password '!xxx!' \ 
--firstname xxx \
--lastname xxx \
--role Admin \
--email xxx@gmail.com


// airflow webserver 실행
airflow webserver --port 8080

 

3. Airflow 스케쥴러 실행

source .venv/bin/activate
export AIRFLOW_HOME=`pwd`
airflow scheduler

 

4. Airflow DAG 작성

AIRFLOW_HOME 디렉토리에 DAG를 저장할 폴더를 생성한다. 이름은 반드시 dags여야 한다.

mkdir dags

여기에 아래 예시 .py 스크립트를 생성한다.

# hello_world.py

from datetime import timedelta

from airflow import DAG
from airflow.utils.dates import days_ago
from airflow.operators.bash import BashOperator 
from airflow.operators.python import PythonOperator

def print_world() -> None: 
	print("world")

# with 구문으로 DAG 정의를 시작합니다. 
with DAG(
	dag_id="hello_world", # DAG의 식별자용 아이디입니다. 
    description="My First DAG", # DAG에 대해 설명합니다. 
    start_date=days_ago(2), # DAG 정의 기준 2일 전부터 시작합니다. 
    schedule_interval="0 6 * * *", # 매일 06:00에 실행합니다. 
    tags=["my_dags"],
    # catchup,  # 유용한 인자 중 하나이다. 궁금하면 찾아봐라
    # depends_on_past # 유용한 인자 중 하나이다. 궁금하면 찾아봐라
) as dag:

# 테스크를 정의합니다.
# bash 커맨드로 echo hello 를 실행하는 task입니다. 
t1 = BashOperator(
	task_id="print_hello",
	bash_command="echo Hello",
	owner="heumsi", # 이 작업의 오너입니다. 보통 작업을 담당하는 사람 이름을 넣습니다. 
    retries=3, # 이 테스크가 실패한 경우, 3번 재시도 합니다. 
    retry_delay=timedelta(minutes=5), # 재시도하는 시간 간격은 5분입니다.
)

# 테스크를 정의합니다.
# python 함수인 print_world를 실행합니다. 
t2 = PythonOperator(
	task_id="print_world", 
    python_callable=print_world, 
    depends_on_past=True, 
    owner="heumsi",
	retries=3,
	retry_delay=timedelta(minutes=5), 
)

# 테스크 순서를 정합니다.
# t1 실행 후 t2를 실행합니다. 
t1 >> t2

저장 후 airflow 웹 콘솔에서 DAGs에 들어가보면 작업이 추가된 것을 볼 수 있다.

우리가 만든 DAG가 두번 돈 것이 기록되어있다. 총 2task가 순차적으로 실행된다. 각 날짜의 결과를 볼 수도 있다.

 

5. Operator

PythonOperator : 파이썬 함수를 실행할 수 있도록하는 Operator이다. 바로 위 예제에서 생성한 python함수를 task로 만들때 사용했다.

BashOperator : 위 예시에서 마찬가지로 살펴보았다. shell 스크립트나 scala파일을 실행할 수 있도록 한다.

DummyOperator : 아무것도 실행하지는 않지만 DAG내에 여러 Task들의 작업 SUCCESS를 기다려야할 때 추가할 수 있다.

B, C, D의 완료를 기다려야할 때 Dummy task인 E를 만들어 기다리도록 할 수 있다.

SimpleHttpOperator : http 요청을 보내야할 때 활용할 수 있다. 물론 PythonOperator가 실행하는 python함수 안에서 request를 할수도 있다.

BranchPythonOperator : 예를들어 학습 결과가 기존 모델보다 좋으면 업데이트 좋지 않으면 저장하고 싶을때 활용할 수 있다. 즉 특정 조건에 따라 실행을 제어할 수 있다.

 

6. Connection

connection을 활용하면 DAG이 실패했을 때 알림을 slack에 자동으로 보내도록 설정할 수 있다. 

// Airflow Slack Provider 설치
pip3 install 'apache-airflow-providers-slack[http]'==8.6.0

// slack api key 발급, webhook url 생성
// airflow webhook console에 webhook을 등록하는 과정을 거친다.
// dags/utils/slack_notifier.py 생성, task가 실패했을 때 알림 코드이다.
// 알림코드 예시 : https://github.com/zzsza/Boostcamp-AI-Tech-Product-Serving/blob/main/01-batch-serving(airflow)/dags/05-python-operator-with-slack-noti.py
// 자세한 방법은 검색을 해보자.

 

7. XCom

dag를 구성하는 task 끼리 return값을 주고받을 수 없다. 대신 airflow에서는 XCom으로 push하여 return값을 저장해두면 다른 task가 pull해올 수 있도록 되어있다.

728x90
반응형