728x90
MapReduce의 programming model

MapReduce의 programming model은 기본적으로 input key/value pair를 받아들여서 output key/value pair를 생성하는 형태로 pipeline처럼 연결된다. Map이 처음 시작할 때도, 처리해야하는 input data를 key/value형태로 받아들이며 Map이 역시나 key/value 형태로 내보내게 된다. 그러면 내보낸 intermediate key/value pair들이 결국 그룹핑(grouping) 되면서 key/value들의 리스트 형태로 Reduce로 넘어간다고 보면 된다. 결국 Reduce도 마찬가지이지만 Map이라고 하는 함수는 사용자가 작성을 해야한다. 

MapReduce에서 input splite이라는 개념을 쓰는데, input splite 사실상 hdfs block size 같아야한다. 얘기는 하나의 hdfs block 대해서 하나의 Map 함수가 적용된다. mapper 개수만큼 여러개 적용될 있다. mapper들이 사실은 block들이 흩어져 있는 여러 노드에서 동시 다발적으로 실행될 건데, 실행중에 나오는 intermediate key/value pair들은 MapReduce 그루핑(grouping) 하는 것이다. 모든 Map 단계가 끝나게 되면은 intermediate value들을 associated with the same intermediate key 이니깐 intermediate key 똑같은것을 공유하는 모든 value들을 묶어주는 것이다.(group together) 앞에서 말했다 싶이 Map단계를 끝나서 나오는 intermediate key/value pair들은 같은 key 공유하는 value들의 리스트는 반드시 똑같은 reduce한테 몽땅 가게 돼있다. reduce 두개 이상 있기 때문에 이런 문제를 고민해야 한다. reduce 두개 이상 하나의 key, intermediate key 대한 value들의 집합이 쪼개지면 절대 안된다. 왜냐하면 key 대해서 모든 value들을 받아서 aggregation 연산을 해야 정확한 결과가 나오는 것이기때문이다. Reduce함수라고 하는 것은 사용자가 개발해서 intermediate key value들의 리스트가 오게된다. 이렇게 되면 key 해당되는 모든 value들이 딸려온다. 그리고 merge해서 작은 결과값을 생성한다.

 

 

MapReduce를 WordCount로 예시를 들어보면

WordCount의 플로우

입력데이터가 하나의 노드에서 돌리면 상관이 없는데, 이 데이터의 용량이 수백 기가, 혹은 테라바이트면 하나의 노드 에서는 어려울 수 있다. 그렇게 되면은 hdfs에서 정한 64mb 블록사이즈로 쪼개서 저장한 다음에 MapReduce가 블록 단위로 처리하게 된 것이다.  

 

  • Splitting:

이것을 input split이라고 얘기한다. MapReduce관점에서는 처리하고자 하는 입력데이터의 단위이기 때문에 input split이라는 명령어를 쓰는 것이고 hdfs관점에서는 데이터가 저장되는 단위이기 때문에 block이라는 개념을 쓴다. 엄밀하게 말하면 input split과 block이 꼭 같은 사이즈일 필요는 없다. 예를 들어서 데이터가 저장되어 있는 것은 64mb block 단위로 저장되어 있는데 하나의 input split, 하나의 mapper가 처리하고자 하는 데이터양이 두개 이상의 block을 한꺼번에 처리하고 싶다던가, 이런 것도 가능하다. 그런데 이렇게 했을때는 성능상에 이점이 없기 때문에 input split과 hdfs block사이즈를 맞추게 된다. 왜 이렇게 하는지는 나중에 설명이 나온다. 그러니까 한줄 한줄이 input split이다. MapReduce가 입력 데이터를 처리하는 단위이다. 

 

  • Mapping:

mapper가 세 개가 떴다고 생각하면 된다. Mapper가 세 개가 떴으면 mapper는 자기가 담당하는 input split을 읽어들여서 사용자가 정의한 map함수를 적용한다. 그런데 사용자는 어떤 식으로 구현한거냐면 (splitting 단계에 있는 것을) 한줄씩 읽어서 파싱(parsing)한 다음에 단어별로 뽑아낸다. 단어를 key로 잡으면 이것이 intermediate key가 된다. 그리고 value는 1이다. 정리하자면 input split을 logical record 단위로 한줄씩 읽으며 파싱한 다음에 거기에서 나온 단어(word)를 key로 잡고 1을 value로 내보내면 되겠다고 인지할 수 있다. 결론적으로 이 단계에 있는 9개의 key/value들이 intermediate key/value pair들의 집합이다. 중요한 포인트는 mapper들이 intermediate key/value pair들을 모두 생산해 낼 때 까지는 reduce 단계로 넘어가지 않는다. 나중에 오는 것들 중에서 집계가 덜 된 데이터들도 있을 수 있기 때문에 mapping단계가 끝날 때 까지 기다린다. 그 다음에 shuffling 단계로 들어간다.

 

  • Shuffling:

reduce단계로 넘어가기 위해서 shuffling 단계로 넘어간다. 이 단계에서는 기본적으로 intermediate key를 공유하는 value들을 묶이게 되는 것이다. 다시 말해 동일한 intermediate key를 공유하는 value들이 묶이고 이 value들이 리스트 형태로 넘어가게 된다. 그래서 예를 들어 [1,1]에서 1 + 1 이런식으로 해서 나온 값을 넘겨준다.(aggregation 연산을 수행할 수 있게 된다.) 그래서 Reduce단계에서 같은 키를 갖는 value들은 항상 똑같은 reducer로 간다.

 

  • Reducing:

reduce에서 사용자가 정해놓은 함수가 있을것이다. 같은 key를 공유하는 value들의 리스트가 무더기로 넘어올 때 더하는 식으로 만들었다. 그럼 final result를 이제 글로벌하게 입력파일이 있을 때 전체적으로 저렇게 나온다.

 

 

MapReduce의 가장 큰 장점은 이 플로우를 생각해서 maper와 reducer를 개발한다고 치자, input을 어떻게 받을거고 output 내보낼 것이고 reducer단계에서는 어떤식으로 결합할 것이라고 정했다고 생각하면, 입력데이터가 아무리 크다고 하더라도 MapReduce는 소스코드 수정없이 모든 데이터 처리를 성공적으로 수행할 것이다. 결론적으로 인풋의 크기가 전혀 상관이 없다는 얘기이다. 이것이 가능한 이유는 mapper가 실행되는 단계에서는 각 input split별로 동시다발적이면서 독립적으로 실행되는 것이다. 그리고 모든 매핑이 끝난 다음에 셔플링과 리듀싱 단계로 넘어가기 때문이다.

 

즁요한 것은 intermediate key/value pair를 어떻게 잡을 것이냐 이다. 

 

 

메인 프로그램 3개를 작성해서 실행 하면, 입력 데이터가 아무리 크다고 하더라도 MapReduce 알아서 input split별로 mapper 띄어주고 사용자가 mapper안에 구현해놓은 map함수를 logical record 하나하나 적용해주고 사용자가 원하는대로 intermediate key/value pair 생성해주고, 이렇게 배포작업이 끝나고 나면 shuffling단계도 알아서 해주면서 intermediate key 공유하는 단어 배열을 공유하는 애들을 한곳에 몰아서 reduce 던져준다. 그러면 Reduce 함수에서는 value들의 리스트를 더하는 작업을 하고 결과는 결과적으로 hdfs 다시 저장되게 된다.

 

앞에서 봤던 wordcount플로우의 pseudo code이다.

Map 함수에는 key와 value가 파라메터로 들어오는데 value는 document contents일 것이다 라고 하고 value에 있는 각각의 단어에다가 “1”을 붙여서 내보내면 map단계가 끝난다. 

Reduce함수에는 intermediate key 공유하는 value들의 집합이 파라메터로 온다. 그러면 각각의 value들을 for문을 돌면서 “1” 더한다. 

 

'서버 > 클라우드 컴퓨팅' 카테고리의 다른 글

Hadoop: MapReduce + HDFS  (0) 2021.12.16
MapReduce: Fault Tolerance, Locality, Large-Scale Indexing  (0) 2021.12.16
MapReduce  (0) 2021.12.16
빅데이터 Parallelization의 문제점  (0) 2021.11.09
Cloud Computing & Big Data  (0) 2021.11.09

+ Recent posts