Hadoop IO
總結(jié):
?????本章包含了以下內(nèi)容
???? 第一,數(shù)據(jù)完整性,hadoop采用CRC來檢測數(shù)據(jù)是否是完整的,在寫入文件時,hdfs為每個數(shù)據(jù)塊都生成一個crc文件。客戶端讀取數(shù)據(jù)時生成一個crc與數(shù)據(jù)節(jié)點存儲的crc做比對,如果不匹配則說明數(shù)據(jù)已經(jīng)損壞了。數(shù)據(jù)節(jié)點在后臺運行一個程序定期檢測數(shù)據(jù),防止物理存儲介質(zhì)中位衰減而造成的數(shù)據(jù)損壞。
???? 第二,壓縮和解壓,在mapreduce中使用壓縮可以減少存儲空間和加速在數(shù)據(jù)在網(wǎng)絡(luò)上的傳輸。
???? 第三,序列化,hadoop使用自己的序列化機(jī)制,實現(xiàn)Writable接口
???? 第四,基本文件類型,SequenceFile和MapFile的讀寫。
?
?
1 數(shù)據(jù)完整性
??? 由于每個磁盤或者網(wǎng)絡(luò)上的I/O操作可能會對正在讀寫的數(shù)據(jù)不慎引入錯誤,如果通過的數(shù)據(jù)流量非常大,數(shù)據(jù)發(fā)生損壞的幾率很高。
??? 檢查損壞數(shù)據(jù)的常用方法是在第一次進(jìn)入系統(tǒng)時計算數(shù)據(jù)的校驗和,然后只要數(shù)據(jù)不是在一個可靠的通道上傳輸,就可能會發(fā)生損壞。如果新生成的校驗和不完全匹配原始的校驗和,那么數(shù)據(jù)就會被認(rèn)為是損壞的。
??? 一個常用的錯誤檢測代碼是CRC-32(cyclic redundancy check,循環(huán)冗余檢查),計算一個32位的任何大小輸入的整數(shù)校驗和。
1.1 HDFS的數(shù)據(jù)完整性
????HDFS以透明方式校驗所有寫入它的數(shù)據(jù),并在默認(rèn)設(shè)置下,會在讀取數(shù)據(jù)時驗證校驗和。針對數(shù)據(jù)的每個io.bytes.per.checksum(默認(rèn)512字節(jié))字節(jié),都會創(chuàng)建一個單獨的校驗和。
?
???數(shù)據(jù)節(jié)點負(fù)責(zé)在存儲數(shù)據(jù)及其校驗和之前驗證它們收到的數(shù)據(jù)。 從客戶端和其它數(shù)據(jù)節(jié)點復(fù)制過來的數(shù)據(jù)??蛻舳藢懭霐?shù)據(jù)并且將它發(fā)送到一個數(shù)據(jù)節(jié)點管線中,在管線的最后一個數(shù)據(jù)節(jié)點驗證校驗和。
?? 客戶端讀取數(shù)據(jù)節(jié)點上的數(shù)據(jù)時,會驗證校驗和,將其與數(shù)據(jù)節(jié)點上存儲的校驗和進(jìn)行對比。每個數(shù)據(jù)節(jié)點維護(hù)一個連續(xù)的校驗和驗證日志,因此它知道每個數(shù)據(jù)塊最后驗證的時間。
?
??? 每個數(shù)據(jù)節(jié)點還會在后臺線程運行一個DataBlockScanner(數(shù)據(jù)塊檢測程序),定期驗證存儲在數(shù)據(jù)節(jié)點上的所有塊,為了防止物理存儲介質(zhì)中位衰減鎖造成的數(shù)據(jù)損壞。
??
??? HDFS通過復(fù)制完整的副本來產(chǎn)生一個新的,無錯的副本來“治愈”哪些出錯的數(shù)據(jù)塊。工作方式:如果客戶端讀取數(shù)據(jù)塊時檢測到錯誤,拋出Checksum Exception前報告該壞塊以及它試圖從名稱節(jié)點中藥讀取的數(shù)據(jù)節(jié)點。名稱節(jié)點將這個塊標(biāo)記為損壞的,不會直接復(fù)制給客戶端或復(fù)制該副本到另一個數(shù)據(jù)節(jié)點。它會從其他副本復(fù)制一個新的副本。
?
??? 使用Open方法來讀取文件前,通過setVerifyChecksum()方法來禁用校驗和驗證。
1.2 本地文件系統(tǒng)
??? Hadoop的本地文件系統(tǒng)執(zhí)行客戶端校驗。意味著,在寫一個名為filename的文件時,文件系統(tǒng)的客戶端以透明的方式創(chuàng)建一個隱藏的文件.filename.crc。在同一個文件夾下,包含每個文件塊的校驗和。
??? 數(shù)據(jù)塊大小由io.bytes.per.checksum屬性控制,塊的大小作為元數(shù)據(jù)存儲在.crc文件中。
???
??? 也可能禁用校驗和:底層文件系統(tǒng)原生支持校驗和。這里通過RawLocalFileSystem來替代LocalFileSystem完成。要在一個應(yīng)用中全局使用,只需要設(shè)置fs.file.impl值為org.apache.hadoop.fs.RawLocalFileSystem來重新map執(zhí)行文件的URL。或者只想對某些讀取禁用校驗和校驗。
??? Configuration conf = ...?
??? FileSystem fs = new RawLocalFileSystem();
??? fs.initialize(null, conf);
???
1.2.3 ChecksumFileSystem
????LocalFileSystem使用ChecksumFileSystem(校驗和文件系統(tǒng))為自己工作,這個類可以很容易添加校驗和功能到其他文件系統(tǒng)中。因為ChecksumFileSystem也包含于文件系統(tǒng)中。
?? FileSystem rawFs = ...?
???FileSystem checksummedFs = new ChecksumFileSystem(rawFs);
?
?
?
2 壓縮
??? 文件壓縮兩大好處:減少存儲文件所需要的空間且加快了數(shù)據(jù)在網(wǎng)絡(luò)上或從磁盤上或到磁盤上的傳輸速度。???
??????????????????????????????????????????????????????????????????????????????????? 壓縮格式
2.1 編碼和解碼
??? 編碼和解碼器用以執(zhí)行壓縮解壓算法。在Hadoop中,編碼和解碼是通過一個壓縮解碼器接口實現(xiàn)的。
?
???
?
????CompressionCodec對流進(jìn)行壓縮和解壓縮
?
??? CompressionCodec有兩個方法輕松地壓縮和解壓數(shù)據(jù)。使用use the? createOutputStream(OutputStream out)創(chuàng)建一個CompressionOutputStream,將其以壓縮格式寫入底層的流。使用createInputStream(InputStream in) 獲取一個CompressionInputStream,從底層的流讀取未壓縮的數(shù)據(jù)。
?
???
01?package com.laos.hadoop;?02?
03?import org.apache.hadoop.conf.Configuration;?
04?import org.apache.hadoop.io.IOUtils;?
05?import org.apache.hadoop.io.compress.CompressionCodec;?
06?import org.apache.hadoop.io.compress.CompressionOutputStream;?
07?import org.apache.hadoop.util.ReflectionUtils;?
08?
09?public class StreamCompressor {?
10?????public static void main(String[] args) throws Exception {?
11?????????String codecClassname = "org.apache.hadoop.io.compress.GzipCodec";?
12?????????Class<?>?codecClass = Class.forName(codecClassname);?
13?????????Configuration conf = new Configuration();?
14?????????CompressionCodec codec = (CompressionCodec) ReflectionUtils?
15?????????????????.newInstance(codecClass, conf);?
16????????? //將讀入數(shù)據(jù)壓縮至System.out
17?????????CompressionOutputStream out = codec.createOutputStream(System.out);?
18?????????IOUtils.copyBytes(System.in, out, 4096, false);?
19?????????out.finish();?
20?????}?
21?
22?}
?
$ echo "Test"|hadoop jar hadoop-itest.jar com.laos.hadoop.StreamCompressor|gunzip
?
????使用CompressionCodecFactory方法來推斷CompressionCodec
?
????在閱讀一個壓縮文件時,我們可以從擴(kuò)展名來推斷出它的編碼/解碼器。以.gz結(jié)尾的文件可以用GzipCodec來閱讀。CompressionCodecFactory提供了getCodec()方法,從而將文件擴(kuò)展名映射到相應(yīng)的CompressionCodec。
?
?????
01?package com.laos.hadoop;?02?
03?import java.io.InputStream;?
04?import java.io.OutputStream;?
05?import java.net.URI;?
06?
07?import org.apache.hadoop.conf.Configuration;?
08?import org.apache.hadoop.fs.FileSystem;?
09?import org.apache.hadoop.fs.Path;?
10?import org.apache.hadoop.io.IOUtils;?
11?import org.apache.hadoop.io.compress.CompressionCodec;?
12?import org.apache.hadoop.io.compress.CompressionCodecFactory;?
13?
14?public class FileDecompressor {?
15?????public static void main(String[] args) throws Exception {?
16?????????String uri = args[0];?
17?????????Configuration conf = new Configuration();?
18?????????FileSystem fs = FileSystem.get(URI.create(uri), conf);?
19?
20?????????Path inputPath = new Path(uri);?
21?????????CompressionCodecFactory factory = new CompressionCodecFactory(conf);?
22?????????CompressionCodec codec = factory.getCodec(inputPath);?
23?????????if (codec == null) {?
24?????????????System.err.println("No codec found for " + uri);?
25?????????????System.exit(1);?
26?????????}?
27?????????String outputUri = CompressionCodecFactory.removeSuffix(uri, codec?
28?????????????????.getDefaultExtension());?
29?????????InputStream in = null;?
30?????????OutputStream out = null;?
31?????????try {?
32?????????????in = codec.createInputStream(fs.open(inputPath));?
33?????????????out = fs.create(new Path(outputUri));?
34?????????????IOUtils.copyBytes(in, out, conf);?
35?????????} finally {?
36?????????????IOUtils.closeStream(in);?
37?????????????IOUtils.closeStream(out);?
38?????????}?
39?????}?
40?}
??? CompressionCodecFactory從io.compression.codecs配置屬性定義的列表中找到編碼和解碼器,默認(rèn)情況下,Hadoop給出所有的編碼和解碼器。每個編碼/加碼器都知道默認(rèn)文件擴(kuò)展名。
?
???
?????????????????
2.2 壓縮和輸入分隔
??? 考慮如何壓縮哪些將由MapReduce處理的數(shù)據(jù)時,考慮壓縮格式是否支持分隔很重要。
??? 例如,gzip格式使用default來存儲壓縮過的數(shù)據(jù),default將數(shù)據(jù)作為一系列壓縮過的塊存儲,但是每塊的開始沒有指定用戶在數(shù)據(jù)流中的任意點定位到下一個塊的起始位置,而是自身與數(shù)據(jù)同步,所以gzip不支持分隔機(jī)制。
?
2.3 在MapReduce中使用壓縮
????如果要壓縮MapReduce作業(yè)的輸出,設(shè)置mapred.output.compress為true,mapred.output.compression.codec屬性指定編碼解碼器。
???如果輸入的文件時壓縮過的,MapReduce讀取時,它們會自動解壓,根據(jù)文件擴(kuò)展名來決定使用那一個壓縮解碼器。
?
??
01?package com.laos.hadoop;?02?
03?import java.io.IOException;?
04?
05?import org.apache.hadoop.fs.Path;?
06?import org.apache.hadoop.io.IntWritable;?
07?import org.apache.hadoop.io.Text;?
08?import org.apache.hadoop.io.compress.CompressionCodec;?
09?import org.apache.hadoop.io.compress.GzipCodec;?
10?import org.apache.hadoop.mapred.FileInputFormat;?
11?import org.apache.hadoop.mapred.FileOutputFormat;?
12?import org.apache.hadoop.mapred.JobClient;?
13?import org.apache.hadoop.mapred.JobConf;?
14?
15?
16?public class MaxTemperatureWithCompression {?
17????? public static void main(String[] args) throws IOException {?
18????????? if (args.length != 2) {?
19????????? System.err.println("Usage: MaxTemperatureWithCompression?<input?path>?" +?
20????????? "<output?path>");?
21????????? System.exit(-1);?
22????????? }?
23??????????
24????????? JobConf conf = new JobConf(MaxTemperatureWithCompression.class); conf.setJobName("Max temperature with output compression");?
25????????? FileInputFormat.addInputPath(conf, new Path(args[0]));?
26????????? FileOutputFormat.setOutputPath(conf, new Path(args[1]));?
27??????????
28????????? conf.setOutputKeyClass(Text.class);?
29????????? conf.setOutputValueClass(IntWritable.class);?
30??????????
31????????? conf.setBoolean("mapred.output.compress", true);?
32????????? conf.setClass("mapred.output.compression.codec", GzipCodec.class,?
33????????? CompressionCodec.class);?????????
34????????? JobClient.runJob(conf);?
35????????? }?
36?}
?
?
?
?
3 序列化
??? 序列化:將結(jié)構(gòu)化對象轉(zhuǎn)換為字節(jié)流以便于通過網(wǎng)絡(luò)進(jìn)行傳輸或?qū)懭氪鎯Φ倪^程。
??? 反序列化:將字節(jié)流轉(zhuǎn)為一系列結(jié)構(gòu)化對象的過程。
?
??? 序列化用在兩個地方:進(jìn)程間通信和持久存儲。
?
??? 在Hadoop中,節(jié)點之間的進(jìn)程間通信是用遠(yuǎn)程過程調(diào)用(RPC)。RPC協(xié)議將使用序列化將消息編碼為二進(jìn)制流(發(fā)送到遠(yuǎn)程節(jié)點),此后在接收端二進(jìn)制流被反序列化為消息。
?
??? Hadoop使用自己的序列化格式Writables。
?
3.1 Writable接口
???
package org.apache.hadoop.io;?import java.io.DataOutput;?
import java.io.DataInput;?
import java.io.IOException;?
public interface Writable {?
????void write(DataOutput out) throws IOException;?//將狀態(tài)寫入二進(jìn)制格式的流
????void readFields(DataInput in) throws IOException; //從二進(jìn)制格式的流讀出其狀態(tài)
}
?
WritableComparable和Comparator
??? IntWritable實現(xiàn)了WritableComparable接口。而WritableComparable繼承了Writable和Comparable。
???
??? 類型的比較對MapReduce而言至關(guān)重要,鍵和鍵之間的比較是在排序階段完成的。Hadoop提供餓了優(yōu)化方法:
???
package org.apache.hadoop.io;?
import java.util.Comparator;?
public interface?RawComparator<T>?extends Comparator<T>?{?
??? public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2);?
}
該接口允許執(zhí)行者比較從流中讀取的未被反序列化為對象的記錄,省去了創(chuàng)建對象的所有開銷。例如,IntWritable使用原始的compare()方法從每個字節(jié)數(shù)組的指定開始位置(S1和S2)和長度(L1和L2)讀取整數(shù)直接比較。
?
????WritableComparator是RawComparator對WritableComparable類的一個通用實現(xiàn)。第一,它提供一個默認(rèn)的對原始compare()函數(shù)調(diào)用,對要比較的對象進(jìn)行反序列化,然后調(diào)用對象的compare()方法。?第二,充當(dāng)RawComparator實例的一個工廠方法。
??? RawComparator<IntWritable> comparator = WritableComparator.get(IntWritable.class)
?
3.2 Writable類
???
?
?
?
3.2.1 Writable的java基本類封裝
????Java基本類型????????????????????????? Writable使用????????????????????? 序列化大小(字節(jié))
?? 布爾類型????????????????????????????????BooleanWritable??????????????? 1
???字節(jié)型?????????????????????????????????? ByteWritable????????????????????1
?? 整型?????????????????????????????????????IntWritable?????????????????????? 4
???????????????????????????????????????????? VIntWritable?????????????????????1-5
?? 浮點行?????????????????????????????????? FloatWritable??????????????????? 4
???長整型?????????????????????????????????? LongWritable??????????????????? 8
???????????????????????????????????????????? VLongWritable?????????????????? 1-9
?? 雙精度浮點型?????????????????????????? DoubleWritable????????????????? 8
?
?
4 基于文件的數(shù)據(jù)結(jié)構(gòu)
?
4.1 SequenceFile類
????SequenceFile為二進(jìn)制鍵值對對提供一個持久化的數(shù)據(jù)結(jié)構(gòu)。
?
4.1.1 寫SequenceFile類
?? 創(chuàng)建SequenceFile類:SequenceFile.createWriter(....)
??
package com.laos.hadoop;?import java.io.IOException;?
import java.net.URI;?
import org.apache.hadoop.conf.Configuration;?
import org.apache.hadoop.fs.FileSystem;?
import org.apache.hadoop.fs.Path;?
import org.apache.hadoop.io.IOUtils;?
import org.apache.hadoop.io.IntWritable;?
import org.apache.hadoop.io.SequenceFile;?
import org.apache.hadoop.io.Text;?
public class SequenceFileWriteDemo {?
????private static final String[] DATA = { "One, two, buckle my shoe",?
????????????"Three, four, shut the door", "Five, six, pick up sticks",?
????????????"Seven, eight, lay them straight", "Nine, ten, a big fat hen" };?
????public static void main(String[] args) throws IOException {?
????????String uri = args[0];?
????????Configuration conf = new Configuration();?
????????FileSystem fs = FileSystem.get(URI.create(uri), conf);?
????????Path path = new Path(uri);?
????????IntWritable key = new IntWritable();?
????????Text value = new Text();?
????????SequenceFile.Writer writer = null;?
????????try {?
????????????writer = SequenceFile.createWriter(fs, conf, path, key.getClass(),?
????????????????????value.getClass());?//參數(shù):文件系統(tǒng),configuration,路徑,鍵的類型和值的類型
????????????for (int i = 0; i < 100; i++) {?
????????????????key.set(100 - i);?
????????????????value.set(DATA[i % DATA.length]);?
????????????????System.out.printf("[%s]/t%s/t%s/n", writer.getLength(), key,?
????????????????????????value);?
????????????????writer.append(key, value);?
????????????}?
????????} finally {?
????????????IOUtils.closeStream(writer);?
????????}?
????}?
}
?
4.1.2 讀取SequenceFile類
?
?
01?package com.laos.hadoop;?02?
03?import java.io.IOException;?
04?import java.net.URI;?
05?
06?import org.apache.hadoop.conf.Configuration;?
07?import org.apache.hadoop.fs.FileSystem;?
08?import org.apache.hadoop.fs.Path;?
09?import org.apache.hadoop.io.IOUtils;?
10?import org.apache.hadoop.io.SequenceFile;?
11?import org.apache.hadoop.io.Writable;?
12?import org.apache.hadoop.util.ReflectionUtils;?
13?
14?public class SequenceFileReadDemo {?
15?????public static void main(String[] args) throws IOException {?
16?????????String uri = args[0];?
17?????????Configuration conf = new Configuration();?
18?????????FileSystem fs = FileSystem.get(URI.create(uri), conf);?
19?????????Path path = new Path(uri);?
20?????????SequenceFile.Reader reader = null;?
21?????????try {?
22?????????????reader = new SequenceFile.Reader(fs, path, conf);//創(chuàng)建reader?
23?????????????Writable key = (Writable) ReflectionUtils.newInstance(reader?
24?????????????????????.getKeyClass(), conf); //獲取key的類型
25?????????????Writable value = (Writable) ReflectionUtils.newInstance(reader?
26?????????????????????.getValueClass(), conf);//獲取value的類型?
27?????????????long position = reader.getPosition();//獲取位置?
28?????????????while (reader.next(key, value)) {//遍歷?
29?????????????????String syncSeen = reader.syncSeen() ? "*" : "";?
30?????????????????System.out.printf("[%s%s]/t%s/t%s/n", position, syncSeen, key,?
31?????????????????????????value);?
32?????????????????position = reader.getPosition(); // beginning of next record?
33?????????????}?
34?????????} finally {?
35?????????????IOUtils.closeStream(reader);?
36?????????}?
37?????}?
38?}
???
??? 兩種方法查找文件中指定的位置,第一種是seak()方法。如果文件中指定位置不是記錄邊界,reader會在調(diào)用next方法是失敗。
??? 第二種是SequenceFile.Reader.sync(long pposition)把reader定位到下一個同步點
?
4.1.3 用命令行接口顯示序列文件
?????使用-text選項顯示文本格式的序列文件。
??? % hadoop fs -text number.seq
4.1.4 序列文件的格式
????
??? SequeceFile是Hadoop API提供的一種二進(jìn)制文件支持。這種二進(jìn)制文件直接將<key, value>對序列化到文件中。一般對小文件可以使用這種文件合并,即將文件名作為key,文件內(nèi)容作為value序列化到大文件中。這種文件格式有以下好處:
??? 1)支持壓縮,且可定制為基于Record或Block壓縮(Block級壓縮性能較優(yōu))?
??? 2)本地化任務(wù)支持:因為文件可以被切分,因此MapReduce任務(wù)時數(shù)據(jù)的本地化情況應(yīng)該是非常好的。?
??? 3)難度低:因為是Hadoop框架提供的API,業(yè)務(wù)邏輯側(cè)的修改比較簡單。
???
??? SequenceFile 是一個由二進(jìn)制序列化過的key/value的字節(jié)流組成的文本存儲文件,它可以在map/reduce過程中的input/output 的format時被使用。在map/reduce過程中,map處理文件的臨時輸出就是使用SequenceFile處理過的。
??? SequenceFile分別提供了讀、寫、排序的操作類。
??? SequenceFile的操作中有三種處理方式:
??? 1)?不壓縮數(shù)據(jù)直接存儲。 //enum.NONE
??? 2) 壓縮value值不壓縮key值存儲的存儲方式。//enum.RECORD
??? 3)key/value值都壓縮的方式存儲。//enum.BLOCK
?
???
沒有壓縮和記錄壓縮的序列文件的內(nèi)部結(jié)構(gòu):未壓縮和記錄壓縮的結(jié)構(gòu)是一樣的,record由記錄長度、鍵長度、鍵和值(或壓縮過的值)構(gòu)成。
?
?
塊壓縮的序列文件的內(nèi)部結(jié)構(gòu):一個同步點內(nèi)記錄筆數(shù)、壓縮鍵的長度、壓縮過的鍵值、壓縮過值的長度和壓縮值。壓縮塊一次壓縮多個記錄。塊的最小大小由屬性:io.seqfile.compress.blocksize定義。
?
?
?
?
4.2 MapFile
??? MapFile是經(jīng)過排序的帶索引的SequenceFile,可以根據(jù)鍵值進(jìn)行查找。
4.2.1 寫MapFile
????
01?package com.laos.hadoop;?02?
03?import java.io.IOException;?
04?import java.net.URI;?
05?
06?import org.apache.hadoop.conf.Configuration;?
07?import org.apache.hadoop.fs.FileSystem;?
08?import org.apache.hadoop.io.IOUtils;?
09?import org.apache.hadoop.io.IntWritable;?
10?import org.apache.hadoop.io.MapFile;?
11?import org.apache.hadoop.io.Text;?
12?
13?public class MapFileWriteDemo {?
14?????private static final String[] DATA = { "One, two, buckle my shoe",?
15?????????????"Three, four, shut the door", "Five, six, pick up sticks",?
16?????????????"Seven, eight, lay them straight", "Nine, ten, a big fat hen" };?
17?
18?????public static void main(String[] args) throws IOException {?
19?????????String uri = args[0];?
20?????????Configuration conf = new Configuration();?
21?????????FileSystem fs = FileSystem.get(URI.create(uri), conf);?
22?????????IntWritable key = new IntWritable();?
23?????????Text value = new Text();?
24?????????MapFile.Writer writer = null;?
25?????????try {?
26?????????????writer = new MapFile.Writer(conf, fs, uri, key.getClass(), value?
27?????????????????????.getClass()); //創(chuàng)建MapFile?? Writer的實例
28?
29?????????????for (int i = 0; i < 1024; i++) {?
30?????????????????key.set(i + 1);?
31?????????????????value.set(DATA[i % DATA.length]);?
32?????????????????writer.append(key, value);?
33?????????????}?
34?????????} finally {?
35?????????????IOUtils.closeStream(writer);?
36?????????}?
37?????}?
38?}
??? % hadoop MapFileWriteDemo numbers.map
??? numbers.map確實是一個目錄,包含data和index兩個文件。數(shù)據(jù)文件包括所有的輸入,index文件包含一小部分鍵和鍵到data文件中偏移量的映射。索引中鍵的個數(shù)由io.map.index.interval屬性設(shè)置。
?
4.2.2 讀MapFile
??? 順序遍歷MapFile過程和讀取SequenceFile過程相似:創(chuàng)建一個MapFile Reader,調(diào)用next函數(shù)直到返回false。
??? public boolean next(WritableComparable key, Writable val) throws IOException
???
??? 隨機(jī)訪問:
??? public Writable get(WritableComparable key, Writable val) throws IOException
?
4.2.3 將SequenceFile轉(zhuǎn)換成MapFile
?
?????關(guān)鍵是給SequenceFile重建索引:使用MapFile的靜態(tài)方法fix()。
???
01?package com.laos.hadoop;?02?
03?import java.net.URI;?
04?
05?import org.apache.hadoop.conf.Configuration;?
06?import org.apache.hadoop.fs.FileSystem;?
07?import org.apache.hadoop.fs.Path;?
08?import org.apache.hadoop.io.MapFile;?
09?import org.apache.hadoop.io.SequenceFile;?
10?
11?public class MapFileFixer {?
12?????public static void main(String[] args) throws Exception {?
13????? String mapUri = args[0];?
14??????
15????? Configuration conf = new Configuration();?
16??????
17????? FileSystem fs = FileSystem.get(URI.create(mapUri), conf);?
18????? Path map = new Path(mapUri);?
19????? Path mapData = new Path(map, MapFile.DATA_FILE_NAME);?
20??????
21????? // Get key and value types from data sequence file?
22????? SequenceFile.Reader reader = new SequenceFile.Reader(fs, mapData, conf);?
23????? Class keyClass = reader.getKeyClass();?
24????? Class valueClass = reader.getValueClass();?
25????? reader.close();?
26??????
27????? // Create the map file index file?
28????? long entries = MapFile.fix(fs, map, keyClass, valueClass, false, conf);?
29????? System.out.printf("Created MapFile %s with %d entries/n", map, entries);?
30????? }?
31?}
?
???
???
???
?
?
總結(jié)
- 上一篇: hadoop MapReduce实例解析
- 下一篇: 人民大学云计算编程的网上评估平台--解题