728x90
Fault Tolerance

구글이 디자인한 MapReduce에서 Fault Tolerance는 어떻게 해결하는가이다.

Hdfs를 기억해보면 datanode가 namenode한테 주기적으로 heartbeat을 보냈다. 구글이 구현한 MapReduce같은 경우에는 master가 주기적으로 ping한다. 분산시스템에서 node의 failure를 detect하는 방법은 주기적인 메시지교환 밖에 없다.

 

master가 어떤 worker가 죽은것을 알았을 때, 그 map task를 reset해서 다른 worker에서 다시 돌린다. 그러면 complete된 map task는 failure가 일어나면 재실행(re execute) 되는건데, 왜그럴까요? 각각의 mapper가 생산해낸 중간결과물(Intermediate key value)은 어디있죠? 그 mapper가 실행되는 그 node의 로컬디스크에 저장되어있다. 당연히 fail했다는 의미는 로컬디스크에 접근이 안된다. 결과적으로 봤을 때 failure를 detect했을 때, 거기에 할당되는 모든 map task들은 다시 실행된다. 

 

반면에, reduce task 끝난 다음에 죽었다. 그러면 얘네들을 굳이 다시 실행할 필요가 없다. 왜냐하면 reduce task 결과는 항상 global file system, hadoop으로 따지면 hdfs 저장되기 때문에 노드에 저장되는 것이 아니라 안정적으로 접근이 가능하다. (이미 노드를 떠난것이다. ) 의문을 가질 있는게 Intermediate key/value pair들도 hdfs 저장해두면 다시 실행할 필요가 없는데, 구글 디자이너들이 생각할 intermediate data까지 global file system 저장하는 것은 아니다고 생각했다. 왜냐하면 intermediate 라는 의미는 어차피 사라질 데이터 이기 때문이다. (reduce 끝나고 나면 중간결과물이 필요없어진다)

 

(Resilient) 

실제로 구글에서 80개의 머신이 죽었는데 살아있는 것들 위주로 작업을 재실행 하면서 진행했다. 결과적으로 전체적인 MapReduce 작업을 무사히 끝마칠 수 있었다. 

failure들이 동시다발적으로 일어난다고 해도 MapReduce자체가 관리해주기 때문에 messy detail들에서 자유로울 있다. 이것이 가장 장점이다.

 

Locality

Network bandwidth자체가 무한대 자원이 아니라 항상 부족할 있는 자원이다. 그러다보니 이걸 조금 아껴 써야하지 않나고 생각하며 디자인 철학이 생겼다. (일반적인 commodity computer 환경 에서) hdfs block단위로 쪼개서 데이터 노드들의 로컬디스크에 각각 저장이 되어있다. 그러면 그냥 로컬디스크에 있는 데이터를 읽게 하는 것이 좋겠다 라고 생각하는 것이죠. 그래서 “Moving computations close to the data”라는 개념이 나왔다. 기존의 슈퍼컴은 데이터를 computing 쪽으로 보내는 모델 이었는데 이렇게 하지 말고 데이터를 이미 분산 시켜놓은 다음에 거기에 computation (MapReduce관점에서 mapper) 보내면 되겠다고 고민했다. 그래서 구글 mapreduce에서 master map task 필요로 하는 input split 데이터를 실제로 소유하고 있는 machine allocation하기 위해 최선을 다한다. 만약에 그것을 실패하게 되면 노드가 바빠서 더이상 작업을 받아들이지 못할수도 있으니까, 그러면 그나마 input data 가까이에 배치하고 싶어한다.

 

가깝다는 개념은 hdfs에서도 다루었다. 

 

Data block과 최대한 가깝게 보낸다고 하는 것은, Node-2를 보면 저기 map task가 원하는 input split(data block)이 같이 있는 것이다. 이것을 data와 computation을 colocation 시킨다라고 표현했다. 이런 경우를 “data local”이라고 표현한다. Data local인 경우가 best case이다. 최적의 상황이다. 왜냐하면 여기서는 network 통신이 일어나지 않기 때문이다.(network bandwidth를 아낄 수 있음) 

만약에 Node-2처럼 이런 환경을 만들어낼 수 없다면, “rack local”도 생각할 수 있다. Node-3의 map task가 원하는 input split이 Node-4에 저장이 되어있다. 이런 경우에는 같은 Rack-1에 있는 다른 노드(node)의 데이터를 copy하는 것이 낫겠다고 생각하게 되는것이죠. 이것이 rack local이다. 그러나 data local, rack lock 둘 다 안되는 Node-5를 보면 다른 rack에서 copy해와서 실행한다.

 

hdfs 블록 배치 방법이 mapreduce 블록배치에도 도움이 되었다. 

 

 

Large-Scale Indexing

구글이 mapreduce를 개발해서 여러 분야에 했었는데, 그 중 대표적인 것이 Large-Scale Indexing이다. 검색엔진을 만들어야 하니까 index를 만들어야 했다. 

 

빅데이터를 다루는데 있어서 분산시스템은 피해갈 없는 숙명같은 것이다. 왜냐하면, 단일노드(단일 컴퓨터)에서는 처리할 없기 때문에, 클라우드 환경에서 빅데이터 분석(big data analytics) 할때는 반드시 분산된 시스템에서 수밖에 없는 것이고, 그랬을 개발자(사용자)입장에서 쉬운 API 통해서 그들이 풀고자 하는 문제에 집중하게 해주고 나머지는 플랫폼(시스템) 처리해주는 이런 모델로 가고있다고 보면된다.

'서버 > 클라우드 컴퓨팅' 카테고리의 다른 글

Hypervisor vs Container  (1) 2024.09.05
Hadoop: MapReduce + HDFS  (0) 2021.12.16
MapReduce: Programming Model  (0) 2021.12.16
MapReduce  (0) 2021.12.16
빅데이터 Parallelization의 문제점  (0) 2021.11.09
728x90
Google MapReduce Overview 1

Google MapReduce가 실제 어떻게 실행이 되는지 시스템 아키텍쳐 측면에서 살펴보자.

 

master가 HDFS의 namenode와 비슷하게 MapReduce 실행을 관장하는 master 역할을 하는 것이고, Hadoop에서는 이것을 jobtracker라고 표현한다. 그리고 worker들은 hadoop에서 tasktracker라고 표현한다. 왼쪽 worker들은 Map 단계를 수행하고, 오른쪽 worker들은 Reduce 단계를 수행한다. 그럼 어떤식으로 흘러가는 것일까 보면, (맨 왼쪽 부터 시작) 입력파일들이 주어지게 된다. 입력파일이 주어지게 되면은 입력파일을 input split으로 쪼개게(partitioning) 된다. worker에서는 split 0, 1두개를 실행시킨다. Input split이라고 하는 것은 HDFS에서 block size랑 같다고 했으니까 대개의 경우에는 64mb 데이터 파일을 읽어들어서 처리하게 된다. (위 그림상) input split이 5개 이니까 5개의 mapper가 뜰건데, 그 mapper안에 정의되어 있는 map함수를 적용하는 것이 바로 worker의 역할이다. 그러면 key/value pair를 parsing해서 각각의 pair를 user-defined(user defined) map function으로 보내주는 것이다. wordcount 같은 경우는 intermediate key/value는 “단어,1”이런식으로 내보냈다. 그러면 worker가 mapper를 통해서 하나의 input split을 처리하면서 나오는 그러한 intermediate key/value pair들은 일단 메모리에 버퍼가 된다. 그런데 메모리에 용량이 부족해질 수 있으니까 주기적으로 버퍼된 pair(buffered pair)들이 로컬 디스크에 R개의 region으로 분할(partitioning)되어서 저장이 된다. intermediate key space를 reducer 개수만큼 partitioning 한다고 했는데 현재 여기서는 reducer가 두개가 돈다고 보면 된다. reducer가 두개일 때 잘 보면 로컬에 저장된것도 조각이 두개이다.(partitioning 되어서 저장된것이다) 그러면 예를 들어서 wordcount 예시를 들어보면 bear와 car는 왼쪽 조각에, deer와 river는 오른쪽 조각에있고 각각 다른 reduce단계의 worker로 갈 수 있다고 생각하면된다. map단계에서 모두 끝나고나면 로컬디스크에 파티셔닝 되어있는 데이터가 있을 것이다. 각각의 데이터가 reduce로 전달되는 것일 뿐이다. map단계가 끝나고 나면 파티셔닝된 데이터가 있을것이고 각각의 데이터가 결과적으로 reduce로 전달되는 것일 뿐인데, 어떻게 전달이 되느냐하면, reduce의 worker가 master로 부터 notification을 받는다. 그러면 remote procedure call을 통해서 퍼버링된 데이터를 map worker의 로컬디스크로 부터 가져가는 것이다.

 

정리하자면 input split 개수만큼 mapper 실행되고 ( 그림에서 현재 mapper 5개가 실행돼야하는데 3개만 보여주고있는상황) mapper 하는일은 input split 있는 logical record 하나씩 읽어서 사용자가 정의한 map함수를 적용하고 결과로 나오는 intermediate key/value pair reduce 개수만큼 partitioning 해놓은 것이다. (Partitioning 함수는 기본적으로 주어져 있다. ) reducer개수가 하나라는 얘기는 partition 자체를 필요가 없다는 얘기이다. Partition data 로컬디스크에 저장하고 있다가 모든 map단계가 끝나면 reducer 각각의 maper에게 연락해서 가져간다. 그러면 결과적으로 reduce 개수만큼 file 생기게된다.

 

Google MapReduce Overview 2

하나의 input split을 받아서 map 함수를 거치면 메모리에 버퍼링되는 intermediate key/value pair들이 나온다. 이것들이 결국 partition하고 sorting해서 저장하게 된다. 결국 디스크에 merge 되어있고 세 칸인거 보니깐 reducer가 세 개 돌고있는 것 같다. 그러면 첫 번째 partition은 첫번째 reduce한테 주고  두번째 partition은 두번째 reduce… 이렇게 준다. 이런식으로 다른 mapper들도 세 개로 쪼갠 partition을 가지고 있을 것이고 첫번째 파티션이 한쪽에 모일 것이다. (오른쪽 그림이 첫번째 파티션에 해당되는 데이터를 처리하는 reduce이기 때문) 내부적으로 merge, sort 하면서 reduce단계로 넘어가서 최종 output을 생산해내는 단계로 가게 된다.

 

Shuffling 단계가 MapReduce실행에 가장 복잡한 단계이다. 또 다르게 말하면 가장 overhead가 큰 단계이다. fetch같은 것들이 많이 일어나니까 network 통신이 많이 일어나기 때문이다. 

 

파티션에 데이터가 없을 있다. 이점은 데이터가 어떻게 구성되느냐에 따라 달라진다.

728x90
MapReduce의 programming model

MapReduce의 programming model은 기본적으로 input key/value pair를 받아들여서 output key/value pair를 생성하는 형태로 pipeline처럼 연결된다. Map이 처음 시작할 때도, 처리해야하는 input data를 key/value형태로 받아들이며 Map이 역시나 key/value 형태로 내보내게 된다. 그러면 내보낸 intermediate key/value pair들이 결국 그룹핑(grouping) 되면서 key/value들의 리스트 형태로 Reduce로 넘어간다고 보면 된다. 결국 Reduce도 마찬가지이지만 Map이라고 하는 함수는 사용자가 작성을 해야한다. 

MapReduce에서 input splite이라는 개념을 쓰는데, input splite 사실상 hdfs block size 같아야한다. 얘기는 하나의 hdfs block 대해서 하나의 Map 함수가 적용된다. mapper 개수만큼 여러개 적용될 있다. mapper들이 사실은 block들이 흩어져 있는 여러 노드에서 동시 다발적으로 실행될 건데, 실행중에 나오는 intermediate key/value pair들은 MapReduce 그루핑(grouping) 하는 것이다. 모든 Map 단계가 끝나게 되면은 intermediate value들을 associated with the same intermediate key 이니깐 intermediate key 똑같은것을 공유하는 모든 value들을 묶어주는 것이다.(group together) 앞에서 말했다 싶이 Map단계를 끝나서 나오는 intermediate key/value pair들은 같은 key 공유하는 value들의 리스트는 반드시 똑같은 reduce한테 몽땅 가게 돼있다. reduce 두개 이상 있기 때문에 이런 문제를 고민해야 한다. reduce 두개 이상 하나의 key, intermediate key 대한 value들의 집합이 쪼개지면 절대 안된다. 왜냐하면 key 대해서 모든 value들을 받아서 aggregation 연산을 해야 정확한 결과가 나오는 것이기때문이다. Reduce함수라고 하는 것은 사용자가 개발해서 intermediate key value들의 리스트가 오게된다. 이렇게 되면 key 해당되는 모든 value들이 딸려온다. 그리고 merge해서 작은 결과값을 생성한다.

 

 

MapReduce를 WordCount로 예시를 들어보면

WordCount의 플로우

입력데이터가 하나의 노드에서 돌리면 상관이 없는데, 이 데이터의 용량이 수백 기가, 혹은 테라바이트면 하나의 노드 에서는 어려울 수 있다. 그렇게 되면은 hdfs에서 정한 64mb 블록사이즈로 쪼개서 저장한 다음에 MapReduce가 블록 단위로 처리하게 된 것이다.  

 

  • Splitting:

이것을 input split이라고 얘기한다. MapReduce관점에서는 처리하고자 하는 입력데이터의 단위이기 때문에 input split이라는 명령어를 쓰는 것이고 hdfs관점에서는 데이터가 저장되는 단위이기 때문에 block이라는 개념을 쓴다. 엄밀하게 말하면 input split과 block이 꼭 같은 사이즈일 필요는 없다. 예를 들어서 데이터가 저장되어 있는 것은 64mb block 단위로 저장되어 있는데 하나의 input split, 하나의 mapper가 처리하고자 하는 데이터양이 두개 이상의 block을 한꺼번에 처리하고 싶다던가, 이런 것도 가능하다. 그런데 이렇게 했을때는 성능상에 이점이 없기 때문에 input split과 hdfs block사이즈를 맞추게 된다. 왜 이렇게 하는지는 나중에 설명이 나온다. 그러니까 한줄 한줄이 input split이다. MapReduce가 입력 데이터를 처리하는 단위이다. 

 

  • Mapping:

mapper가 세 개가 떴다고 생각하면 된다. Mapper가 세 개가 떴으면 mapper는 자기가 담당하는 input split을 읽어들여서 사용자가 정의한 map함수를 적용한다. 그런데 사용자는 어떤 식으로 구현한거냐면 (splitting 단계에 있는 것을) 한줄씩 읽어서 파싱(parsing)한 다음에 단어별로 뽑아낸다. 단어를 key로 잡으면 이것이 intermediate key가 된다. 그리고 value는 1이다. 정리하자면 input split을 logical record 단위로 한줄씩 읽으며 파싱한 다음에 거기에서 나온 단어(word)를 key로 잡고 1을 value로 내보내면 되겠다고 인지할 수 있다. 결론적으로 이 단계에 있는 9개의 key/value들이 intermediate key/value pair들의 집합이다. 중요한 포인트는 mapper들이 intermediate key/value pair들을 모두 생산해 낼 때 까지는 reduce 단계로 넘어가지 않는다. 나중에 오는 것들 중에서 집계가 덜 된 데이터들도 있을 수 있기 때문에 mapping단계가 끝날 때 까지 기다린다. 그 다음에 shuffling 단계로 들어간다.

 

  • Shuffling:

reduce단계로 넘어가기 위해서 shuffling 단계로 넘어간다. 이 단계에서는 기본적으로 intermediate key를 공유하는 value들을 묶이게 되는 것이다. 다시 말해 동일한 intermediate key를 공유하는 value들이 묶이고 이 value들이 리스트 형태로 넘어가게 된다. 그래서 예를 들어 [1,1]에서 1 + 1 이런식으로 해서 나온 값을 넘겨준다.(aggregation 연산을 수행할 수 있게 된다.) 그래서 Reduce단계에서 같은 키를 갖는 value들은 항상 똑같은 reducer로 간다.

 

  • Reducing:

reduce에서 사용자가 정해놓은 함수가 있을것이다. 같은 key를 공유하는 value들의 리스트가 무더기로 넘어올 때 더하는 식으로 만들었다. 그럼 final result를 이제 글로벌하게 입력파일이 있을 때 전체적으로 저렇게 나온다.

 

 

MapReduce의 가장 큰 장점은 이 플로우를 생각해서 maper와 reducer를 개발한다고 치자, input을 어떻게 받을거고 output 내보낼 것이고 reducer단계에서는 어떤식으로 결합할 것이라고 정했다고 생각하면, 입력데이터가 아무리 크다고 하더라도 MapReduce는 소스코드 수정없이 모든 데이터 처리를 성공적으로 수행할 것이다. 결론적으로 인풋의 크기가 전혀 상관이 없다는 얘기이다. 이것이 가능한 이유는 mapper가 실행되는 단계에서는 각 input split별로 동시다발적이면서 독립적으로 실행되는 것이다. 그리고 모든 매핑이 끝난 다음에 셔플링과 리듀싱 단계로 넘어가기 때문이다.

 

즁요한 것은 intermediate key/value pair를 어떻게 잡을 것이냐 이다. 

 

 

메인 프로그램 3개를 작성해서 실행 하면, 입력 데이터가 아무리 크다고 하더라도 MapReduce 알아서 input split별로 mapper 띄어주고 사용자가 mapper안에 구현해놓은 map함수를 logical record 하나하나 적용해주고 사용자가 원하는대로 intermediate key/value pair 생성해주고, 이렇게 배포작업이 끝나고 나면 shuffling단계도 알아서 해주면서 intermediate key 공유하는 단어 배열을 공유하는 애들을 한곳에 몰아서 reduce 던져준다. 그러면 Reduce 함수에서는 value들의 리스트를 더하는 작업을 하고 결과는 결과적으로 hdfs 다시 저장되게 된다.

 

앞에서 봤던 wordcount플로우의 pseudo code이다.

Map 함수에는 key와 value가 파라메터로 들어오는데 value는 document contents일 것이다 라고 하고 value에 있는 각각의 단어에다가 “1”을 붙여서 내보내면 map단계가 끝난다. 

Reduce함수에는 intermediate key 공유하는 value들의 집합이 파라메터로 온다. 그러면 각각의 value들을 for문을 돌면서 “1” 더한다. 

 

'서버 > 클라우드 컴퓨팅' 카테고리의 다른 글

Hadoop: MapReduce + HDFS  (0) 2021.12.16
MapReduce: Fault Tolerance, Locality, Large-Scale Indexing  (0) 2021.12.16
MapReduce  (0) 2021.12.16
빅데이터 Parallelization의 문제점  (0) 2021.11.09
Cloud Computing & Big Data  (0) 2021.11.09

+ Recent posts