jai包

網(wǎng)站的建設(shè)成都創(chuàng)新互聯(lián)公司專(zhuān)注網(wǎng)站定制,經(jīng)驗(yàn)豐富,不做模板,主營(yíng)網(wǎng)站定制開(kāi)發(fā).小程序定制開(kāi)發(fā),H5頁(yè)面制作!給你煥然一新的設(shè)計(jì)體驗(yàn)!已為玻璃貼膜等企業(yè)提供專(zhuān)業(yè)服務(wù)。
<dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-core</artifactId> <version>1.2.1</version> </dependency>
2.x以后就拆成一些零散的包了,沒(méi)有core包了
代碼:
package org.conan.myhadoop.mr;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
//org.apache.hadoop.mapred 老系統(tǒng)的包
//org.apache.hadoop.mapreduce 新系統(tǒng)的包
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
/*
* ModuleMapReduce Class
* 單純的注釋
*/
public class ModuleMapReduce extends Configured implements Tool {
/**
*
* ModuleMapper Class 不僅有注釋的功效而且你鼠標(biāo)放在你注釋的方法上面他會(huì)把你注釋的內(nèi)容顯示出來(lái),
*
*/
public static class ModuleMapper extends
Mapper<LongWritable, Text, LongWritable, Text>
{
@Override
public void setup(Context context) throws IOException,
InterruptedException {
super.setup(context);
}
@Override
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
// TODO
}
@Override
public void cleanup(Context context) throws IOException,
InterruptedException {
super.cleanup(context);
}
}
/**
*
* ModuleReducer Class
*
*/
public static class ModuleReducer extends
Reducer<LongWritable, Text, LongWritable, Text> {
@Override
public void setup(Context context) throws IOException,
InterruptedException {
// TODO Auto-generated method stub
super.setup(context);
}
@Override
protected void reduce(LongWritable key, Iterable<Text> value,
Context context) throws IOException, InterruptedException {
// TODO
}
@Override
protected void cleanup(Context context) throws IOException,
InterruptedException {
super.cleanup(context);
}
}
// Driver 驅(qū)動(dòng)
// @Override //實(shí)現(xiàn)接口時(shí)關(guān)鍵字1.5和1.7的JDK都會(huì)報(bào)錯(cuò),只有1.6不報(bào)錯(cuò)
public int run(String[] args) throws Exception {
Job job = parseInputAndOutput(this, this.getConf(), args);
// 2.set job
// step 1:set input
job.setInputFormatClass(TextInputFormat.class);
// step 3:set mappper class
job.setMapperClass(ModuleMapper.class);
// step 4:set mapout key/value class
job.setMapOutputKeyClass(LongWritable.class);
job.setMapOutputValueClass(Text.class);
// step 5:set shuffle(sort,combiner,group)
// set sort
job.setSortComparatorClass(LongWritable.Comparator.class);
// set combiner(optional,default is unset)必須是Reducer的子類(lèi)
job.setCombinerClass(ModuleReducer.class);
// set grouping
job.setGroupingComparatorClass(LongWritable.Comparator.class);
// step 6 set reducer class
job.setReducerClass(ModuleReducer.class);
// step 7:set job output key/value class
job.setOutputKeyClass(LongWritable.class);
job.setOutputValueClass(Text.class);
// step 8:set output format
job.setOutputFormatClass(FileOutputFormat.class);
// step 10: submit job
Boolean isCompletion = job.waitForCompletion(true);// 提交job
return isCompletion ? 0 : 1;
}
public Job parseInputAndOutput(Tool tool, Configuration conf, String[] args)
throws IOException {
// 輸入?yún)?shù)的合法性
if (args.length != 2) {
System.err.printf(
"Usage: %s [generic options] <input> <output> \n ", tool
.getClass().getSimpleName());
//%s表示輸出字符串,也就是將后面的字符串替換模式中的%s
ToolRunner.printGenericCommandUsage(System.err);
return null;
}
// 1.create job
Job job = Job.getInstance(conf, this.getClass().getSimpleName());
job.setJarByClass(ModuleMapReduce.class);
// step 2:set input path
Path inputPath = new Path(args[0]);
FileInputFormat.addInputPath(job, inputPath);
// step 9:set output path
Path outputPath = new Path(args[0]);
FileOutputFormat.setOutputPath(job, outputPath);
return job;
}
public static void main(String[] args) {
try {
int status = ToolRunner.run(new ModuleMapReduce(), args);// 返回值即為isCompletion ? 0 : 1
System.exit(status);// System.exit(0)中斷虛擬機(jī)的運(yùn)行,退出應(yīng)用程序,0表示沒(méi)有異常正常退出。
} catch (Exception e) {
e.printStackTrace();
}
}
}倒排索引代碼
輸入文件如下:
13588888888 112
13678987879 13509098987
18987655436 110
2543789 112
15699807656 110
011-678987 112
說(shuō)明:每一行為一條電話(huà)通話(huà)記錄,左邊的號(hào)碼(記為a)打給右邊的號(hào)碼(記為b號(hào)碼),中間用空格隔開(kāi)
要求:
將以上文件以如下格式輸出:
110 18987655436|15699807656
112 13588888888|011-678987
13509098987 13678987879
說(shuō)明:左邊為被呼叫的號(hào)碼b,右邊為呼叫b的號(hào)碼a以"|"分割
package org.conan.myhadoop.mr;
import java.io.IOException;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.Date;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class ReverseIndex extends Configured implements Tool {
enum Counter {
LINESKIP, // 出錯(cuò)的行
}
public static class Map extends Mapper {
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String line = value.toString(); // 讀取源數(shù)據(jù)
try {
// 數(shù)據(jù)處理
String[] lineSplit = line.split(" ");
String anum = lineSplit[0];
String bnum = lineSplit[1];
context.write(new Text(bnum), new Text(anum)); // 輸出
} catch (java.lang.ArrayIndexOutOfBoundsException e) {
context.getCounter(Counter.LINESKIP).increment(1); // 出錯(cuò)hang計(jì)數(shù)器+1
return;
}
}
}
public static class Reduce extends Reducer {
public void reduce(Text key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
String valueString;
String out = "";
for (Text value : values) {
valueString = value.toString();
out += valueString + "|";
System.out.println("Ruduce:key=" + key + " value=" + value);
}
context.write(key, new Text(out));
}
}
@Override
public int run(String[] args) throws Exception {
Configuration conf = this.getConf();
Job job = new Job(conf, "ReverseIndex"); // 任務(wù)名
job.setJarByClass(ReverseIndex.class); // 指定Class
FileInputFormat.addInputPath(job, new Path(args[0])); // 輸入路徑
FileOutputFormat.setOutputPath(job, new Path(args[1])); // 輸出路徑
job.setMapperClass(Map.class); // 調(diào)用上面Map類(lèi)作為Map任務(wù)代碼
job.setReducerClass(ReverseIndex.Reduce.class); // 調(diào)用上面Reduce類(lèi)作為Reduce任務(wù)代碼
job.setOutputFormatClass(TextOutputFormat.class);
job.setOutputKeyClass(Text.class); // 指定輸出的KEY的格式
job.setOutputValueClass(Text.class); // 指定輸出的VALUE的格式
job.waitForCompletion(true);
// 輸出任務(wù)完成情況
System.out.println("任務(wù)名稱(chēng):" + job.getJobName());
System.out.println("任務(wù)成功:" + (job.isSuccessful() ? "是" : "否"));
System.out.println("輸入行數(shù):"
+ job.getCounters()
.findCounter("org.apache.hadoop.mapred.Task$Counter",
"MAP_INPUT_RECORDS").getValue());
System.out.println("輸出行數(shù):"
+ job.getCounters()
.findCounter("org.apache.hadoop.mapred.Task$Counter",
"MAP_OUTPUT_RECORDS").getValue());
System.out.println("跳過(guò)的行:"
+ job.getCounters().findCounter(Counter.LINESKIP).getValue());
return job.isSuccessful() ? 0 : 1;
}
public static void main(String[] args) throws Exception {
// 判斷參數(shù)個(gè)數(shù)是否正確
// 如果無(wú)參數(shù)運(yùn)行則顯示以作程序說(shuō)明
if (args.length != 2) {
System.err.println("");
System.err
.println("Usage: ReverseIndex < input path > < output path > ");
System.err
.println("Example: hadoop jar ~/ReverseIndex.jar hdfs://localhost:9000/in/telephone.txt hdfs://localhost:9000/out");
System.exit(-1);
}
// 記錄開(kāi)始時(shí)間
DateFormat formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
Date start = new Date();
// 運(yùn)行任務(wù)
int res = ToolRunner.run(new Configuration(), new ReverseIndex(), args);
// 輸出任務(wù)耗時(shí)
Date end = new Date();
float time = (float) ((end.getTime() - start.getTime()) / 60000.0);
System.out.println("任務(wù)開(kāi)始:" + formatter.format(start));
System.out.println("任務(wù)結(jié)束:" + formatter.format(end));
System.out.println("任務(wù)耗時(shí):" + String.valueOf(time) + " 分鐘");
System.exit(res);
}
}去重代碼
//Mapper任務(wù)
static class DDMap extends Mapper<LongWritable,Text,Text,Text>{
private static Text line = new Text();
protected void map(LongWritable k1,Text v1,Context context){
line = v1;
Text text = new Text("");
try {
context.write(line,text);
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
};
}
//Reducer任務(wù)
static class DDReduce extends Reducer<Text,Text,Text,Text>{
protected void reduce(Text k2,Iterable<Text> v2s,Context context){
Text text = new Text("");
try {
context.write(k2, text);
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
};
}參考文章;
一個(gè)經(jīng)典的MapReduce模板代碼,倒排索引(ReverseIndex)
http://blog.itpub.net/26400547/viewspace-1214945/
http://www.tuicool.com/articles/emi6Fb
網(wǎng)站欄目:mapreduce模板代碼
URL地址:http://www.chinadenli.net/article18/piicdp.html
成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供Google、面包屑導(dǎo)航、搜索引擎優(yōu)化、小程序開(kāi)發(fā)、ChatGPT、云服務(wù)器
聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶(hù)投稿、用戶(hù)轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請(qǐng)盡快告知,我們將會(huì)在第一時(shí)間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如需處理請(qǐng)聯(lián)系客服。電話(huà):028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時(shí)需注明來(lái)源: 創(chuàng)新互聯(lián)