hadoop mapReduce例子

http://blog.csdn.net/jediael_lu/article/details/37649609

hadoop fs针对于各种文件系统
hadoop dfs 专门针对HDFS文件系统

官方文档开始就演示了一个mapreduce的例子,对于很多初学者根本不知所以然。

map/reduce认为是对数据的合并排序函数,就像sql中经常count(),group by ,order by为了从数据中提炼到我们想要的结果。map就像php的array_map,把所有的数组元素都用某个方法影响一遍,reduce就是把数组相邻元素计算的值传递给下一个,比如算数字5的阶乘 1x2x3x4x5。详细的概念可以参考一篇js描述的map/reduce。

http://www.liaoxuefeng.com/wiki/001434446689867b27157e896e74d51a89c25cc8b43bdb3000/001435119854495d29b9b3d7028477a96ed74db95032675000

初学Hadoop之图解MapReduce与WordCount示例分析

http://www.cnblogs.com/hehaiyang/p/4484442.html

/usr/local/hadoop/bin/hadoop dfs -ls
/usr/local/hadoop/bin/hadoop dfs -mkdir /test
删除目录
/usr/local/hadoop/bin/hadoop dfs -rmr /test

/usr/local/hadoop/bin/hadoop dfs -ls /output

//调用jar文件的 wordcount类 【输入目录】【输出结果目录】
可以用命令查看jar包里有哪些类,也可以放到eclipse 新建java项目导入jar包在目录结果展开查看。
jar -tvf /usr/local/hadoop-1.0.4/hadoop-examples-1.0.4.jar

bin/hadoop jar hadoop-examples-1.0.4.jar wordcount /testinput/conf/hdfs-site.xml /output/wordcount
bin/hadoop dfs -cat /output/wordcount/*
本地java连接执行 wordCount.java 报没有权限:

//输入路径
String input = “hdfs://node1:9900/testinput”;
//输出路径,必须是不存在的,空文件加也不行。
String output = “hdfs://node1:9900/output2″;

java本地连接其他机器的hadoop时权限问题:

http://f.dataguru.cn/thread-281774-1-1.html

问题:
Windows|Eclipse 运行HDFS程序之后,报:org.apache.hadoop.security.AccessControlException: Permission denied: user=sunqw, access=WRITE, inode=”":hadoop:supergroup:rwxr-xr-x。
或者
Windows|Eclipse 运行HDFS程序之后,报:org.apache.hadoop.security.AccessControlException: Access denied for user sunqw. Superuser privilege is required。

解决方法:
方式一:
在系统环境变量中增加HADOOP_USER_NAME,其值为root;
或者 通过java程序动态添加,如下:
System.setProperty(“HADOOP_USER_NAME”, “root”);

方式二:
mac下用eclipse执行传递的用户当然是 chenlong 只能访问dfs的 /user/chenlong
放开 hadoop 目录的权限 , 命令如下 :$ hadoop fs -chmod 777 /testinput

使用Eclipse在非hadoop运行的用户下进行写入hdfs文件系统中时,由于sunqw对”/”目录没有写入权限,所以导致异常的发生。解决方法即开放hadoop中的HDFS目录的权限,命令如下:hadoop fs -chmod 777 / 。

方式三:
修改hadoop的配置文件:conf/hdfs-core.xml,添加或者修改 dfs.permissions 的值为 false。 新版本已改成 dfs.permissions.enabled
过时属性参考:http://hadoop.apache.org/docs/stable/hadoop-project-dist/hadoop-common/DeprecatedProperties.html

方式四:
将Eclipse所在机器的用户的名称修改为root,即与服务器上运行hadoop的用户一致。

hadoop-1.0.4/src/hadoop-core-1.0.4.jar 里
org.apache.hadoop.security.UserGroupInformation 115行 左右 有说到读取系统环境变量 HADOOP_USER_NAME

不带绝对路径的 就在用户当前目录下创建文件夹 如input 就相当于 /user/root/input
bin/hadoop fs -mkdir input

vim ~/hadoop.txt
chenlong hello world
hello php world
xiaoqian hello
wocao nima
ceshi test
java php

把文件放进去
bin/hadoop fs -put ~/hadoop.txt input

用jar包里面的grep类查找找 含有php的行:
bin/hadoop jar hadoop-examples-1.0.4.jar grep input/hadoop.txt /user/root/output/grep2 ‘php’

查看结果先php出现2次:
bin/hadoop fs -cat output/grep2/*
2 php
cat: File does not exist: /user/root/output/grep2/_logs

用jar包里面的wordcount类统计单词出现次数:
bin/hadoop jar hadoop-examples-1.0.4.jar wordcount input/hadoop.txt /user/root/output/wordcount
[root@node1 hadoop]# bin/hadoop fs -cat output/wordcount/*
ceshi 1
chenlong 1
hello 3
java 1
nima 1
php 2
test 1
wocao 1
world 2
xiaoqian 1
cat: File does not exist: /user/root/output/wordcount/_logs

例子jar包里还要其他类,没有测试,有些因为不太熟参数没有测试成功。

bin/hadoop jar hadoop-examples-1.0.4.jar sort -inFormat org.apache.hadoop.mapred.TextInputFormat input/hadoop.txt /user/root/output/sort4

关于启动任务时提示
WARN snappy.LoadSnappy: Snappy native library not loaded
这个压缩snappy未加载的提示。
安装snappy
确认一下环境变量$HADOOP_COMMON_LIB_NATIVE_DIR所指定的路径

http://my.oschina.net/qiangzigege/blog/290647

ll /usr/local/lib/libsnappy.so
lrwxrwxrwx 1 root root 18 Sep 14 21:42 /usr/local/lib/libsnappy.so -> libsnappy.so.1.1.4
Libraries have been installed in:
/usr/local/lib
cp /usr/local/lib/libsnappy.so.1.1.4 /usr/local/hadoop/lib/native/Linux-amd64-64/
把libsnappy.so.1.1.4复制成几份为libsnappy.so libsnappy.so.1 , 跟libhadoop.so一样,免得碰到鬼问题。
再执行就会提示
WARN snappy.LoadSnappy: Snappy native library is available
INFO snappy.LoadSnappy: Snappy native library loaded

在eclipse用JAVA程序执行mapreduce任务的例子,摘自例子或者网上:

运行需要build path的包在根目录下几个jar或者lib目录下,要关联源java代码下载src版本的在它的src文件夹下。如

hadoop-1.0-1.4/src/core/org/apache/hadoop/conf/Configuration.java

WordCount.java

package org.apache.hadoop.examples;
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

public class WordCount {

public static class TokenizerMapper extends Mapper{

private final static IntWritable one = new IntWritable(1);
private Text word = new Text();

public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
StringTokenizer itr = new StringTokenizer(value.toString());
while (itr.hasMoreTokens()) {
word.set(itr.nextToken());
context.write(word, one);
}
}
}

public static class IntSumReducer extends Reducer{
private IntWritable result = new IntWritable();

public void reduce(Text key, Iterablevalues,
Context context
) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
result.set(sum);
context.write(key, result);
}
}

public static void main(String[] args) throws Exception {

Configuration conf = new Configuration();

/*
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
if (otherArgs.length != 2) {
System.err.println(“Usage: wordcount “);
System.exit(2);
}
*/
System.setProperty(“HADOOP_USER_NAME”, “root”);

Job job = new Job(conf, “word count2″);

//如果需要打成jar运行,需要下面这句
job.setJarByClass(WordCount.class);

//map
job.setMapperClass(TokenizerMapper.class);

job.setCombinerClass(IntSumReducer.class);

//reduce
job.setReducerClass(IntSumReducer.class);

//output
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);

//输入路径
String input = “hdfs://node1:9900/testinput/conf/capacity-scheduler.xml”;
//输出路径,必须是不存在的,空文件夹也不行。
String output = “hdfs://node1:9900/output/wordcount_java3″;
FileInputFormat.addInputPath(job, new Path(input));
FileOutputFormat.setOutputPath(job, new Path(output));

//FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
//FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));

System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}

/usr/local/hadoop/bin/hadoop dfs -cat /output/wordcount_java3/*
(maximum-system-jobs 2
* 2
, 2
–> 8
-1 1
。。。。很多行,因为输入源就是随便找的一个配置文件。

另外还有网上一个关于计算最高温度的。

Temperature.java
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class Temperature {
/**
* 四个泛型类型分别代表:
* KeyIn Mapper的输入数据的Key,这里是每行文字的起始位置(0,11,…)
* ValueIn Mapper的输入数据的Value,这里是每行文字
* KeyOut Mapper的输出数据的Key,这里是每行文字中的“年份”
* ValueOut Mapper的输出数据的Value,这里是每行文字中的“气温”
*/
static class TempMapper extends
Mapper {
@Override
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
// 打印样本: Before Mapper: 0, 2000010115
System.out.print(“Before Mapper: ” + key + “, ” + value);
String line = value.toString();
String year = line.substring(0, 4);
int temperature = Integer.parseInt(line.substring(8));
context.write(new Text(year), new IntWritable(temperature));
// 打印样本: After Mapper:2000, 15
System.out.println(
“======” +
“After Mapper:” + new Text(year) + “, ” + new IntWritable(temperature));
}
}

/**
* 四个泛型类型分别代表:
* KeyIn Reducer的输入数据的Key,这里是每行文字中的“年份”
* ValueIn Reducer的输入数据的Value,这里是每行文字中的“气温”
* KeyOut Reducer的输出数据的Key,这里是不重复的“年份”
* ValueOut Reducer的输出数据的Value,这里是这一年中的“最高气温”
*/
static class TempReducer extends
Reducer {
@Override
public void reduce(Text key, Iterable values,
Context context) throws IOException, InterruptedException {
int maxValue = Integer.MIN_VALUE;
StringBuffer sb = new StringBuffer();
//取values的最大值
for (IntWritable value : values) {
maxValue = Math.max(maxValue, value.get());
sb.append(value).append(“, “);
}
// 打印样本: Before Reduce: 2000, 15, 23, 99, 12, 22,
System.out.print(“Before Reduce: ” + key + “, ” + sb.toString());
context.write(key, new IntWritable(maxValue));
// 打印样本: After Reduce: 2000, 99
System.out.println(
“======” +
“After Reduce: ” + key + “, ” + maxValue);
}
}

public static void main(String[] args) throws Exception {
//输入路径
String dst = “hdfs://node1:9900/user/root/tempinput”;
//输出路径,必须是不存在的,空文件加也不行。
String dstOut = “hdfs://node1:9900/user/root/output/temperature”;
Configuration hadoopConfig = new Configuration();

hadoopConfig.set(“fs.hdfs.impl”,
org.apache.hadoop.hdfs.DistributedFileSystem.class.getName()
);
hadoopConfig.set(“fs.file.impl”,
org.apache.hadoop.fs.LocalFileSystem.class.getName()
);

System.setProperty(“HADOOP_USER_NAME”, “root”);
Job job = new Job(hadoopConfig);

//如果需要打成jar运行,需要下面这句
//job.setJarByClass(NewMaxTemperature.class);

//job执行作业时输入和输出文件的路径
FileInputFormat.addInputPath(job, new Path(dst));
FileOutputFormat.setOutputPath(job, new Path(dstOut));

//指定自定义的Mapper和Reducer作为两个阶段的任务处理类
job.setMapperClass(TempMapper.class);
job.setReducerClass(TempReducer.class);

//设置最后输出结果的Key和Value的类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);

//执行job,直到完成
job.waitForCompletion(true);
System.out.println(“Finished”);
}
}

console输出:
15/09/15 00:16:06 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform… using builtin-java classes where applicable
15/09/15 00:16:06 WARN mapred.JobClient: Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same.
15/09/15 00:16:06 WARN mapred.JobClient: No job jar file set. User classes may not be found. See JobConf(Class) or JobConf#setJar(String).
15/09/15 00:16:06 INFO input.FileInputFormat: Total input paths to process : 1
15/09/15 00:16:06 WARN snappy.LoadSnappy: Snappy native library not loaded
15/09/15 00:16:06 INFO mapred.JobClient: Running job: job_local_0001
15/09/15 00:16:06 INFO mapred.Task: Using ResourceCalculatorPlugin : null
15/09/15 00:16:06 INFO mapred.MapTask: io.sort.mb = 100
15/09/15 00:16:07 INFO mapred.MapTask: data buffer = 79691776/99614720
15/09/15 00:16:07 INFO mapred.MapTask: record buffer = 262144/327680
Before Mapper: 0, 2014010114======After Mapper:2014, 14
Before Mapper: 11, 2014010216======After Mapper:2014, 16
Before Mapper: 22, 2014010317======After Mapper:2014, 17
Before Mapper: 33, 2014010410======After Mapper:2014, 10
Before Mapper: 44, 2014010506======After Mapper:2014, 6
Before Mapper: 55, 2012010609======After Mapper:2012, 9
Before Mapper: 66, 2012010732======After Mapper:2012, 32
Before Mapper: 77, 2012010812======After Mapper:2012, 12
Before Mapper: 88, 2012010919======After Mapper:2012, 19
Before Mapper: 99, 2012011023======After Mapper:2012, 23
Before Mapper: 110, 2001010116======After Mapper:2001, 16
Before Mapper: 121, 2001010212======After Mapper:2001, 12
Before Mapper: 132, 2001010310======After Mapper:2001, 10
Before Mapper: 143, 2001010411======After Mapper:2001, 11
Before Mapper: 154, 2001010529======After Mapper:2001, 29
Before Mapper: 165, 2013010619======After Mapper:2013, 19
Before Mapper: 176, 2013010722======After Mapper:2013, 22
Before Mapper: 187, 2013010812======After Mapper:2013, 12
Before Mapper: 198, 2013010929======After Mapper:2013, 29
Before Mapper: 209, 2013011023======After Mapper:2013, 23
Before Mapper: 220, 2008010105======After Mapper:2008, 5
Before Mapper: 231, 2008010216======After Mapper:2008, 16
Before Mapper: 242, 2008010337======After Mapper:2008, 37
Before Mapper: 253, 2008010414======After Mapper:2008, 14
Before Mapper: 264, 2008010516======After Mapper:2008, 16
Before Mapper: 275, 2007010619======After Mapper:2007, 19
Before Mapper: 286, 2007010712======After Mapper:2007, 12
Before Mapper: 297, 2007010812======After Mapper:2007, 12
Before Mapper: 308, 2007010999======After Mapper:2007, 99
Before Mapper: 319, 2007011023======After Mapper:2007, 23
Before Mapper: 330, 2010010114======After Mapper:2010, 14
Before Mapper: 341, 2010010216======After Mapper:2010, 16
Before Mapper: 352, 2010010317======After Mapper:2010, 17
Before Mapper: 363, 2010010410======After Mapper:2010, 10
Before Mapper: 374, 2010010506======After Mapper:2010, 6
Before Mapper: 385, 2015010649======After Mapper:2015, 49
Before Mapper: 396, 2015010722======After Mapper:2015, 22
Before Mapper: 407, 2015010812======After Mapper:2015, 12
Before Mapper: 418, 2015010999======After Mapper:2015, 99
Before Mapper: 429, 2015011023======After Mapper:2015, 23
15/09/15 00:16:07 INFO mapred.MapTask: Starting flush of map output
15/09/15 00:16:07 INFO mapred.MapTask: Finished spill 0
15/09/15 00:16:07 INFO mapred.Task: Task:attempt_local_0001_m_000000_0 is done. And is in the process of commiting
15/09/15 00:16:07 INFO mapred.JobClient: map 0% reduce 0%
15/09/15 00:16:09 INFO mapred.LocalJobRunner:
15/09/15 00:16:09 INFO mapred.Task: Task ‘attempt_local_0001_m_000000_0′ done.
15/09/15 00:16:09 INFO mapred.Task: Using ResourceCalculatorPlugin : null
15/09/15 00:16:09 INFO mapred.LocalJobRunner:
15/09/15 00:16:09 INFO mapred.Merger: Merging 1 sorted segments
15/09/15 00:16:09 INFO mapred.Merger: Down to the last merge-pass, with 1 segments left of total size: 442 bytes
15/09/15 00:16:09 INFO mapred.LocalJobRunner:
Before Reduce: 2001, 11, 10, 29, 12, 16, ======After Reduce: 2001, 29
Before Reduce: 2007, 23, 99, 12, 19, 12, ======After Reduce: 2007, 99
Before Reduce: 2008, 5, 16, 37, 14, 16, ======After Reduce: 2008, 37
Before Reduce: 2010, 14, 16, 17, 10, 6, ======After Reduce: 2010, 17
Before Reduce: 2012, 23, 19, 12, 32, 9, ======After Reduce: 2012, 32
Before Reduce: 2013, 23, 29, 12, 22, 19, ======After Reduce: 2013, 29
Before Reduce: 2014, 14, 16, 17, 10, 6, ======After Reduce: 2014, 17
Before Reduce: 2015, 49, 22, 12, 99, 23, ======After Reduce: 2015, 99
15/09/15 00:16:10 INFO mapred.Task: Task:attempt_local_0001_r_000000_0 is done. And is in the process of commiting
15/09/15 00:16:10 INFO mapred.LocalJobRunner:
15/09/15 00:16:10 INFO mapred.Task: Task attempt_local_0001_r_000000_0 is allowed to commit now
15/09/15 00:16:10 INFO output.FileOutputCommitter: Saved output of task ‘attempt_local_0001_r_000000_0′ to hdfs://node1:9900/user/root/output/temperature
15/09/15 00:16:10 INFO mapred.JobClient: map 100% reduce 0%
15/09/15 00:16:12 INFO mapred.LocalJobRunner: reduce > reduce
15/09/15 00:16:12 INFO mapred.Task: Task ‘attempt_local_0001_r_000000_0′ done.
15/09/15 00:16:13 INFO mapred.JobClient: map 100% reduce 100%
15/09/15 00:16:13 INFO mapred.JobClient: Job complete: job_local_0001
15/09/15 00:16:13 INFO mapred.JobClient: Counters: 19
15/09/15 00:16:13 INFO mapred.JobClient: File Output Format Counters
15/09/15 00:16:13 INFO mapred.JobClient: Bytes Written=64
15/09/15 00:16:13 INFO mapred.JobClient: FileSystemCounters
15/09/15 00:16:13 INFO mapred.JobClient: FILE_BYTES_READ=754
15/09/15 00:16:13 INFO mapred.JobClient: HDFS_BYTES_READ=880
15/09/15 00:16:13 INFO mapred.JobClient: FILE_BYTES_WRITTEN=80826
15/09/15 00:16:13 INFO mapred.JobClient: HDFS_BYTES_WRITTEN=64
15/09/15 00:16:13 INFO mapred.JobClient: File Input Format Counters
15/09/15 00:16:13 INFO mapred.JobClient: Bytes Read=440
15/09/15 00:16:13 INFO mapred.JobClient: Map-Reduce Framework
15/09/15 00:16:13 INFO mapred.JobClient: Reduce input groups=8
15/09/15 00:16:13 INFO mapred.JobClient: Map output materialized bytes=446
15/09/15 00:16:13 INFO mapred.JobClient: Combine output records=0
15/09/15 00:16:13 INFO mapred.JobClient: Map input records=40
15/09/15 00:16:13 INFO mapred.JobClient: Reduce shuffle bytes=0
15/09/15 00:16:13 INFO mapred.JobClient: Reduce output records=8
15/09/15 00:16:13 INFO mapred.JobClient: Spilled Records=80
15/09/15 00:16:13 INFO mapred.JobClient: Map output bytes=360
15/09/15 00:16:13 INFO mapred.JobClient: Total committed heap usage (bytes)=458227712
15/09/15 00:16:13 INFO mapred.JobClient: Combine input records=0
15/09/15 00:16:13 INFO mapred.JobClient: Map output records=40
15/09/15 00:16:13 INFO mapred.JobClient: SPLIT_RAW_BYTES=102
15/09/15 00:16:13 INFO mapred.JobClient: Reduce input records=40
Finished

在服务器上查看
[root@node1 hadoop]# hadoop fs -cat output/temperature/*
2001 29
2007 99
2008 37
2010 17
2012 32
2013 29
2014 17
2015 99

拷贝到普通文件系统来看。
/usr/local/hadoop/bin/hadoop fs -get output/temperature ~
[root@node1 ~]# cat temperature/_SUCCESS
[root@node1 ~]# cat temperature/part-r-00000
2001 29
2007 99
2008 37
2010 17
2012 32
2013 29
2014 17
2015 99

关于 Administrator

爱拼才会赢!
此条目发表在 hadoop 分类目录,贴了 , 标签。将固定链接加入收藏夹。

发表评论

电子邮件地址不会被公开。 必填项已用 * 标注

*

您可以使用这些 HTML 标签和属性: <a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <cite> <code> <del datetime=""> <em> <i> <q cite=""> <strike> <strong>