Hadoop集群搭建及MapReduce应用
一、Hadoop集群的搭建與配置
1、節點準備
集群規劃:
主機名 IP 安裝的軟件 運行的進程
weekend 01 192.168.1.60 jdk、hadoop NameNode、DFSZKFailoverController
weekend 02 192.168.1.61 jdk、hadoop NameNode、DFSZKFailoverController
weekend 03 192.168.1.62 jdk、hadoop ResourceManager
weekend 04 192.168.1.63 jdk、hadoop ResourceManager
weekend 05 192.168.1.64 jdk、hadoop、zookeeper DataNode、NodeManager、JournalNode、QuorumPeerMain
weekend 06 192.168.1.65 jdk、hadoop、zookeeper DataNode、NodeManager、JournalNode、QuorumPeerMain
weekend 07 192.168.1.66 jdk、hadoop、zookeeper DataNode、NodeManager、JournalNode、QuorumPeerMain
2、Java環境安裝
此部分主要描述Java的安裝及環境配置。
上傳JDK軟件包及hadoop相關軟件包并解壓,如下:
[hadoop@weekend01 src]$ pwd
/usr/local/src
[hadoop@weekend01 src]$ ls
hadoop-2.6.4 jdk1.8.0_92 zookeeper-3.4.6
hbase-1.2.1 mongodb-linux-x86_64-rhel62-3.2.6 zookeeper-3.4.6.tar.gz
授權給hadoop用戶可以編輯/etc/profile文件,(此步驟非必要)
添加如下內容至/etc/profile文件中
export JAVA_HOME=/usr/local/src/jdk1.8.0_92
export HADOOP_HOME=/usr/local/src/hadoop-2.6.4
export CLASSPATH=.:$JAVA_HOME/lib:$JAVA_HOME/jre/lib$CLASSPATH
export HBASE_HOME=/usr/local/src/hbase-1.2.1
export ZK_HOME=/usr/local/src/zookeeper-3.4.6
export PATH=$JAVA_HOME/bin:$JAVA_HOME/jre/bin:$HADOOP_HOME/bin:$HADOOP_HOME/sbin:$HBASE_HOME/bin:$PATH:$ZK_HOME/bin
保存并退出并執行[hadoop@weekend01 ~]$ source /etc/profile(如不行,reboot就可以)然后驗證java環境,和hadoop命令,
[hadoop@weekend01 ~]$ java -version
java version "1.8.0_92"
Java(TM) SE Runtime Environment (build 1.8.0_92-b14)
Java HotSpot(TM) 64-Bit Server VM (build 25.92-b14, mixed mode)
[hadoop@weekend01 ~]$ hadoop version
Hadoop 2.6.4
Subversion https://git-wip-us.apache.org/repos/asf/hadoop.git -r 5082c73637530b0b7e115f9625ed7fac69f937e6
Compiled by jenkins on 2016-02-12T09:45Z
Compiled with protoc 2.5.0
From source with checksum 8dee2286ecdbbbc930a6c87b65cbc010
This command was run using /usr/local/src/hadoop-2.6.4/share/hadoop/common/hadoop-common-2.6.4.jar
到此就成功配置了JDK和hadoop的環境
3、SSH配置
hadoop控制腳本依賴ssh來執行針對整個集群的操作,因此為了支持無縫工作,此部分主要描述SSH的配置。
[hadoop@weekend01 ~]$ ssh-keygen
下面直接回車就可以
然后將公鑰分別考別到自己和其他6個節點
[hadoop@weekend01 ~]$ ssh-copy-id weekend01
測試
[hadoop@weekend01 ~]$ ssh weekend01
Last login: Tue May 31 16:30:47 2016 from slave2
[hadoop@weekend01 ~]$ exit
logout
Connection to weekend01 closed.
[hadoop@weekend01 ~]$ ssh weekend02
Last login: Tue Nov 8 11:33:42 2016 from master
[hadoop@weekwnd02 ~]$ exit
logout
Connection to weekend02 closed.
4、Hadoop配置
此部分主要描述對Hadoop各個配置文件的修改。
[hadoop@weekend01 hadoop]$ cat hdfs-site.xml
<configuration>
<property>
<name>dfs.namenode.dir</name>
<value>/weekend01/namenomde/dir</value>
</property>
<property>
<name>dfs.blocksize</name>
<value>268435456</value>
</property>
<property>
<name>dfs.namenode.handler.count</name>
<value>100</value>
</property>
<property>
<name>dfs.datanode.data.dir</name>
<value>/weekend01.datanode/dir</value>
</property>
</configuration>
[hadoop@weekend01 hadoop]$ cat -n hadoop-env.sh
26 export JAVA_HOME=/usr/local/src/jdk1.8.0_92
[hadoop@weekend01 hadoop]$ cat core-site.xml
<configuration>
<!-- 指定hdfs的nameservice為weekend01 -->
<property>
<name>fs.defaultFS</name>
<value>hdfs://192.168.1.60:9000</value>
</property>
<property>
<name>io.file.buffer.size</name>
<value>131072</value>
</property>
<!-- 指定hadoop臨時目錄 -->
<!--<property>
<name>hadoop.tmp.dir</name>
<value>/home/hadoop/app/hadoop-2.4.1/tmp</value>
</property>
-->
<!-- 指定zookeeper地址 -->
<!--<property>
<name>ha.zookeeper.quorum</name>
<value>weekend05:2181,weekend06:2181,weekend07:2181</value>
</property>-->
</configuration>
[hadoop@weekend01 hadoop]$ cat mapred-site.xml
<configuration>
<!-- 指定mr框架為yarn方式 -->
<property>
<name>mapreduce.framework.name</name>
<value>yarn</value>`
</property>
</configuration>
[hadoop@weekend01 hadoop]$ cat slaves
weekend05
weekend06
weekend07
[hadoop@weekend01 hadoop]$ cat yarn-site.xml
<configuration>
<!-- Site specific YARN configuration properties -->
<property>
<name>yarn.acl.enable</name>
<value>false</value>
</property>
<property>
<name>yarn.admin.acl</name>
<value>*</value>
</property>
<property>
<name>yarn.log-aggreationanable</name>
<value>false</value>
</property>
<property>
<name>yarn.resourcemanager.address</name>
<value>192.168.1.60:9000</value>
</property>
<property>
<name>yarn.resourcemanager.scheduler.address</name>
<value>192.168.1.60:9000</value>
</property>
<property>
<name>yarn.resourcemanager.resourcetracker.address</name>
<value>192.168.1.60:9000</value>
</property>
<property>
<name>yarn.resourcemanager.admin.address</name>
<value>192.168.1.60:9000</value>
</property>
<property>
<name>yarn.resourcemanager.webapp.address</name>
<value>192.168.1.60:9000</value>
</property>
<property>
<name>yarn.resourcemanager.hostname</name>
<value>weekend01</value>
</property>
<property>
<name>yarn.resourcemanager.scheduler,class</name>
<value>CapacityScheduler</value>
</property>
<property>
<name>yarn.</name>
<value></value>
</property>
<property>
<name>yarn.resourcemanager.</name>
<value></value>
</property>
<property>
<name>yarn.resourcemanager.</name>
<value></value>
</property>
<property>
<name>yarn.resourcemanager.</name>
<value></value>
</property>
<property>
<name>yarn.resourcemanager.</name>
<value></value>
</property>
</configuration>
5、Hadoop測試
此部分主要對Hadoop進行測試。
格式化HDFS文件系統,在weekend01上
[hadoop@weekend01 ~]$ hdfs namenode -format
…..
16/11/08 11:00:37 INFO common.Storage: Storage directory /tmp/hadoop-hadoop/dfs/name has been successfully formatted #這句話是格式化成功的標志
…….
然后在weekend05、weekend06、weekend07上分別啟動zookeeper集群和journalnode進程
[hadoop@weekend05 ~]$ zkServer.sh start
JMX enabled by default
Using config: /usr/local/src/zookeeper-3.4.6/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED
[hadoop@weekend05 ~]$ hadoop-daemon.sh start journalnode
starting journalnode, logging to /usr/local/src/hadoop-2.6.4/logs/hadoop-hadoop-journalnode-weekend05.out
[hadoop@weekend05 ~]$ jps #看到這三個進程標志著啟動成功
4833 JournalNode
4881 Jps
4779 QuorumPeerMain
然后在weekend01上格式化ZKFC
[hadoop@weekend01 ~]$ hdfs zkfc -formatZK
[hadoop@weekend01 ~]$ jps
4420 SecondaryNameNode
5148 Jps
4141 NameNode
[hadoop@weekend03 ~]$ start-yarn.sh
[hadoop@weekend03 ~]$ jps
4837 ResourceManager
4902 Jps
[hadoop@weekend01 ~]$ start-dfs.sh
[hadoop@weekend05 ~]$ jps
4833 JournalNode
4948 DataNode
5530 Jps
4779 QuorumPeerMain
5403 NodeManager
[hadoop@weekend06 ~]$ jps
2464 QuorumPeerMain
5448 NodeManager
2520 JournalNode
4953 DataNode
5581 Jps
[hadoop@weekend07 ~]$ jps
4864 QuorumPeerMain
5713 Jps
5581 NodeManager
5086 DataNode
4974 JournalNode
二、MapReduce應用
1、應用描述
使用hadoop進行數據統計,并做去重處理,該實驗由于采用高可用避免了集群的單點故障,可以有效避免由于namenode單點故障引起的集群崩潰
2、數據準備
呼出終端,輸入下面指令:
bin/hadoop fs -mkdir hdfsInput
執行這個命令時可能會提示類似安全的問題,如果提示了,請使用
bin/hadoop dfsadmin -safemode leave
來退出安全模式。
意思是在HDFS遠程創建一個輸入目錄,我們以后的文件需要上載到這個目錄里面才能執行。
在終端依次輸入下面指令:
cd hadoop-1.2.1
bin/hadoop fs -put file/myTest*.txt hdfsInput
3、設計思路
1)將文件拆分成splits,由于測試用的文件較小,所以每個文件為一個split,并將文件按行分割形成<key,value>對,key為偏移量(包括了回車符),value為文本行。這一步由MapReduce框架自動完成,如下圖:
2)將分割好的<key,value>對交給用戶定義的map方法進行處理,生成新的<key,value>對,如下圖所示:
3)得到map方法輸出的<key,value>對后,Mapper會將它們按照key值進行排序,并執行Combine過程,將key值相同的value值累加,得到Mapper的最終輸出結果。如下圖:
4)Reducer先對從Mapper接收的數據進行排序,再交由用戶自定義的reduce方法進行處理,得到新的<key,value>對,并作為WordCount的輸出結果,如下圖:
4、程序代碼
package com.felix;
import java.io.IOException;
import java.util.Iterator;
import java.util.StringTokenizer;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.mapred.TextOutputFormat;
/**
*
* 描述:WordCount explains by Felix
* @author Hadoop Dev Group
*/
public class WordCount
{
/**
* MapReduceBase類:實現了Mapper和Reducer接口的基類(其中的方法只是實現接口,而未作任何事情)
* Mapper接口:
* WritableComparable接口:實現WritableComparable的類可以相互比較。所有被用作key的類應該實現此接口。
* Reporter 則可用于報告整個應用的運行進度,本例中未使用。
*
*/
public static class Map extends MapReduceBase implements
Mapper<LongWritable, Text, Text, IntWritable>
{
/**
* LongWritable, IntWritable, Text 均是 Hadoop 中實現的用于封裝 Java 數據類型的類,這些類實現了WritableComparable接口,
* 都能夠被串行化從而便于在分布式環境中進行數據交換,你可以將它們分別視為long,int,String 的替代品。
*/
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
/**
* Mapper接口中的map方法:
* void map(K1 key, V1 value, OutputCollector<K2,V2> output, Reporter reporter)
* 映射一個單個的輸入k/v對到一個中間的k/v對
* 輸出對不需要和輸入對是相同的類型,輸入對可以映射到0個或多個輸出對。
* OutputCollector接口:收集Mapper和Reducer輸出的<k,v>對。
* OutputCollector接口的collect(k, v)方法:增加一個(k,v)對到output
*/
public void map(LongWritable key, Text value,
OutputCollector<Text, IntWritable> output, Reporter reporter)
throws IOException
{
String line = value.toString();
StringTokenizer tokenizer = new StringTokenizer(line);
while (tokenizer.hasMoreTokens())
{
word.set(tokenizer.nextToken());
output.collect(word, one);
}
}
}
public static class Reduce extends MapReduceBase implements
Reducer<Text, IntWritable, Text, IntWritable>
{
public void reduce(Text key, Iterator<IntWritable> values,
OutputCollector<Text, IntWritable> output, Reporter reporter)
throws IOException
{
int sum = 0;
while (values.hasNext())
{
sum += values.next().get();
}
output.collect(key, new IntWritable(sum));
}
}
public static void main(String[] args) throws Exception
{
/**
* JobConf:map/reduce的job配置類,向hadoop框架描述map-reduce執行的工作
* 構造方法:JobConf()、JobConf(Class exampleClass)、JobConf(Configuration conf)等
*/
JobConf conf = new JobConf(WordCount.class);
conf.setJobName("wordcount"); //設置一個用戶定義的job名稱
conf.setOutputKeyClass(Text.class); //為job的輸出數據設置Key類
conf.setOutputValueClass(IntWritable.class); //為job輸出設置value類
conf.setMapperClass(Map.class); //為job設置Mapper類
conf.setCombinerClass(Reduce.class); //為job設置Combiner類
conf.setReducerClass(Reduce.class); //為job設置Reduce類
conf.setInputFormat(TextInputFormat.class); //為map-reduce任務設置InputFormat實現類
conf.setOutputFormat(TextOutputFormat.class); //為map-reduce任務設置OutputFormat實現類
/**
* InputFormat描述map-reduce中對job的輸入定義
* setInputPaths():為map-reduce job設置路徑數組作為輸入列表
* setInputPath():為map-reduce job設置路徑數組作為輸出列表
*/
FileInputFormat.setInputPaths(conf, new Path(args[0]));
FileOutputFormat.setOutputPath(conf, new Path(args[1]));
JobClient.runJob(conf); //運行一個job
}
}
package com.hadoop.sample;
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
public class WordCount {
//繼承mapper接口,設置map的輸入類型為<Object,Text>
//輸出類型為<Text,IntWritable>
public static class Map extends Mapper<Object,Text,Text,IntWritable>{
//one表示單詞出現一次
private static IntWritable one = new IntWritable(1);
//word存儲切下的單詞
private Text word = new Text();
public void map(Object key,Text value,Context context) throws IOException,InterruptedException{
//對輸入的行切詞
StringTokenizer st = new StringTokenizer(value.toString());
while(st.hasMoreTokens()){
word.set(st.nextToken());//切下的單詞存入word
context.write(word, one);
}
}
}
//繼承reducer接口,設置reduce的輸入類型<Text,IntWritable>
//輸出類型為<Text,IntWritable>
public static class Reduce extends Reducer<Text,IntWritable,Text,IntWritable>{
//result記錄單詞的頻數
private static IntWritable result = new IntWritable();
public void reduce(Text key,Iterable<IntWritable> values,Context context) throws IOException,InterruptedException{
int sum = 0;
//對獲取的<key,value-list>計算value的和
for(IntWritable val:values){
sum += val.get();
}
//將頻數設置到result
result.set(sum);
//收集結果
context.write(key, result);
}
}
/**
* @param args
*/
public static void main(String[] args) throws Exception{
// TODO Auto-generated method stub
Configuration conf = new Configuration();
//檢查運行命令
String[] otherArgs = new GenericOptionsParser(conf,args).getRemainingArgs();
if(otherArgs.length != 2){
System.err.println("Usage WordCount <int> <out>");
System.exit(2);
}
//配置作業名
Job job = new Job(conf,"word count");
//配置作業各個類
job.setJarByClass(WordCount.class);
job.setMapperClass(Map.class);
job.setCombinerClass(Reduce.class);
job.setReducerClass(Reduce.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
5、運行結果用戶登錄
在終端輸入下面指令:
bin/hadoop jar hadoop-examples-1.2.1.jar wordcount hdfsInput hdfsOutput
注意,這里的示例程序是1.2.1版本的,可能每個機器有所不一致,那么請用*通配符代替版本號
bin/hadoop jar hadoop-examples-*.jar wordcount hdfsInput hdfsOutput
應該出現下面結果:
Hadoop命令會啟動一個JVM來運行這個MapReduce程序,并自動獲得Hadoop的配置,同時把類的路徑(及其依賴關系)加入到Hadoop的庫中。以上就是Hadoop Job的運行記錄,從這里可以看到,這個Job被賦予了一個ID號:job_201202292213_0002,而且得知輸入文件有兩個(Total input paths to process : 2),同時還可以了解map的輸入輸出記錄(record數及字節數),以及reduce輸入輸出記錄。
查看HDFS上hdfsOutput目錄內容:
在終端輸入下面指令:
bin/hadoop fs -ls hdfsOutput
從上圖中知道生成了三個文件,我們的結果在"part-r-00000"中。
使用下面指令查看結果輸出文件內容
bin/hadoop fs -cat output/part-r-00000
注:文中部分圖引自其他博文,此文僅作為實驗筆記!如有侵犯請郵箱至1255560195@qq.com,未經允許不得轉載!
轉載于:https://www.cnblogs.com/zd520pyx1314/p/7246491.html
總結
以上是生活随笔為你收集整理的Hadoop集群搭建及MapReduce应用的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 套接字(socket)与socket 编
- 下一篇: 【Java集合系列四】HashSet和L