본문 바로가기
대외활동/DateEngineering Zoom Camp

[DE-Zoomcamp] 7-2. Kafka → Flink → PostgreSQL Streaming Pipeline 실습

by 드인 2026. 3. 7.

이 글은 Data Engineering Zoomcamp의 Module 7: Streaming 를 바탕으로
Kafka, Flink, PostgreSQL을 이용한 실시간 데이터 파이프라인 구축 과정을 정리한 글입니다.


목차

  1. 실습 목표
  2. 실습 환경 구성
  3. Kafka Producer 구현
  4. Kafka Consumer 구현
  5. Kafka 데이터를 PostgreSQL에 저장
  6. Flink Streaming Job 실행
  7. Flink Job 실행 및 확인
  8. Window Aggregation 실습
  9. Late Event 처리 실험
  10. 실습 정리
  11. 참고 자료 / 출처


1. 실습 목표

  • 목표: Streaming 데이터 파이프라인 구축
  • 구성할 데이터 흐름
Producer (Python)
↓
Kafka (Redpanda)
↓
Apache Flink
↓
PostgreSQL

 

 

실습에서는 NYC Yellow Taxi 데이터를 이용해 Kafka로 데이터를 전송하고, 
Flink가 이를 처리해 PostgreSQL에 저장하는 Streaming Pipeline을 구성합니다.


구성 요소 역할
Producer 이벤트 생성
Kafka 이벤트 저장 및 전달
Flink 실시간 데이터 처리
PostgreSQL 결과 저장

2. 실습 환경 구성

실습 환경은 Docker 기반으로 구성됩니다.

  • Redpanda (Kafka-compatible broker)
  • PostgreSQL
  • Apache Flink JobManager
  • Apache Flink TaskManager

Docker Compose를 이용해 모든 서비스를 실행합니다.

Redpanda 설정

Redpanda는 Kafka 프로토콜을 지원하는 Message Broker입니다.

 

Kafka 대신 사용하는 이유

  • JVM 없이 실행 가능
  • ZooKeeper 필요 없음
  • 단일 binary 실행
redpanda:
image: redpandadata/redpanda:v25.3.9
ports:
- 9092:9092
- 29092:29092

 

Kafka 연결 포트

포트 용도
9092 외부 Kafka 연결
29092 Docker 내부 Kafka 연결

PostgreSQL 설정

Streaming 처리 결과를 저장하기 위한 데이터베이스입니다.

postgres:
image: postgres:18
environment:
POSTGRES_DB: postgres
POSTGRES_USER: postgres
POSTGRES_PASSWORD: postgres
ports:
- 5432:5432
 

 

Flink JobManager / TaskManager

Flink는 두 가지 구성 요소로 실행됩니다.

구성 요소 역할
JobManager 작업 관리 및 스케줄링
TaskManager 실제 데이터 처리

JobManager UI에서 실행 중인 Streaming Job을 확인할 수 있습니다.

http://localhost:8081
 

 


3. Kafka Producer 구현

Streaming 파이프라인의 시작인 Producer는 데이터를 Kafka Topic으로 전송합니다.

실습에서는 NYC Yellow Taxi 데이터를 사용합니다.

3.1 데이터 준비

NYC Taxi Dataset을 다운로드합니다.

import pandas as pd

url = "https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2025-11.parquet"

columns = [
'PULocationID',
'DOLocationID',
'trip_distance',
'total_amount',
'tpep_pickup_datetime'
]

df = pd.read_parquet(url, columns=columns).head(1000)
 

1000개의 샘플 데이터를 Kafka로 전송합니다.

3.2 Ride 데이터 모델 정의

Streaming 메시지는 dataclass 구조로 정의합니다.

from dataclasses import dataclass

@dataclass
class Ride:
PULocationID: int
DOLocationID: int
trip_distance: float
total_amount: float
tpep_pickup_datetime: int
 

각 Ride 객체는 Taxi Trip 이벤트를 의미합니다.

3.3 Kafka Producer 구현

KafkaProducer를 이용해 데이터를 Kafka로 전송합니다.

Kafka는 binary data를 사용하므로 JSON serialization이 필요합니다.

import json
from kafka import KafkaProducer

def json_serializer(data):
return json.dumps(data).encode("utf-8")

producer = KafkaProducer(
bootstrap_servers=["localhost:9092"],
value_serializer=json_serializer
)
 

Kafka topic으로 메시지를 전송합니다.

producer.send("rides", value=ride_data)
producer.flush()
 

Producer 실행

uv run python src/producers/producer.py

4. Kafka Consumer 구현

Kafka Consumer는 Kafka Topic에서 메시지를 읽습니다.

Consumer는 Producer와 반대로 JSON → Python 객체 변환을 수행합니다.

4.1 Kafka Consumer 설정

Consumer 설정

from kafka import KafkaConsumer

consumer = KafkaConsumer(
"rides",
bootstrap_servers=["localhost:9092"],
auto_offset_reset="earliest",
group_id="rides-console"
)
 
옵션 설명
auto_offset_reset 메시지 읽기 시작 위치
group_id Consumer 그룹

4.2 메시지 처리 테스트

Kafka 메시지를 읽어 출력합니다.

for message in consumer:
print(message.value)
 

Producer에서 보낸 메시지를 Consumer가 정상적으로 읽으면, Kafka Streaming Pipeline이 정상 동작하는 것입니다.


5. Kafka 데이터를 PostgreSQL에 저장

  • 이제 Kafka 데이터를 데이터베이스에 저장합니다.

5.1 PostgreSQL 테이블 생성

CREATE TABLE processed_events (
PULocationID INTEGER,
DOLocationID INTEGER,
trip_distance DOUBLE PRECISION,
total_amount DOUBLE PRECISION,
pickup_datetime TIMESTAMP
);

 

5.2 psycopg2를 이용한 데이터 저장

Python에서 PostgreSQL에 데이터를 저장합니다.

import psycopg2

conn = psycopg2.connect(
host="localhost",
port=5432,
database="postgres",
user="postgres",
password="postgres"
)

cursor = conn.cursor()
 

Kafka 메시지를 읽어 DB에 저장합니다.

cursor.execute(
"""
INSERT INTO processed_events
(PULocationID, DOLocationID, trip_distance, total_amount, pickup_datetime)
VALUES (%s, %s, %s, %s, %s)
"""
)

6. Flink Streaming Job 실행

이제 Python Consumer 대신 Apache Flink를 사용합니다.

Flink는 다음 기능을 제공합니다.

  • Window aggregation
  • Checkpointing
  • Parallel processing
  • Stream SQL

6.1 Kafka Source Table 생성

Flink SQL로 Kafka Source Table을 정의합니다.

CREATE TABLE events (
PULocationID INT,
DOLocationID INT,
trip_distance DOUBLE,
total_amount DOUBLE,
tpep_pickup_datetime BIGINT
) WITH (
'connector' = 'kafka',
'topic' = 'rides',
'properties.bootstrap.servers' = 'redpanda:29092',
'format' = 'json'
)
 

 

6.2 PostgreSQL Sink Table 생성

Flink에서 PostgreSQL로 데이터를 저장합니다.

CREATE TABLE processed_events (
PULocationID INT,
DOLocationID INT,
trip_distance DOUBLE,
total_amount DOUBLE,
pickup_datetime TIMESTAMP
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:postgresql://postgres:5432/postgres'
)

6.3 Flink SQL을 이용한 Streaming Pipeline

Streaming SQL

INSERT INTO processed_events
SELECT *
FROM events
 

Kafka → Flink → PostgreSQL Streaming 데이터가 실시간으로 저장됩니다.

 


7. Flink Job 실행 및 확인

Flink Job 실행

docker compose exec jobmanager ./bin/flink run -py pass_through_job.py
 

Flink Dashboard 확인

http://localhost:8081
 

 

  • Running Jobs
  • Task Slots
  • Records Received

Kafka 데이터를 생성합니다.

python src/producers/producer.py
 

PostgreSQL 확인

SELECT count(*) FROM processed_events;
 

데이터가 증가하면 Streaming Pipeline이 정상 동작합니다.


8. Window Aggregation 실습

Streaming 데이터는 시간 단위 집계가 필요하며, Flink는 Window 기능을 제공합니다.

8.1 Aggregation 테이블 생성

CREATE TABLE processed_events_aggregated (
window_start TIMESTAMP,
PULocationID INTEGER,
num_trips BIGINT,
total_revenue DOUBLE PRECISION
);

 

8.2 Tumbling Window 집계

1시간 단위 집계

 
SELECT
window_start,
COUNT(*) AS num_trips
FROM TABLE(
TUMBLE(TABLE events, DESCRIPTOR(event_time), INTERVAL '1' HOUR)
)
GROUP BY window_start
 

결과

1 hour window aggregation

9. Late Event 처리 실험

Streaming 시스템에서는 이벤트가 늦게 도착할 수 있으며, 이를 Late Event라고 합니다.

event time: 10:00
arrival time: 10:05

9.1 Watermark 동작 확인

Flink에서는 Watermark를 이용해 Late Event를 처리합니다.

WATERMARK FOR event_timestamp
AS event_timestamp - INTERVAL '5' SECOND
 

이는 5초까지 늦은 이벤트 허용한다는 것을 의미합니다.

9.2 Aggregation 업데이트 확인

Late Event가 발생하면 기존 집계 결과가 업데이트됩니다. 

이를 위해 Aggregation 테이블에 Primary Key를 설정합니다.


10. 실습 정리

이번 실습에서는 다음 Streaming Pipeline을 구축했습니다.

NYC Taxi Dataset
↓
Python Producer
↓
Kafka (Redpanda)
↓
Apache Flink
↓
PostgreSQL
 

실습을 통해 다음 개념을 이해할 수 있습니다.

  • Kafka 기반 Streaming 데이터 전달
  • Flink 기반 실시간 데이터 처리
  • Window 기반 데이터 집계
  • Watermark 기반 Late Event 처리

해당 파이프라인는 실제 데이터 플랫폼에서도 자주 사용되는 Streaming 데이터 파이프라인 구조입니다.


10. 참고자료 / 출처