用Google cloud跑一個(gè)Hadoop MapReduce 的 job, 想建一個(gè) inverted index。
輸入文件5個(gè),第一個(gè)詞是文件的ID,后面就是文件內(nèi)容,ID 和內(nèi)容之間用 tab 隔開,內(nèi)容詞與此之間用空格隔開,如下:
我期望的結(jié)果是找出每個(gè)詞出現(xiàn)在哪些文件中,以及出現(xiàn)的頻率。map 結(jié)束后,輸出應(yīng)該是這樣:
Reduce 完成后應(yīng)該得到這樣的結(jié)果:
我的代碼如下:
import java.io.IOException;
import java.util.StringTokenizer;
import java.util.Map;
import java.util.HashMap;
import java.lang.StringBuilder;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class BuildIndex {
public static class IndexMapper extends Mapper<Object, Text, Text, Text>{
private Text word = new Text();
private Text docID = new Text();
public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
String[] pair = value.toString().split("\t");
docID.set(pair[0]);
StringTokenizer itr = new StringTokenizer(pair[1]);
while (itr.hasMoreTokens()) {
word.set(itr.nextToken());
context.write(word, docID);
}
}
}
public static class IndexReducer extends Reducer<Text,Text,Text,Text> {
private Text result = new Text();
public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
Map<String, Integer> map = new HashMap<>();
for (Text val : values) {
Integer counter = map.get(val.toString());
if (counter == null) {
counter = 1;
} else {
counter += 1;
}
map.put(val.toString(), counter);
}
StringBuilder sb = new StringBuilder();
for (String s : map.keySet()) {
sb.append(s + ":" + map.get(s) + " ");
}
result.set(sb.toString());
context.write(key, result);
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "build inverted index");
job.setJarByClass(BuildIndex.class);
job.setMapperClass(IndexMapper.class);
job.setReducerClass(IndexReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
但是我的輸出結(jié)果卻是這樣:
感覺reducer完全沒有工作,請(qǐng)問這是為什么?
你現(xiàn)在想要實(shí)現(xiàn)的是同級(jí)單詞在a文件和b文件...中各出現(xiàn)的次數(shù).
假設(shè)有兩個(gè)文件a.txt和b.txt.
a文件里面的數(shù)據(jù)是{tom jack tom jack rose tom}
b文件里面的數(shù)據(jù)是{google apple tom google rose}
把編號(hào)換成了文件名.
map的輸出就是這樣<tom:a.txt,1> <jack:a.txt,1> <tom:a.txt,1><jack:a.txt,1> <rose:a.txt,1>
<google:b.txt,1><apple:b.txt,1><tom:b.txt,1>....
這樣的數(shù)據(jù)給到reducer,reducer統(tǒng)計(jì)不了.因?yàn)閗ey不相同.key有的是<tom:a.txt,1><tom:b.txt,1>.
為了解決這個(gè)問題,map輸入的內(nèi)容不要直接到reducer中,中間加一層combiner來處理匯總數(shù)據(jù)
combiner接收<tom:a.txt,1><tom:b.txt,1>
combiner把key做一下切割 .切割成<tom, a.txt:1 ><tom , b.txt:2> ,這樣key相同了.就可以統(tǒng)計(jì)了
下面把代碼貼上 注[我用的是文件的名稱,不是文件里的開頭編號(hào),要用的話還得把文件名換成編號(hào),這樣做有寫問題,你可以下去試一試.我找到解決辦法在補(bǔ)充.]
mapper類
public class WCMapper extends Mapper<LongWritable,Text,Text,Text>{
Text text = new Text();
Text val = new Text( "1" );
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString();
String [] strings = line.split( " " ); //根據(jù)空格切割
FileSplit fileSplit = (FileSplit) context.getInputSplit();// 得到這行數(shù)據(jù)所在的文件切片
String fileName = fileSplit.getPath().getName();// 根據(jù)文件切片得到文件名
for (String s : strings){
text.set(s + ":" + fileName);
context.write(text,val);
}
}
}
combiner類
public class WCCombiner extends Reducer<Text,Text,Text,Text> {
Text text = new Text( );
@Override
protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
//map傳進(jìn)來的是 <apple:2 , 1> <google:2 ,1>
int sum = 0; //統(tǒng)計(jì)詞頻
for (Text val : values){
sum += Integer.parseInt(val.toString());
}
//切割key
int index = key.toString().indexOf( ":" );
text.set(key.toString().substring( index + 1 ) + ":" + sum); // value ---> 2:1
key.set( key.toString().substring( 0,index )); // key --> apple
context.write( key,text );
}
}
reducer類:
public class WCReduce extends Reducer<Text,Text,Text,Text>{
Text result = new Text( );
protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
String file = new String();
for (Text t : values){
file += t.toString();
}
result.set( file );
context.write( key,result);
}
}
注[本地鏈接的linux環(huán)境hadoop] ,要在本機(jī)的/etc/hosts文件中添加 export HADOOP_USER_NAME=hdfs
a.txt和b.txt都是單詞,以空格分割,你可以做假數(shù)據(jù)測(cè)試一下.
這是測(cè)試結(jié)果.
北大青鳥APTECH成立于1999年。依托北京大學(xué)優(yōu)質(zhì)雄厚的教育資源和背景,秉承“教育改變生活”的發(fā)展理念,致力于培養(yǎng)中國(guó)IT技能型緊缺人才,是大數(shù)據(jù)專業(yè)的國(guó)家
達(dá)內(nèi)教育集團(tuán)成立于2002年,是一家由留學(xué)海歸創(chuàng)辦的高端職業(yè)教育培訓(xùn)機(jī)構(gòu),是中國(guó)一站式人才培養(yǎng)平臺(tái)、一站式人才輸送平臺(tái)。2014年4月3日在美國(guó)成功上市,融資1
北大課工場(chǎng)是北京大學(xué)校辦產(chǎn)業(yè)為響應(yīng)國(guó)家深化產(chǎn)教融合/校企合作的政策,積極推進(jìn)“中國(guó)制造2025”,實(shí)現(xiàn)中華民族偉大復(fù)興的升級(jí)產(chǎn)業(yè)鏈。利用北京大學(xué)優(yōu)質(zhì)教育資源及背
博為峰,中國(guó)職業(yè)人才培訓(xùn)領(lǐng)域的先行者
曾工作于聯(lián)想擔(dān)任系統(tǒng)開發(fā)工程師,曾在博彥科技股份有限公司擔(dān)任項(xiàng)目經(jīng)理從事移動(dòng)互聯(lián)網(wǎng)管理及研發(fā)工作,曾創(chuàng)辦藍(lán)懿科技有限責(zé)任公司從事總經(jīng)理職務(wù)負(fù)責(zé)iOS教學(xué)及管理工作。
浪潮集團(tuán)項(xiàng)目經(jīng)理。精通Java與.NET 技術(shù), 熟練的跨平臺(tái)面向?qū)ο箝_發(fā)經(jīng)驗(yàn),技術(shù)功底深厚。 授課風(fēng)格 授課風(fēng)格清新自然、條理清晰、主次分明、重點(diǎn)難點(diǎn)突出、引人入勝。
精通HTML5和CSS3;Javascript及主流js庫,具有快速界面開發(fā)的能力,對(duì)瀏覽器兼容性、前端性能優(yōu)化等有深入理解。精通網(wǎng)頁制作和網(wǎng)頁游戲開發(fā)。
具有10 年的Java 企業(yè)應(yīng)用開發(fā)經(jīng)驗(yàn)。曾經(jīng)歷任德國(guó)Software AG 技術(shù)顧問,美國(guó)Dachieve 系統(tǒng)架構(gòu)師,美國(guó)AngelEngineers Inc. 系統(tǒng)架構(gòu)師。