avro使用详解
一、avro的介紹
 1、概括
 avro是一個數據序列化系統,它提供
豐富的數據結構
 快速可壓縮的二進制數據形式
 存儲持久數據的文件容器
 遠程過程調用RPC
 簡單的動態語言結合功能
 2、類型
 ?
 二、avro在hadoop的使用
 1、模式確定
 例如:{"namespace": "example.avro",
 ? ? ? ? "type": "record",
 ? ? ? ? "name": "User",
 ? ? ? ? "fields": [
 ? ? ? ? {"name": "name", "type": "string"},
 ? ? ? ? {"name": "favorite_number", ?"type": ["int", "null"]},
 ? ? ? ? {"name": "favorite_color", "type": ["string", "null"]}
 ? ? ? ? ]
 ? ? }
其中namespace是包名,name是類名
2、text數據作為輸入
 2.1 無插件的序列化
 //創建數據記錄
 Schema schema = new Schema.Parser().parse(new File("user.avsc"));
 GenericRecord user1 = new GenericData.Record(schema);
 user1.put("name", "Alyssa");
 user1.put("favorite_number", 256);
 // Leave favorite color null
GenericRecord user2 = new GenericData.Record(schema);
 user2.put("name", "Ben");
 user2.put("favorite_number", 7);
 user2.put("favorite_color", "red");
//序列化
 // Serialize user1, user2 and user3 to disk
 DatumWriter<User> userDatumWriter = new SpecificDatumWriter<User>(User.class);
 DataFileWriter<User> dataFileWriter = new DataFileWriter<User>(userDatumWriter);
 dataFileWriter.create(user1.getSchema(), new File("users.avro"));
 dataFileWriter.append(user1);
 dataFileWriter.append(user2);
 dataFileWriter.append(user3);
 dataFileWriter.close();
//反序列化
 // Deserialize Users from disk
 DatumReader<User> userDatumReader = new SpecificDatumReader<User>(User.class);
 DataFileReader<User> dataFileReader = new DataFileReader<User>(file, userDatumReader);
 User user = null;
 while (dataFileReader.hasNext()) {
 ? ? // Reuse user object by passing it to next(). This ?saves us from
 ? ? // allocating and garbage collecting many objects for ? files with
 ? ? // many items.
 ? ? user = dataFileReader.next(user);
 ? ? System.out.println(user);
 }
 2.2有插件的序列化
 2.2.1 插件導入
 <plugin>
 ? <groupId>org.apache.avro</groupId>
 ? <artifactId>avro-maven-plugin</artifactId>
 ? <version>1.8.2</version>
 ? <executions>
 ? ? <execution>
 ? ? ? <phase>generate-sources</phase>
 ? ? ? <goals>
 ? ? ? ? <goal>schema</goal>
 ? ? ? </goals>
 ? ? ? <configuration>
 ? ? ? ? <sourceDirectory>${project.basedir}/../</sourceDirectory>
 ? ? ? ? <outputDirectory>${project.basedir}/target/generated-sources/</outputDirectory>
 ? ? ? </configuration>
 ? ? </execution>
 ? </executions>
 </plugin>
 2.2.2 編譯schema文件
 注意schema文件放在指定的文件中?
 ?
 在idea中編譯此文件,使之在目錄中生成class文件
2.2.3 常規使用
 DatumWriter<User> userDatumWriter = new SpecificDatumWriter<User>(User.class);
 DataFileWriter<User> dataFileWriter = new DataFileWriter<User>(userDatumWriter);
 dataFileWriter.create(user1.getSchema(), new File("users.avro"));
 dataFileWriter.append(user1);
 dataFileWriter.append(user2);
 dataFileWriter.append(user3);
 dataFileWriter.close();
//序列化
 // Deserialize Users from disk
 DatumReader<User> userDatumReader = new SpecificDatumReader<User>(User.class);
 DataFileReader<User> dataFileReader = new DataFileReader<User>(file, userDatumReader);
 User user = null;
 while (dataFileReader.hasNext()) {
 ? ? // Reuse user object by passing it to next(). This saves us from
 ? ? // allocating and garbage collecting many objects for files with
 ? ? // many items.
 ? ? user = dataFileReader.next(user);
 ? ? System.out.println(user);
 }
 3、例子(使用的是有插件的方式)
 MapReduceColorCount:
package example;
import java.io.IOException;
import org.apache.avro.Schema;
 import org.apache.avro.mapred.AvroKey;
 import org.apache.avro.mapred.AvroValue;
 import org.apache.avro.mapreduce.AvroJob;
 import org.apache.avro.mapreduce.AvroKeyInputFormat;
 import org.apache.avro.mapreduce.AvroKeyValueOutputFormat;
 import org.apache.hadoop.conf.Configured;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.Mapper;
 import org.apache.hadoop.mapreduce.Reducer;
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 import org.apache.hadoop.util.Tool;
 import org.apache.hadoop.util.ToolRunner;
import example.avro.User;
public class MapReduceColorCount extends Configured implements Tool {
? public static class ColorCountMapper extends
 ? Mapper<AvroKey<User>, NullWritable, Text, IntWritable> {
? ? @Override
 ? ? public void map(AvroKey<User> key, NullWritable value, Context context)
 ? ? ? ? throws IOException, InterruptedException {
? ? ? CharSequence color = key.datum().getFavoriteColor();
 ? ? ? if (color == null) {
 ? ? ? ? color = "none";
 ? ? ? }
 ? ? ? context.write(new Text(color.toString()), new IntWritable(1));
 ? ? ? }
 ? ? }
? ? public static class ColorCountReducer extends
 ? Reducer<Text, IntWritable, AvroKey<CharSequence>, AvroValue<Integer>> {
? ? @Override
 ? ? public void reduce(Text key, Iterable<IntWritable> values,
 ? ? ? ? Context context) throws IOException, InterruptedException {
? ? ? int sum = 0;
 ? ? ? for (IntWritable value : values) {
 ? ? ? ? sum += value.get();
 ? ? ? }
 ? ? ? context.write(new AvroKey<CharSequence>(key.toString()), new AvroValue<Integer>(sum));
 ? ? }
 ? ? }
? public int run(String[] args) throws Exception {
 ? ? if (args.length != 2) {
 ? ? ? System.err.println("Usage: MapReduceColorCount <input path> <output path>");
 ? ? ? return -1;
 ? ? }
? ? Job job = new Job(getConf());
 ? ? job.setJarByClass(MapReduceColorCount.class);
 ? ? job.setJobName("Color Count");
? ? FileInputFormat.setInputPaths(job, new Path(args[0]));
 ? ? FileOutputFormat.setOutputPath(job, new Path(args[1]));
? ? job.setInputFormatClass(AvroKeyInputFormat.class);
 ? ? job.setMapperClass(ColorCountMapper.class);
 ? ? AvroJob.setInputKeySchema(job, User.getClassSchema());
 ? ? job.setMapOutputKeyClass(Text.class);
 ? ? job.setMapOutputValueClass(IntWritable.class);
? ? job.setOutputFormatClass(AvroKeyValueOutputFormat.class);
 ? ? job.setReducerClass(ColorCountReducer.class);
 ? ? AvroJob.setOutputKeySchema(job, Schema.create(Schema.Type.STRING));
 ? ? AvroJob.setOutputValueSchema(job, Schema.create(Schema.Type.INT));
? ? return (job.waitForCompletion(true) ? 0 : 1);
 ? }
? public static void main(String[] args) throws Exception {
 ? ? int res = ToolRunner.run(new MapReduceColorCount(), args);
 ? ? System.exit(res);
 ? }
 }
 注意:當采用不用插件的方式時,map的代碼如下?
 @Override?
 public void map(AvroKey key, NullWritable value, Context context)throws IOException,InterruptedException {}?
 由于代碼并不知道AvroKey的schema,所以要在main中使用AvroJob.setDataModelClass(job,GenericData.class);指定數據的schema。
 ?
總結
 
                            
                        - 上一篇: ubuntu 16.04 安装 pyth
- 下一篇: nvcc找不到的问题(Ubuntu16.
