데브코스 데이터 엔지니어링 WEEK10 WIL(3)

2024. 12. 17. 00:27·Data Engineer/데브코스

Airflow.cfg

더보기

1) DAGs 폴더는 어디에 지정되는가?

기본적으로 Airflow가 설치된 디렉토리 밑의 dags라는 이름의 subfolder로 구현된다. 

위츠는 고정된것은 아니고 airflow.cfg에 있는 dags_folder라는 키에 지정할 수 있다.

2) DAGs 폴더에 새로운 Dag를 추가하면 언제 실제로 Airflow 시스템에서 이를 알게 되고 스캔 주기는 어느 키에서 결정되는가?

기본값은 300초=5분, dag_dir_list_interval이라는 키에 의해서 

3) 이 파일에서 Airflow를 API 형태로 외부에서 조작하고 싶다면 어느 섹션을 변경?

Airflow는 꼭 웹 UI, Command  Line에서만 제어할 수 있는 것은 아니다. 

cfg의 특정 section의 configuration을 바꿔주면 외부에서 API를 통해서 Airflow의 상태를 체크하거나

특정 DAG를 실행해 볼 수 있다. 또한 Airflow의 variables나 connection 정보를 import, export할 수 있다.

Airflow.cfg의 api 섹션 밑에 auth_backend의 키를 적당한 인증방식으로 바꾸면 된다. 

가장 간단한거는 airflow.api.auth.backend.basic_auth로 변경하는 것이다. id와 password를 가지고 인증하는 방법 

4) Ariflow의 Variable에서 key 와 value의 값이 있는데 이 value의 값을 encrypted가 되려면 변수의 이름에 어떤 단어들이 들어가야할까? 

password, secret, passwd, authorization, api_key, access_token

5) 이 환경 설정 파일을 수정했다고 airflow가 바로 알지는 않는다. 내가 명시적으로 airflow를 다시 시작해줘야한다.

ec2서버에서 airflow를 설치했다면 

sudo systemctl restart airflow-webserver

sudo systemctl restart airflow-scheduler를 통해 webserver scheduler를 restart하면 되고

docker같은 경우에는 cfg 고치고 저장한다음에 docker engine에서 서비스를 다시 실행해주면 된다. 

db init은 X -> meta data db를 처음부터 초기화한다. 우리가 바꾼것만 반영하는 일을 하지 않는다. 

이 명령어는 meta data db가 다른 걸로 바꿨을 때만 실행해주는 명령어이다. (sqlite->postgres)

6. Metadata DB의 내용을 암화하는데 사용되는 키는 무엇인가?

Airflow.cfg에 fernet_key라는 key가 있는데 여기다가 string을 적어주면 airflow가 meta data db에 data를 쓰거나 읽을 때 해당 key를 사용한다.  

Airflow와 타임존

airflow.cfg에는 타임존과 관련된 두 종류의 키가 있다. 

1) default_timezone

default_timezone의 timezone을 따라가는것은 start_date, end_date, schedule 

그런데 execution_date과 웹 UI에 기록되는 log 시간은 상관없이 항상 UTC를 따른다.

굉장히 혼동스럽기 때문에 UTC로 통일하자.

2) default_ui_timezone

webserver단에서 시간이 표시되는 방식 결정해준다. 

ui에서도 바꿀 수 있다.기본은 UTC

Dags폴더 스캔 주의점

airflow.cfg의 core section밑에 dags_folder라는 키가 가리키는 위치와 그 밑의 sub 폴더들을 스캔해서 새로 만들어지거나 삭제되거나 변경되어있는 DAG들 체크한다.

dags_folder의 키가 가리키는 위치를 scan할 때 airflow는 무슨 file이 DAG파일이고 무슨 파일이 DAG파일이 아닌지 

판단하기 위해 파이썬 코드들을 우선적으로 실행해본다. 그래서 개발중에 실행이 안될거라 생각한 코드들이 5분마다 실행되는..  ex) cur.execute("DELETE FROM ...") 가 있으면 table record 삭제되는..

Open Weather Dag 구현하기

full refresh와 incremental update 2가지 버전 만들어보고 incremental update할 때

primary key uniqueness를 보장하기 위해 window 함수(ROW_NUMBER,UPSERT)를 써서 구현해보자

 

Open Weathermap API

위도/경도 정보 주면 그 지역의 기후 정보를 알려주는 서비스, 가서 api key를 받아야한다.

https://openweathermap.org/api/one-call-3

- 우리가 만들려는 DAG는 서울지역의 향후 8일간의 온도 정보(일별 낮/최소/최대)를 읽어서 redshift에 만들어 놓은 table에 적재하는것

-  그래서 먼저 각자 schema 밑에 table을 만들어줘야한다. 이것은 코드단에서 처리할 것 

-  사용할 API Key를 open_weather_api_key라는 Vaiable로 저장한다. 

- API 호출할 때는 파이썬의 request 모듈 사용 

- one-call API는 json 형태로 return해준다. 

f=request.get(link)

f_js=f.json() -> 결과를 받아서 json 함수를 호출하면 dictionary 형태로 바꿔준다.

여기 dictionary에는 daily라는 field, hourly라는 field, minutely라는 field가 있다. 8일간의 날씨정보가 분별, 시간별, 일별로 들어온다. 그중 daily 필드에만 관심이 있다. daily 필드는 리스트이고 8개의 record가 있다. 각 레코드는 하루에 해당된다. 레코드에는 dt가 있어 몇일인지 나타낸다. dt는 숫자로 표현되어있다. epoch(1970년 1월 1일 이후 밀리세컨드로 바꿔서 시간을 표시한것) -> 이것을 우리가 읽을 수 있는 날짜로 변경 해줘야한다.

datetime.fromtimestamp(d["dt"]).strftime('%Y-%m-%d') # 2021-10-09

(왼) 나중에 created_date을 이용해서 incremental update할 때 중복처리를 한다.default로 입력 안해줘도 현재시간으로 세팅된다. (오) 응답의 dt는 epoch, sunrise는 해가 뜬시간 sunset은 해가 진시간, temp밑의 day가 낮 온도 min이 최소온도 max가 최대온도이다. daily라는 list를 loop으로 돌면서 한 날짜씩 record로 만든다.

Full Refresh

두 가지 full refresh 를구현하는 방식이 있다.

- 테이블이나 record를 다 날리고 하나씩 INSERT INTO를 사용

- Redshift의 COPY라는 bulk update SQL을 사용해서 -> COPY를 하기 위해서는 S3에 먼저 loading해야된다. 내가 가지고 있는 data를 정리해서 csv같은 파일로 만든다음에 S3의 특정 위치에 적재하고 그 location에서 redshift로 bulk update

이번에는 전자 사용

from airflow import DAG
from airflow.models import Variable
from airflow.providers.postgres.hooks.postgres import PostgresHook
from airflow.decorators import task

from datetime import datetime
from datetime import timedelta

import requests
import logging
import json

def get_Redshift_connection():
    # autocommit is False by default
    hook = PostgresHook(postgres_conn_id='redshift_dev_db')
    return hook.get_conn().cursor()

@task
def etl(schema, table):
    api_key = Variable.get("open_weather_api_key")
    # 서울의 위도/경도
    lat = 37.5665
    lon = 126.9780

    # https://openweathermap.org/api/one-call-api
    url = f"https://api.openweathermap.org/data/2.5/onecall?lat={lat}&lon={lon}&appid={api_key}&units=metric&exclude=current,minutely,hourly,alerts"
    response = requests.get(url)
    data = json.loads(response.text) #response.json() 해도 된다
    """
    {'dt': 1622948400, 'sunrise': 1622923873, 'sunset': 1622976631, 'moonrise': 1622915520, 'moonset': 1622962620, 'moon_phase': 0.87, 'temp': {'day': 26.59, 'min': 15.67, 'max': 28.11, 'night': 22.68, 'eve': 26.29, 'morn': 15.67}, 'feels_like': {'day': 26.59, 'night': 22.2, 'eve': 26.29, 'morn': 15.36}, 'pressure': 1003, 'humidity': 30, 'dew_point': 7.56, 'wind_speed': 4.05, 'wind_deg': 250, 'wind_gust': 9.2, 'weather': [{'id': 802, 'main': 'Clouds', 'description': 'scattered clouds', 'icon': '03d'}], 'clouds': 44, 'pop': 0, 'uvi': 3}
    """
    ret = []
    for d in data["daily"]:
        day = datetime.fromtimestamp(d["dt"]).strftime('%Y-%m-%d')
        ret.append("('{}',{},{},{})".format(day, d["temp"]["day"], d["temp"]["min"], d["temp"]["max"]))

    cur = get_Redshift_connection()
    drop_recreate_sql = f"""DROP TABLE IF EXISTS {schema}.{table};
CREATE TABLE {schema}.{table} (
    date date,
    temp float,
    min_temp float,
    max_temp float,
    created_date timestamp default GETDATE()
);
"""
    insert_sql = f"""INSERT INTO {schema}.{table} VALUES """ + ",".join(ret) #insert into를 한번에
    logging.info(drop_recreate_sql)
    logging.info(insert_sql)
    try:
        cur.execute(drop_recreate_sql)
        cur.execute(insert_sql)
        cur.execute("Commit;")
    except Exception as e:
        cur.execute("Rollback;")
        raise

with DAG(
    dag_id = 'Weather_to_Redshift',
    start_date = datetime(2023,5,30), # 날짜가 미래인 경우 실행이 안됨
    schedule = '0 2 * * *',  # 매일 오전 2시 0분에 시작되는 dag
    max_active_runs = 1,
    catchup = False,
    default_args = {
        'retries': 1,
        'retry_delay': timedelta(minutes=3),
    }
) as dag:

    etl("keeyong", "weather_forecast")

- etl task 하나만 있다. 그리고 etl 함수에 schema와 table이름을 인자로 줬다. 

- Variable.get(open_weather_api_key)로 할당받은 api key를 받는다. 

- daily field로 loop를 돈다. for d in data["daily"] -> 8일간의 일별 날짜 정보

- 매일 오전 2시 0분에 실행된다. 

 

Incremental Update

Redshift에서는 Primary Key Uniqueness를 보장하는 방법!

SQL에 있는 row number라는 window 함수 사용 (upsert)

 

Primary Key Uniqueness? 

테이블에서 레코드가 생기면 하나의 레코드를 유일하게 지칭할 수 있는 필드가 필요해진다.(primary key)

필요해서 따라서 다수의 필드가(composite key), CREATE TABLE 할 때 지정해준다. 

2개의 방법 foriegn key를 적어주면 field가 다른 table에 있는 field라는 것을 알면 나중에 sql실행할 때 sql optimizer가 쿼리를 좀더 최적화 잘 할 수 있다. 또한 데이터의 정합성이 깨졌을 때 체크해주기도 한다. 따라서 foriegn key는 적어주면 좋다.

빅데이터 기반 Datawarehouse는 primary key uniqueness를 보장하지 않는다. 

-> 만약 primary key를 지켜주려면 모든 record들의 prrimary key를 B + tree형태로 메모리에 올려놓고 lookup을 해야한다. 레코드가 하나 추가 될때마다 이 record의 primary key가 존재하는지 체크 매번 해야하므로 엄청나게 시간이 들고 체크하기 위한 효율적인 data structure를 만들어야하기 때문에 메모리등의 리소스를 더 많이 잡아 먹는다. 굉장히 비효율적. OLTP(production db)는 데이터의 용량이 크지 않기 때문에 괜찮 -> 보장하는것은 기본적으로 데이터 인력의 책임 

 

그래서 어떻게 Primary Key 유지?

- weather_forecast는 날씨 정보이기 때문에 최근 정보가 더 신뢰할 수 있음.

 - 같은 날짜에 대해서 중복 record가 존재하는 경우 무엇을 더 우선시 할지 결정할 용도로 default로 data가 생성되는 시간인 created_date이라는 필드를 만든다.

- 그래서 date가 같은 레코드들이 여러개 있다면 created_date을 기준으로 더 최근 정보를 선택한다.

- 이런 방식을 통해 primary key가 유일함을 보장하고 이때 사용하는 SQL 문법이 ROW_NUMBER이다. 

여기서 date가 primary key이다. 하나만 존재해야한다.

ROW_NUMBER는 window 함수

위의 왼쪽 테이블에서 최종적으로 2개만 남기고 싶다. 2021-01-01과 2021-01-02 data 만..

date별로 모아서 created_date의 역순으로 일련번호를 매기고 일련번호가 1번인것만 남기자! -> 중복 제거

그것을 해주는 SQL 문법이 ROW_NUMBER

ROW_NUMBER()의 문법

ROW_NUMBER() OVER() seq -> ROW_NUMBER는 기본적으로 일렵번호 붙여라. 그리고 이 일련 번호의 이름은 seq로 하겠다. 그리고 OVER안에 어떻게 일련번호 붙일지.. 

OVER안에 partition by를 써서 어떤 필드를 기준으로 record를 grouping 즉 partition 할것인지.. 

partition by date -> date이 같은 값을 한 그룹으로 묶고, order by는 같은 partition에 속합 record들을 어떤 순서로 일련번호 붙일 것인지 -> 여기서는 created_date DESC, created_date이 큰것부터 나열하고 여기에 1번을 붙인다. 

이렇게 하면 primary_key_uniqueness를 보장하면서 중복을 제거할 수 있는 방법이 된다. 

 

방금거를 좀더 일반화해서 어떻게 primary key uniqueness를 어떻게 보장할 수 있는지

-> 대상이 되는 table이 있고 그 테이블에 내가 새로 읽어온 record를 적재하면서 중복제거하면서 incremental update을 하고 싶다.

① 먼저 임시 테이블을 만들고 임시 테이블에 대상이 되는 table들을 복제한다. 

② 그리고 그 임시 테이블에 새로 읽어온 data를 적재한다. 

③ 이 시점에 중복 data들이 존재할 것이다. (이전의 애플 주식에서는 distinct라는 것을 사용해서 중복을 제거했었다. 하지만 앞단에 뭐라도 수정된 값이 있으면 primary_key_uniqeness 유지 안될것이다.)

④ 임시 테이블에다가 ROW_NUMBER를 적용해서 primary key로 partition을 잡고, created_date같은 어느 record가 더 최신인지를 나타내주는 field를 기준으로 그 field의 값이 큰 것부터 앞에 나오게 order를 내면서, 그 order 기준으로 일련번호를 붙인다. 그리고 그 일련번호가 1인것만 원본테이블로 복사.

from airflow import DAG
from airflow.decorators import task
from airflow.models import Variable
from airflow.providers.postgres.hooks.postgres import PostgresHook

from datetime import datetime
from datetime import timedelta

import requests
import logging
import json

def get_Redshift_connection():
    # autocommit is False by default
    hook = PostgresHook(postgres_conn_id='redshift_dev_db')
    return hook.get_conn().cursor()

@task
def etl(schema, table, lat, lon, api_key):

    # https://openweathermap.org/api/one-call-api
    url = f"https://api.openweathermap.org/data/2.5/onecall?lat={lat}&lon={lon}&appid={api_key}&units=metric&exclude=current,minutely,hourly,alerts"
    response = requests.get(url)
    data = json.loads(response.text)

    """
    {'dt': 1622948400, 'sunrise': 1622923873, 'sunset': 1622976631, 'moonrise': 1622915520, 'moonset': 1622962620, 'moon_phase': 0.87, 'temp': {'day': 26.59, 'min': 15.67, 'max': 28.11, 'night': 22.68, 'eve': 26.29, 'morn': 15.67}, 'feels_like': {'day': 26.59, 'night': 22.2, 'eve': 26.29, 'morn': 15.36}, 'pressure': 1003, 'humidity': 30, 'dew_point': 7.56, 'wind_speed': 4.05, 'wind_deg': 250, 'wind_gust': 9.2, 'weather': [{'id': 802, 'main': 'Clouds', 'description': 'scattered clouds', 'icon': '03d'}], 'clouds': 44, 'pop': 0, 'uvi': 3}
    """
    ret = []
    for d in data["daily"]:
        day = datetime.fromtimestamp(d["dt"]).strftime('%Y-%m-%d')
        ret.append("('{}',{},{},{})".format(day, d["temp"]["day"], d["temp"]["min"], d["temp"]["max"]))

    cur = get_Redshift_connection()
    
    # 원본 테이블이 없다면 생성
    create_table_sql = f"""CREATE TABLE IF NOT EXISTS {schema}.{table} (
    date date,
    temp float,
    min_temp float,
    max_temp float,
    created_date timestamp default GETDATE()
);"""
    logging.info(create_table_sql)

    # 임시 테이블 생성 (그리고 원본테이블 다 집어넣는다.)
    create_t_sql = f"""CREATE TEMP TABLE t AS SELECT * FROM {schema}.{table};"""
    logging.info(create_t_sql)
    try:
        cur.execute(create_table_sql)
        cur.execute(create_t_sql)
        cur.execute("COMMIT;") #auto commit이 false이기 때문에 중간중간에 COMMIT
    except Exception as e:
        cur.execute("ROLLBACK;")
        raise

    # 임시 테이블 데이터 입력(읽어온 8일간의 날씨 데이터를 임시 테이블에 적재, 그리고 이 때 중복 레코드들이 존재할것이다. )
    insert_sql = f"INSERT INTO t VALUES " + ",".join(ret)
    logging.info(insert_sql)
    try:
        cur.execute(insert_sql)
        cur.execute("COMMIT;")
    except Exception as e:
        cur.execute("ROLLBACK;")
        raise

    # 기존 테이블 대체(기존 record다 날리고 일련번호 이용해서 큰값부터 낮은 값순으로 정렬한다음에 seq가 1인 record들만 선택해서 원본테이블에 적재)
    alter_sql = f"""DELETE FROM {schema}.{table};
      INSERT INTO {schema}.{table}
      SELECT date, temp, min_temp, max_temp FROM (
        SELECT *, ROW_NUMBER() OVER (PARTITION BY date ORDER BY created_date DESC) seq
        FROM t
      )
      WHERE seq = 1;"""
    logging.info(alter_sql)
    try:
        cur.execute(alter_sql)
        cur.execute("COMMIT;")
    except Exception as e:
        cur.execute("ROLLBACK;")
        raise

with DAG(
    dag_id = 'Weather_to_Redshift_v2',
    start_date = datetime(2022,8,24), # 날짜가 미래인 경우 실행이 안됨
    schedule = '0 4 * * *',  # 적당히 조절
    max_active_runs = 1,
    catchup = False,
    default_args = {
        'retries': 1,
        'retry_delay': timedelta(minutes=3),
    }
) as dag:

    etl("keeyong", "weather_forecast_v2", 37.5665, 126.9780, Variable.get("open_weather_api_key"))

1. CREATE TEMP TABLE t AS SELECT * FROM keeyong.weather_forecast;

- 원래 테이블의 내용을 임시 테이블 t로 복사

2. DAG는 임시 테이블(스테이징 테이블)에 레코드를 추가

- 이때 중복 데이터가 들어갈 수 있음

3. DELETE FROM keeyong.weather_forecast;

4. 중복을 없앤 형태로 새로운 테이블 생성

INSERT INTO keeyong.weather_forecast SELECT date, temp, min_temp, max_temp, created_date FROM ( SELECT *, ROW_NUMBER() OVER (PARTITION BY date ORDER BY created_date DESC) seq FROM t ) WHERE seq = 1

->date이 같은 record들끼리 묶어놓고 그안에서 일련번호를 붙이고 created_date의 값이 가장 큰것부터 1번으로 붙인다.

그리고 나중에 원본테이블에 insert할때는 seq가 1인것만 선택하게

 

3번 4번을 transaction을 묶어서-> 여기서는 auto_commit이 false라서 별로 의미가 없음 

날씨 데이터의 경우 최신 정보가 더 정확하므로 5/31일날 읽어온 data들을 우선시한다.

이런 것을 UPSERT라고 한다 insert와 update의 합친 버전

primary key를 기준으로 레코드 업데이트 할 때 이미 table에 동일한 primary key가 있으면 지금 적재하는 데이터로 수정하고 만약 존재하지 않는다면 새 레코드로 적재 하는것

앞에서 한것은 직접 upsert를 구현한것 (특정한 원칙을 가지고)

-> 데이터 웨어하우스마다 자기만의 UPSERT를 효율적으로 해주는 문법이 있다. 

Backfill

airflow가 backfill을 어떻게 쉽게 만드는지, 그 과정에서 중요한 시스템 parameter들

- 아마존에서 다양한 report들을 읽어오는 dag가 있고 dag구성하는 task들 있다.

- 하루에 한번씩 도는데 나중에 보니 2틀 동안 특정 amazon report_replacement라는 task가 실패했다는 것을 나중에 알게 되었다. -> 그 이틀 동안의 data가 빠져있다.

- Dag는 daily로 동작하는 dag이기 때문에 5월 24일날 실행될 때는 5월 23일날의 정보를 읽어오고 5월 25일날 실패한 dag는 그 전날인 5월 24일날 정보를 읽어오게 했을 것이다. 따라서 5월 23, 24일 2틀동안의 정보가 datawarehouse table어딘가에 빠져있다. 

- 나중에라도 깨달았을때 어떻게 쉽게 재실행 할 수 있을까? -> 데이터 엔지니어 삶에 직접적인 영향!

 

Backfill은 Incremental Update시에만 의미가 있다.

다시 한번..  가능하면 Full Refresh를 사용하는 것이 좋음.. 문제가 생기면 그냥 다시 Full Refresh하면 되니까

 

Backfill의 정의 

실패한 데이터 파이프라인을 재실행 혹은 읽어온 데이터들이 잘못돼있어 다시 다 읽어와야하는 경우를 의미

Full Refresh에서는 간단. 그냥 다시 실행하면 됨

하지만 Incremental Update하면 DAG가 어떻게 구현되어있냐에 따라서 복잡해짐.

 

DailyDAG를 작성한다고 해보자. (하루에 한번 Incremental update)

-> 프로덕션 DB에 특정 Table에 record를 읽어다가 datawarehouse에 update하는 daily dag

 

Backfill을 잘모른다면?

실행될 때 지금 현재 시간날짜를 얻은다음 그것을 기준으로 전날이 어느날인지 계산하는 형식을 사용한다.

python이라면 datime.now()-timedelta(1)

어제 날짜를 기준으로 대상 production table의 record를 읽어와서 datawarehouse에 적재 

근데 이 코드가 실패를 한다면 이 날짜의 data는 datawarehouse에서 빠져있을 것이다. 미래에 이거를 다시 실행한다고 하면 오늘 기준으로 어제 data를 읽어다가 update하지, 실패했던 날 기준으로 전날 날짜를 얻어가지고 update하지 않는다. 그래서 몇일이 지난 다음에 빠진 날짜의 data를 update하려고 하면 날짜를 하드 코딩을 하게 된다. 

1년치가 실패한다고 하면 loop를 돌려서 2022년 1월1일 2022년 12월 31일까지 코드를 돌리게 수정해야한다..

dag를 처음 만들때부터 backfill이 쉽게 가는 구조로 생각을 안해두면 몸이 고생한다!

 

daily update나 hourly update를 할 때 읽어와야하는 data를 현재 시간 기준으로 계산하는 순간 그 과정에서 하나라도 실패하면 나중에 돌아와서 다시 채우려고 하면(backfill) 코드를 해킹해야하는 issue가 생긴다. 

 

어떻게 ETL을 구현해 놓으면 이런 일이 편해질까?

Airflow는 모든 실행에 대해서 db에 기록을 해놓는다. 

언제 실행이 됐고 그때 읽어온 data의 날짜가 무엇인지.. 

실패한 날짜와 실패한 날짜의 읽어오려고 했던 data의 날짜를 db에 기록을 해놓는다. 

daily dag면 하루에 한번 실행되고 전날 data를 읽어온다.  hourly dag는 한시간에 한번 실행되고 그 전시간 data를 읽어온다. 시스템은 이 dag가 incremental update를 하는지 full refresh를 하는지는 모른다. airflow는 모든 dag는 incremental update을 한다고 생각하고 5월 25일 날 daily dag가 돌면 그전날인 5월 24일을 시스템 변수로 제공해주고 기록을 해둔다. 그렇기 때문에 내가 시간을 계산해서 해당 날짜의 data를 읽어오는 것이 아닌 airflow가 주는것을 그냥 사용해서 Web UI 들어가서 실패한 날짜의 dag를 클릭하고 clear하면 된다. (Airflow는 읽어와야하는 data의 날짜 알고 있기 때문에) 만약 Full Refresh라면 이 execution date 변수값은 무시하면 된다. 

 

즉 정리하면

내가 읽어와야하는 data의 날짜를 직접 계산하는것이 아니라 airflow에게 맡긴다. 이 때 airflow가 주는 system variable이 있는데 이게 execution date이다. 재실행 될 때 어느 날짜의 data를 읽어와야되는지 system variable로 제공할거고 나는 이거를 이용해서 코드를 짜면된다. 

 

daily dag라면 모든 날짜에대해서 실행결과가 실패 성공했는지 적고 그 때 실행할때 읽어와야하는 data의 날짜가 무엇인지 기록해놓는다. ->execution_date

execution date은 dag를 구현할 때 읽어올 수 있다. execution date을 airflow로부터 읽어와서 실패한 날짜의 date를 읽어올 수 있게 코드를 짜자. (앞에서처럼 하드코딩하는것이 아닌)

그러면 동일한 코드로 운영도 하고 backfill을 할 수 있다. 

각 dag가 읽어와야되는 data의 날짜를 execution date으로 제공해준다. 

execution date으로 incremental update을 구현하자!

start_date 

daily dag이면 하루에 한번씩 동작을 하고 전날 data를 읽어오는 dag이다. 2020년 11월 8일날 처음 dag를 실행시킨다고하면 처음 읽어와야하는 data의 날짜는 2020년 11월 7일의 data이다. 

그래서 airflow의 start_date은 시작 날짜라기는 보다 처음 읽어와야되는 data의 날짜이다. 

그리고 airflow는 모든 dag가 incremental update한다고 가정하고 만들어져있다. 따라서 start date이 11월 7일로 되어있고 daily dag이면 처음 실행되는것은 그다음날인 2020년 11월 8일이다. 그래야지 그전날 data를 온전히 읽어올 수 있다. 

만약 hourly dag라면 그 다음 시간에 실행된다. 

3번 실행된다. 08-11 02시에 처음 실행돼야 했고 이 때의 execution date은 전날인 08-10이다. 그 후로 08-12, 08-13일것이 실행 되어야한다. 3개의 밀린 job은 queue에 들어가서 차례로 실행되고 실행이 되면서 execution date값으로는 전날 값이 들어가면서 내가 코드를 안바꾸고도 동일한 코드를 가지고 backfill할 수 있게 된다.
이런 변수는 full refresh하는 경우에는 의미 없다.

'Data Engineer > 데브코스' 카테고리의 다른 글

데브코스 데이터 엔지니어링 WEEK11 WIL(1)  (0) 2024.11.19
데브코스 데이터 엔지니어링 WEEK10 WIL(2)  (0) 2024.11.19
데브코스 데이터 엔지니어링 WEEK10 WIL(1)  (0) 2024.11.19
데브코스 데이터 엔지니어링 WEEK8 WIL(2)  (0) 2024.11.19
데브코스 데이터 엔지니어링 WEEK8 WIL(1)  (0) 2024.11.07
'Data Engineer/데브코스' 카테고리의 다른 글
  • 데브코스 데이터 엔지니어링 WEEK11 WIL(1)
  • 데브코스 데이터 엔지니어링 WEEK10 WIL(2)
  • 데브코스 데이터 엔지니어링 WEEK10 WIL(1)
  • 데브코스 데이터 엔지니어링 WEEK8 WIL(2)
dev.di
dev.di
devdi 님의 블로그 입니다.
  • dev.di
    개발 블로그
    dev.di
  • 전체
    오늘
    어제
    • 분류 전체보기 (28)
      • Algorithm (9)
        • Basics (9)
      • AWS (0)
        • AWS (0)
        • SAA (0)
      • Computer Science (1)
        • OS 벼락치기 (1)
        • DB 벼락치기 (0)
      • Data Engineer (8)
        • Airflow (0)
        • Data Warehouse (0)
        • Kafka (0)
        • Spark (0)
        • 데브코스 (8)
      • Docker (0)
      • Interviews (1)
      • Network (2)
        • Physical Layer (0)
        • Data Link Layer (0)
      • OOP (3)
        • GoF (3)
      • Python (4)
        • Django (3)
        • Scraping (1)
      • Software Engineering (0)
      • Spring (0)
  • 블로그 메뉴

    • 홈
    • 태그
    • 방명록
  • 링크

  • 공지사항

  • 인기 글

  • 태그

    데이터 웨어하우스
    포트포워딩
    IPv4
    sql
  • 최근 댓글

  • 최근 글

  • hELLO· Designed By정상우.v4.10.0
dev.di
데브코스 데이터 엔지니어링 WEEK10 WIL(3)
상단으로

티스토리툴바