Yeon's 개발블로그

지식을 전파하는 개발자가 되고싶습니다.

Cloud Computing

16. MapReduce Framework

Dev.yeon 2020. 12. 9. 23:51

++Mapper Class++

map 실행과정

1. map method

1-1) void run(Mapper.Context context): mapper클래스의 전체 구동함수에 해당하며 이 메소드를 override할일은 거의없다. MapReduce 프레임워크에서 Mapper클래스의 객체를 만들고 주어진 입력파일을 레코드의 집합으로 만든다음, 각 레코드들을 Mapper클래스 객체의 run메소드의 입력으로 넣어준다. 

void run(Mapper.Context context) throws IOException, InterruptedException{
	setup(context);
    while(context.nextkeyValue()){
    	map(context.getCurrentkey(),context.getCurrentValue(),context);
    }
    cleanup(context);
}

 

1-2) protected void setup(Mapper.Context context): map 테스크가 시작될 때 단 한번 실행된다. map메소드에서 필요한 리소스를 할당하거나 map에서 필요한 선행작업을 여기에서 할 수 있다. default는 아무 일도 하지 않는 것으로 되어있으며, 이를 override해서 쓸 수 있다.

1-3) protected void cleanup(Mapper.Context context): map함수의 호출이 완료되면, 마지막으로 한번 호출된다. setup에서 할당된 리소스가 있다면 여기에서 해제하고, default는 아무일도 하지 않는 것으로 되어있으며, 이를 override해서 쓸 수 있다. 응용에 따라 map()이 key,value 쌍을 출력하도록 하지 않고, setup()에서 리소스를 할당하고, map()에서 그 리소스를 사용하여 정보를 쌓아놓은 다음에, cleanup()에서 총체적인 처리를 하도록 할 수도있다. 

 

2. map 입력

2-1) TextInputFormat: 기본적인 입력 포맷으로, 텍스트 라인 하나가 하나의 레코드가 된다. FileInputFormat 클래스를 상속하고, 텍스트파일이나 .gz로 압축된것도 처리가능하다. 

key=해당라인의 파일 오프셋(타입: LongWritable)

value=해당라인전체( 타입: Text)

2-2) KeyValueTextInputFormat: 한줄에 key와 value가 함께 있는 포멧으로, tab과 같은 분리자로 구분한다. 분리자 설정은 key.value.separator.in.input.line의 프로퍼티 값을 다른 분리자로 변경하여 쓸 수 있다. key와 value타입은 모두 Text이다. 

2-3) SequenceFileInputFormat: Hadoop고유의 파일 포멧으로 바이너리 파일이고, 내부 작업시 사용하는 포멧이다. FileInputFormat 클래스를 상속하고, key와 value는 어떠한 타입도 가능하지만 앞서 파일이 생성될 때 사용한 타입을 사용해야한다.  MapFile을 읽을 때도 사용할 수 있다. 

*SequenceFile: 하둡 고유의 바이너리 key, value 기반의 압축지원 파일포맷이다. 하둡에서 여러 job을 chaining하여 작업할 때 사용하는 입력포멧이다. 압축을 포멧일부로 지원하고, 큰 파일을 쪼개는 것이 가능하고, 바이너리 key, value를 지원하고, 효율적인 직렬화 프레임워크를 사용할 수도 있다. 

-> 만일 TextInputFormat이 아닌 다른 입력 포멧을 사용하려면 job.setInputFormatClass(KeyValueTextInputFormat.class)메소드를 호출하여 변경해주어야한다. 입력파일들이 여러 폴더에 분산되어 저장되어 있다면 FileInputFormat.addInputPath(job,new Path(args[0])); 메소드를 여러번 호출하거나, addInputPaths를 호출해서 다수의 입력파일 경로를 한번에 지정할 수도 있다. 다만, SequenceFileInputFormat의 경우에는 다른방법을 사용한다.  

 

3. map 출력

-맵의 출력 레코드들의 타입이 전체 하둡 job의 출력타입들(reducer의 출력타입)과 다르다면 setMapOutputKeyClass(Text.class), job.setMapOutputValueClass(IntWritable.class) 메소드를 호출하여 프레임워크에 알려주어야한다. 

-만약 맵출력만 필요하고 reduce는 아예필요없다면 reduce의 테스크 수를 0으로 지정할 수 있다. 이경우에는 정해놓은 하둡 job의 출력 디렉토리에 맵의 출력물들이 바로 저장된다. 출력파일의 이름은 리듀스한 경우에는 part-r- 이지만, 리듀스가 없는 경우에는 part-m- 으로 시작한다. part-m 파일의 수는 맵 태스크의 수와 동일하다. 

-Identity Mapper: 맵이 필요없는 경우에 사용하는 것으로, 주어진 입력레코드를 그대로 출력 레코드에 내보낸다. 이 경우에는 별도로 맵클래스를 구현할 필요없이 base클래스인 Mapper 클래스 job.setMapperClass(Mapper.class)를 그대로 이용하면된다. 

protected void map(KEYIN key, VALUEIN value, Context context) throws IOException, IntteruptedException{
context.write((KEYOUT) key, (VALUEOUT) value);
}

 

4. MapReduce에서 사용되는 변수타입

어떤 타입이 map이나 reduce에서 key로 사용되기 위해서는 WritableComparable이라는 인터페이스를 구현해야하고, value로 사용되기 위해서는 Writable이란 인터페이스를 구현해야한다. WritableComparable은 Writable인터페이스를 포함한다. Serialization(직렬화)는 어떤 객체나 자료구조가 네트워크를 타고 전송되거나 디스크에 저장될 수 있도록 변환해주는 과정이고, Deserialization(역직렬화)는 직렬화되어 전송된 데이터를 다시 원래 자료구조로 복구하는 과정을 말한다.

-Writable 인터페이스: 직렬화, 역직렬화를 구현하는데 사용되는 메소드들을 가지고있다. 하둡의 특성상 key, value레코드가 디스크에 저장되거나 네트워크를 타고 전달되어야 하기 때문에 이런 인터페이스가 필요하다. 하둡은 RPC을 이용해서 클러스터 내의 노드들 간에 통신을 하는데, 이때에도 Writable 인터페이스를 사용한다. Writable 인터페이스가 가지고있는 메소드는 write(직렬화할때 호출)과 readFields(역직렬화할때 호출)이 있다.

-WritableComparable 인터페이스: Writable에서 제공되는 메소드들을 포함하고, 객체들간의 비교를 해주는 자바의 Comparable인터페이스가 추가된 인터페이스이다. key들은 sorting이 가능해야하므로 이러한 인터페이스가 필요하다. WritableComparable인터페이스에는 compareTo라는 메소드 하나가 존재하며, 이는 지금 객체와 인자로 들어온 객체를 비교하여 둘 사이의 순서를 정해주는 역할을 한다.

 

5. map task수의 결정방식 (일반파일)

맵테스크는 입력파일의 개수보다 작을 수는 없다. 작은 크기의 입력파일이 너무 많으면 맴 테스크의 수도 많아져서 실행효율이 떨어진다. 입력파일이 크면 데이터 블록(128~256MB)로 나뉘고, 맵 테스크는 블록마다 1개씩 할당된다. 압축되는 등 분할이 어려운 경우에는 블록 수에 무관하게 전체 파일에 맵 테스크 하나가 지정된다. InputFormat 클래스에 isSplitable 메소드를 사용하여 블록 단위로 나눌 수 있는 것인지 없는 것인지 지정해 줄수있다. 

 

word count example

++ Combiner ++

컴바이너는 mini reducer또는 local reducer라고 불린다. 맵테스크의 출력에 reduce 코드를 먼저 적용하여 reduce로 데이터가 이동하기 전에 데이터의 크기와 통신량을 줄인다. 그렇다고 모든 job에 컴바이너를 적용할 수 있는 것은 아니다. 교환법칙과 결합법칙이 만족되는 job에 컴바이너를 적용할 수 있다. (예제: 합, 최솟값, 최댓값) 평균값을 구하는 일에는 컴바이너를 쓸 수 없다. job.setCombinerClass(MyReducer.class)를 그대로 써서 활용하는게 좋다. 

 

++ Shuffling & Sorting ++

Shuffling & Sorting

reduce 테스크의 개수는 job클래스의 numReduceTasks메소드로 지정한다. Patitioner는 맵 테스크의 결과 레코드의 key값을 해싱해서 그 해싱값을 reduce 테스크의 수로 나누어 그 레코드가 어떤 reduce테스크로 갈지 정해주는 클래스이다. 기본으로 사용되는 클래스는 HashPartitioner이나, 자신만의 파티셔너를 구현한다면 job클래스의 setPartitionerClass 메소드로 지정해주면 된다. 

ex) M개의 맵테스크와 N개의 리듀스 테스크가 있다고 하자. 맵 테스크가 종료된 후, 각 리듀스 테스크는 M개의 맵테스크들과 각각 연결은 맺고 자신에 해당하는 파티션 데이터를 읽어간다. 이를 Shuffling이라고 한다. M*N개의 네트워크 커넥션이 맺어지고, 데이터가 네트워크를 통해 복사된다. 이때 전송되는 데이터의 양이 크다면, 네트워크 대역폭이 병목 지점이 된다. 리듀스 테스크는 모든 맵 테스크에서 데이터를 가져온 후, 이들을 하나로 병합한다. 합병된 데이터에 대해 key를 기준으로 sorting을 한다. sorting이 끝나면 레코드들을 전체적으로 스캔하면서 레코드들을 그룹핑한다. 즉, 같은 키를 갖는 레코드들을 하나로 묶어서 하나의 reduce 입력 레코드를 만든다. 

-맵출력버퍼링: 맵에서 출력된 레코드들은 파티셔너 클래스를 통해 파티션 번호가 배정된다. 이들은 메모리 버퍼에 쓰여졌다가 버퍼가 어느정도 다차면 그 때 디스크 파일로 저장된다. 맵테스크가 종료될때까지 이과정을 반복하고, 종료시에는 디스크로 저장되있었던 파일들을 모두 모아서 하나의 파일로 합병한다. 그 다음에 reduce 테스크들이 각기 자신에게 해당하는 파티션의 데이터를 읽어가게된다. 

-맵출력버퍼링 단계: 맵테스크가 초기화될때 기본 100MB크기의 버퍼가 메모리에 생긴다. 맵에서 출력레코드가 생길때마다 파티셔너 클래스를 통해 그 레코드의 파티션 번호가 정해진다. (파티션번호, KEY, VALUE)정보를 메모리 버퍼에 쓴다. 버퍼가 80%차면 버퍼의 내용을 디스크에 쓰는데, 이때 파일이름은 spill이다. spill이 만들어질때 파티션 번호 순서로 레코드들이 정렬된다. 맵 출력이 더 없을때까지 이 과정을 반복한다. spill의 개수가 너무 많아지지 않도록 spill 파일의 최댓값을 가지고 있으며, spill의 수가 이 수를 넘어가면 디스크기반 merge sort를 한다. 맵출력이 끝나면, 모든 spill파일들과 메모리 버퍼의 남은 것들을 모두 하나의 디스크 파일로 merge sort한다. 컴바이너는 spill이 발생할때, 또는 spill간의 합병이 될때 여러번 실행된다. 

 

++ Reducer Class ++

public void run(Context context) throws IOException, InterruptedException{
	setup(context);
    while(context.nextkeyValue()){
    	reduce(context.getCurrentKey(), context.getValues(), context);
    }
    cleanup(context);
}

-reduce task: 리듀스 테스크 개수는 프레임워크에서 정하지 않고 사용자가 준값을 그대로 사용한다. job클래스의 setNumReduceTasks 메소드를 호출하여 그 숫자를 결정하면된다. 호출이 없다면 default는 1이다. 맵 테스크 출력 레코드들이 shuffling &sorting을 거쳐 리듀스 테스크의 입력이된다. 리듀스의 출력레코드는 HDFS상에 저장된다. 출력포맷을 정할 수 있으며 기본적으로는 TextOutputFormat이다. 출력파일들의 위치지정은 FileOutputFormat클래스의 setOutputPath메소드를 통해 가능하다. reduce메소드의 인자인 value 리스트(Iterative <ValueType>)는 한번만 스캔이 가능하다. 입력레코드들이 메모리에 저장되기에는 너무 클 수 있기 때문이다. 

-출력포멧: job.setOutputFormatClass 메소드로 지정한다. 

TextOutputFormat: 텍스트 파일생성, 출력레코드가 하나의 라인으로 key와 value사이에 TAB문자, setCompressOutput, setOutputCompressorClass를 사용하여 압축이 가능하다.

SequenceFileOutputFormat: 시퀀스 파일포멧으로 출력, 여러 하둡 job을 이어서 실행할 경우 사용해야한다. SequenceFileOutputFormat 클래스의 setOutputCompressionType메소드를 통해 압축방식을 지정한다. (BLOCK: 블록내의 레코드들을 함께 압축, NONE:압축하지 않음, RECORD: 레코드별로압축)

MapFileOutputFormat: 출력을 앱 파일 형태로 만들어주는 출력포멧, 이런파일은 SequenceFileInputFormat으로 읽을 수 있다. 맵 출력이 임시 저장되는 파일들은 사실 맵 파일 형태로 유지된다. 

-Identity Reducer: job.setReducerClass(Reducer.class);를 사용하여 입력레코드를 그대로 출력한다.

protected void reduce(KEYIN key, Iterable<VALUEIN> values, Context context) throws IOException, InterruptedException{
	for (VALUEIN value: values){
		context.write(KEYOUT) key, (VALUEOUT)value);
	}
}

맵테스크가 출력한 것을 그대로 출력하는 셈이지만, 셔플링과 소팅때문에 맵테스크들에서의 원래 출력과 순서가 달라진다. 주로 이미지 변환이나 문서 분류기 등에 사용된다. 단순한 작업을 여러 서버에서 동시에 실행하고 싶을 때 사용된다. 리듀스 테스크의 수를 0으로 하는 경우에는 맵테스크의 출력이 그대로 job 전체의 최종출력이 되고, 최종 출력물의 개수는 맵테스크의 개수와 같다. 이경우에 파일이름은 part-m-으로 시작한다. 하지만 identity reducer를 쓰는 경우에는 레코드 순서가 달라지고 최종 출력물의 개수는 리듀스 테스크의 개수가 된다. 이 경우에 파일이름은 part-r-로 시작한다.