Hadoop入门(五)IO操作
一、HadoopIO操作意義
Hadoop自帶一套用于I/O的原子性的操作
(不會被線程調度機制打斷,一直到結束,中間不會有任何context switch)
特點
基于保障海量數據集的完整性和壓縮性?
Hadoop提供了一些用于開發分布式系統的API(一些序列化操作+基于磁盤的底層數據結構)
?
二、HDFS數據完整性
用戶希望儲存和處理數據的時候,不會有任何損失或者損壞。
Hadoop提供兩種校驗
1校驗和(常用循環冗余校驗CRC-32)
2運行后臺進程來檢測數據塊
(1)校驗和
(2)寫入數據節點驗證
Hdfs會對寫入的所有數據計算校驗和,并在讀取數據時驗證校驗和。
元數據節點負責在驗證收到的數據后,儲存數據及其校驗和。在收到客戶端數據或復制其他datanode的數據時執行。
正在寫數據的客戶端將數據及其校驗和發送到一系列數據節點組成的管線,管線的最后一個數據節點負責驗證校驗和
(3)讀取數據節點驗證
客戶端讀取數據節點數據也會驗證校驗和,將它們與數據節點中儲存的校驗和進行比較。
每個數據節點都持久化一個用于驗證的校驗和日志。
客戶端成功驗證一個數據塊后,會告訴這個數據節點,數據節點由此更新日志。
(4)恢復數據
由于hdfs儲存著每個數據塊的備份,它可以通過復制完好的數據備份來修復損壞的數據塊來恢復數據。
(5)Localfilesystem類
Hadoop的LocalFileSystem類是用來執行客戶端的校驗和驗證。當寫入一個名為filename的文件時文件系統客戶端會在包含文件塊校驗和的同一目錄內建立一個名為Filename.crc的隱藏文件。
(6)ChecksumfileSystem類
LocalFileSystem類通過ChecksumFileSystem類來完成自己的任務
FileSystem rawFs;
FileSystem checksummedFs=new ChecksumFileSystem(rawFS);
可以通過CheckFileSystem的getRawFileSystem()方法獲取源文件系統。
當檢測到錯誤,CheckFileSystem類會調用reportCheckSumFailure()方法報告錯誤,然后LocalFileSystem將這個出錯的文件和校驗和移到名為bad_files的文件夾內,管理員可以定期檢查這個文件夾。
(7)DatablockScanner
數據節點后臺有一個進程DataBlockScanner,定期驗證儲存在這個數據節點上的所有數據項,該項措施是為解決物理儲存媒介上的損壞。DataBlockScanner是作為數據節點的一個后臺線程工作的,跟著數據節點同時啟動
它的工作流程如圖
由于對數據節點上的每一個數據塊掃描一遍要消耗較多系統資源,因此掃描周期的值一般比較大,
這就帶來另一個問題,就是在一個掃描周期內可能出現數據節點重啟的情況,所以為了提高系統性能,避免數據節點在啟動后對還沒有過期的數據塊又掃描一遍,
DataBlockScanner在其內部使用了日志記錄器來持久化保存每一個數據塊上一次掃描的時間
這樣的話,數據節點可以在啟動之后通過日志文件來恢復之前所有的數據塊的有效時間。
?
三、基于文件的數據結構
HDFS和MR主要針對大數據文件來設計,在小文件處理上效率低.解決方法是選擇一個容器,將這些小文件包裝起來,將整個文件作為一條記錄,可以獲取更高效率的儲存和處理,避免多次打開關閉流耗費計算資源.hdfs提供了兩種類型的容器 SequenceFile和MapFile
小文件問題解決方案
在原有HDFS基礎上添加一個小文件處理模塊,具體操作流程如下:
?
四、Sequence file
Sequence file由一系列的二進制key/value組成,如果key為小文件名,value為文件內容,則可以將大批小文件合并成一個大文件。Hadoop-0.21.0版本開始中提供了SequenceFile,包括Writer,Reader和SequenceFileSorter類進行寫,讀和排序操作。該方案對于小文件的存取都比較自由,不限制用戶和文件的多少,支持Append追加寫入,支持三級文檔壓縮(不壓縮、文件級、塊級別)。其存儲結構如下圖所示:
(1)SequenceFile儲存
文件中每條記錄是可序列化,可持久化的鍵值對,提供相應的讀寫器和排序器,寫操作根據壓縮的類型分為3種
- Write 無壓縮寫數據
- RecordCompressWriter記錄級壓縮文件,只壓縮值
- BlockCompressWrite塊級壓縮文件,鍵值采用獨立壓縮方式
在儲存結構上,sequenceFile主要由一個Header后跟多條Record組成,如圖
? ? 前三個字節是一個Bytes SEQ代表著版本號,同時header也包括key的名稱,value class , 壓縮細節,metadata,以及Sync markers。Sync markers的作用在于可以讀取任意位置的數據。
在recourds中,又分為是否壓縮格式。當沒有被壓縮時,key與value使用Serialization序列化寫入SequenceFile。當選擇壓縮格式時,record的壓縮格式與沒有壓縮其實不盡相同,除了value的bytes被壓縮,key是不被壓縮的。
? ? 當保存的記錄很多時候,可以把一串記錄組織到一起同一壓縮成一塊。
在Block中,它使所有的信息進行壓縮,壓縮的最小大小由配置文件中,io.seqfile.compress.blocksize配置項決定。
(2)SequenceFile寫操作
通過createWrite創建SequenceFile對象,返回Write實例,指定待寫入的數據流如FSDataOutputStream或FileSystem對象和Path對象。還需指定Configuration對象和鍵值類型(都需要能序列化)。
SequenceFile通過API來完成新記錄的添加操作 fileWriter.append(key,value);
private?static?void?writeTest(FileSystem?fs,?int?count,?int?seed,?Path?file,??CompressionType?compressionType,?CompressionCodec?codec)??throws?IOException?{??fs.delete(file,?true);??LOG.info("creating?"?+?count?+?"?records?with?"?+?compressionType?+??"?compression");??//指明壓縮方式??SequenceFile.Writer?writer?=??SequenceFile.createWriter(fs,?conf,?file,??RandomDatum.class,?RandomDatum.class,?compressionType,?codec);??RandomDatum.Generator?generator?=?new?RandomDatum.Generator(seed);??for?(int?i?=?0;?i?<?count;?i++)?{??generator.next();???? //keyh??RandomDatum?key?=?generator.getKey();???? //value??RandomDatum?value?=?generator.getValue();?? //追加寫入??writer.append(key,?value);??}??writer.close();?? }?? public class SequenceFileWriteDemo {private static final String[] DATA = {"One, two, buckle my shoe", "Three, four, shut the door", "Five, six, pick up sticks", "Seven, eight, lay them straight", "Nine, ten, a big fat hen"};public static void main(String[] args) throws IOException {String uri = =“hdfs: //master:8020/number.seq";Configuration conf = new Configuration();FileSystem fs = FileSystem.get(URI.create(uri), conf);Path path = new Path(uri);IntWritable key = new IntWritable();Text value = new Text();SequenceFile.Writer writer = null;try {writer = SequenceFile.createWriter(fs, conf, path, key.getClass(), value.getClass());for (int i = 0; i < 100; i++) {key.set(100 - i);value.set(DATA[i % DATA.length]);System.out.printf("[%s]\t%s\t%s\n", writer.getLength(), key, value);writer.append(key, value);}} finally {IOUtils.closeStream(writer);}} }(3)讀取SequenceFile
public class SequenceFileReadDemo {public static void main(String[] args) throws IOException {String uri = =“hdfs://master:8020/number.seq";Configuration conf = new Configuration();FileSystem fs = FileSystem.get(URI.create(uri), conf);Path path = new Path(uri);SequenceFile.Reader reader = null;try {reader = new SequenceFile.Reader(fs, path, conf);Writable key = (Writable)ReflectionUtils.newInstance(reader.getKeyClass(), conf);Writable value = (Writable)ReflectionUtils.newInstance(reader.getValueClass(), conf);long position = reader.getPosition();while (reader.next(key, value)) {//同步記錄的邊界String syncSeen = reader.syncSeen() ? "*" : "";System.out.printf("[%s%s]\t%s\t%s\n", position, syncSeen, key, value);position = reader.getPosition(); // beginning of next record}} finally {IOUtils.closeStream(reader);}} }示例2
寫入了100條(key,value)的信息,其中以LongWriable為key,以Text作為value
Configuration config = new Configuration(); FileSystem fs = FileSystem.get(conf); int i = 0; Path path = new Path("/home/lake/hello.xml"); SequenceFile.Writer writer = null; SequenceFile.Writer.Option optPath = SequenceFile.Writer.file(path); //定義keySequenceFile.Writer.Option optKey = SequenceFile.Writer.keyClass(LongWritable.class); //定義value SequenceFile.Writer.Option optVal = SequenceFile.Writer.valueClass(Text.class); writer = SequenceFile.createWriter(conf, optPath, optKey, optVal); //寫入的數據測試 String value = "hello world"; while(i < 100){ writer.append(new LongWritable(i),new Text(value)); i ++; } writer.close();讀取的代碼
Configuration config = new Configuration();FileSystem fs = FileSystem.get(conf); Path path = new Path("/home/lake/hello.xml"); SequenceFile.Reader reader = new SequenceFile.Reader(fs.getConf(), SequenceFile.Reader.file(path)); List<Object> sampleValues = new ArrayList<Object>(); Writable key = (Writable) ReflectionUtils.newInstance(reader.getKeyClass(), fs.getConf()); Writable value = (Writable) ReflectionUtils.newInstance(reader.getValueClass(), fs.getConf()); int count = 0; String keyName = "Key"; String valueName = "Value"; //change data to json format while (reader.next(key, value) && count < 12){ sampleValues.add("{\"" + keyName + "\": \"" + key + "\", \"" + valueName + "\": \"" + value + "\"}"); count++; }?
五、MapFile
一個MapFile可以通過SequenceFile的地址,進行分類查找的格式。使用這個格式的優點在于,首先會將SequenceFile中的地址都加載入內存,并且進行了key值排序,從而提供更快的數據查找。
與SequenceFile只生成一個文件不同,MapFile生成一個文件夾。
索引模型按128個鍵建立的,可以通過io.map.index.interval來修改
缺點
1.文件不支持復寫操作,不能向已存在的SequenceFile(MapFile)追加存儲記錄2.當write流不關閉的時候,沒有辦法構造read流。也就是在執行文件寫操作的時候,該文件是不可讀取的
排序后的SequeneceFile,并且它會額外生成一個索引文件提供按鍵的查找.讀寫mapFile與讀寫SequenceFile非常類似,只需要換成MapFile.Reader和MapFile.Writer就可以了。
在命令行顯示mapFile的文件內容同樣要用? -text
(1)MapFile寫操作
public class MapFileWriteFile {private static final String[] myValue = {"hello world", "bye world", "hello hadoop", "bye hadoop"};public static void main(String[] args) {String uri = "hdfs://master:8020/number.map";Configuration conf = new Configuration();FileSystem fs = FileSystem.get(URI.create(uri), conf);IntWritable key = new IntWritable();Text value = new Text();MapFile.Writer writer = null;try {writer = new MapFile.Writer(conf, fs, uri, key.getClass(), value.getClass());for (int i = 0; i < 500; i) {key.set(i);value.set(myValue[i % myValue.length]);writer.append(key, value);}}finally{IOUtils.closeStream(writer);}}} }MapFile會生成2個文件 1個名data,1個名index 查看前10條data+index $ hdfs –fs –text /number.map/data | head
(2)讀取MapFile
public class MapFileReadFile {public static void main(String[] args) {String uri ="hdfs://master:8020/number.map";Configuration conf = new Configuration();FileSystem fs = FileSystem.get(URI.create(uri), conf);MapFile.Reader reader = null;try {reader = new MapFile.Reader(fs, uri, conf);WritableComparable key = (WritableComparable) ReflectionUtils.newInstance(reader.getValueClass(), conf);while (reader.next(key, value)) {System.out.printf("%s\t%s\n", key, value);}reader.get(new IntWritable(7), value);System.out.printf("%s\n", value);} finally {IOUtils.closeStream(reader);}} }SequenceFile文件是用來存儲key-value數據的,但它并不保證這些存儲的key-value是有序的, 而MapFile文件則可以看做是存儲有序key-value的SequenceFile文件。 MapFile文件保證key-value的有序(基于key)是通過每一次寫入key-value時的檢查機制,這種檢查機制其實很簡單,就是保證當前正要寫入的key-value與上一個剛寫入的key-value符合設定的順序, 但是,這種有序是由用戶來保證的,一旦寫入的key-value不符合key的非遞減順序,則會直接報錯而不是自動的去對輸入的key-value排序
(3)SequenceFile轉換為MapFile
mapFile既然是排序和索引后的SequenceFile那么自然可以把SequenceFile轉換為MapFile使用mapFile.fix()方法把一個SequenceFile文件轉換成MapFile
?
六、壓縮
減少儲存文件所需空間,還可以降低其在網絡上傳輸的時間。 壓縮算法對比
Bzip2支持切分 splitting.hdfs上文件1GB,如按照默認塊64MB,那么這個文件被分為16個塊。如果把這個塊放入MR任務 ,將有16個map任務輸入。如果算法不支持切分,后果是MR把這個文件作為一個Map輸入。這樣任務減少了,降低了數據的本地性。
(1)CodeC
實現了一種壓縮解壓算法。Hadoop中壓縮解壓類實現CompressionCodec接口createOutputStream來創建一個CompressionOutputStream,將其壓縮格式寫入底層的流?
演示HDFS上一個1.bzip2算法壓縮的文件解壓,然后把解壓的文件壓縮成2.gz
(2)本地庫
Hadoop使用java開發,但是有些需求和操作并不適合java,所以引入了本地庫 native。可以高效執行某些操作。如使用gzip壓縮解壓時,使用本地庫比使用java時間要縮短大約10%,解壓達到50%。在hadoop_home/lib/native下
在hadoop配置文件core-site.xml可以設置是否使用native
默認是啟用本地庫,如果頻繁使用原生庫做壓解壓任務,可以使用codecpool,通過CodecPool的getCompressor方法獲得Compressor對象,需要傳入Codec 。這樣可以節省創建Codec對象開銷 ,允許反復使用。
(3)如何選擇壓縮格式
- Gzip 優點是壓縮率高,速度快。Hadoop支持與直接處理文本一樣。缺點不支持split,當文件壓縮在128m內,都可以用gzip
- Izo ? 優點壓縮速度快 合理的壓縮率;支持split,是最流行的壓縮格式。支持native庫;缺點 比gzip壓縮率低,hadoop本身不支持,需要安裝;在應用中對lzo格式文件需要處理如 指定inputformat為lzo格式
- Snappy壓縮 高速壓縮率合理支持本地庫。不支持split,hadoop不支持 要安裝linux沒有對應命令;當MR輸出數據較大,作為到reduce數據壓縮格式?
- Bzip2 支持split,很高的壓縮率,比gzip高,hadoop支持但不支持native,linux自帶命令使用方便。缺點壓縮解壓速度慢
使用哪種壓縮和具體應用有關,對于巨大,沒有儲存邊界的文件如日志 可以考慮
?
七、序列化
為什么Hadoop基本類型還要定義序列化?
Hadoop定義了兩個序列化相關接口
Writable和Comparable
WritableComparable接口相當于繼承了上述兩個接口的新接口?
Public interface WritableComparable<T>
?extends Writable,Comparable<T>
(1)Writable接口
基于DataInput與DatOutput的簡單高效可序列化接口,就是org.apache.hadoop.io.Writable接口
幾乎所有的hadoop可序列化對象都必須實現這個接口有2個方法
Write,readFiles
以IntWritable為例,它把java的int類型封裝成了Writable序列化格式
可以通過set()設置它的值 ?
new IntWritable().set(100);?
new IntWritable(100);
(2)WritableComparable接口
類似java的Comparable接口,用于類型的比較。MR其中一個階段叫排序,默認使用Key來排序。Hadoop提供了一個優化接口RawComparator
?
Public interface RawComparator<T> extends Comparator<T>{
?? ?Public int compare(byte[] ?b1,int s1,int l1,byte[] b2,int s2,int l2);
}
可以比較b1和b2,允許執行者直接比較數據流記錄,而無須先把數據流反序列化成對象,這樣可以避免新建對象的開銷 。
?
八、Writable類
ArrayWritable
TwoDArrayWritable
MapWritable
SortedMapWritable
BooleanWritable
ByteWritable
IntWritable
VIntWritable
FloatWritable
LongWritable
VLongWritable
DoubleWritable
NullWritable
Text
BytesWritable
MD5Hash
ObjectWrtiable
GenericWritable
| Java?primitive | Writable?Implementation | Serialized?size(bytes) |
| boolean | BooleanWritable | 1 |
| byte | ByteWritable | 1 |
| short | ShortWritable | 2 |
| int | IntWritable | 4 |
| ? | VIntWritable | 1-5 |
| float | FloatWritable | 4 |
| long | LongWritable | 8 |
| VLongWritable | 1-9 | |
| double | DoubleWritable | 8 |
(1)Text ?
存儲的數據按照UTF-8,類似String,它提供了序列化,反序列化和字節級別比較的方法。Text類替換了UTF8類。
1.unicode編碼是一個很大的集合,可以容納100多萬個符號。具體的符號對應表可以查詢unicode.org 它只規定了符號的二進制代碼,沒有規定如何存儲,而utf-8就是unicode的實現還有utf16等。對于單個字符字節第一位為0,后面7位為這個符號的unicode碼。因此對于英語字母,utf-8編碼和ASCII碼是相同的。所有\u0001~\u007f會以單字節儲存。\u0080~\u07ff的unicode會以雙字節儲存,\u0800~\uFFFF的會以3字節存儲。
2.例子 Text的幾個方法 一旦使用多字節編碼Text和String區別就明顯了
public void testText() throws UnsupportedEncodingException {Text T = new Text("你好天地");String S ="你好天地";assertEquals(t.getLength(), 12);assertEquals(s.getBytes("utf-8").length, 12);assertEquals(s.length(), 4);assertEquals(t.find("天"), 6);assertEquals(s.indexOf("天"), 3); }Text.find()方法返回的是字節偏移量,String.indexOf返回單個編碼字符的索引位置,
String.codeprintAt()和Text.charAt類似,前者通過字節偏移量來索引
Text對字符串沒有String方法豐富 大多數情況下通過toString轉換成String來操作
(2)BytesWritable
相當于二進制數據數組的包裝。以字節數組{1,2,3,4}它的序列化格式是4字節表示字節數 ,每2個字節表示一個數據即 “0000 0004 0102 0304” 和Text一樣BytesWritable也是可變的 ,可以通過set來修改
(3)NullWritable
NullWritable是writable類型的特殊類型,序列化長度為0,它充當占位符但不真在數據流中讀寫。NullWritable是單實例類型,通過NullWriable.get()方法獲取
(4)ObjectWritable和GenericWritable
ObjectWritable是對java基本類型的和這些類型的數組類型的通用封裝 ,使用RPC來封送
(5)自定義Writable類型
Hadoop基本滿足大部分需求,但有些情況下可以根據業務需要構造新的實現,為了提高MR作業的性能,因為Writable是MR的核心。 例子:假如 要處理一組姓名字段,不能單獨處理名和姓。 下面表示一對字符串TextPair的基本實現
public class TextPair implements WritableComparable<TextPair> {private Text first, second;public TextPair() {set(new Text(), new Text());}public TextPair(String first, String second) {set(new Text(first), new Text(second));}public TextPair(Text first, Text second) {set(first, second);}public void set(Text first, Text second) {this.first = first;this.second = second;}public Text getFirst() {return first;}public Text getSecond() {return second;}public void readFields(DataInput in) {First.readFields(in);Second.readFields(in);}public void write(DataOutput out) {first.write(out);second.write(out);}public int compareTo(TextPair tp) {int cmp = first.compareTo(tp.first);if (cmp != 0) return cmp;return second.compareTo(tp.second);}public int hashCode() {return first.hashCode() * 163 + second.hashCode();}public Boolean equals(Object o) {if (o instanceof TextPair) {TextPair tp = (TextPair) o;return first.equals(tp.first) && second.equals(tp.second);}return false;}public String toString() {return first +"\t"+second;} }示例說明:
TextPair類的write()方法將first和 second兩個字段序列化到輸出流中,
反之 readFields方法對來自輸入流的字節進行反序列化處理。
DataOutput和DataInput接口提供了底層的序列化和反 序列化方法。
所以可以完全控制Writable對象的數據傳輸格式。
與java值對象一樣,必須重寫object的hashCode,equals和toString()方法。hashCode給后面的MR進行 reduce分區使用
最后 這個類 繼承了WritableComparable接口,所以必須提供CompareTo方法的實現,該 方法 按first排序 ,如相同按second排序。
?
九、小結
HDFS以CRC校驗來檢測數據是否為完整的,并在默認設置下,會讀取數據時驗證校驗和,保證其數據的完整性,其所有序列化數據結構都是針對大數據處理的。
Hadoop對大數據的壓縮和解壓機制,可以減少儲存 空間和加速數據在網絡上的傳輸。
在hadoop中通過序列化將消息編碼為二進制流發送到遠程節點,此后在接收端接收的二進制流被反序列化為消息。Hadoop沒有采用java的序列化,實現了自己的writable接口。
總結
以上是生活随笔為你收集整理的Hadoop入门(五)IO操作的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 泰拉瑞亚绿藻锭怎么合成
- 下一篇: 怎么选择进口电脑椅怎么选择进口电脑椅子