본문으로 건너뛰기
데이터 엔지니어 코스

코드 노드 — Python · SQL로 변환 작성하기

표준 노드(필터 · 집계 · 조인)가 못 끝내는 변환을 만났을 때 코드 노드로 짧은 Python 또는 SQL을 끼워 같은 파이프라인 안에서 처리합니다.

9

직전 레슨에서 만든 파이프라인은 컬럼 선택과 이름 바꾸기까지였습니다. 실제 마트 데이터셋을 만들다 보면 표준 노드 한두 개로 끝나지 않는 변환이 곧 등장합니다. 통화 단위 정규화, 부분 문자열 파싱, 외부 코드와의 일관성 보정 — 이런 일은 코드 노드로 다룹니다. 이번 레슨은 직전 파이프라인에 코드 노드 하나를 끼워 한 사이클을 다시 돌립니다.

표준 노드로 끝나는 변환과 그렇지 않은 변환

표준 변환 노드로 충분한 패턴.

  • 컬럼 선택·이름 바꾸기·단순 타입 cast
  • 단일 조건 필터 (status = 'active')
  • 단일 키 기준 집계 (sum·count·avg)
  • 두 데이터셋의 단순 join

코드 노드로 자연스럽게 옮겨가는 패턴.

  • 한 컬럼 안에 두 정보가 섞여 정규식 파싱이 필요한 경우 ("4,660 ₩"4660 + "KRW")
  • 같은 컬럼이 여러 표기로 들어오는 경우 ("Seoul" / "서울" / "SEL") 의 정규화 테이블
  • 환율·외부 코드 표 같은 준-정적 보조 자료와의 lookup 후 컬럼 추가
  • 한 행을 여러 행으로 펴거나 (explode), 여러 행을 한 행으로 묶는 (pivot) 형태 변환
  • 데이터 품질 체크 (행 수가 임계값 아래로 떨어지면 실패시키기·중복 키 탐지)

판단 기준 한 줄: 그 변환을 SQL·pandas 한두 줄로 표현했을 때 의도가 더 명료하면 코드 노드. 표준 노드를 다섯 개 이상 연쇄해야 한다면 거기서부터 보통 코드 노드가 짧습니다.

코드 자산은 두 가지 형태로 살아 있습니다

포털의 코드(Codes) 자산은 자리가 두 군데입니다.

  1. 컬렉션의 코드 자산 — 컬렉션 안에 저장된 재사용 스니펫. 여러 파이프라인에서 끌어다 쓸 수 있고 버전이 따로 관리됩니다.
  2. 파이프라인 내부의 코드 노드 — 그 파이프라인에서만 쓰는 인라인 스크립트. 짧은 변환에 적합.

이번 레슨은 (2) 인라인 코드 노드부터 시작하고, 같은 스니펫을 (1) 컬렉션 자산으로 승급시키는 패턴을 끝에서 다룹니다.

코드 노드 한 개 끼우기

직전 레슨의 파이프라인을 다시 엽니다.

  1. 노드 라이브러리 → Transform → 코드 노드 (Python) 를 Transform 노드와 Sink 노드 사이에 드래그.
  2. 기존 화살표를 끊고 Source → Transform → 코드 노드 → Sink 순서로 다시 잇습니다.
  3. 코드 노드 구성 패널에서 입력 데이터셋 별칭을 정합니다 (기본 df).
  4. 출력 데이터셋 별칭은 보통 같은 이름(df)을 그대로 두고, 본문에서 그 데이터프레임을 변형해 반환합니다.

코드 노드 본문의 형태.

def transform(df):
    # df: 직전 노드의 출력 데이터프레임
    # 반환값: 다음 노드의 입력 데이터프레임
    df["price_krw"] = (
        df["price_krw_str"]
        .str.replace(",", "", regex=False)
        .str.replace("₩", "", regex=False)
        .str.strip()
        .astype("int64")
    )
    df["currency"] = "KRW"
    return df[["order_date", "item_code", "price_krw", "currency"]]

저장 후 캔버스 우상단의 검증을 누르면 포털이 함수 시그니처와 반환 타입을 한 번 정적 분석합니다. 통과하면 녹색.

SQL 코드 노드도 같은 자리에 있습니다

같은 코드 노드 라이브러리에 코드 노드 (SQL) 도 따로 있습니다. 입력 데이터셋이 한두 개의 join·group by·window function으로 끝난다면 SQL이 짧습니다.

SELECT
  order_date,
  category,
  SUM(quantity * price_krw) AS revenue_krw,
  COUNT(DISTINCT order_id) AS order_count
FROM df
WHERE status = 'paid'
GROUP BY 1, 2

df는 직전 노드의 출력이 바인딩된 가상 테이블 이름입니다. 입력을 두 개 받으려면 코드 노드의 입력 포트를 둘로 늘리고, 두 번째 입력의 별칭(df2 등)을 SQL 안에서 그대로 참조합니다.

한 번 실행해서 무엇이 바뀌었는지 확인

다시 실행. 결과 데이터셋에서 둘만 확인.

  • price_krw 컬럼이 정수 타입으로 잡혔는가 (스키마 미리보기)
  • 입력 행 수와 출력 행 수가 의도한 만큼 일치하는가 (필터를 안 걸었다면 동일)

같은 스니펫을 컬렉션 코드 자산으로 승급

같은 정규화 로직이 두세 파이프라인에서 반복되기 시작하면 인라인 코드 노드는 부담이 됩니다. 그 시점에 다음 패턴으로 옮기세요.

  1. 코드 노드 우상단 메뉴에서 컬렉션 코드로 추출.
  2. 자산 이름(예: normalize_price_krw)과 저장 컬렉션 지정.
  3. 원래 파이프라인의 코드 노드는 그 자산을 호출하는 형태로 바뀝니다.
  4. 다른 파이프라인에서도 노드 라이브러리 → Codes → normalize_price_krw로 끌어다 씁니다.

자산화한 스니펫은 자기 페이지에서 변경 이력과 함께 보입니다. 변환 로직을 한 군데에서 고치면 다음 실행부터 모든 의존 파이프라인이 같이 갱신됩니다.

자가 점검

  • 인라인 Python 코드 노드를 하나 추가해 끝까지 실행이 갔는가
  • 같은 변환을 SQL 코드 노드로도 한 번 표현해 봤는가 (선택)
  • 같은 스니펫을 컬렉션 코드 자산으로 추출해 봤는가
  • AI 어시스턴트가 만든 첫 줄을 그대로 받지 않고 한 줄 이상 수정했는가

다음 레슨

이번 레슨의 파이프라인을 매일 정해진 시간에 자동으로 돌게 만들고, 실행 모드(overwrite·append·CDC)와 실패 시 통보 채널을 설정합니다.