이제 Hadoop의 MapReduce는 어떻게 되어있을까 살펴보자.
MapReduce와 HDFS가 어떤식으로 결합해서 실행되는지 살펴보자. 특히 Hadoop 1.xx 버전에서는 이렇게 실행된다. 앞에 구글 MapReduce에서 master라고 했던 부분은 JobTracker라는 master processor로 Hadoop에서 실행되고, 구글 MapReduce의 Worker는 Hadoop에서 TaskTracker라는 slave processor로 실행된다. 기억을 더듬어 보면 HDFS에서 namenode라고 하는 master가 있어서 얘가 메타데이터(meta data)를 관리했고 datanode가 각각의 slave node에서 실행되면서 local Linux file system에 실제 애플리케이션 데이터를 관리하는 형태로 실행됐었다. 비슷하게 MapReduce가 실행되기 위해서는 Master processor인 JobTracker, 그러니까 job을 제출하는 job submission node에서 JobTracker가 실행돼야하고 각각의 slave node에서 tasktracker보다는 slave process가 실행되어야 한다. 그러니까 namenode, datanode가 hdfs에서 cluster를 구성하는 것이고 JobTracker와 tasktracker가 MapReduce 프레임워크를 구성하고 있다고 보면 된다. namenode와 job submission node를 하나로 묶는 경우도 있고 기계가 남아돌면 분리 하는 경우도 있다.
Slave node를 물리적인 노드를 의미한다고 생각하면 된다. 각각의 물리노드에 hdfs를 위한 datanode daemon과 MapReduce를 위한 tasktracker를 모두 실행되고 있는 것을 볼 수 있다. 왜 이렇게 실행 되어야 하냐면 만약 둘 중 하나가 빠져있으면 문제가 생긴다. 정확히 말하면 MapReduce를 처리할 때 문제가 생길 수 있는데, 예를 들어서 한 slave node에 tasktracker가 없다고 치고, datanode는 다 돌고 있다고 하자. 그러면 job submission node에 있는 JobTracker가 tasktracker가 없는 노드로는 map task나 reduce task를 보낼 수 없다. Task tracker가 실제로 JobTracker의 명령을 받아서 map 또는 reduce task를 실행하는 주체이기 때문에 tasktracker가 없다고 하는것은 task를 실행할 수 없는 것이기 때문에 Mapeduce 프레임워크 입장에서 봤을때는 해당 slave node가 없는 것과 같은 맥락이다.
반대로 tasktracker가 있는데 datanode가 안돌고 있다고 하면 어떻게 될까? 그렇게 되면 해당 slave node는 namenode에게 눈에 보이지 않는 것이다. 즉 해당 slave node는 없는 것과 마찬가지이다. 그래서 datanode와 tasktracker는 항상 같은 노드에서 실행이 되어야 한다.
ps. Hadoop yarn에서도 같은 맥락에서 동작한다.
Hadoop job processing step
MapReduce job processing의 step을 그림으로 살펴보자.
- Run job : client가 job을 실행해 달라고 JobClient 클래스에 요청을 한다.
- Get new job ID : 새로운 job ID를 JobTracker에게 요청해서 job ID를 받게된다.
- Copy job resources : job에 관련된 resource들을 shared file system에 업로드한다. (Job resource라고 하는 것은 보통은 mapper와 reduce 클래스 다 포함하고 있는 jar 파일 같은 것이다. Jar 파일 같은것들을 hdfs에 올려놓아야 나중에 MapTask나 ReduceTask를 실행할 수 있게 된다. 왜냐하면 MapReduce job을 submission 하면은 실제 Map과 Reduce가 어디서 실행 되냐면은 전적으로 JobTracker가 알아서 한다. 이 얘기는 MapReduce job의 map task와 reduce task가 Hadoop cluster를 구성하는 어느 node에서 실행될지 모른다. 사전에 알면 그 노드에 필요한 파일들을 갖다놓을것이다. 그러나 안된다. 어디서나 접근이 가능한, 모든 노드에서 접근을 할 수 있는 file system이 대표적으로 hdfs이기 때문이다.)
- Submit job : job을 제출한다.
- Initialize job : JobTracker가 initialize 한다.
- retrieve input splits : input split은 MapReduce job이 처리하고싶은 단위이다. 그러니까 64mb씩 쪼개서 각각의 input split에 대해서 어느 tasktracker에 있는지 찾아내야한다. 그래야 각각에 대한 input split에 mapper를 띄울 때 최대한 그 데이터에 가깝게 스케쥴링 할 수 있기 때문이다.
- Heartbeat : input split을 기반으로 해서 tasktracker에게 작업을 할당하게 되면 주기적으로 heartbeat을 보내게된다.
- Retrieve job : tasktracker는 child JVM을 launch시켜서 Map 또는 Reduce task를 실행하게 되는데 그 전에 해야할게 HDFS에 접근해서 job resource(jar 파일 같은거)를 받아와야한다. jar파일 같은게 로컬하게 있어야 map/reduce task를 실행시킬수있다.
- Launch : tasktracker가 childJVM을 launch시키고
- Run : 사용자가 정의하는 map/reduce함수를 적용하는 과정이다.
정리하면, 기본적으로 사용자가 개발한 MapReduce 프로그램 실행파일 jar가 있어야 하고, 그 jar파일이 hdfs에 올라가 있고 처리에 필요한 input data도 hdfs에 있다라고 가정을 하는 것이다. 그런 과정에서 JobTracker는 각각의 input split에 대해서 hdfs의 namenode에게 물어봐서 각각이 어디에 있는지 찾은 다음에 거기 최대한 가깝게 map task를 실행한다. 실제 실행이라고 하는 것은 tasktracker가 childJVM을 통해서 사용자가 정의한 map/reduce 함수를 적용하는 과정이다.
JobTracker
JobTracker를 자세히 살펴보도록 하겠다.
JobTracker가 하는 가장 중요한 일은 기본적으로 cluster에 있는 특정 노드들의 MapReduce task들을 경작(farm out) 하는 것이다. (task는 job을 이루는 구성요소) (MapReduce task는 map task + reduce task를 말한다) input data를 갖고있는 그 노드에 map task를 가져다 주거나 적어도 같은 rack에 있는 노드에서 map task를 실행해서 같은 rack에 있는 데이터 노드를 복사해올 수 있게한다. 이것은 data locality에 해당된다.
Data locality를 구현하는 주체는 JobTracker이다.
그런데 JobTracker라고 하는 것은 single point of failure이다. (단일고장점) 만약 JobTracker가 죽게되면 모든 실행중인 MapReduce job은 멈추게(halting) 될 수 밖에 없다.
많은 사람들의 job을 관리해야 하니까 JobTracker가 가지는 overhead는 클 수 밖에 없다. 수많은 task에서 state정보를 메모리에서 계속 관리하고 있기 때문에 너무 자주업데이트 된다. 회사로 예를 들면 부서별로 JobTracker가 따로돌면 관리부담도 줄어들고 좋다. 이런 부담을 도입해서 Hadoop2에서는 yarn이라는 운영체제가 나오면서 apllication master라는 대체개념이 나온다.
MapReduce job processing
- Client apllication이 MapReduce job을 JobTracker한테 제출한다.
- 그 이후 JobTracker가 최대한 가깝게 스케쥴링을 해야하니까 기본적으로 input data가 있는 위치를 물어보기 위해 namenode랑 컨택한다. (JobTracker는 프로세싱만 하고 데이터관리는 안한다)
- namenode로 부터 받아서 그 노드들 중에 최대한 input data랑 가깝게 있는 tasktracker node를 찾는데 (가용한 slot이 있는 경우)
- TaskTracker를 찾으면 TaskTracker node들 한테 일을준다.
- TaskTracker node들이 기본적으로 모니터링된다. Data node와 namenode처럼 heartbeat을 주고받는다. (분산시스템에서 서로의 생사를 확인할 수 있는 방법은 구글처럼 master가 ping을 하던지, 하둡에서 slave가 주기적으로 heartbeat을 보내주던지 이 두개밖에 없다) 만약에 heartbeat을 받지 않으면 다른 그 작업을 다른 tasktracker한테 스케쥴링 하게 된다.
- 자기가 실행하고 있는 task가 에러가 나면서 죽으면 JobTracker에게 알려준다. Jobtracker가 fail난 task를 다시 다른곳에 resubmission 한다. 이것은 혹시라도 그 tasktracker가 실행되는 node의 환경이 맞지 않아서 task가 죽은 것일수도 있으니까 다른곳에서도 해보는 것이다.(계속 죽으면 블랙리스트에 넣는다)
- 전체적인 작업이 끝났다. 이 얘기는 MapReduce작업이 끝났다는 얘기이다.
- 그러면 client apllication이 JobTracker한테 관련된 정보를 가져올 수 있게되는 형태로 진행된다.
MapReduce의 progress는 기본적으로 JobTracker가 담당한다고 보면 된다.
TaskTracker
TaskTracker라고 하는 것은 기본적으로 map, reduce, shuffling과정이 있었는데 이러한 task들을 jobtracker한테 받아서 실행하는 역할을 수행한다. 그런데 shuffling task라는 것은 reduce task가 remote procedure call을 통해서 map task가 실행됐던 그 node의 데이터를 복사해 가는거니까 조금 다른데 map과 reduce같은 경우에는 slot이 정해져 있다. 각각의 TaskTracker 같은 경우는 slot이 정적으로 설정되어 있다. 동적으로 실행시간이 변하는게 아니다. 각각의 TaskTracker가 받아들일 수 있는 mapper 또는 reduce의 개수가 사전에 구성이 되어있다 라고 이해하면된다. Static configuration의 문제는 뭐냐면 결과적으로 Hadoop cluster가 homogenous하다고 가정하는 것이다. node의 하드웨어 스펙이 비슷하니까 각자 균등하게 분할해서 똑같은 숫자의 Mapper와 똑같은 숫자의 reduce를 처리하면 되겠네 라고 하는 naive한 방식으로 시작한 것이다. hadoop이 탄생할 때 이런 모습이고 목적이었는데, 시간이 지날수록 하드웨어 자원이 발전하니까 각각의 노드의 컴퓨팅 파워가 달라지기 시작하고, 문제가 뭐냐면 hadoop 생태계가 커지면서 MapReduce말고도 다른 형태의 프레임워크가 들어오기 시작했다. 이런것들 때문에 고정적인 slot기반의 자원관리가 좋지 않다. (slot의 단점)
각각의 TaskTracker는 separate JVM process를 낳아서(spawn해서) 실제 일(actual work)을 한다. Separate JVM을 launch하는 이유는 map이나 reduce task가 실행되다가 죽더라도 tasktacker자체는 별로 영향을 미치지 않게 하기 위한 것이다.
그래서 spawning된 프로세서를 TaskTracker는 꾸준히 모니터링 하면서 output과 exit코드를 캡쳐 해서 jobtracker한테 보고하게 된다.
프로세서가 끝나게 되면은(성공적으로 끝났던, 중간에 에러나서 죽었던) 이 결과를 기본적으로 jobtracker한테 보고(notification)하고 다음 지침을 받는다. 그리고 중요한 것은 각각의 TaskTracker들은 jobtracker한테 주기적으로 heartbeat message를 보낸다. (TaskTracker가 살아있음을 계속 알림)
각각의 TaskTracker가 주기적으로 heartbeat을 보내면서 현재 가용한 slot의 개수를 알려준다. 그러면 jobtracker는 이 cluster를 구성하는 전체 TaskTracker의 가용한 slot들을 다 알고 있는 것이된다. 그러면 새로운 MapReduce job이 들어와서 task assign을 할 때 그 slot들을 보면서 결정하는 것이다. 고려해야 할 것이 두 가지이다.
첫 번째는 task를 실행하려면 slot이 있어야 한다. 두 번째는 data locality를 고려해야 한다. jobtracker는 최대한 data에 가깝게 보내고 싶어한다. 그래야 전체적인 cluster 활용율이 올라가기 때문이다.
'서버 > 클라우드 컴퓨팅' 카테고리의 다른 글
Hypervisor vs Container (1) | 2024.09.05 |
---|---|
MapReduce: Fault Tolerance, Locality, Large-Scale Indexing (0) | 2021.12.16 |
MapReduce: Programming Model (0) | 2021.12.16 |
MapReduce (0) | 2021.12.16 |
빅데이터 Parallelization의 문제점 (0) | 2021.11.09 |