Airflow 면접 질문 - Airflow myeonjeob jilmun

티스토리 뷰

프로그래머 면접 신입 편 바로가기

짧은 경력이지만, 인터뷰어로 참여한 경험을 바탕으로 안타까운 구직자들의 현실을 기록합니다.

제가 다 했습니다.

  • 프로젝트 하면서 본인의 모듈이 아닌 부분까지 모조리 본인이 개발했다고 이력서에 기재하는 스타일
  • 나아가 타 팀의 프로젝트나, 주워 들은 이야기 까지 동원되는 경우도 있음
  • 모듈에 대해 상세히 물어보기 시작하면, 그 부분은 본인 파트가 아니었다고 이야기 함 (그럼 이력서에 솔직하게 프로젝트에서 본인의 파트는 뭐였다고 써놨어야지..)

네. 해봤습니다.

  • 어느 수준까지 해봤냐고 물으면 대답 못하는 스타일
  • 예를 들어 커널 컴파일 직접 해봤냐고 물어보면 주절주절 대답 함. 하지만 정작 이미 존재하는 Makefile에 make 타이핑 해본게 끝.

저는 당당합니다.

  • 모든 분야에 박식한것 처럼 당당한 스타일. 
  • 커널까지 오가며 다양한 기기와 소켓 통신을 했다길래 엔디안 질문했더니 당황 함. (뭔지도 모르는 눈치)
  • 때로는 솔직한게 좋지 않을까?

시키는 일만 했던 스타일

  • 왜 해야하는지 모르고 돌격하는 스타일
  • 예를 들어 32bit 모듈을 64bit로 포팅했다길래 32/64bit 무슨 차이가 있냐고 물으니 그냥 64bit가 더 좋다는 대답이 돌아옴. 
  • 왜 64bit로 포팅했냐고 물을 껄 그랬나..

저 없으면 회사 안돌아가지 말입니다.

  • 본인이 회사에 엄청난 기여를 하고 있다고 말하는 스타일
  • "본인이 퇴사한다고 하면 회사에서 붙잡지 않겠어요?" 라는 질문에 놔줄수 밖에 없을 것이라는 대답을 함
  • 능력대비 많은 급여를 바라니 회사에서는 보내줄 수 밖에..
  • 현실은 당신 없어도 회사 잘 돌아갈 것 같음.

Q : 자기소개 좀 해주세요.

  • A : 나는 아무개입니다. 더 뭘 소개할까요?
  • 정말 엄청난 고수 느낌의 짤막한 답변이었지만. 아는것도 없고 인터뷰 진행중인 회사가 뭐하는 회사인지도 모름.
  • 이력서는 꼴랑 2장...

연봉은 많이 주세요

  • "현재 회사에서 몇년째 동결입니다. 그러니 그 동결된 만큼 계산해서 많은 돈을 주세요."
  • 이직하면서 연봉을 어느정도 올리는 것에는 동의하지만, 왜 동결된 연봉을 여기와서 찾으세요?

지금 이 시간에도 구직 활동으로 정신 없을 많은 프로그래머가..

부디 본인에게 맞는 회사에 취직해서 능력을 백 분 발휘하기를 기대합니다.

어제 데이터 엔지니어 직무로 면접을 봤다.
면접 질문 중에 내가 제대로 대답하지 못한 부분들에 대해 면접 복기를 해보면서 다시 공부해봤다.
면접 보기 전에는 남들의 지식을 바탕으로 학습을 했다면, 지금은 내가 모든 것을 뜯어볼 시간이다.


  1. S3의 key가 어떻게 생겼는지 아는가?
    • 아직 모르겠다. 이번주 안으로 S3를 활성화해서 어떻게 데이터가 저장되는지 확인해볼 계획이다.
  2. MySQLToS3Operator는 key가 존재하면 반드시 에러를 발생시킨다고 했는데, 그 이유에 대해서 생각해 봤는가?
    • S3를 공부하면 더 확실해지겠지만, 동일한 key를 갖는 S3 bucket이 존재해서 그런 것으로 추정된다.
    • 그리고 key가 존재한다고 하더라도, replace 파라미터를 True로 주면 bucket을 갱신할 수 있다.
  3. MySQLToS3Operator가 어떻게 동작하는지 아는가?
    • 지금부터 확인해보려고 한다.

Airflow 면접 질문 - Airflow myeonjeob jilmun

나는 MySQLToS3Operator를 사용할때는 S3 key가 있으면 MySQLToS3Operator가 동작하지 않는다고 생각했었다..

그런데 오늘 소스코드를 뜯어보니 replace 파라미터가 있었고, replace=True이면 S3 key가 있어도 정상적으로 동작하는 것을 알게 되었다.


Version - 3.3.0

1. airflow.providers.amazon.aws.transfers.mysql_to_s3

A. 공식 Document

airflow.providers.amazon.aws.transfers.mysql_to_s3 — apache-airflow-providers-amazon Documentation


위 document를 보면 This class is deprecated. Please use airflow.providers.amazon.aws.transfers.sql_to_s3.SqlToS3Operator.라고 적혀있다.
이 클래스는 사용되지 않으니, SqlToS3Operator를 사용하라고 한다.

B. Source Code

path: airflow\providers\amazon\aws\transfers\mysql_to_s3.py

import warnings
from typing import Optional

from airflow.exceptions import AirflowException
from airflow.providers.amazon.aws.transfers.sql_to_s3 import SqlToS3Operator

warnings.warn(
    "This module is deprecated. Please use airflow.providers.amazon.aws.transfers.sql_to_s3`.",
    DeprecationWarning,
    stacklevel=2,
)


class MySQLToS3Operator(SqlToS3Operator):
    """
    This class is deprecated.
    Please use `airflow.providers.amazon.aws.transfers.sql_to_s3.SqlToS3Operator`.
    """

    template_fields_renderers = {
        "pd_csv_kwargs": "json",
    }
    
    def __init__(  
    self,  
    *,  
    mysql_conn_id: str = 'mysql_default',  
    pd_csv_kwargs: Optional[dict] = None,  
    index: bool = False,  
    header: bool = False,  
    **kwargs,  
    ) -> None:

    ...
    ...
    ...
      
    super().__init__(sql_conn_id=mysql_conn_id, **kwargs)

소스코드를 열어보니 This module is deprecated. Please use airflow.providers.amazon.aws.transfers.sql_to_s3이라는 warning 메시지가 보인다.

그리고 MySQLToS3Operator class가 SqlToS3Operator를 상속받고 있는 것을 알 수 있다.
부모 클래스로 매개변수 값들을 다 넘기는 걸 보면, 내가 찾고자 하는 정보는 여기에 없는 것 같다.

2. airflow.providers.amazon.aws.transfers.sql_to_s3

A. 공식 Document

airflow.providers.amazon.aws.transfers.sql_to_s3 — apache-airflow-providers-amazon Documentation


a. Classes

  • SqlToS3Operator
    • Saves data from a specific SQL query into a file in S3.

b. Attributes

  • FILT_FORMAT
  • FileOptions
  • FILE_OPTIONS_MAP

c. Inheritance Structure

  • airflow.models.BaseOperator
    • airflow.providers.amazon.aws.transfers.sql_to_s3

d. more information

SQL to Amazon S3 Transfer Operator — apache-airflow-providers-amazon Documentation


SqlToS3Operator를 사용하여 SQL server로부터 Amazon S3 file로 data를 copy한다.

SQL hook이 SQL 결과를 pandas dataframe으로 변환하는 function이 있는 한, 어느 SQL connection도 SqlToS3Operator는 호환된다.

ㄱ) Prerequisite Tasks

  • AWS Console이나 AWS CLI를 사용하여 필요한 자원을 생성하라.
  • pip install 'apache-airflow[amazon]'을 설치하라.
  • Setup Connection.

ㄴ) MySQL to Amazon S3

MySQL query에 대한 응답을 Amazon S3 file로 보내는 예제


sql_to_s3_task = SqlToS3Operator(
    task_id="sql_to_s3_task",
    sql_conn_id="mysql_default",
    query=SQL_QUERY,
    s3_bucket=S3_BUCKET,
    s3_key=S3_KEY,
    replace=True,
)

e. Parameters

query: str

  • 실행될 sql query
  • 만약 절대 경로에 위치한 file을 실행하려면, .sql 확장자로 끝나야 한다.

s3_bucket: str

  • data가 저장될 bucket

s3_key: str

  • 파일에 필요한 key
  • 파일의 이름이 포함된다.

replace: bool

  • 만약 S3에 file이 존재한다면 교체여부.

sql_conn_id: str

  • 특정 database를 참조.
  • MySQL, Postgre ...

parameters: (Union[None, Mapping, Iterable])

  • SQL query를 render할 parameter 지정.

aws_conn_id: str

  • 특정 S3 connection을 참조.

verify: Optional[Union[bool, str]]

  • S3 connection의 SSL certificates 확인 여부
  • default로 SSL certificates가 확인된다.
  • False
  • path/to/cert/bundle.pem

file_format: typing_extensions.Literal[csv, json, parquet]

  • 대상 파일 형식, 오직 csv, json, parquet만 허용된다.
  • S3에 저장될 파일 형식

pd_kwargs: Optional[dict]

  • DataFrame .to_parquet(), .to_json(), .to_csv()가 포함될 인자들.

f. Field

template_fields: Sequence[str]

['s3_bucket', 's3_key', 'query']

template_ext: Sequence[str]

['.sql']

template_fields_renderers

[
    "query": "sql",  
    "pd_csv_kwargs": "json",  
    "pd_kwargs": "json",
]

g. Method

execute(self, context)

이것은 operator를 생성할때 파생되는 main method이다.

Context는 jinka templates를 rendering할 때와 동일한 dictionary이다.

더 많은 context는 get_template_context를 참고하라.

이 context가 pythonOperator에서 python_callable로 입력되는 함수의 **context parameter와 동일한 것 같다.

B. Source Code

a. Attributes

FILT_FORMAT


FILE_FORMAT = Enum(  
    "FILE_FORMAT",  
    "CSV, PARQUET",  
)  

FileOptions


FileOptions = namedtuple('FileOptions', ['mode', 'suffix'])  
  

FILE_OPTIONS_MAP


FILE_OPTIONS_MAP = {  
    FILE_FORMAT.CSV: FileOptions('r+', '.csv'),  
    FILE_FORMAT.PARQUET: FileOptions('rb+', '.parquet'),  
}

b. Classes

SqlToS3Operator


class SqlToS3Operator(BaseOperator):
    ...
    ...
    ...
    

BaseOperator를 상속받는다.

c. Method

execute


def execute(self, context: 'Context') -> None:
    sql_hook = self._get_hook()
    s3_conn = S3Hook(aws_conn_id=self.aws_conn_id, verify=self.verify)
    data_df = sql_hook.get_pandas_df(sql=self.query, parameters=self.parameters)
    self.log.info("Data from SQL obtained")

    self._fix_int_dtypes(data_df)
    file_options = FILE_OPTIONS_MAP[self.file_format]

    with NamedTemporaryFile(mode=file_options.mode, suffix=file_options.suffix) as tmp_file:

        if self.file_format == FILE_FORMAT.CSV:
            data_df.to_csv(tmp_file.name, **self.pd_kwargs)
        else:
            data_df.to_parquet(tmp_file.name, **self.pd_kwargs)

        s3_conn.load_file(
            filename=tmp_file.name, key=self.s3_key, bucket_name=self.s3_bucket, replace=self.replace
        )
        

execute method는 BaseOperator class의 execute를 overriding한 것이다.
BaseOperator class의 execute method는 raise NotImplementedError로 작성되어서 execute method를 overriding하여 구현하지 않으면 에러가 발생한다.


  1. sql_hook = self._get_hook()
    • Database의 hook을 얻는다.
      • self.sql_conn_id를 사용
  2. s3_conn = S3Hook(aws_conn_id=self.aws_conn_id, verify=self.verify)
    • S3의 hook을 얻는다.
      • self.aws_conn_id를 사용
  3. data_df = sql_hook.get_pandas_df(sql=self.query, parameters=self.parameters)
    • Database의 hook을 통해 query를 실행하고 DataFrame 형태로 반환받는다.
  4. self._fix_int_dtypes(data_df)
    • null 값에 대해 처리한다.
  5. file_options = FILE_OPTIONS_MAP[self.file_format]
    • 파일 종류에 따라 어떤 방식으로 추출할지 정하는 부분으로 추정된다.
  6. s3_conn.load_file(filename=tmp_file.name, key=self.s3_key, bucket_name=self.s3_bucket, replace=self.replace)
    • 위에서 얻은 csv or parquet 파일을 S3에 업로드한다.
    • airflow - s3

_get_hook


def _get_hook(self) -> DbApiHook:  
    self.log.debug("Get connection for %s", self.sql_conn_id)  
    conn = BaseHook.get_connection(self.sql_conn_id)  
    hook = conn.get_hook()  
    if not callable(getattr(hook, 'get_pandas_df', None)):  
        raise AirflowException(  
            "This hook is not supported. The hook class must have get_pandas_df method."  
        )  
    return hook

Database의 connection을 얻어오고 hook을 얻어온다.

얻어온 hook을 반환한다.
반환되는 type은 DbApiHook이다.

execute Method에서 호출된다.

_fix_int_dtypes


@staticmethod  
def _fix_int_dtypes(df: pd.DataFrame) -> None:  
    """Mutate DataFrame to set dtypes for int columns containing NaN values."""  
    for col in df:  
        if "float" in df[col].dtype.name and df[col].hasnans:  
            # inspect values to determine if dtype of non-null values is int or float  
            notna_series = df[col].dropna().values  
            if np.equal(notna_series, notna_series.astype(int)).all():  
                # set to dtype that retains integers and supports NaNs  
                df[col] = np.where(df[col].isnull(), None, df[col])  
                df[col] = df[col].astype(pd.Int64Dtype())  
            elif np.isclose(notna_series, notna_series.astype(int)).all():  
                # set to float dtype that retains floats and supports NaNs  
                df[col] = np.where(df[col].isnull(), None, df[col])  
                df[col] = df[col].astype(pd.Float64Dtype())
                

staticmethod라서 클래스 변수인 self를 사용하지 않았다.

따로 return값은 없는데, 여기에서 pandas의 DataFrame은 mutable한 성질이 있음을 알 수 있다.

참고

python - Pandas DataFrame mutability - Stack Overflow