MapReduce读写orc文件
生活随笔
收集整理的這篇文章主要介紹了
MapReduce读写orc文件
小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.
博客地址:http://www.fanlegefan.com
文章地址:http://www.fanlegefan.com/archives/mapreduceorc/
MapReduce 讀取ORC格式文件
創(chuàng)建orc格式hive表
create table test_orc(name string,age int) stored as orc查看hive表結(jié)構(gòu)
show create table test_orc CREATE TABLE `test_orc`(`name` string, `age` int) ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.orc.OrcSerde' STORED AS INPUTFORMAT 'org.apache.hadoop.hive.ql.io.orc.OrcInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat' LOCATION'hdfs://localhost:9000/user/work/warehouse/test_orc' TBLPROPERTIES ('transient_lastDdlTime'='1502868725')插入測(cè)試數(shù)據(jù)
insert into table test_orc select name ,age from test limit 10;jar依賴
<dependency><groupId>org.apache.orc</groupId><artifactId>orc-core</artifactId><version>1.2.3</version> </dependency> <dependency><groupId>org.apache.orc</groupId><artifactId>orc-mapreduce</artifactId><version>1.1.0</version> </dependency> <dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-mapreduce-client-core</artifactId><version>2.6.0</version> </dependency>MR讀取ORC格式文件代碼如下
package com.fan.hadoop.orc;import com.fan.hadoop.parquet.thrift.ParquetThriftWriterMR; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.orc.mapred.OrcStruct; import org.apache.orc.mapreduce.OrcInputFormat; import java.io.IOException;public class OrcReaderMR {public static class OrcMap extends Mapper<NullWritable,OrcStruct,Text,IntWritable> {// Assume the ORC file has type: struct<s:string,i:int>public void map(NullWritable key, OrcStruct value,Context output) throws IOException, InterruptedException {// take the first field as the key and the second field as the valueoutput.write((Text) value.getFieldValue(0),(IntWritable) value.getFieldValue(1));}}public static void main(String[] args) throws Exception {Configuration conf = new Configuration();Job job = Job.getInstance(conf);job.setJarByClass(ParquetThriftWriterMR.class);job.setJobName("parquetthrfit");String in = "hdfs://localhost:9000/user/work/warehouse/test_orc";String out = "hdfs://localhost:9000/test/orc";job.setMapperClass(OrcMap.class);OrcInputFormat.addInputPath(job, new Path(in));job.setInputFormatClass(OrcInputFormat.class);job.setNumReduceTasks(0);job.setOutputFormatClass(TextOutputFormat.class);FileOutputFormat.setOutputPath(job, new Path(out));job.waitForCompletion(true);}}查看生成文件
hadoop dfs -cat /test/orc/part-m-00000kafka 14 tensflow 98 hadoop 34 hbase 68 flume 57 kafka 99 kafka 28 flume 24 tensflow 35 flume 44MR寫ORC格式文件
package com.fan.hadoop.orc;import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.orc.OrcConf; import org.apache.orc.TypeDescription; import org.apache.orc.mapred.OrcStruct; import org.apache.orc.mapreduce.OrcOutputFormat; import java.io.IOException;public class OrcWriterMR {public static class OrcWriterMapperextends Mapper<LongWritable,Text,NullWritable,OrcStruct> {private TypeDescription schema =TypeDescription.fromString("struct<name:string,age:int>");private OrcStruct pair = (OrcStruct) OrcStruct.createValue(schema);private final NullWritable nada = NullWritable.get();private Text name = new Text();private IntWritable age = new IntWritable();public void map(LongWritable key, Text value,Context output) throws IOException, InterruptedException {if(!"".equals(value.toString())){String[] arr = value.toString().split("\t");name.set(arr[0]);age.set(Integer.valueOf(arr[1]));pair.setFieldValue(0, name);pair.setFieldValue(1,age);output.write(nada, pair);}}}public static void main(String[] args) throws Exception {Configuration conf = new Configuration();OrcConf.MAPRED_OUTPUT_SCHEMA.setString(conf,"struct<name:string,age:int>");Job job = Job.getInstance(conf);job.setJarByClass(OrcWriterMR.class);job.setJobName("OrcWriterMR");String in = "hdfs://localhost:9000/user/work/warehouse/test/ddd.txt";String out = "hdfs://localhost:9000/test/orc2";job.setMapperClass(OrcWriterMapper.class);job.setInputFormatClass(TextInputFormat.class);job.setNumReduceTasks(0);job.setOutputFormatClass(OrcOutputFormat.class);FileInputFormat.addInputPath(job, new Path(in));OrcOutputFormat.setOutputPath(job, new Path(out));job.waitForCompletion(true);} }查看生成文件
#### 生成orc文件hadoop dfs -ls /test/orc2-rw-r--r-- 3 work supergroup 0 2017-08-16 17:45 /test/orc2/_SUCCESS -rw-r--r-- 3 work supergroup 6314874 2017-08-16 17:45 /test/orc2/part-m-00000.orc將數(shù)據(jù)放到hive表路徑下
hadoop fs -cp /test/orc2/part-m-00000.orc /user/work/warehouse/test_orc/在hive表中查看數(shù)據(jù)
hive> select * from test_orc limit 13; OK kafka 14 tensflow 98 hadoop 34 hbase 68 flume 57 kafka 99 kafka 28 flume 24 tensflow 35 flume 44 flume 44 tensflow 35 flume 24 Time taken: 0.045 seconds, Fetched: 13 row(s)總結(jié)
以上是生活随笔為你收集整理的MapReduce读写orc文件的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。