Yeon's 개발블로그

지식을 전파하는 개발자가 되고싶습니다.

Cloud Computing

15. MapReduce Program의 구조

Dev.yeon 2020. 12. 9. 18:50

1. MapReduce 프로그램의 구조

public class ExampleClass{ 

//inner class로, 부모클래스의 인스턴스가 없어도 인스턴스 생성이 가능하도록 static으로 정의

	public static vlass MyMapper extends Mapper<K1,V1,K2,V2>{
    	K2 k2= new K2();
        V2 v2=new V2();
        public void map (K1 key, V1 value, Context context){
        	//body
            context.write(k2,v2);
        }
    }
    
    public static class MyReducer extends Reducer<K2,V2,K3,V3>{
    	K3 k3=new K3();
        V3 v3=new V3();
        public void reduce(K2 key, Iterable<V2> values, Context context){
        	//body
            context.write(k3,v3);
        }
    }
    public static void main(String[] args)throws Exception{
    	Configuration conf=new Configuration(); //실행을 위해 job을 framwork에 설명해주는 인터페이스
    	Job job=Job.getInstance(conf,"job name"); //conf를 가지고 job instance생성
        //job instance를 사용하여 각종 초기화 작업수행
        //map, reduce클래스 지정
        //입력파일의 위치지정
        //출력파일이 저장될 위치지정
        //입력, 출력 포맷 지정
        //맵의 출력 key,value 타입지정
        //최종 출력의 key, value 타입지정(다른경우에만)
        //job 실행
    }





}

1) MyMapper 클래스: Mapper 클래스를 상속

Mapper 클래스에 map()메소드가 있으며, 입력(key,value)를 받아 출력(key,value)를 수행한다. K1,V1은 입력이고, K2,V2는 출력이다. Mapper클래스에는 setup(), map(), cleanup(),run()에 대한 메소드가 있다. map()메소드는 하나의 입력쌍에 대해 항상 하나의 출력쌍을 생성하는 것은 아니다. 

 

2)MyReducer 클래스: Reducer 클래스를 상속

Reducer 클래스에 reduce()메소드가 있으며, K2, V2는 입력이고, K3,V3는 출력이다. Reducer클래스에는 setup(), reduce(), cleanup(), run()에 대한 메소드가 있다. map()의 출력 key, value와 reduce()의 입력 key,value의 데이터형은 항상 일치해야한다. reduce()메소드는 항상 하나의 입력쌍에 대해 하나의 출력쌍을 생성하는 것은 아니다. 

 

2. 예제코드: WordCount 프로그램

1) package, import 부분

package wordcount;
import java.io.IOExceptio;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration; //job과 framwork사이 인터페이스
import org.apache.hadoop.fs.Path; //입출력 파일의 path지정
import org.apache.hadoop.io.IntWritable; //MapReduce가 사용하는 데이터타입
import org.apache.hadoop.io.Text; //MapReduce가 사용하는 데이터타입
import org.apache.hadoop.mapreduce.Job; //job클래스
import org.apache.hadoop.mapreduce.Mapper; //mapper클래스
import org.apache.hadoop.mapreduce.Reducer; //Reduce클래스
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; //입력데이터 형식,위치
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; //출력데이터 형식,위치

 

2) TekenizerMapper 클래스

public class WordCount{
	public static class TokenizerMapper extends Mapper<object, Text, Text,IntWritable>{
    	private final static IntWritable one=new IntWritable(1);//통신이 가능한 정수 
        private Text word=new Text();//통신이 가능한 string
        
        public void map(Object key, Text value, Context context) throws IOException , InterruptedException{
        	StringTokenizer itr=new StringTokenizer(value.toString()); //text to string
            while(itr.hasMoreToekens()){
            	word.set(itr.nectToken()); //word에 string을 text로 변환하여저장
                context.write(word,one); //map()의 출력 (key,value)쌍은 통신가능타입
        }
    }
}

TokenizerMapper

*하둡 제공 기본데이터타입

타입 설명
Text String에 해당하고, toString()을 사용하여 String형식으로 바꿀 수 있다.
IntWritable int에 해당하고, get()을 호출하여 int로 바꿀 수 있다.
LongWritable long에 해당하고, get()을 호출하여 long으로 바꿀 수 있다.
FloatWritable float에 해당하고, get()을 호출하여 float으로 바꿀 수 있다.
BooleanWritable boolean에 해당하고, get()을 호출하여 boolean으로 바꿀 수 있다. 
ArrayWritable 동일한 형의 여러값을 배열처럼 저장한다.
NullWritable null에 해당한다.

다시 Writable한 형태로 바꾸고 싶다면 set(변수이름)을 호출한다. 

 

3) IntSumReducer 클래스

public static class IntSumReducer extends Reducer<Text,IntWritable,Text,IntWritable>{
	private IntWritable result=new IntWritable();
    
    public void reduce(Text key, Iterable<IntWritable> values, Context context)throws IOException, InterruptedException{
    	int sum=0;
        for(IntWritable val:values){
        	sum+=val.get(); //int형으로 변환하여 계산
        }
        result.set(sum); //result에 sum을 IntWritable로 변환하여 저장
        context.write(key,result); //출력파일에 저장 
    }
}

IntSumReducer

map()함수의 출력 타입과 같은 형으로 입력을 받아야한다. 

 

4) main()에서의 job세팅

public static void main(String[] args)throws Exception{
	Configuration conf=new Configuration(); //job을 framework에 설명해주는 인터페이스
    Job job=Job.getInstance9conf,"word count"); //comf를 가지고 job instance생성
    job.setJarByClass(WordCount.class); //slavenode에 jar파일을 보내 실행시킬때 찾을수있게
    job.setMapperClass(TokenizerMapper.class); //mapper클래스 지정
    job.setCombinerClass(IntSumReducer.class); //combiner클래스지정(맵이 실행되는 노드에서)
    job.setReducerClass(IntSumReducer.class); //reducer클래스지정
    job.setOutputKeyClass(Text.class); //output key타입지정
    job.setOutputValueClass(IntWritable.class); //output value타입지정
    FileInputFormat.addInputPath(job,new Path(args[0])); //입력파일이 있는 폴더지정
    FileOutputFormat.setOutputPath(job, new Path(args[1])); //출력파일을 저장할 폴더지정
    System.exit(job.waitForCompletion(true)?0:1); //job실행 (성공적이면 0)
}

-Combiner의 역할: map함수가 출력한 key, value쌍들을 대상으로 slave node내에서 실행된다. 그 후에, shuffle& sort가 진행된다.