본문 바로가기
카테고리 없음

[Airflow] - 2. Template Variable, 데이터 공유, Task 다루기 고급

by 16비트 2023. 6. 16.

Jinja 템플릿 설명

문서(파일)에서  특정  양식으로  작성된  값을  런타임시  실제  값으로  치환해주는  처리  엔진

●   오퍼레이터  파라미터  입력시  중괄호  {} 2개를  이용하면 Airflow에서  기본적으로  제공하는  변수들을  치환된  값으로  입력할  수  있음. (ex: 수행  날짜, DAG_ID)

 

Bash Operator에서 Jinja 템플릿 사용하기

templated될 수 있는 파라미터

- bash_command (str) 
- env (dict[str, str] | None) 

vscode에 dags 파이썬 파일 생성

ds : yyyy-mm-dd 형식

&& : 앞에 있는 커맨드가 성공하면 뒤에 있는 커맨드를 실행하겠다

> git add .

> git commit -m "bash jinja 업로드"

> git push

 

WSL 터미널창에서

$ cd ~/airflow

$ git pull

접속테스트

첫번째 태스크, 두번째 태스크 결과

 

 Airflow의 날짜개념

DAG 파이썬 파일 생성

catchup을 true로 하면서 이전부터 쭉 배치 실행

아무런 값을 넣지 않고 **kwarg만 출력해도 값이 나온다

git push - git pull

접속 테스트

 

Python Operator에서 Jinja 템플릿 사용하기

templated 될 수 있는 파라미터

● op_kwargs 
● op_args
● templates_dict 

DAG 파이썬 파일 생성

함수를 실행시키기만 해도 실행가능한 task가 생성된다

첫번째 태스크 결과, 두번째 태스크 결과

 

Bash Operator with macros

Macro 변수 : Jinja템플릿 내에서 날짜 연산을 가능하게 하는 변수

파이썬의 datetime, dateutil 라이브러리

매월 말일 실행되는 DAG

매월 둘째주 토요일에 수행되는 DAG

> git add .

> git commit -m "날짜 계산"

> git push

 

WSL 터미널창에서

$ cd ~/airflow

$ git pull

접속테스트

매월  말일  수행되는  Dag에서변수  START_DATE: 전월  말일,변수  END_DATE: 어제

매월 둘째주 토요일에 수행되는 Dag에서 변수 START_DATE: 2주 전 월요일, 변수 END_DATE: 2주 전 토요일

 

Python Operator에서 macro 변수 사용하기

●  templated가 가능한 패러미터

- op_kwargs

- op_args
- templates_dict

 

templates_dict의 키 밸류 값을 **kwargs가 받는다

start_date는 전월 1일을 의미한다

end_date는 3월 1일에서 하루를 뺀 2월 28일을 의미한다

start_date의 밸류 값이 파이썬 함수의 start_date 변수로 들어간다

end_date의 밸류 값이 파이썬 함수의 end_date 변수로 들어간다

 

 

그러나  Python 오퍼레이터에서  굳이  macro를  사용할  필요가  있을까? 

날짜  연산을  DAG안에서  직접  할  수  있다면?

import 라이브러리를 사용해서 함수 내에서 직접 연산을 할 수 있다

strftime 함수를 이용해서 yyyy-mm-dd 형식으로 출력할 수 있다

DAG 파이썬 파일에 두개의 task를 실행하고 결과값이 똑같은지 비교한다

 

 

라이브러리를 함수 안에다 작성한 이유 : 스케줄러 부하 경감

스케줄러는 주기적으로 DAG에 오류가 없는지 파싱(검사)한다

> git add .
> git commit -m "Python 오퍼레이터에서 macro 변수 사용"
> git push


WSL 터미널창에서
$ cd ~/airflow
$ git pull
접속테스트

 

Python Operator에서 Xcom 사용

Xcom이란 Airflow DAG안 Task 간 데이터 공유를 위해 사용되는 기술

  • ex) Task1의 수행 중 내용이나 결과를 Task2에서 사용 또는 입력으로 주고 싶은 경우 사용
  • 주로 작은 규모의 데이터 공유를 위해 사용(Xcom 내용은 메타 DB의 xcom 테이블에 값이 저장됨)
  • 1 GB이상의 대용량 데이터 공유를 위해서는 외부 솔루션 사용 필요(AWS S3, HDFS 등)

xcom_push를 통해서 데이터를 넣고 xcom_pull을 통해서 데이터를 꺼낸다

 

 

리턴하면 자동으로 transaction_value 값이 xcom에 등록되고 key로 retrun_value로 자동적으로 등록된다

task_id 값을 통해서 리턴값을 가져올 수 있다

 

파이썬 파일 생성

ti 인스턴스 객체를 얻어서 xcom으로 직접 키값을 넣는다

마지막에 등록된 xcom 값을 가져온다

task_id 값을 입력하면 특정한 task의 xcom의 값을 가져올 수 있다

task_id를 통해 xcom을 return값을 가져옴

status값은 task의 순서에 따라 Success가 들어간다

> git add .
> git commit -m "파이썬 오퍼레이터에서 Xcom 사용"
> git push


WSL 터미널창에서
$ cd ~/airflow
$ git pull
접속테스트

Bash Operator에서 xcom 사용

bash 오퍼레이터에서 echo 출력값은 xcom에 저장된다. 결국 마지막 출력문이 최종 리턴 값으로 xcom에 저장된다

> git add .
> git commit -m "Bash Operator에서 xcom 사용"
> git push


WSL 터미널창에서
$ cd ~/airflow
$ git pull
접속테스트

task2의 xcom 값에 저장된 것이 없다. do_xcom_push=False로 했기 때문

 

Python &Bash 오퍼레이터간 Xcom 사용

파이썬 함수는 return을 push

bash 오퍼레이터는 task_ids 값만 주어도 자동으로 xcom 값을 가져온다

bash 오퍼레이터에서 ti.xcom_push로 KV 명시적 push

 

DAG 파이썬 파일 작성

파이썬 함수로 push하는 task와 bash 오퍼레이터로 pull로 가져오는 task

bash오퍼레이터에서 push하는 task와 파이썬 함수로 pull하는 task 

> git add .
> git commit -m "Python & Bash 간의 xcom 전달"
> git push


WSL 터미널창에서
$ cd ~/airflow
$ git pull
접속테스트

 

Python & email 오퍼레이터간 Xcom 사용

templated가 가능한 이메일 오퍼레이터

- to

- subject

- html_content

- files

 

파이썬 함수로 return해서 xcom에 저장하고 

> git add .
> git commit -m "Python & Email 오퍼레이터간 xcom 전달"
> git push


WSL 터미널창에서
$ cd ~/airflow
$ git pull
접속테스트

 

ramdom함수가 Fail, Success를 선택해서 이메일을 보내기 때문에 DAG를 실행할 때마다 이메일로 랜덤값이 보내진다

 

전역 공유변수 Variable

Xcom : 특정 DAG, 특정 schedule에 수행되는 Task 간에만 공유되는 데이터

Variable : 모든 DAG가 공유할 수 있는 전역 변수

웹 UI에서 Variable을 등록할 수 있다

키 : sample_key,   값 : sample_value

과부하 발생 때문에 오퍼레이터 내부에서 가져오는 방법을 사용해야한다

전역변수 Variable DAG 파이썬 파일 작성

첫 번째 task 파이썬 라이브러리 사용 -> 스케줄러 과부하

두 번째 task 템플릿 사용 -> 권고

> git add .
> git commit -m "전역 공유변수 Variable"
> git push


WSL 터미널창에서
$ cd ~/airflow
$ git pull
접속테스트

 

BranchPython 오퍼레이터로 분기처리하기

선택적으로 task를 실행시키고 싶은 경우

 

Task 분기 처리 방법

1. BranchPythonOperator

2. task.branch 데커레이터 이용

3. BaseBranchOperator 상속하여 직접 개발

 

실행시키고 싶은 task_id를 리턴값으로 준다

여러 개의 task를 실행시키고 싶다면 list로 담아서 준다

 

> git add .
> git commit -m "BranchPythonOperator"
> git push


WSL 터미널창에서
$ cd ~/airflow
$ git pull
접속테스트

 

@task.branch 로 분기처리하기

task.branch 데커레이터 사용하기

함수를 실행시키면 return값으로 task가 실행된다 

> git add .
> git commit -m "@task.branch로 분기처리하기"
> git push


WSL 터미널창에서
$ cd ~/airflow
$ git pull
접속테스트

 

 

BaseBranchOperator 로 분기처리하기

class(설계도)를 만들어야한다. 상속할 부모 클래스명 BaseBranchOperator

공식문서를 참조 : choose_branch 함수를 오버라이딩한다

> git add .
> git commit -m "BaseBranchOperator"
> git push


WSL 터미널창에서
$ cd ~/airflow
$ git pull
접속테스트

 

BranchPython 오퍼레이터, @taks.branch 데커레이터를 주로 이용

 

Trigger Rule 설정하기

하위task 의 실행 옵션을 정하는 Rule

트리거는 하위task에 건다

 

트리거 rule이 all_done으로 설정되어서 하위 task가 모두 done으로 실행되어야 실행되는 task

task2는 예외처리가 되도록 설정해두어도 하위 task가 실행되는지 확인하기

트리거 rule이 none_skipped이므로 상위 task에서 skip이 없어야 task가 실행이 된다

BashOperator에서 task b와 task d를 skip하도록 설정해두어도 하위 task가 실행되는지 확인하기 (실패해야됨)

> git add .
> git commit -m "Trigger Rule 실습"
> git push


WSL 터미널창에서
$ cd ~/airflow
$ git pull
접속테스트

all_done 은 하위 task가 잘 실행되었다

non_skipped는 하위 task가 skip되었다

 

Task groups 실습

Task들의 모음

''' ''' docstring : 함수에 대한 설명을 해줌

tooltip : airflow 화면에서 함수에 대한 설명. docstring과 같은 기능. 클래스를 사용하면 ''' '''는 airflow 화면에 표시되지 않음

DAG안에서 task_id가 같으면 에러가 나지만 그룹이 다르면 에러가 나지 않는다

> git add .
> git commit -m "Task Group"
> git push


WSL 터미널창에서
$ cd ~/airflow
$ git pull
접속테스트

 

Edge Labels

Task 연결 설명

라이브러리를 가져온다

그래프 화살표 위에 설명이 뜨게 된다

> git add .
> git commit -m "Edge Labels"
> git push


WSL 터미널창에서
$ cd ~/airflow
$ git pull
접속테스트

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

댓글