상세 컨텐츠

본문 제목

Chapter 10. Batch Processing(일괄 처리)

Log.Develop/DDIA

by bluayer 2022. 6. 10. 10:32

본문

소개

본 글은 데이터 중심 어플리케이션(마틴 클레프만)으로 스터디하며 해당 책의 내용을 요약 정리한 내용입니다.

https://github.com/ddia-study/ddia-study

 

GitHub - ddia-study/ddia-study: 데이터 중심 어플리케이션 설계

데이터 중심 어플리케이션 설계. Contribute to ddia-study/ddia-study development by creating an account on GitHub.

github.com

 

서론

  • 서비스(온라인 시스템)
  • 일괄 처리 시스템(오프라인 시스템) : 처리량이 대표적인 지표
  • 스트림 처리 시스템 : near-real-time processing / nearline processing

 

유닉스 도구로 일괄 처리하기

단순 로그 분석

실제로 많은 데이터 분석이 수 분 내에 awk, sed, grep, sort, uniq, xargs의 조합으로 결과를 얻을 수 있고 놀라울 정도로 잘 수행된다.

 

정렬 대 인메모리 집계

  • URL 해시 테이블을 메모리에 유지
  • 유닉스 연쇄 명령

 

중소 규모의 웹사이트 대부분은 고유 URL과 해당 URL 카운트를 대략 1GB 메모리에 담을 수 있다.

반면 허용 메모리보다 작업 세트가 크다면 정렬 접근법을 사용하는 것이 좋다.

먼저 데이터 청크를 메모리에 정렬하고 청크를 세그먼트 파일로 디스크에 저장한다.

그다음 각각 정렬된 세그먼트 파일 여러 개를 한 개의 큰 정렬 파일로 병합한다.

 

유닉스 철학

유닉스 파이프 : 다른 방법으로 데이터 처리가 필요할 때 정원 호스와 같이 여러 다른 프로그램을 연결하는 방법이 필요하다. 이것은 I/O 방식이기도 하다.

 

동일 인터페이스

특정 프로그램이 다른 어떤 프로그램과도 연결 가능하려면 프로그램 모두가 같은 입출력 인터페이스를 사용해야 한다는 의미다.

유닉스에서 인터페이스는 파일이다. 파일은 단지 순서대로 정렬된 바이트의 연속이다.

 

로직의 연결과 분리

파이프는 한 프로세스의 stdout을 다른 프로세스의 stdin과 연결한다. 이때 중간 데이터를 디스크에 쓰지 않고 작은 인메모리 버퍼를 사용해 프로세스 간 데이터를 전송한다.

 

투명성과 실험

  • 입력 파일은 일반적으로 불변
  • 파이프라인을 중단하고 출력을 파이프를 통해 less로 보내 원하는 형태의 출력이 나오는지 확인 가능
  • 특정 파이프라인의 출력을 파일에 쓰고 그 파일을 다음 단계의 입력으로 사용할 수 있다.

 

유닉스 도구를 사용하는 데 가장 큰 제약은 단일 장비에서만 실행된다는 점이다.

바로 이 점이 하둡 같은 도구가 필요한 이유다.

 

맵리듀스와 분산 파일 시스템

맵리듀스는 유닉스 도구와 비슷한 면이 있지만 수천 대의 장비로 분산해서 실행ㅎ이 가능하다.

맵리듀스 작업은 분산 파일 시스템 상의 파일을 입력과 출력으로 사용한다.

HDFS = 비공유 원칙

NAS, SAN에서 사용하는 공유 디스크와는 반대

NameNode라는 중앙 서버는 특정 파일 블록이 어떤 장비에 저장됐는지 추적한다.

장비가 죽거나 디스크가 실패하는 경우를 대비하기 위해 파일 블록은 여러 장비에 복제된다. (단순 복제 혹은 삭제 코딩)

RAID랑 비슷하지만 파일의 접근과 복제가 특별한 하드웨어 장치 없이 평범한 데이터 센터 네트워크 상에서 이뤄진다는 점이다.

 

맵리듀스 작업 실행하기

  1. 입력 파일을 읽는다. 레코드로 쪼갠다.
  2. 각 입력 레코드마다 매퍼 함수를 호출해 키와 값을 추출한다.(Map)
  3. 키를 기준으로 키-값 쌍을 모두 정렬한다.
  4. 정렬된 키-값 쌍 전체를 대상으로 리듀스 함수를 호출한다. (Reduce)

 

2, 4단계는 사용자가 직접 작성해야 한다.

  • Mapper : 매퍼는 모든 입력 레코드마다 한 번씩만 호출하며, 레코드로부터 키와 값을 추출하는 작업이다. 각 레코드는 독립 처리된다.
  • Reducer : 매퍼가 생산한 키-값 쌍을 받아 같은 키를 가진 레코드를 모으고 출력 레코드를 생산한다. 예시로 동일 URL의 출현 횟수가 있다.

 

맵리듀스의 분산 실행

유닉스 명령어 파이프라인과의 가장 큰 차이점은 맵리듀스가 병렬로 수행하는 코드를 직접 작성하지 않고도 여러 장비에서 동시에 처리가 가능하다는 점이다.

작업 입력으로 HDFS 상의 디렉터리를 사용하는 것이 일반적이고, 입력 디렉터리 내 각 파일 또는 파일 블록을 독립된 맵 태스크에서 처리할 독립 파티션으로 간주한다.

데이터 가까이서 연산하기 : 각 매퍼 입력 파일의 복제본이 있는 장비에 RAM, CPU가 충분하면 해당 작업을 수행. 네트워크 부하 감소, 지역성 증가의 이점.

리듀서 측 연산도 파티셔닝 되며, 리듀서 태스크 수는 사용자가 설정한다.

같은 키면 같은 리듀서에서 처리됨을 보장하는데, 특정 키-값 쌍이 어느 리듀스 태스크에서 수행될지 결정하기 위해 키의 해시값을 사용한다.

리듀서를 기준으로 파티셔닝 하고 정렬한 뒤 매퍼로부터 데이터 파티션을 복사하는 과정을 "셔플"이라고 한다.

 

Q. 셔플 작업은 매퍼에서 하는 건가? 아님 프레임워크?

셔플은 논리적인 과정을 일컫는 말이다. 따라서 주체라고 할 곳이 없다.

 

맵리듀스 워크플로

맵리듀스 작업 하나로 해결할 수 있는 문제의 범위는 제한적이다.

따라서 맵리듀스 작업을 연결해 workflow로 구성하는 방식은 꽤 일반적이다.

맵리듀스 작업 하나의 출력을 다른 맵리듀스 작업의 입력으로 사용하는 식이다.

선행 작업이 완전히 끝나야만 다음 작업을 시작할 수 있다.

(스케줄러 : Oozie, Azkaban, Luigi, Airflow, Pinball)

(하둡용 고수준 도구 : Pig, Hive, Cascading, Crunch, FlumeJava)

 

리듀스 사이드 조인과 그룹화

한 레코드가 다른 레코드와 연관이 있는 것은 일반적 : FK, Document Reference

조인을 할 때 여러 색인을 확인해야 할 수도 있음.

하지만 맵 리듀스에는 색인 개념이 없다.

맵리듀스에 파일 집합이 입력으로 주어졌다면, Full table scan 함.

 

사용자 활동 이벤트 분석 예제

일괄 처리에서 처리량을 높이기 위해서는 가능한 한 장비 내에서 연산을 수행해야 한다.

더 좋은 방법은 사용자 데이터베이스의 사본을 가져와 사용자 활동 이벤트 로그가 저장된 분산 파일 시스템에 넣는 방법이다.

 

정렬 병합 조인 (sort-merge join)

맵리듀스 프레임워크에서 키로 매퍼의 출력을 파티셔닝해 키-값 쌍으로 정렬한다면 같은 사용자의 활동 이벤트와 사용자 레코드는 리듀서의 입력으로 서로 인접해서 들어간다.

보조 정렬 : 리듀서가 항상 사용자 DB를 먼저 보고 활동 이벤트를 시간 순으로 보게 하는 식으로 맵리듀스에서 작업 레코드를 재배열하기도 한다.

리듀서는 특정 사용자 ID의 모든 레코드를 한 번에 처리하므로 한 번에 사용자 한 명의 레코드만 메모리에 유지하면 되고 네트워크로 아무 요청도 보낼 필요가 없다.

 

같은 곳으로 연관된 데이터 가져오기

병합 정렬 조인 중 매퍼와 정렬 프로세스는 특정 사용자 ID로 조인 연산을 할 때 필요한 모든 데이터를 한 곳으로 모은다.

그래서 사용자 ID 별로 리듀서를 한 번만 호출한다.

맵리듀스는 모든 네트워크 통신을 직접 관리하기 때문에 특정 장비가 죽어도 고민할 필요가 없다.

 

그룹화

맵리듀스로 그룹화 연산을 구현하는 가장 간단한 방법은 매퍼가 키-값 쌍을 생성할 떄 그룹화할 대상을 키로 하는 것이다.

맵리듀스 위에서 그룹화와 조인의 구현은 상당히 유사하다.

 

쏠림 다루기

키 하나에 너무 많은 데이터가 연관된다면 "같은 키를 가지는 모든 레코드를 같은 장소로 모으는" 패턴은 제대로 작동하지 않는다.

이런 레코드를 linchpin object 또는 hot key라고 한다.

핫스팟 완화

Pig : skewed join(핫키 판단을 위해 샘플링하고, 조인 수행 시 임의로 선택한 리듀서로 보냄. 핫 키를 여러 리듀서에 퍼뜨려서 처리하게 하는 방법)

Hive : 테이블 메타데이터에 핫 키를 명시적으로 지정하고 관련 레코드를 따로 저장한다. 해당 테이블에서 조인할 때 핫 키를 가지는 레코드는 map-side join을 사용해 처리한다.

 

map-side join

reduce-side join : 조인 로직을 리듀서에서 수행. 입력 데이터에 대한 가정이 필요 없고, 리듀서 입력을 병합해야 함.

map-side-join : 입력 데이터에 대한 특정 가정이 필요. 조인을 더 빠르게 수행

 

브로드캐스트 해시 조인

맵사이드 조인은 "각 매퍼 메모리에 적재 가능한 정도로" 작은 데이터셋과 매우 큰 데이터셋을 조인하는 경우에 가장 간단하게 적용해볼 수 있다.

작은 입력을 큰 입력의 모든 파티션에 "브로드캐스트"한다.

 

파티션 해시 조인(partitioned hash join/bucketed map join)

조인의 입력을 파티셔닝 한다면 조인을 각 파티션에 독립적으로 적용할 수 있다.

파티셔닝을 제대로 했다면 조인할 레코드 모두가 같은 번호의 파티션에 위치하게 된다.

그래서 각 매퍼는 각 입력 데이터셋 중 파티션 한 개만 읽어도 충분하다.

 

맵 사이드 병합 조인

입력 데이터셋이 같은 방식으로 파티션이 됐을 뿐 아니라 같은 키를 기준으로 정렬됐다면 변형된 맵사이드 조인을 적용할 수 있다.

리듀서에서 일반적으로 수행하는 것과 동일한 병합 연산을 수행할 수 있다.

 

일괄 처리 워크플로의 출력

검색 색인 구축

검색 엔진 색인 구축을 위해 구글에서 맵리듀스 사용.

정해진 문서 집합을 대상으로 전문 검색이 필요하다면 일괄 처리가 색인을 구축하는 데 매우 효율적이다.

매퍼는 필요에 따라 문서 집합을 파티셔닝 하고 각 리듀서가 해당 파티션에 대한 색인을 구축한다.

 

Q. 색인 추출에 가깝게 처리한다는 건가?

그렇다. 도큐먼트들을 입력 파일로 넣고 맵리듀스를 통해 색인 추출을 해내는 것.

 

일괄 처리의 출력으로 키-값을 저장

분류기 같은 머신러닝 시스템(스팸 필터, 이상 검출, 이미지 인식 등)을 구축하거나 추천 시스템을 구축할 수도 있다.

일괄 처리 값을 저장해야 하는데 이를 DB에 태우면 각종 문제가 생길 수 있다. (DB 과부하, 네트워크 요청으로 인한 성능 저하 등등)

그래서 내부에 일괄 처리 작업 내부에 데이터베이스 파일을 생성해서 저장한다.
(Voldmort, Terrapin, ElephantDB, HBase Bulk loading)

 

일괄 처리 출력에 관한 철학

  • 코드에 버그가 있어 출력이 잘못되거나 오염됐다면 코드를 이전 버전으로 돌리고 작업을 재수행해 간단하게 출력을 고칠 수 있다. (인적 내결함성)
  • minimizing irreversibility
  • 연결 작업과 로직을 분리.

 

하둡 vs 분산 데이터베이스

MPP(Massively Parallel Processing, 분석 SQL을 병렬 질의에 초점) vs MapReduce & Distributed File System(아무 프로그램이나 실행할 수 있는 운영체제 같은 속성)

 

저장소의 다양성

HDFS는 데이터가 어떤 형태여도 상관없음(Data Lake, Datawarehouse와 연관)

  • sushi principle
  • ETL

 

처리 모델의 다양성

머신러닝, 추천 시스템, 전문 검색 색인 등 범용적인 데이터 처리 모델이 필요한 경우가 있다.

맵리듀스를 이용하면 자신이 작성한 코드를 대용량 데이터셋 상에서 쉽게 실행할 수 있다.

 

빈번하게 발생하는 결함을 줄이는 설계

맵리듀스는 실패를 견딜 수 있고, 개별 태스크 수준에서 작업을 재수행한다.

또한 데이터를 되도록 디스크에 기록하려 한다.

일괄 처리 작업은 우선순위가 낮을 수 있는데, 온라인 서비스에서 쓰고 남은 자원을 모아 연산을 수행할 수 있게 되어 있다.

 

맵리듀스를 넘어

일괄 처리 방법의 대안?

 

중간 상태 구체화

단순히 한 작업에서 다른 작업으로 데이터를 옮기는 "중간 상태(Intermediate state)"가 존재한다.

중간 상태를 파일로 쓰는 과정을 구체화(mateerialization)라고 하는데, 완전히 구체화하는 mapreduce는 단점이 여러 개 있다.

  • 모든 선행 작업 태스크가 종료될 때까지 기다려야 해서 느려질 수 있음
  • 매퍼가 종종 중복될 수 있음
  • 임시 데이터가 여러 장비에 복제될 수 있음

 

데이터플로 엔진

Spark, Flink, Tez

전체 워크플로를 독립된 하위 작업으로 나누지 않고 작업 하나로서 다룬다.

여러 처리 단계를 통해 데이터 흐름을 명시적으로 모델링하기 때문에 데이터 플로 엔진이라 부른다.

맵과 리듀스에 기반해서 만든 더 유연한 연산자를 쓸 수 있다.

  • 정렬은 필요할 때만
  • 필요 없는 맵 태스크 없음
  • 지역성 최적화 가능
  • 중간 상태를 잘 활용해서 I/O가 훨씬 적게 듦
  • 선행 단계 끝나기를 기다릴 필요 없음
  • 각 태스크마다 JVM 구동 필요 없음

 

내결함성

데이터플로 엔진은 매번 파일 쓰기가 아니라 유효한 데이터로부터 계산을 다시함.

주어진 데이터가 어떻게 연산됐는지를 추적. (RDD 추상화, 체크포인트 등)

그러나 재연산시 다른 결괏값이 나올 수 있어서, 비결정적 원인을 제거해야 함(매번 다른 난수 등)

Trade-off를 잘 고려하자.

 

그래프와 반복 처리

그래프 분석 알고리즘 : PageRank

그래프를 저장하기도 한다. => 정점, 간선 목록 포함된 파일

그래프를 맵리듀스 방식으로 처리하려면 계속 반복적으로 따라갈 간선이 없는지 등등 체크해야 함. => 비효율적

프리글 처리모델(Pregel, BSP)

BSP(Bulk Synchronous parallel, Pregel 모델) : 일괄 처리 그래프의 최적화 방법

한 정점은 다른 정점으로 "메시지를 보낼" 수 있다.

따라서 개별 정점에서 함수를 호출해 모든 메시지를 전달한다.

 

내결함성

메시지 전달로 통신하기 때문에 일괄처리가 가능하다.

반복이 끝나는 시점에 모든 정점의 상태를 주기적으로 체크포인트로 저장한다.

 

병렬 실행

병렬로 처리하게 되면 인접 정점 간 네트워크 통신이 빈번하게 일어날 수 있다.

따라서 대부분 단일 머신에서 처리하는 게 훨씬 성능이 높다.

 

고수준 API와 언어

도 지원한다 이제! (in 데이터플로 엔진)

 

선언형 질의 언어로 전환

하이브, 스파크, 플링크는 조인 알고리즘을 알아서 최적화해줌(질의 최적화기)

선언적인 방법으로 조인을 지정하면, 질의 최적화기가 최적의 수행 방법을 결정.

데이터 플로 엔진(선언형 질의 + 질의 최적화기) = MPP와 비슷하며 성능도 비슷

 

다양한 분야를 지원하기 위한 전문화

  • 머신러닝
  • k-nearest neighbor(공간 알고리즘)
  • 유전자 분석 알고리즘

관련글 더보기

댓글 영역