Jinja 템플릿 설명
문서(파일)에서 특정 양식으로 작성된 값을 런타임시 실제 값으로 치환해주는 처리 엔진
● 오퍼레이터 파라미터 입력시 중괄호 {} 2개를 이용하면 Airflow에서 기본적으로 제공하는 변수들을 치환된 값으로 입력할 수 있음. (ex: 수행 날짜, DAG_ID)
Bash Operator에서 Jinja 템플릿 사용하기
templated될 수 있는 파라미터
- bash_command (str)
- env (dict[str, str] | None)
vscode에 dags 파이썬 파일 생성
ds : yyyy-mm-dd 형식
&& : 앞에 있는 커맨드가 성공하면 뒤에 있는 커맨드를 실행하겠다
> git add .
> git commit -m "bash jinja 업로드"
> git push
WSL 터미널창에서
$ cd ~/airflow
$ git pull
접속테스트
첫번째 태스크, 두번째 태스크 결과
Airflow의 날짜개념
DAG 파이썬 파일 생성
catchup을 true로 하면서 이전부터 쭉 배치 실행
아무런 값을 넣지 않고 **kwarg만 출력해도 값이 나온다
git push - git pull
접속 테스트
Python Operator에서 Jinja 템플릿 사용하기
templated 될 수 있는 파라미터
● op_kwargs
● op_args
● templates_dict
DAG 파이썬 파일 생성
함수를 실행시키기만 해도 실행가능한 task가 생성된다
첫번째 태스크 결과, 두번째 태스크 결과
Bash Operator with macros
Macro 변수 : Jinja템플릿 내에서 날짜 연산을 가능하게 하는 변수
파이썬의 datetime, dateutil 라이브러리
매월 말일 실행되는 DAG
매월 둘째주 토요일에 수행되는 DAG
> git add .
> git commit -m "날짜 계산"
> git push
WSL 터미널창에서
$ cd ~/airflow
$ git pull
접속테스트
매월 말일 수행되는 Dag에서변수 START_DATE: 전월 말일,변수 END_DATE: 어제
매월 둘째주 토요일에 수행되는 Dag에서 변수 START_DATE: 2주 전 월요일, 변수 END_DATE: 2주 전 토요일
Python Operator에서 macro 변수 사용하기
● templated가 가능한 패러미터
- op_kwargs
- op_args
- templates_dict
templates_dict의 키 밸류 값을 **kwargs가 받는다
start_date는 전월 1일을 의미한다
end_date는 3월 1일에서 하루를 뺀 2월 28일을 의미한다
start_date의 밸류 값이 파이썬 함수의 start_date 변수로 들어간다
end_date의 밸류 값이 파이썬 함수의 end_date 변수로 들어간다
그러나 Python 오퍼레이터에서 굳이 macro를 사용할 필요가 있을까?
날짜 연산을 DAG안에서 직접 할 수 있다면?
import 라이브러리를 사용해서 함수 내에서 직접 연산을 할 수 있다
strftime 함수를 이용해서 yyyy-mm-dd 형식으로 출력할 수 있다
DAG 파이썬 파일에 두개의 task를 실행하고 결과값이 똑같은지 비교한다
라이브러리를 함수 안에다 작성한 이유 : 스케줄러 부하 경감
스케줄러는 주기적으로 DAG에 오류가 없는지 파싱(검사)한다
> git add .
> git commit -m "Python 오퍼레이터에서 macro 변수 사용"
> git push
WSL 터미널창에서
$ cd ~/airflow
$ git pull
접속테스트
Python Operator에서 Xcom 사용
Xcom이란 Airflow DAG안 Task 간 데이터 공유를 위해 사용되는 기술
- ex) Task1의 수행 중 내용이나 결과를 Task2에서 사용 또는 입력으로 주고 싶은 경우 사용
- 주로 작은 규모의 데이터 공유를 위해 사용(Xcom 내용은 메타 DB의 xcom 테이블에 값이 저장됨)
- 1 GB이상의 대용량 데이터 공유를 위해서는 외부 솔루션 사용 필요(AWS S3, HDFS 등)
xcom_push를 통해서 데이터를 넣고 xcom_pull을 통해서 데이터를 꺼낸다
리턴하면 자동으로 transaction_value 값이 xcom에 등록되고 key로 retrun_value로 자동적으로 등록된다
task_id 값을 통해서 리턴값을 가져올 수 있다
파이썬 파일 생성
ti 인스턴스 객체를 얻어서 xcom으로 직접 키값을 넣는다
마지막에 등록된 xcom 값을 가져온다
task_id 값을 입력하면 특정한 task의 xcom의 값을 가져올 수 있다
task_id를 통해 xcom을 return값을 가져옴
status값은 task의 순서에 따라 Success가 들어간다
> git add .
> git commit -m "파이썬 오퍼레이터에서 Xcom 사용"
> git push
WSL 터미널창에서
$ cd ~/airflow
$ git pull
접속테스트
Bash Operator에서 xcom 사용
bash 오퍼레이터에서 echo 출력값은 xcom에 저장된다. 결국 마지막 출력문이 최종 리턴 값으로 xcom에 저장된다
> git add .
> git commit -m "Bash Operator에서 xcom 사용"
> git push
WSL 터미널창에서
$ cd ~/airflow
$ git pull
접속테스트
task2의 xcom 값에 저장된 것이 없다. do_xcom_push=False로 했기 때문
Python &Bash 오퍼레이터간 Xcom 사용
파이썬 함수는 return을 push
bash 오퍼레이터는 task_ids 값만 주어도 자동으로 xcom 값을 가져온다
bash 오퍼레이터에서 ti.xcom_push로 KV 명시적 push
DAG 파이썬 파일 작성
파이썬 함수로 push하는 task와 bash 오퍼레이터로 pull로 가져오는 task
bash오퍼레이터에서 push하는 task와 파이썬 함수로 pull하는 task
> git add .
> git commit -m "Python & Bash 간의 xcom 전달"
> git push
WSL 터미널창에서
$ cd ~/airflow
$ git pull
접속테스트
Python & email 오퍼레이터간 Xcom 사용
templated가 가능한 이메일 오퍼레이터
- to
- subject
- html_content
- files
파이썬 함수로 return해서 xcom에 저장하고
> git add .
> git commit -m "Python & Email 오퍼레이터간 xcom 전달"
> git push
WSL 터미널창에서
$ cd ~/airflow
$ git pull
접속테스트
ramdom함수가 Fail, Success를 선택해서 이메일을 보내기 때문에 DAG를 실행할 때마다 이메일로 랜덤값이 보내진다
전역 공유변수 Variable
Xcom : 특정 DAG, 특정 schedule에 수행되는 Task 간에만 공유되는 데이터
Variable : 모든 DAG가 공유할 수 있는 전역 변수
웹 UI에서 Variable을 등록할 수 있다
키 : sample_key, 값 : sample_value
과부하 발생 때문에 오퍼레이터 내부에서 가져오는 방법을 사용해야한다
전역변수 Variable DAG 파이썬 파일 작성
첫 번째 task 파이썬 라이브러리 사용 -> 스케줄러 과부하
두 번째 task 템플릿 사용 -> 권고
> git add .
> git commit -m "전역 공유변수 Variable"
> git push
WSL 터미널창에서
$ cd ~/airflow
$ git pull
접속테스트
BranchPython 오퍼레이터로 분기처리하기
선택적으로 task를 실행시키고 싶은 경우
Task 분기 처리 방법
1. BranchPythonOperator
2. task.branch 데커레이터 이용
3. BaseBranchOperator 상속하여 직접 개발
실행시키고 싶은 task_id를 리턴값으로 준다
여러 개의 task를 실행시키고 싶다면 list로 담아서 준다
> git add .
> git commit -m "BranchPythonOperator"
> git push
WSL 터미널창에서
$ cd ~/airflow
$ git pull
접속테스트
@task.branch 로 분기처리하기
task.branch 데커레이터 사용하기
함수를 실행시키면 return값으로 task가 실행된다
> git add .
> git commit -m "@task.branch로 분기처리하기"
> git push
WSL 터미널창에서
$ cd ~/airflow
$ git pull
접속테스트
BaseBranchOperator 로 분기처리하기
class(설계도)를 만들어야한다. 상속할 부모 클래스명 BaseBranchOperator
공식문서를 참조 : choose_branch 함수를 오버라이딩한다
> git add .
> git commit -m "BaseBranchOperator"
> git push
WSL 터미널창에서
$ cd ~/airflow
$ git pull
접속테스트
BranchPython 오퍼레이터, @taks.branch 데커레이터를 주로 이용
Trigger Rule 설정하기
하위task 의 실행 옵션을 정하는 Rule
트리거는 하위task에 건다
트리거 rule이 all_done으로 설정되어서 하위 task가 모두 done으로 실행되어야 실행되는 task
task2는 예외처리가 되도록 설정해두어도 하위 task가 실행되는지 확인하기
트리거 rule이 none_skipped이므로 상위 task에서 skip이 없어야 task가 실행이 된다
BashOperator에서 task b와 task d를 skip하도록 설정해두어도 하위 task가 실행되는지 확인하기 (실패해야됨)
> git add .
> git commit -m "Trigger Rule 실습"
> git push
WSL 터미널창에서
$ cd ~/airflow
$ git pull
접속테스트
all_done 은 하위 task가 잘 실행되었다
non_skipped는 하위 task가 skip되었다
Task groups 실습
Task들의 모음
''' ''' docstring : 함수에 대한 설명을 해줌
tooltip : airflow 화면에서 함수에 대한 설명. docstring과 같은 기능. 클래스를 사용하면 ''' '''는 airflow 화면에 표시되지 않음
DAG안에서 task_id가 같으면 에러가 나지만 그룹이 다르면 에러가 나지 않는다
> git add .
> git commit -m "Task Group"
> git push
WSL 터미널창에서
$ cd ~/airflow
$ git pull
접속테스트
Edge Labels
Task 연결 설명
라이브러리를 가져온다
그래프 화살표 위에 설명이 뜨게 된다
> git add .
> git commit -m "Edge Labels"
> git push
WSL 터미널창에서
$ cd ~/airflow
$ git pull
접속테스트
댓글