안녕하세요! 라플라스테크놀로지스 데이터 엔지니어 팀의 제이든(Jaden)입니다. 커머스 데이터 분석 시스템 설계, 빅데이터 파이프라인 개발을 담당하고 있습니다.
여기에서는 데이터 처리 과정에서 안정적인 Data Lake를 구축하는 데 유용하게 사용하고 있는 도구인 Delta Lake를 소개하고, 사용자 경험을 희생하지 않으면서 엔지니어링의 이점을 활용하는 저장소 시스템을 구축하는 과정에서 겪었던 고민을 공유 해볼까 합니다.
데이터 파이프라인 설계
커머스 데이터는 REST API나 MySQL, PostgreSQL 같은 RDBMS 또는 Kafka를 통해 수집합니다. 수집된 데이터는 변환, 집계, 적재 등의 과정을 거쳐 사용자에게 의미 있는 분석 데이터로 시각화되어 대시보드로 제공됩니다.
출처 : delta.io
가공의 단계별로 처리한 데이터는 바로 다음 단계로 전달하지 않고 정해진 저장소에 저장됩니다. 이는 다음 단계의 작업이 실패했을 때 실패한 task만 재실행 하기 위함 인데요. 이러한 중간 단계의 데이터는 다른 도메인 시스템에서 활용 되기도 합니다. 예를 들어 “제품 카테고리 데이터”의 전처리 결과물은 “주문 데이터” 집계 시 JOIN을 통해 제품에 카테고리를 라벨링을 하는 데 활용되어, “일자별 카테고리별 매출 분석”을 가능하게 해 줍니다. 분석하고자하는 데이터 소스와 도메인이 다양할수록 데이터 파이프라인의 구조가 복잡해집니다.
스키마 관리의 어려움
데이터 가공의 단계별로 저장되는 데이터의 스키마는 분석의 니즈에 따라 계속 변화합니다. 하지만, 데이터 파이프라인 시스템의 스키마를 변경하는 것은 단순한 작업이 아닙니다. 예를 들어 주문 데이터의 스키마가 변경되면, 해당 데이터에 종속성을 가지고 있는 모든 시스템과 스키마를 수정해줘야 합니다. 그뿐만 아니라 과거에 적재된 데이터와 집계 연산을 해야 하는 경우에 발생하는 스키마 불일치 문제도 해결해야 합니다.
저희는 기존에 데이터 저장소로서 MySQL을 사용하였었는데요. 데이터가 증가함에 따라 일 배치를 하루 안에 처리할 수 없게 되었고, 확장 가능한 분산 데이터 저장소가 필요하게 되어 Hive를 사용했습니다. 하지만 Hive의 테이블의 ALTER는 우리가 기대했던 것과 다르게 동작했습니다. Hive의 ALTER는 메타데이터만 수정하고, 과거에 저장된 데이터의 구조를 변경하려면 다시 전체 데이터를 새롭게 만들어줘야 합니다. 스키마 수정 이후에 처리된 데이터와 과거의 데이터가 불일치하여 데이터의 정합성이 맞지 않는 경우도 발생했습니다. 이러한 스키마 변경 이력을 파악하는 것도 어려웠죠.
동시성 문제
데이터 파이프라인 스케줄링 과정에서 동시에 다수의 task가 데이터에 접근하는 경우에도 많은 문제가 발생합니다. 위에서 예시를 든 “제품 데이터 파이프라인”에서 “제품 카테고리 데이터”를 재처리하는 도중에 “주문 데이터 파이프라인”에서 JOIN을 시도한다면 수많은 카테고리가 분석에서 누락되는 결과가 생길 수 있습니다.
Delta Lake 도입
출처 : delta.io
데이터레이크 구축의 핵심은 메타데이터 관리라고 생각합니다. 잘못된 데이터가 계속 유입되거나 필요한 데이터가 어느 경로에 어떤 구조로 저장되어있는지 관리되지 않으면, 비용만 차지하는 데이터 늪이 되기도 합니다.
Delta Lake는 데이터레이크에 안정성을 제공하는 오픈소스 스토리지입니다. Spark를 개발한 UC버클리대학 AMP랩에서 설립한 Databricks라는 회사에서 주도하는 프로젝트로서, 현재 많은 글로벌 IT 기업에서 Spark와 함께 활용하고 있습니다.
Delta Lake는 parquet, json, csv와 같이 파일 저장 포맷 개념으로 생각하고 활용하면 됩니다.
아래와 같이 Spark에서 손쉽게 delta 포맷으로 데이터를 저장하면 적용은 끝입니다!
# delta write data.write.format("delta").save("/tmp/delta-table") # delta read df = spark.read.format("delta").load("/tmp/delta-table") df.show()
Delta Lake에서 모든 연산은 Append
Delta Lake는 모든 연산을 append로 동작하게 했습니다. 연산마다 매번 새로운 오브젝트를 생성하는 방식인데요. 데이터가 저장된 경로를 살펴보면, 실제 데이터는 parquet 포맷으로 저장되며 트랜잭션 로그도 함께 저장됩니다. 데이터에 연산을 수행할 때마다 parquet 파일이 추가되며 트랜잭션 로그도 같이 append 됩니다.
출처 : Databricks
이 트랜잭션 로그에는 추가된 데이터 파일의 경로와 현재 버전의 데이터가 생성된 연산 정보가 들어있습니다.
Delta Lake는 데이터 저장 포맷으로 내부에는 parquet를 사용하고 있는 것을 확인할 수 있습니다. parquet 포맷은 컬럼 기반 저장 구조로서 압축률이 높고 디스크 IO가 적다는 장점이 있습니다. parquet 파일 내부를 좀 더 자세히 들여다보면 다음과 같습니다.
Header - parquet 파일임을 명시 (magic number: "PAR1") Data - 컬럼 기반으로 실제 데이터 저장 Footer - 데이터 버전, 스키마, 파일 블록의 메타데이터
이처럼 메타데이터는 데이터와 함께 분산되어 저장되며, 트랜잭션 로그를 통해 버전 관리와 동시접근 통제가 이루어집니다.
ACID
하나의 데이터 저장소에 동시에 여러 작업이 수행될 때 delta lake에서 데이터 일관성을 보장해줍니다.
예를 들어 사용자 A가 읽기 작업을 하면, 최근 트랜잭션 로그를 확인하여 최신 버전의 파일 블록들을 사용자 A에게 전달합니다. 동시에 사용자 B가 쓰기 작업을 하고 있어도 이는 새로운 파일을 append하기 때문에 사용자 A의 읽기에는 영향이 없습니다. 사용자 B의 쓰기가 모두 완료된 시점에서 새로운 트랜잭션 로그가 추가되고 그 이후에 다른 사용자가 데이터를 조회할 때 새로운 버전의 파일이 제공됩니다.
만약 사용자 A와 사용자 B가 동시에 같은 파일 블록에 수정 작업을 하면 어떨까요? Delta Lake는 먼저 작업이 종료된 작업을 트랜잭션 로그에 추가하며, 수정 작업을 뒤늦게 완료하고 트랜잭션 로그를 추가하려는 순간 기존 버전이 변경된 것을 인식하여 새로운 작업을 실패 처리해 줍니다. 분산 스토리지 시스템에서 언제나 일관된 데이터를 보장해주기 때문에 신뢰도가 높은 데이터를 유지 할 수 있습니다.
Spark에서 overwrite를 하는 과정은 아래와 같은 순서대로 수행됩니다.
- 데이터를 저장소에서 메모리로 읽기
- 데이터 메모리에서 수정
- 데이터를 저장소에서 삭제
- 메모리에서 수정된 데이터를 저장소에 저장
여기서 만약 데이터가 저장소에서 삭제된 순간 오류가 발생하여 작업이 종료된다면, 모든 데이터가 유실되게 됩니다. 그러나 Delta Lake를 사용하면, 작업이 중간에 종료된다 해도 기존 버전의 데이터가 계속 유지되기 때문에 안정적으로 overwrite를 사용할 수 있습니다.
Schema enforcement
Delta Lake는 스키마 enforcement 기능도 제공해줍니다. 데이터 소싱 시 데이터를 전달해주는 주체는 대부분 외부입니다. 따라서 오염된 데이터가 입력된다고 해도 데이터 파이프라인 단계의 마지막 단계에서 데이터 정합성 문제가 확인되거나 최악의 경우에는 이러한 오류가 확인되지 못하고 지표에 반영되는 경우도 있을 수 있습니다.
# type mismatch error >>> df.write.format("delta").mode("append").save("/tmp/delta-table") # 실패 # type casting >>> df = df .withColumn("id", col("id").cast("integer")) >>> df.write.format("delta").mode("append").save("/tmp/delta-table") # 성공
Delta Lake에서 schema enforcement를 사용하면 기존과 다른 스키마의 데이터가 append 되는 순간 에러를 발생시키기 때문에 데이터 유입 초기 단계에서 데이터 구조 변화를 확인할 수 있고 빠른 대응이 가능해집니다.
Time travel
데이터가 바뀔 때마다 버저닝을 하는 특징을 활용해서, 데이터를 예전 버전으로 되돌릴 수 있습니다. 데이터를 잘못 처리하거나 실수로 데이터를 지웠어도 간단하게 아래와 같이 예전 데이터를 불러올 수 있습니다.
df = spark.read.format("delta").option("timestampAsOf", "2022-03-07 09:08:45.333").load("/tmp/delta-table")
데이터를 날짜별로 Dataframe에 불러와서 통계 분석 하는 방식으로 time travel을 활용할 수도 있습니다.
데이터 버전의 보존 기간은 직접 수동으로 설정할 수 있으며, vacuum 명령어를 사용하여 명시적으로 오래된 버전의 데이터를 영구 삭제해주는 기능도 제공하고 있습니다.
성능
Delta Lake는 압축, 데이터 스키핑, 캐싱 등을 활용해서 더욱더 빠른 쿼리 성능을 낼 수 있도록 시스템을 발전시키고 있습니다.
비용
AWS의 RDS는 저장 용량이 클수록 비용이 늘어나며, 시스템의 수평 확장이 어렵기 때문에, 데이터 증가에 따른 쿼리 비용 또한 크게 증가합니다. Delta Lake로 전환 시 분산 저장 스토리지인 AWS S3를 사용하였는데, S3는 상대적으로 저장 비용이 저렴하며 네트워크 사용량에 따라 비용을 청구합니다. 따라서 사용자 쿼리 레이어에는 시계열 쿼리에 최적화된 쿼리 엔진에 최종 집계된 데이터를 적재하여 이를 대시보드에 연결하는 방식으로 비용을 절감할 수 있었습니다.
마치며
데이터 기반 의사 결정은 이제 선택이 아니라 필수가 되었습니다. 기업의 운명을 결정할 수도 있는 빅데이터는 올바른 절차의 수집, 정제, 적재 파이프라인 시스템이 필요합니다. 데이터레이크는 보다 유연하고 다양한 분석을 하도록 고안된 데이터 처리 방법이지만, 데이터 처리 과정에서 오염된 데이터가 유입되거나, 일부 데이터가 누락된다면 중요한 의사 결정을 잘못하는 치명적인 문제를 야기할 수 있습니다. 데이터엔지니어는 이러한 책임감을 가지고 안정적으로 데이터를 관리하는 시스템을 설계해야 합니다. 앞으로 Delta Lake와 같은 도구를 활용하여 안정성과 성능이라는 두 마리 토끼를 모두 잡아 볼 수 있을 거라고 생각합니다.
감사합니다.