Mapreduce初析
Mapreduce是一個(gè)計(jì)算框架,既然是做計(jì)算的框架,那么表現(xiàn)形式就是有個(gè)輸入(input),mapreduce操作這個(gè)輸入(input),通過(guò)本身定義好的計(jì)算模型,得到一個(gè)輸出(output),這個(gè)輸出就是我們所需要的結(jié)果。
我們要學(xué)習(xí)的就是這個(gè)計(jì)算模型的運(yùn)行規(guī)則。在運(yùn)行一個(gè)mapreduce計(jì)算任務(wù)時(shí)候,任務(wù)過(guò)程被分為兩個(gè)階段:map階段和reduce階段,每個(gè)階段都是用鍵值對(duì)(key/value)作為輸入(input)和輸出(output)。而程序員要做的就是定義好這兩個(gè)階段的函數(shù):map函數(shù)和reduce函數(shù)。
mapreduce編程實(shí)例
1、數(shù)據(jù)去重
“數(shù)據(jù)去重”主要是為了掌握和利用并行化思想來(lái)對(duì)數(shù)據(jù)進(jìn)行有意義的篩選。統(tǒng)計(jì)大數(shù)據(jù)集上的數(shù)據(jù)種類個(gè)數(shù)、從網(wǎng)站日志中計(jì)算訪問(wèn)地等這些看似龐雜的任務(wù)都會(huì)涉及數(shù)據(jù)去重。下面就進(jìn)入這個(gè)實(shí)例的MapReduce程序設(shè)計(jì)。
1.1 實(shí)例描述
對(duì)數(shù)據(jù)文件中的數(shù)據(jù)進(jìn)行去重。數(shù)據(jù)文件中的每行都是一個(gè)數(shù)據(jù)。
樣例輸入如下所示:
1)file1:
?
2)file2:
?
樣例輸出如下所示:
?
1.2 設(shè)計(jì)思路
數(shù)據(jù)去重的最終目標(biāo)是讓原始數(shù)據(jù)中出現(xiàn)次數(shù)超過(guò)一次的數(shù)據(jù)在輸出文件中只出現(xiàn)一次。我們自然而然會(huì)想到將同一個(gè)數(shù)據(jù)的所有記錄都交給一臺(tái)reduce機(jī)器,無(wú)論這個(gè)數(shù)據(jù)出現(xiàn)多少次,只要在最終結(jié)果中輸出一次就可以了。具體就是reduce的輸入應(yīng)該以數(shù)據(jù)作為key,而對(duì)value-list則沒(méi)有要求。當(dāng)reduce接收到一個(gè)《key,value-list》時(shí)就直接將key復(fù)制到輸出的key中,并將value設(shè)置成空值。
在MapReduce流程中,map的輸出《key,value》經(jīng)過(guò)shuffle過(guò)程聚集成《key,value- list》后會(huì)交給reduce。所以從設(shè)計(jì)好的reduce輸入可以反推出map的輸出key應(yīng)為數(shù)據(jù),value任意。繼續(xù)反推,map輸出數(shù) 據(jù)的key為數(shù)據(jù),而在這個(gè)實(shí)例中每個(gè)數(shù)據(jù)代表輸入文件中的一行內(nèi)容,所以map階段要完成的任務(wù)就是在采用Hadoop默認(rèn)的作業(yè)輸入方式之后,將 value設(shè)置為key,并直接輸出(輸出中的value任意)。map中的結(jié)果經(jīng)過(guò)shuffle過(guò)程之后交給reduce。reduce階段不會(huì)管每 個(gè)key有多少個(gè)value,它直接將輸入的key復(fù)制為輸出的key,并輸出就可以了(輸出中的value被設(shè)置成空了)。
1.3 程序代碼
程序代碼如下所示:
package com.hebut.mr;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
public class Dedup {
//map將輸入中的value復(fù)制到輸出數(shù)據(jù)的key上,并直接輸出
public static class Map extends Mapper《Object,Text,Text,Text》{
private static Text line=new Text();//每行數(shù)據(jù)
//實(shí)現(xiàn)map函數(shù)
public void map(Object key,Text value,Context context)
throws IOException,InterruptedException{
line=value;
context.write(line, new Text(“”));
}
}
//reduce將輸入中的key復(fù)制到輸出數(shù)據(jù)的key上,并直接輸出
public static class Reduce extends Reducer《Text,Text,Text,Text》{
//實(shí)現(xiàn)reduce函數(shù)
public void reduce(Text key,Iterable《Text》 values,Context context)
throws IOException,InterruptedException{
context.write(key, new Text(“”));
}
}
public static void main(String[] args) throws Exception{
Configuration conf = new Configuration();
//這句話很關(guān)鍵
conf.set(“mapred.job.tracker”, “192.168.1.2:9001”);
String[] ioArgs=new String[]{“dedup_in”,“dedup_out”};
String[] otherArgs = new GenericOptionsParser(conf, ioArgs).getRemainingArgs();
if (otherArgs.length != 2) {
System.err.println(“Usage: Data Deduplication 《in》 《out》”);
System.exit(2);
}
Job job = new Job(conf, “Data Deduplication”);
job.setJarByClass(Dedup.class);
//設(shè)置Map、Combine和Reduce處理類
job.setMapperClass(Map.class);
job.setCombinerClass(Reduce.class);
job.setReducerClass(Reduce.class);
//設(shè)置輸出類型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
//設(shè)置輸入和輸出目錄
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
} }
1.4 代碼結(jié)果
1)準(zhǔn)備測(cè)試數(shù)據(jù)
通過(guò)Eclipse下面的“DFS Locations”在“/user/hadoop”目錄下創(chuàng)建輸入文件“dedup_in”文件夾(備注:“dedup_out”不需要?jiǎng)?chuàng)建。)如圖1.4-1所示,已經(jīng)成功創(chuàng)建。
?
然后在本地建立兩個(gè)txt文件,通過(guò)Eclipse上傳到“/user/hadoop/dedup_in”文件夾中,兩個(gè)txt文件的內(nèi)容如“實(shí)例描述”那兩個(gè)文件一樣。如圖1.4-2所示,成功上傳之后。
從SecureCRT遠(yuǎn)處查看“Master.Hadoop”的也能證實(shí)我們上傳的兩個(gè)文件。
?
查看兩個(gè)文件的內(nèi)容如圖1.4-3所示:
?
2)查看運(yùn)行結(jié)果
這時(shí)我們右擊Eclipse 的“DFS Locations”中“/user/hadoop”文件夾進(jìn)行刷新,這時(shí)會(huì)發(fā)現(xiàn)多出一個(gè)“dedup_out”文件夾,且里面有3個(gè)文件,然后打開(kāi)雙 其“part-r-00000”文件,會(huì)在Eclipse中間把內(nèi)容顯示出來(lái)。如圖1.4-4所示。
?
此時(shí),你可以對(duì)比一下和我們之前預(yù)期的結(jié)果是否一致。
#e#
2、數(shù)據(jù)排序
“數(shù)據(jù)排序”是許多實(shí)際任務(wù)執(zhí)行時(shí)要完成的第一項(xiàng)工作,比如學(xué)生成績(jī)?cè)u(píng)比、數(shù)據(jù)建立索引等。這個(gè)實(shí)例和數(shù)據(jù)去重類似,都是先對(duì)原始數(shù)據(jù)進(jìn)行初步處理,為進(jìn)一步的數(shù)據(jù)操作打好基礎(chǔ)。下面進(jìn)入這個(gè)示例。
2.1 實(shí)例描述
對(duì)輸入文件中數(shù)據(jù)進(jìn)行排序。輸入文件中的每行內(nèi)容均為一個(gè)數(shù)字,即一個(gè)數(shù)據(jù)。要求在輸出中每行有兩個(gè)間隔的數(shù)字,其中,第一個(gè)代表原始數(shù)據(jù)在原始數(shù)據(jù)集中的位次,第二個(gè)代表原始數(shù)據(jù)。
樣例輸入:
1)file1:
2
32
654
32
15
756
65223
2)file2:
5956
22
650
92
3)file3:
26
54
6
樣例輸出:
1 2
2 6
3 15
4 22
5 26
6 32
7 32
8 54
9 92
10 650
11 654
12 756
13 5956
14 65223
2.2 設(shè)計(jì)思路
這個(gè)實(shí)例僅僅要求對(duì)輸入數(shù)據(jù)進(jìn)行排序,熟悉MapReduce過(guò)程的讀者會(huì)很快想到在MapReduce過(guò)程中就有排序,是否可以利用這個(gè)默認(rèn)的排序,而不需要自己再實(shí)現(xiàn)具體的排序呢?答案是肯定的。
但是在使用之前首先需要了解它的默認(rèn)排序規(guī)則。它是按照key值進(jìn)行排序的,如果key為封裝int的IntWritable類型,那么MapReduce按照數(shù)字大小對(duì)key排序,如果key為封裝為String的Text類型,那么MapReduce按照字典順序?qū)ψ址判颉?/p>
了解了這個(gè)細(xì)節(jié),我們就知道應(yīng)該使用封裝int的IntWritable型數(shù)據(jù)結(jié)構(gòu)了。也就是在map中將讀入的數(shù)據(jù)轉(zhuǎn)化成 IntWritable型,然后作為key值輸出(value任意)。reduce拿到《key,value-list》之后,將輸入的 key作為value輸出,并根據(jù)value-list中元素的個(gè)數(shù)決定輸出的次數(shù)。輸出的key(即代碼中的linenum)是一個(gè)全局變量,它統(tǒng)計(jì)當(dāng)前key的位次。需要注意的是這個(gè)程序中沒(méi)有配置Combiner,也就是在MapReduce過(guò)程中不使用Combiner。這主要是因?yàn)槭褂胢ap和reduce就已經(jīng)能夠完成任務(wù)了。
2.3 程序代碼
程序代碼如下所示:
package com.hebut.mr;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
public class Sort {
//map將輸入中的value化成IntWritable類型,作為輸出的key
public static class Map extends
Mapper《Object,Text,IntWritable,IntWritable》{
private static IntWritable data=new IntWritable();
//實(shí)現(xiàn)map函數(shù)
public void map(Object key,Text value,Context context)
throws IOException,InterruptedException{
String line=value.toString();
data.set(Integer.parseInt(line));
context.write(data, new IntWritable(1));
}
}
//reduce將輸入中的key復(fù)制到輸出數(shù)據(jù)的key上,
//然后根據(jù)輸入的value-list中元素的個(gè)數(shù)決定key的輸出次數(shù)
//用全局linenum來(lái)代表key的位次
public static class Reduce extends
Reducer《IntWritable,IntWritable,IntWritable,IntWritable》{
private static IntWritable linenum = new IntWritable(1);
//實(shí)現(xiàn)reduce函數(shù)
public void reduce(IntWritable key,Iterable《IntWritable》 values,Context context)
throws IOException,InterruptedException{
for(IntWritable val:values){
context.write(linenum, key);
linenum = new IntWritable(linenum.get()+1);
}
}
}
public static void main(String[] args) throws Exception{
Configuration conf = new Configuration();
//這句話很關(guān)鍵
conf.set(“mapred.job.tracker”, “192.168.1.2:9001”);
String[] ioArgs=new String[]{“sort_in”,“sort_out”};
String[] otherArgs = new GenericOptionsParser(conf, ioArgs).getRemainingArgs();
if (otherArgs.length != 2) {
System.err.println(“Usage: Data Sort 《in》 《out》”);
System.exit(2);
}
Job job = new Job(conf, “Data Sort”);
job.setJarByClass(Sort.class);
//設(shè)置Map和Reduce處理類
job.setMapperClass(Map.class);
job.setReducerClass(Reduce.class);
//設(shè)置輸出類型
job.setOutputKeyClass(IntWritable.class);
job.setOutputValueClass(IntWritable.class);
//設(shè)置輸入和輸出目錄
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
2.4 代碼結(jié)果
1)準(zhǔn)備測(cè)試數(shù)據(jù)
通過(guò)Eclipse下面的“DFS Locations”在“/user/hadoop”目錄下創(chuàng)建輸入文件“sort_in”文件夾(備注:“sort_out”不需要?jiǎng)?chuàng)建。)如圖2.4-1所示,已經(jīng)成功創(chuàng)建。
?
然后在本地建立三個(gè)txt文件,通過(guò)Eclipse上傳到“/user/hadoop/sort_in”文件夾中,三個(gè)txt文件的內(nèi)容如“實(shí)例描述”那三個(gè)文件一樣。如圖2.4-2所示,成功上傳之后。
從SecureCRT遠(yuǎn)處查看“Master.Hadoop”的也能證實(shí)我們上傳的三個(gè)文件。
?
查看兩個(gè)文件的內(nèi)容如圖2.4-3所示:
?
2)查看運(yùn)行結(jié)果
這時(shí)我們右擊Eclipse 的“DFS Locations”中“/user/hadoop”文件夾進(jìn)行刷新,這時(shí)會(huì)發(fā)現(xiàn)多出一個(gè)“sort_out”文件夾,且里面有3個(gè)文件,然后打開(kāi)雙 其“part-r-00000”文件,會(huì)在Eclipse中間把內(nèi)容顯示出來(lái)。如圖2.4-4所示。
?
#e#
3、平均成績(jī)
“平均成績(jī)”主要目的還是在重溫經(jīng)典“WordCount”例子,可以說(shuō)是在基礎(chǔ)上的微變化版,該實(shí)例主要就是實(shí)現(xiàn)一個(gè)計(jì)算學(xué)生平均成績(jī)的例子。
3.1 實(shí)例描述
對(duì)輸入文件中數(shù)據(jù)進(jìn)行就算學(xué)生平均成績(jī)。輸入文件中的每行內(nèi)容均為一個(gè)學(xué)生的姓名和他相應(yīng)的成績(jī),如果有多門學(xué)科,則每門學(xué)科為一個(gè)文件。要求在輸出中每行有兩個(gè)間隔的數(shù)據(jù),其中,第一個(gè)代表學(xué)生的姓名,第二個(gè)代表其平均成績(jī)。
樣本輸入:
1)math:
張三 88
李四 99
王五 66
趙六 77
2)china:
張三 78
李四 89
王五 96
趙六 67
3)english:
張三 80
李四 82
王五 84
趙六 86
樣本輸出:
張三 82
李四 90
王五 82
趙六 76
3.2 設(shè)計(jì)思路
計(jì)算學(xué)生平均成績(jī)是一個(gè)仿“WordCount”例子,用來(lái)重溫一下開(kāi)發(fā)MapReduce程序的流程。程序包括兩部分的內(nèi)容:Map部分和Reduce部分,分別實(shí)現(xiàn)了map和reduce的功能。
Map處理的是一個(gè)純文本文件, 文件中存放的數(shù)據(jù)時(shí)每一行表示一個(gè)學(xué)生的姓名和他相應(yīng)一科成績(jī)。Mapper處理的數(shù)據(jù)是由InputFormat分解過(guò)的數(shù)據(jù)集,其中 InputFormat的作用是將數(shù)據(jù)集切割成小數(shù)據(jù)集InputSplit,每一個(gè)InputSlit將由一個(gè)Mapper負(fù)責(zé)處理。此 外,InputFormat中還提供了一個(gè)RecordReader的實(shí)現(xiàn),并將一個(gè)InputSplit解析成《key,value》對(duì)提 供給了map函數(shù)。InputFormat的默認(rèn)值是TextInputFormat,它針對(duì)文本文件,按行將文本切割成InputSlit,并用 LineRecordReader將InputSplit解析成《key,value》對(duì),key是行在文本中的位置,value是文件中的 一行。
Map的結(jié)果會(huì)通過(guò)partion分發(fā)到Reducer,Reducer做完Reduce操作后,將通過(guò)以格式OutputFormat輸出。
Mapper最終處理的結(jié)果對(duì)《key,value》,會(huì)送到Reducer中進(jìn)行合并,合并的時(shí)候,有相同key的鍵/值對(duì)則送到同一個(gè) Reducer上。Reducer是所有用戶定制Reducer類地基礎(chǔ),它的輸入是key和這個(gè)key對(duì)應(yīng)的所有value的一個(gè)迭代器,同時(shí)還有 Reducer的上下文。Reduce的結(jié)果由Reducer.Context的write方法輸出到文件中。
3.3 程序代碼
程序代碼如下所示:
package com.hebut.mr;
import java.io.IOException;
import java.util.Iterator;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
public class Score {
public static class Map extends
Mapper《LongWritable, Text, Text, IntWritable》 {
// 實(shí)現(xiàn)map函數(shù)
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
// 將輸入的純文本文件的數(shù)據(jù)轉(zhuǎn)化成String
String line = value.toString();
// 將輸入的數(shù)據(jù)首先按行進(jìn)行分割
StringTokenizer tokenizerArticle = new StringTokenizer(line, “\n”);
// 分別對(duì)每一行進(jìn)行處理
while (tokenizerArticle.hasMoreElements()) {
// 每行按空格劃分
StringTokenizer tokenizerLine = newStringTokenizer(tokenizerArticle.nextToken());
String strName = tokenizerLine.nextToken();// 學(xué)生姓名部分
String strScore = tokenizerLine.nextToken();// 成績(jī)部分
Text name = new Text(strName);
int scoreInt = Integer.parseInt(strScore);
// 輸出姓名和成績(jī)
context.write(name, new IntWritable(scoreInt));
}
}
}
public static class Reduce extends
Reducer《Text, IntWritable, Text, IntWritable》 {
// 實(shí)現(xiàn)reduce函數(shù)
public void reduce(Text key, Iterable《IntWritable》 values,
Context context) throws IOException, InterruptedException {
int sum = 0;
int count = 0;
Iterator《IntWritable》 iterator = values.iterator();
while (iterator.hasNext()) {
sum += iterator.next().get();// 計(jì)算總分
count++;// 統(tǒng)計(jì)總的科目數(shù)
}
int average = (int) sum / count;// 計(jì)算平均成績(jī)
context.write(key, new IntWritable(average));
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
// 這句話很關(guān)鍵
conf.set(“mapred.job.tracker”, “192.168.1.2:9001”);
String[] ioArgs = new String[] { “score_in”, “score_out” };
String[] otherArgs = new GenericOptionsParser(conf, ioArgs).getRemainingArgs();
if (otherArgs.length != 2) {
System.err.println(“Usage: Score Average 《in》 《out》”);
System.exit(2);
}
Job job = new Job(conf, “Score Average”);
job.setJarByClass(Score.class);
// 設(shè)置Map、Combine和Reduce處理類
job.setMapperClass(Map.class);
job.setCombinerClass(Reduce.class);
job.setReducerClass(Reduce.class);
// 設(shè)置輸出類型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
// 將輸入的數(shù)據(jù)集分割成小數(shù)據(jù)塊splites,提供一個(gè)RecordReder的實(shí)現(xiàn)
job.setInputFormatClass(TextInputFormat.class);
// 提供一個(gè)RecordWriter的實(shí)現(xiàn),負(fù)責(zé)數(shù)據(jù)輸出
job.setOutputFormatClass(TextOutputFormat.class);
// 設(shè)置輸入和輸出目錄
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
3.4 代碼結(jié)果
1)準(zhǔn)備測(cè)試數(shù)據(jù)
通過(guò)Eclipse下面的“DFS Locations”在“/user/hadoop”目錄下創(chuàng)建輸入文件“score_in”文件夾(備注:“score_out”不需要?jiǎng)?chuàng)建。)如圖3.4-1所示,已經(jīng)成功創(chuàng)建。
?
然后在本地建立三個(gè)txt文件,通過(guò)Eclipse上傳到“/user/hadoop/score_in”文件夾中,三個(gè)txt文件的內(nèi)容如“實(shí)例描述”那三個(gè)文件一樣。如圖3.4-2所示,成功上傳之后。
備注:文本文件的編碼為“UTF-8”,默認(rèn)為“ANSI”,可以另存為時(shí)選擇,不然中文會(huì)出現(xiàn)亂碼。
從SecureCRT遠(yuǎn)處查看“Master.Hadoop”的也能證實(shí)我們上傳的三個(gè)文件。
?
查看三個(gè)文件的內(nèi)容如圖3.4-3所示:
?
2)查看運(yùn)行結(jié)果
這時(shí)我們右擊Eclipse 的“DFS Locations”中“/user/hadoop”文件夾進(jìn)行刷新,這時(shí)會(huì)發(fā)現(xiàn)多出一個(gè)“score_out”文件夾,且里面有3個(gè)文件,然后打開(kāi)雙 其“part-r-00000”文件,會(huì)在Eclipse中間把內(nèi)容顯示出來(lái)。如圖3.4-4所示。
?
#e#
4、單表關(guān)聯(lián)
前面的實(shí)例都是在數(shù)據(jù)上進(jìn)行一些簡(jiǎn)單的處理,為進(jìn)一步的操作打基礎(chǔ)?!皢伪黻P(guān)聯(lián)”這個(gè)實(shí)例要求從給出的數(shù)據(jù)中尋找所關(guān)心的數(shù)據(jù),它是對(duì)原始數(shù)據(jù)所包含信息的挖掘。下面進(jìn)入這個(gè)實(shí)例。
4.1 實(shí)例描述
實(shí)例中給出child-parent(孩子——父母)表,要求輸出grandchild-grandparent(孫子——爺奶)表。
樣例輸入如下所示。
file:
child parent
Tom Lucy
Tom Jack
Jone Lucy
Jone Jack
Lucy Mary
Lucy Ben
Jack Alice
Jack Jesse
Terry Alice
Terry Jesse
Philip Terry
Philip Alma
Mark Terry
Mark Alma
家族樹(shù)狀關(guān)系譜:
樣例輸出如下所示。
file:
grandchild grandparent
Tom Alice
Tom Jesse
Jone Alice
Jone Jesse
Tom Mary
Tom Ben
Jone Mary
Jone Ben
Philip Alice
Philip Jesse
Mark Alice
Mark Jesse
4.2 設(shè)計(jì)思路
分析這個(gè)實(shí)例,顯然需要進(jìn)行單表連接,連接的是左表的parent列和右表的child列,且左表和右表是同一個(gè)表。
連接結(jié)果中除去連接的兩列就是所需要的結(jié)果——“grandchild--grandparent”表。要用MapReduce解決這個(gè)實(shí)例,首先應(yīng)該考慮如何實(shí)現(xiàn)表的自連接;其次就是連接列的設(shè)置;最后是結(jié)果的整理。
考慮到MapReduce的shuffle過(guò)程會(huì)將相同的key會(huì)連接在一起,所以可以將map結(jié)果的key設(shè)置成待連接的列,然后列中相同的值就自然會(huì)連接在一起了。再與最開(kāi)始的分析聯(lián)系起來(lái):
要連接的是左表的parent列和右表的child列,且左表和右表是同一個(gè)表,所以在map階段將讀入數(shù)據(jù)分割成child和parent之后,會(huì)將parent設(shè)置成key,child設(shè)置成value進(jìn)行輸出,并作為左表;再將同一對(duì)child和parent中的child設(shè)置成key,parent設(shè)置成value進(jìn)行輸出,作為右表。為了區(qū)分輸出中的左右表,需要在輸出的value中再加上左右表的信息,比如在value的String最開(kāi)始處加上字符1表示左表,加上字符2表示右表。這樣在map的結(jié)果中就形成了左表和右表,然后在shuffle過(guò)程中完成連接。reduce接收到連接的結(jié)果,其中每個(gè)key的value-list就包含了“grandchild--grandparent”關(guān)系。取出每個(gè)key的value-list進(jìn)行解析,將左表中的child放入一個(gè)數(shù)組,右表中的parent放入一個(gè)數(shù)組,然后對(duì)兩個(gè)數(shù)組求笛卡爾積就是最后的結(jié)果了。
4.3 程序代碼
程序代碼如下所示。
package com.hebut.mr;
import java.io.IOException;
import java.util.*;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
public class STjoin {
public static int time = 0;
/*
* map將輸出分割child和parent,然后正序輸出一次作為右表,
* 反序輸出一次作為左表,需要注意的是在輸出的value中必須
* 加上左右表的區(qū)別標(biāo)識(shí)。
*/
public static class Map extends Mapper《Object, Text, Text, Text》 {
// 實(shí)現(xiàn)map函數(shù)
public void map(Object key, Text value, Context context)
throws IOException, InterruptedException {
String childname = new String();// 孩子名稱
String parentname = new String();// 父母名稱
String relationtype = new String();// 左右表標(biāo)識(shí)
// 輸入的一行預(yù)處理文本
StringTokenizer itr=new StringTokenizer(value.toString());
String[] values=new String[2];
int i=0;
while(itr.hasMoreTokens()){
values[i]=itr.nextToken();
i++;
}
if (values[0].compareTo(“child”) != 0) {
childname = values[0];
parentname = values[1];
// 輸出左表
relationtype = “1”;
context.write(new Text(values[1]), new Text(relationtype +
“+”+ childname + “+” + parentname));
// 輸出右表
relationtype = “2”;
context.write(new Text(values[0]), new Text(relationtype +
“+”+ childname + “+” + parentname));
}
}
}
public static class Reduce extends Reducer《Text, Text, Text, Text》 {
// 實(shí)現(xiàn)reduce函數(shù)
public void reduce(Text key, Iterable《Text》 values, Context context)
throws IOException, InterruptedException {
// 輸出表頭
if (0 == time) {
context.write(new Text(“grandchild”), new Text(“grandparent”));
time++;
}
int grandchildnum = 0;
String[] grandchild = new String[10];
int grandparentnum = 0;
String[] grandparent = new String[10];
Iterator ite = values.iterator();
while (ite.hasNext()) {
String record = ite.next().toString();
int len = record.length();
int i = 2;
if (0 == len) {
continue;
}
// 取得左右表標(biāo)識(shí)
char relationtype = record.charAt(0);
// 定義孩子和父母變量
String childname = new String();
String parentname = new String();
// 獲取value-list中value的child
while (record.charAt(i) != ‘+’) {
childname += record.charAt(i);
i++;
}
i = i + 1;
// 獲取value-list中value的parent
while (i 《 len) {
parentname += record.charAt(i);
i++;
}
// 左表,取出child放入grandchildren
if (‘1’ == relationtype) {
grandchild[grandchildnum] = childname;
grandchildnum++;
}
// 右表,取出parent放入grandparent
if (‘2’ == relationtype) {
grandparent[grandparentnum] = parentname;
grandparentnum++;
}
}
// grandchild和grandparent數(shù)組求笛卡爾兒積
if (0 != grandchildnum && 0 != grandparentnum) {
for (int m = 0; m 《 grandchildnum; m++) {
for (int n = 0; n 《 grandparentnum; n++) {
// 輸出結(jié)果
context.write(new Text(grandchild[m]), newText(grandparent[n]));
}
}
}
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
// 這句話很關(guān)鍵
conf.set(“mapred.job.tracker”, “192.168.1.2:9001”);
String[] ioArgs = new String[] { “STjoin_in”, “STjoin_out” };
String[] otherArgs = new GenericOptionsParser(conf, ioArgs).getRemainingArgs();
if (otherArgs.length != 2) {
System.err.println(“Usage: Single Table Join 《in》 《out》”);
System.exit(2);
}
Job job = new Job(conf, “Single Table Join”);
job.setJarByClass(STjoin.class);
// 設(shè)置Map和Reduce處理類
job.setMapperClass(Map.class);
job.setReducerClass(Reduce.class);
// 設(shè)置輸出類型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
// 設(shè)置輸入和輸出目錄
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
4.4 代碼結(jié)果
1)準(zhǔn)備測(cè)試數(shù)據(jù)
通過(guò)Eclipse下面的“DFS Locations”在“/user/hadoop”目錄下創(chuàng)建輸入文件“STjoin_in”文件夾(備注:“STjoin_out”不需要?jiǎng)?chuàng)建。)如圖4.4-1所示,已經(jīng)成功創(chuàng)建。
?
然后在本地建立一個(gè)txt文件,通過(guò)Eclipse上傳到“/user/hadoop/STjoin_in”文件夾中,一個(gè)txt文件的內(nèi)容如“實(shí)例描述”那個(gè)文件一樣。如圖4.4-2所示,成功上傳之后。
從SecureCRT遠(yuǎn)處查看“Master.Hadoop”的也能證實(shí)我們上傳的文件,顯示其內(nèi)容如圖4.4-3所示:
?
2)運(yùn)行詳解
?。?)Map處理:
(2)Shuffle處理
在shuffle過(guò)程中完成連接。
?。?)Reduce處理
首先由語(yǔ)句“0 != grandchildnum && 0 != grandparentnum”得知,只要在“value-list”中沒(méi)有左表或者右表,則不會(huì)做處理,可以根據(jù)這條規(guī)則去除無(wú)效的shuffle連接。
然后根據(jù)下面語(yǔ)句進(jìn)一步對(duì)有效的shuffle連接做處理。
// 左表,取出child放入grandchildren
if (‘1’ == relationtype) {
grandchild[grandchildnum] = childname;
grandchildnum++;
}
// 右表,取出parent放入grandparent
if (‘2’ == relationtype) {
grandparent[grandparentnum] = parentname;
grandparentnum++;
}
針對(duì)一條數(shù)據(jù)進(jìn)行分析:
《Jack,1+Tom+Jack,
1+Jone+Jack,
2+Jack+Alice,
2+Jack+Jesse 》
分析結(jié)果:左表用“字符1”表示,右表用“字符2”表示,上面的《key,value-list》中的“key”表示左表與右表的連接鍵。而“value-list”表示以“key”連接的左表與右表的相關(guān)數(shù)據(jù)。
根據(jù)上面針對(duì)左表與右表不同的處理規(guī)則,取得兩個(gè)數(shù)組的數(shù)據(jù)。
然后根據(jù)下面語(yǔ)句進(jìn)行處理。
for (int m = 0; m 《 grandchildnum; m++) {
for (int n = 0; n 《 grandparentnum; n++) {
context.write(new Text(grandchild[m]), new Text(grandparent[n]));
}
}
處理結(jié)果如下面所示:
Tom Jesse
Tom Alice
Jone Jesse
Jone Alice
其他的有效shuffle連接處理都是如此。
3)查看運(yùn)行結(jié)果
這時(shí)我們右擊Eclipse 的“DFS Locations”中“/user/hadoop”文件夾進(jìn)行刷新,這時(shí)會(huì)發(fā)現(xiàn)多出一個(gè)“STjoin_out”文件夾,且里面有3個(gè)文件,然后打開(kāi)雙 其“part-r-00000”文件,會(huì)在Eclipse中間把內(nèi)容顯示出來(lái)。如圖4.4-4所示。
?
#e#
? 5、多表關(guān)聯(lián)
多表關(guān)聯(lián)和單表關(guān)聯(lián)類似,它也是通過(guò)對(duì)原始數(shù)據(jù)進(jìn)行一定的處理,從其中挖掘出關(guān)心的信息。下面進(jìn)入這個(gè)實(shí)例。
5.1 實(shí)例描述
輸入是兩個(gè)文件,一個(gè)代表工廠表,包含工廠名列和地址編號(hào)列;另一個(gè)代表地址表,包含地址名列和地址編號(hào)列。要求從輸入數(shù)據(jù)中找出工廠名和地址名的對(duì)應(yīng)關(guān)系,輸出“工廠名——地址名”表。
樣例輸入如下所示。
1)factory:
factoryname addressed
Beijing Red Star 1
Shenzhen Thunder 3
Guangzhou Honda 2
Beijing Rising 1
Guangzhou Development Bank 2
Tencent 3
Back of Beijing 1
2)address:
addressID addressname
1 Beijing
2 Guangzhou
3 Shenzhen
4 Xian
樣例輸出如下所示。
factoryname addressname
Back of Beijing Beijing
Beijing Red Star Beijing
Beijing Rising Beijing
Guangzhou Development Bank Guangzhou
Guangzhou Honda Guangzhou
Shenzhen Thunder Shenzhen
Tencent Shenzhen
5.2 設(shè)計(jì)思路
多表關(guān)聯(lián)和單表關(guān)聯(lián)相似,都類似于數(shù)據(jù)庫(kù)中的自然連接。相比單表關(guān)聯(lián),多表關(guān)聯(lián)的左右表和連接列更加清楚。所以可以采用和單表關(guān)聯(lián)的相同的處理方式,map識(shí)別出輸入的行屬于哪個(gè)表之后,對(duì)其進(jìn)行分割,將連接的列值保存在key中,另一列和左右表標(biāo)識(shí)保存在value中,然后輸出。reduce拿到連接結(jié)果之后,解析value內(nèi)容,根據(jù)標(biāo)志將左右表內(nèi)容分開(kāi)存放,然后求笛卡爾積,最后直接輸出。
這個(gè)實(shí)例的具體分析參考單表關(guān)聯(lián)實(shí)例。下面給出代碼。
5.3 程序代碼
程序代碼如下所示:
package com.hebut.mr;
import java.io.IOException;
import java.util.*;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
public class MTjoin {
public static int time = 0;
/*
* 在map中先區(qū)分輸入行屬于左表還是右表,然后對(duì)兩列值進(jìn)行分割,
* 保存連接列在key值,剩余列和左右表標(biāo)志在value中,最后輸出
*/
public static class Map extends Mapper《Object, Text, Text, Text》 {
// 實(shí)現(xiàn)map函數(shù)
public void map(Object key, Text value, Context context)
throws IOException, InterruptedException {
String line = value.toString();// 每行文件
String relationtype = new String();// 左右表標(biāo)識(shí)
// 輸入文件首行,不處理
if (line.contains(“factoryname”) == true
|| line.contains(“addressed”) == true) {
return;
}
// 輸入的一行預(yù)處理文本
StringTokenizer itr = new StringTokenizer(line);
String mapkey = new String();
String mapvalue = new String();
int i = 0;
while (itr.hasMoreTokens()) {
// 先讀取一個(gè)單詞
String token = itr.nextToken();
// 判斷該地址ID就把存到“values[0]”
if (token.charAt(0) 》= ‘0’ && token.charAt(0) 《= ‘9’) {
mapkey = token;
if (i 》 0) {
relationtype = “1”;
} else {
relationtype = “2”;
}
continue;
}
// 存工廠名
mapvalue += token + “ ”;
i++;
}
// 輸出左右表
context.write(new Text(mapkey), new Text(relationtype + “+”+ mapvalue));
}
}
/*
* reduce解析map輸出,將value中數(shù)據(jù)按照左右表分別保存,
* 然后求出笛卡爾積,并輸出。
*/
public static class Reduce extends Reducer《Text, Text, Text, Text》 {
// 實(shí)現(xiàn)reduce函數(shù)
public void reduce(Text key, Iterable《Text》 values, Context context)
throws IOException, InterruptedException {
// 輸出表頭
if (0 == time) {
context.write(new Text(“factoryname”), new Text(“addressname”));
time++;
}
int factorynum = 0;
String[] factory = new String[10];
int addressnum = 0;
String[] address = new String[10];
Iterator ite = values.iterator();
while (ite.hasNext()) {
String record = ite.next().toString();
int len = record.length();
int i = 2;
if (0 == len) {
continue;
}
// 取得左右表標(biāo)識(shí)
char relationtype = record.charAt(0);
// 左表
if (‘1’ == relationtype) {
factory[factorynum] = record.substring(i);
factorynum++;
}
// 右表
if (‘2’ == relationtype) {
address[addressnum] = record.substring(i);
addressnum++;
}
}
// 求笛卡爾積
if (0 != factorynum && 0 != addressnum) {
for (int m = 0; m 《 factorynum; m++) {
for (int n = 0; n 《 addressnum; n++) {
// 輸出結(jié)果
context.write(new Text(factory[m]),
new Text(address[n]));
}
}
}
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
// 這句話很關(guān)鍵
conf.set(“mapred.job.tracker”, “192.168.1.2:9001”);
String[] ioArgs = new String[] { “MTjoin_in”, “MTjoin_out” };
String[] otherArgs = new GenericOptionsParser(conf, ioArgs).getRemainingArgs();
if (otherArgs.length != 2) {
System.err.println(“Usage: Multiple Table Join 《in》 《out》”);
System.exit(2);
}
Job job = new Job(conf, “Multiple Table Join”);
job.setJarByClass(MTjoin.class);
// 設(shè)置Map和Reduce處理類
job.setMapperClass(Map.class);
job.setReducerClass(Reduce.class);
// 設(shè)置輸出類型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
// 設(shè)置輸入和輸出目錄
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
5.4 代碼結(jié)果
1)準(zhǔn)備測(cè)試數(shù)據(jù)
通過(guò)Eclipse下面的“DFS Locations”在“/user/hadoop”目錄下創(chuàng)建輸入文件“MTjoin_in”文件夾(備注:“MTjoin_out”不需要?jiǎng)?chuàng)建。)如圖5.4-1所示,已經(jīng)成功創(chuàng)建。
?
然后在本地建立兩個(gè)txt文件,通過(guò)Eclipse上傳到“/user/hadoop/MTjoin_in”文件夾中,兩個(gè)txt文件的內(nèi)容如“實(shí)例描述”那兩個(gè)文件一樣。 成功上傳之后,從SecureCRT遠(yuǎn)處查看“Master.Hadoop”的也能證實(shí)我們上傳的兩個(gè)文件。
2)查看運(yùn)行結(jié)果
這時(shí)我們右擊Eclipse 的“DFS Locations”中“/user/hadoop”文件夾進(jìn)行刷新,這時(shí)會(huì)發(fā)現(xiàn)多出一個(gè)“MTjoin_out”文件夾,且里面有3個(gè)文件,然后打開(kāi)雙 其“part-r-00000”文件,會(huì)在Eclipse中間把內(nèi)容顯示出來(lái)。
6、倒排索引
“倒排索引”是文檔檢索系統(tǒng)中最常用的數(shù)據(jù)結(jié)構(gòu),被廣泛地應(yīng)用于全文搜索引擎。它主要是用來(lái)存儲(chǔ)某個(gè)單詞(或詞組)在一個(gè)文檔或一組文檔中的存儲(chǔ)位置的映射,即提供了一種根據(jù)內(nèi)容來(lái)查找文檔的方式。由于不是根據(jù)文檔來(lái)確定文檔所包含的內(nèi)容,而是進(jìn)行相反的操作,因而稱為倒排索引(Inverted Index)。
評(píng)論
查看更多