데이터 파이프라인(ETL, ELT)
테이터파이프라인 관리 프레임워크인 Airflow

ETL: 외부 데이터 소스를 가져와 데이터 웨어하우스에 저장하는 과정
Airflow: 이러한 ETL을 주기적으로 실행해주거나 ETL들간의 실행순서를 정해주는 Framework
Datasource (데이터 소스): 프로덕션 DB, 이메일, 카드 거래 데이터처럼 실제 서비스에서 수집한 원본 데이터
ELT (추출, 적재, 변환): 웨어하우스에 저장된 데이터를 바탕으로 분석/모델링을 위해 정제된 새로운 데이터를 생성
이를 통해 대시보드를 만들거나 데이터 분석에 활용 (CTAS or DBT라는 툴을 사용
ETL,ELT=DataPipeline=Data Workflow=DAG(Airflow에서)

- DAG (Directed Acyclic Graph): 한 방향으로만 흐르고, 사이클(순환)이 없다. 루프가 존재하지 않아서 작업이 한 번에 순차적으로 진행
- 데이터 파이프라인: 여러 개의 작업(task)으로 구성되며, 작업 간에는 루프가 없도록 DAG 구조 설계. 반복 처리가 필요하다면 DAG 외의 다른 방법을 사용해야 한다.
- DAG: task들의 집합 ex), Extract, Transform, Load 각 단계를 task로 정의
- Task: Airflow에서 각 task는 코드로 작성되며, operator 클래스 형태로 구현
- Airflow 커뮤니티에서 제공하는 다양한 operator를 import하고 필요한 파라미터를 지정하여 원하는 task를 실행할 수 있다. 아니면 pythonOperator로 직접 파이썬으로 코드 개발
- Airflow 코딩: 그래서 우리가 Airflow코딩한다는것은 DAG 인스턴스 하나 만들고 DAG에 대한 정보 이름 스케쥴 지정한 다음에 그 DAG구성하는 다양한 Task들이 무엇인지 Operator형태로 구현한다.
Extract: 데이터 소스가 있으면 파일을 다운받거나 소스가 제공하는 API를 통해 data dump를 받는다.
Transform: 받은 dump를 내가 원하는 형태로 추출하거나 format을 바꾸는 등( data 커지면 그냥 파이썬 코드로는 X 시간 너무 오래 걸리거나 메모리 에러가 생긴다. Spark같은 빅데이터 기술 사용)
Load: DataWarehouse에 데이터를 테이블로 로딩

이 때 다양한 종류의 파이프라인이 실행된다. 이러한 데이터 파이프라인을 정해진 시간에 실행해주거나 A파이프라인이 끝나면 B파이프라인 trigger해주고 그과정에서 에러가 생기면 담당자한테 alert을 보낸다. -> Airflow
데이터 파이프라인: 데이터를 한곳에서 다른곳으로 옮기는것, 데이터를 소스로부터 목적지로 복사하는 작업
이작업은 python, java로. SQL은 대부분 ELT(이미 데이터 시스템 안에 들어와있는 데이터를 정제하는 것)
목적지가 외부 시스템이 될 수도 있다. ex)캐시 시스템(Reis, Memcache), 프로덕션 데이터베이스
1) Raw Data ETL Jobs
2) Summary/Report Jobs
3) Production Data Jobs, 평점같은것들



데이터 파이프라인 만들 때 고려할 점
1) 데이터 복사 방식 선택 (Full Refresh vs. Incremental Update)
이 데이터를 Full Refresh 한다 했을때 걸리는 시간이 그 다음 파이프라인을 고려해서 충분히 짧은가 고려해서 Full Refresh할것인지 incremental update할것인지.. 가능하면 데이터가 작을경우 매번 통채로 복사해서 테이블을 만들자.
Full Refresh: 데이터 소스에서 데이터 웨어하우스로 전체 데이터를 주기적으로 복사하는 방식. 하지만 소스에 있는 데이터가 너무 크면 어느순간부터 안된다.
Incremental Update: 데이터 양이 많을 때 새롭게 생성, 수정, 삭제된 데이터만 선택적으로 가져온다.
2) 멱등성 보장
한번 실행하건 100번 실행하건 Datawarehouse와 데이터 소스단의 정보가 달라지지 말아야함
Transform에서 중간에 실패하면 전체다 깔끔하게 실패하도록
SQL의 transaction이 필요
3) Backfill이 쉬어야된다.
Full Refresh는 간단하다. 그냥 다시 돌리면 데이터 복구
하지만 Incremental Update의 경우 특정 날짜의 Incremental update이 실패했는데 그거를 모르고 넘어가면 그 날짜에 해당하는 data가 비어있다. 아니면 앞단이 소스쪽에서 다 바뀌면 일일이 다 copy해와야한다.
이러한 모든과정을 Backfill이라 한다. 운영이 정상적으로 되고 source와 destination의 data가 일정해야한다(멱등성보장)
Airflow는 이 부분에 강점을 갖고 있다. 데이터엔지니어가 원하는 형태로 파이프라인을 작성하기만 하면 된다.
4) 데이터 파이프라인의 입력과 출력을 문서화
데이터 파이프라인이 100개 넘어가기 시작하면 각 파이프라인이 뭘 하는지 힘들다.
그리고 이 파이프라인의 오너가 누구인지. (테크니컬 오너는 코드를 만든사람 비즈니스 오너는 이 데이터를 요청한 사람)
5) 주기적으로 쓸모없는 데이터들을 삭제
데이터 카탈로그에서 100일동안 안쓴 DAG같은것들
6) 데이터 파이프라인 사고시 마다 사고 리포트 쓰기(post-mortem)
얼마나 자주 쓰는냐, 얼마나 심하냐-> 기술 부채가 심해지는지 아닌지 알 수 있다.
7) 중요한 데이터 파이프라인의 입력과 출력이 믿을만한지
Primary key uniqueness보장하는지, 중복레코드 체크 등등
입력이 원하는 데이터인지 체크하는 코드 만들고
최종적으로 만든 코드도 이상이 없는지 다양한 테스트 붙여볼 수 있다.
간단한 ETL 작성해보기
ETL의 각 단계는 개별 task로 구성할 수 있으며, 필요에 따라 세 단계를 하나의 task로 묶을 수도 있다.
데이터 파이프라인 프레임워크는 전체 파이프라인 코드를 한 번에 실행하는 대신, task 단위로 나누어 관리한다.
웹상에 존재하는 S3에 있는 CSV파일을 Redshift에 있는 테이블에 적재해보자.

1) 목적지가 되는 Datawarehouse에 테이블을 만들고 Redshift connection함수를 만들자..
!pip install ipython-sql==0.4.1
!pip install SQLAlchemy==1.4.49
%load_ext sql
%sql postgresql://ID:PW@learnde.cduaw970ssvt.ap-northeast-2.redshift.amazonaws.com:5439/dev
%%sql
DROP TABLE IF EXISTS keeyong.name_gender;
CREATE TABLE keeyong.name_gender (
name varchar(32) primary key,
gender varchar(8)
);
import psycopg2 #파이썬에 postgres 계열의 database를 조작할 때 쓰는 모듈
# Redshift connection 함수
# 본인 ID/PW 사용!
def get_Redshift_connection():
host = "learnde.cduaw970ssvt.ap-northeast-2.redshift.amazonaws.com"
redshift_user = "ID"
redshift_pass = "PW"
port = 5439
dbname = "dev"
conn = psycopg2.connect("dbname={dbname} user={user} host={host} password={password} port={port}".format(
dbname=dbname,
user=redshift_user,
password=redshift_pass,
host=host,
port=port
))
conn.set_session(autocommit=True)
return conn.cursor()
2) CSV파일을 다운로드 받아야한다.
이 때 코드에 여러가지 이슈들이 있는데
- header 빼기
- 역동성을 보장해야된다.
3) ETL 함수를 하나씩 정의
파이썬으로 3개의 함수로 구성한다. extract, transform, load
extract: url을 인자로 받아서 그 내용을 읽어다가 해당하는 문자열 content를 그대로 return
transofrm: data를 받아서 list로 바꿔서 return
loadL list로 받아 Redshift에 line by line으로 적재.
각각을 별개의 task 또는 하나로 묶을 수 있다.
import requests
def extract(url):
f = requests.get(url)
return (f.text)
def transform(text):
lines = text.strip().split("\n")
records = []
for l in lines:
(name, gender) = l.split(",") # l = "Keeyong,M" -> [ 'keeyong', 'M' ]
records.append([name, gender])
return records
def load(records):
"""
records = [
[ "Keeyong", "M" ],
[ "Claire", "F" ],
...
]
"""
# BEGIN과 END를 사용해서 SQL 결과를 트랜잭션으로 만들어주는 것이 좋음
#DELETE하고 INSERT해서 에러내도 데이터 정합성 깨지지 않도록
cur = get_Redshift_connection()
# DELETE FROM을 먼저 수행 -> FULL REFRESH을 하는 형태
for r in records:
name = r[0]
gender = r[1]
print(name, "-", gender)
sql = "INSERT INTO keeyong.name_gender VALUES ('{n}', '{g}')".format(n=name, g=gender)
cur.execute(sql)
최종 실행
link = "https://s3-geospatial.s3-us-west-2.amazonaws.com/name_gender.csv"
data = extract(link)
lines = transform(data)
load(lines)
- transform을 하거나 할 때 load를 할 때 첫번째를 무시해야한다. name-gender
- 멱등성이 깨지고 있다. extract, transform, load를 다시 실행하면 또 적재되는.. 한번을 실행하건 백번을 실행하건 source내용이 안바뀌면 datawarehouse의 target내용이 동일해야한다. 여기서는 중복된다. load부분을 수정(transaction->Delete FROM을 먼저 수행하고 -> FULL REFRESH하는 형태로)
AIRFLOW
AirFlow 소개
- 데이터 파이프라인 스케줄링: 지정된 시간에 ETL을 실행하거나, 한 ETL이 완료되면 다음 ETL을 자동으로 실행한다.
- 파이프라인 작성 용이: Airflow 모듈을 통해 자주 사용하는 데이터 소스(production db, facebook광고)와 데이터 웨어하우스를 쉽게 통합할 수 있다. -> 이런것들을 Operator라고 한다. Documentation | Apache Airflow ETL은 작성하려고 하면 굉장히 많은 data source(production db, 광고)와 여러가지의 data destination option(ware house, S3)들이 있는데 사용하고 있는것들은 사람들이 airflow에서 쉽게 프로그래밍 할 수 있도록 aiflow 모듈 형태로 만들어 놓았다.
- Incremental Update 관리: 실패한 날짜의 데이터를 재실행하는 작업이 복잡하지만, Airflow 방식으로 코딩하면 단순화된다.
- DAG 구성: DAG=ETL DAG는 여러 task로 구성된 데이터 파이프라인으로, task별 스케줄링(실행순서) 및 성공 시 다음 단계로 넘어가도록 설정할 수 있다.
- 오픈소스의 특성상 최신 버전은 버그가 많을 수 있다. 그래서 안전하게 가고 싶으면 google.cloud에서 airflow를 어떤 버전을 쓰는지 확인하면 된다. Cloud Composer version list | Google Cloud

Airflow구성
- 아키텍처 관점에서 봤을 때 어떤 컴포넌트들을 가지고 있는지
- Airflow가 Dag수가 늘어나면 Airflow의 용량이 부족해지기 시작-> 스케일링 해야하는데 스케일링 방법
- Airflow 코드의 구조
Airflow는 총 5개의 컴포넌트로 구성
1) 웹서버 -> Python Flask로 구현
2) Schedular
3) Worker (Task에 해당되는 코드를 실행해주는 모듈)
4) Sqlite 메타데이터베이스 (스케듈링 정보, Worker 정보, 데이터 파이프라인에 대한 정보 어딘가에 기록정보 용도)
-> Single Thread이고 file기반 데이터베이스이기 때문에 별도로 mysql postgre 설치
5) 큐: 다수의 서버로 Airflow구성하는 경우
보통 Airflow 처음에는 한개. 그러면 Worker도 한대 서버 용량으로 제약되된다.
Datapipeline이 늘어나면 서버의 수를 늘린다. 이 서버는 Worker용으로 사용
datapipeline의 특정 task가 어느 워커에서 실행할것인지 미리 정할 수 없기 때문에
큐를 만들어서 큐에다가 task를 집어넣고, 놀고 있는 worker가 생기면 큐에 있는 task를 읽어다가 실행하는 형태
(서버가 하나로 돌아가는 Airflow의 경우에도 executor가 어떤 종류이냐에 따라서 서버 한대짜리 airflow도 queue를 사용하기도 한다.)
요약: Airflow는 5개의 component
Webserver Schedular는 Master기능 Worker는 일꾼 (Datapiplie의 task 코드 실행)
airflow를 scaling한다는거는 Worker의 수를 늘리는것
3개의 component가 자기 상황을 어딘가 기록해야하는데 그게 sqlite database이다.
task가 다수의 워커로 분산이되어서 처리되려면 중간 매개체가 큐이다. 이 경우 Airflow가 어떤 Executor를 쓰는지에 따라 달라진다.
스케줄러는 DAG안에 있는 Task들을 WORKER에게 배정하는 역할을 수행
의존관계에 따라 A가 끝나면 B를 Trigger하는, 웹 UI를 통해 쉽게 관리할 수 있다.
- Worker는 서버가 가지는 CPU숫자만큼 동시에 돌 수 있다.
- 이렇게 한대로 가면 CPU 수, 용량의 제한이 있기 때문에 동시에 실행할 수 있는 task의 수가 제약이 있게 되고
datapipeline의 수가 늘어나면서 한대만으로 부족하다.
- 따라서 Scaling할때는 worker를 별도의 서버에 세팅한다. Worker가 있는 서버의 수를 늘리는 방법을 쓴다.


빅데이터 시스템
외부에서 볼 때는 유기적인 하나의 컴퓨터처럼 보이지만
내부에서는 다수의 서버가 다양한 프로토콜과 복잡한 제어방식을 통해 같이 협업하는 방식
빅데이터 시스템은 다수의 서버로 구성되고 빅데이터 시스템 용량이 부족할 때마다 시스템에 서버를 추가하는 방식(Scale out) + 각 서버의 사양을 높이는 (Scale up) 병행
직접 Scaleout을 하는것이 아닌 Airflow를 제공해주는 클라우드 서비스 사용하는것이 편하다.

사용자는 브라우저를 통해 웹 서버와 통신하며, DAG 디렉토리 안에 파이썬으로 작성된 파이프라인 코드를 위치시킨다. Airflow 서버는 주기적으로 이 디렉토리를 파싱하여, 지정된 시간에 해당 파이프라인을 가져와 실행한다. 이때, Airflow의 Scheduler는 파이프라인을 실행하기 위해 Worker에게 직접 작업을 주지 않고, 대신 실행기(Executor)를 통해 작업을 분배한다.

웹 UI를 통해서는 어떤 데이터 파이프라인들이 airflow에 있고 최근 실행상황, 특정 파이프라인의 task들, log가 뭐가 나왔는지 보고 debugging할 수 있고, 문제가 생긴 데이터 파이프라인이 있으면 문제를 해결하고 웹 UI상에서 재실행 할 수 있다. (backfill)
'Data Engineer > 데브코스' 카테고리의 다른 글
| 데브코스 데이터 엔지니어링 WEEK11 WIL(1) (0) | 2024.11.19 |
|---|---|
| 데브코스 데이터 엔지니어링 WEEK10 WIL(2) (0) | 2024.11.19 |
| 데브코스 데이터 엔지니어링 WEEK8 WIL(2) (0) | 2024.11.19 |
| 데브코스 데이터 엔지니어링 WEEK8 WIL(1) (0) | 2024.11.07 |
| 데브코스 데이터 엔지니어링 WEEK6 WIL(2) (0) | 2024.11.07 |
