BigBrother的大数据之旅Day 13 hbase(2)
HBase(2)
詳述人員角色表的設計思路以及實現
思路:兩個部分的信息分別保存到兩張表中,因為hbase是列存儲的表,一般存儲非關系數據,就像記筆記一樣,把關鍵點寫上.
第一張表: 個人信息表
rowkey為編號, 列族1為個人信息(性別,名字等),列族2為其擁有的角色(包含優先級)
第二張表: 角色信息表
rowkey: 角色id,列族1 角色信息(主要是名稱),列族2擁有的人員
詳述電話案例分析的設計思路以及實現
背景: 10個用戶,每個用戶每年產生1000條通話記錄
2.1 電話表
rowkey為 手機號+"-"+(Long.MAX_VALUE - sdf.parse(date).getTime()),sdf為simpledateformat
cf族中,
dnum為對方手機號
type為類型 ,0主叫,1被叫
length:長度
date:時間
2.2insert數據
public void insert() throws Exception {List<Put> puts = new ArrayList<Put>();for (int i = 0; i < 10; i++) {String phoneNumber = getPhone("158");for (int j = 0; j < 1000; j++) {// 屬性String dnum = getPhone("177");String length = String.valueOf(r.nextInt(100));String type = String.valueOf(r.nextInt(2));// yyyyMMddHHmmssString date = getDate("2018");// rowkey設計String rowkey = phoneNumber + "_" + (Long.MAX_VALUE - sdf.parse(date).getTime());Put put = new Put(rowkey.getBytes());put.add("cf".getBytes(), "dnum".getBytes(), dnum.getBytes());put.add("cf".getBytes(), "length".getBytes(), length.getBytes());put.add("cf".getBytes(), "type".getBytes(), type.getBytes());put.add("cf".getBytes(), "date".getBytes(), date.getBytes());puts.add(put);}table.put(puts);}}2.3 查看數據
public void scan() throws Exception {String phoneNumber = "15822158090";// 小于等于這個時間的都包含String startRow = phoneNumber + "_" + (Long.MAX_VALUE - sdf.parse("20180331000000").getTime());// 大于這個時間的都包含String stopRow = phoneNumber + "_" + (Long.MAX_VALUE - sdf.parse("20180231000000").getTime());Scan scan = new Scan();scan.setStartRow(startRow.getBytes());scan.setStopRow(stopRow.getBytes());ResultScanner scanner = table.getScanner(scan);for (Result result : scanner) {System.out.print(Bytes.toString(CellUtil.cloneValue(result.getColumnLatestCell("cf".getBytes(), "dnum".getBytes()))));System.out.print("--" + Bytes.toString(CellUtil.cloneValue(result.getColumnLatestCell("cf".getBytes(), "type".getBytes()))));System.out.print("--" + Bytes.toString(CellUtil.cloneValue(result.getColumnLatestCell("cf".getBytes(), "date".getBytes()))));System.out.println("--" + Bytes.toString(CellUtil.cloneValue(result.getColumnLatestCell("cf".getBytes(), "length".getBytes()))));}}protobuf做什么用的?詳述如何使用?
Protocol Buffers 是一種輕便高效的結構化數據存儲格式,可以用于結構化數據串行化,或者說序列化。它很適合做數據存儲或 RPC 數據交換格式。可用于通訊協議、數據存儲等領域的語言無關、平臺無關、可擴展的序列化結構數據格式。目前提供了 C++、Java、Python 三種語言的 API。
在本課程中,是把數據當成一個protobuf對象存儲到Hbase中,而不是向以前一樣,把字段直接存儲到Hbase中.目的是為了節省空間.根據課堂上的例子,大概可以壓縮到原來的3分之1左右.
1安裝protobuf-2.5.0
(1)解壓
[root@node1 temp]# tar -zxf protobuf-2.5.0.tar.gz -C /usr/local/protobuf/(2)安裝開發工具
yum -y groupinstall "Development tools"(3) 使用配置文件生成make文件和相關配置
./configure(4) make和install
make && make install2 在程序中使用protobuf
書寫.proto文件便于生成phone的protobuf類
package com.sxt message PhoneRecord{required string dnum=1;required string type=2;required int32 length=3;required string date=4; }詳述hbase的優化思路
(1) 表設計
a:預先創建region分區,當數據寫入HBase中時,會按照region分區情況,負載均衡也就是指定startkey和endkey,在初期提高吞吐量
b:rowkey設計,越小越好,依據實際業務,散列(取反或者哈希)
c:列族最多兩個,否則服務器之間的io太多
d: in memory 通過HColoumDescriptor.setInMemory(true)
e: 最大版本,不易太多
HColumnDescriptor.setMaxVersions(int maxVersions)
f:time to live
HColumnDescriptor.setTimeToLive(int timeToLive)設置表中數據的存儲生命期,過期數據將自動被刪除,單位為秒
I: 實際應用中,可以考慮必要時手動進行major compact,將同一個row key的修改進行合并形成一個大的StoreFile。同時,可以將StoreFile設置大些,減少split的發生。
hbase.hregion.majoucompaction 默認為24 小時、hbase.hregion.majorcompaction.jetter 默認值為0.2 防止region server 在同一時間進行major compaction)。
hbase.hregion.majorcompaction.jetter參數的作用是:對參數hbase.hregion.majoucompaction 規定的值起到浮動的作用,假如兩個參數都為默認值24和0,2,那么major compact最終使用的數值為:19.2~28.8 這個范圍。
最好手動 合并,編寫 major compaction
(2)寫表
a:關閉自動flush(持久化到硬盤)
b: 通過調用HTable.setWriteBufferSize(writeBufferSize)方法可以設置HTable客戶端的寫buffer大小,如果新設置的buffer小于當前寫buffer中的數據時,buffer將會被flush到服務端。其中,writeBufferSize的單位是byte字節數
c:WAL
不重要到數據,關閉 WAL功能
d: 多用List
e:多線程并發寫 可以使用MR
(3)讀表
a 設置scanner 一次從服務器抓取多條數據
HTable.setScannerCaching(int scannerCaching)
b 指定要查找的列族
get.addcolum(‘cf’.getBytes(),‘name’.getBytes())相當于不是查詢* 而是查詢有條件的數據
c 使用完resultScanner后關閉resultScanner
d 多線程并發讀
e 加入緩存(自己單獨做 redis)
f blockCache
讀流程 memstore 》 blockCache(默認65536字節) 》storefile
為什么用hbase+mapreduce整合?
快
(1)如果使用一個腳本把hbse中的數據,進行處理,效率是及其底下的,hbase中存放的是海量的數據
(2)如果使用多線程,效率是有些提升,但是,線程之間會占用資源,在hbase的海量數據面前,速度還是不夠快
列出從hdfs讀取數據寫到hbase以及hbase讀取數據寫到hdfs的步驟及實現
1 從hdfs讀取數據到hbase
(1) 主類
public class MainClass {private static String targetTable = "table1y";public static void main(String[] args) throws Exception {Configuration conf = HBaseConfiguration.create();conf.set("hbase.zookeeper.quorum", "node2,node3,node4");conf.set("mapreduce.framework.name", "local");Job job = Job.getInstance(conf);job.setJobName("hbase to mr to hbase");job.setJarByClass(MainClass.class);FileInputFormat.addInputPath(job, new Path("/mr/fof/input/fof.txt"));job.setMapperClass(MyMapper.class);job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(IntWritable.class);job.setOutputKeyClass(NullWritable.class);job.setOutputValueClass(Put.class);TableMapReduceUtil.initTableReducerJob(targetTable, MyReducer.class,job, null, null, null, null, false);// TableMapReduceUtil.initTableReducerJob(targetTable, MyReducer.class, job);// job.setOutputFormatClass(cls); // job.setInputFormatClass(cls);boolean b = job.waitForCompletion(true);} }(2) mapper類
public class MyMapper extends Mapper<LongWritable, Text, Text, IntWritable> {private Text outKey = new Text();private IntWritable outValue = new IntWritable(1);@Overrideprotected void map(LongWritable key, Text value, Context context)throws IOException, InterruptedException {String line = value.toString();String[] words = line.split(" ");for (String word : words) {outKey.set(word);context.write(outKey, outValue);}} }(3) reducer類
public class MyReducer extends TableReducer<Text, IntWritable, ImmutableBytesWritable> {@Overrideprotected void reduce(Text key, Iterable<IntWritable> values, Context context)throws IOException, InterruptedException {Put put = new Put(key.toString().getBytes());Iterator<IntWritable> itera = values.iterator();int sum = 0;int num = 0;while (itera.hasNext()) {IntWritable val = itera.next();num = val.get();sum += num;}put.add("cf".getBytes(), "age".getBytes(), String.valueOf(sum).getBytes());context.write(null, put);}}2 hbase到hdfs
(1)主類
public class MainClass {private static String sourceTable = "table1";public static void main(String[] args) throws Exception {Configuration conf = HBaseConfiguration.create(); // Configuration config = new Configuration(true);conf.set("hbase.zookeeper.quorum", "node2,node3,node4");conf.set("mapreduce.framework.name", "local");Job job = Job.getInstance(conf);job.setJarByClass(MainClass.class); // MR主入口類Scan scan = new Scan();scan.setCaching(500); // 默認值是1,對MR作業太小了,設置為500scan.setCacheBlocks(false); // 服務端緩存沒有意義TableMapReduceUtil.initTableMapperJob(sourceTable, // 源表scan, // Scan對象MyMapper.class, // 繼承自TableMapper的Mapper類Text.class, // mapper輸出key的類型IntWritable.class, // mapper輸出value的類型job); // TableInputFormatjob.setReducerClass(MyReducer.class); // reducer類job.setNumReduceTasks(1); // 至少一個reducer//設置輸出路徑 FileOutputFormat.setOutputPath(job, new Path("/hbase2hdfs/output"));boolean b = job.waitForCompletion(true);if (!b) {throw new IOException("error with job!");}}}(2)mapper類
public class MyMapper extends TableMapper<Text, IntWritable> {private Text outKey = new Text();private IntWritable outValue = new IntWritable(1);@Overrideprotected void map(ImmutableBytesWritable key, Result value, Context context)throws IOException, InterruptedException {Cell nameCell = value.getColumnLatestCell("cf".getBytes(), "name".getBytes());Cell ageCell = value.getColumnLatestCell("cf".getBytes(), "age".getBytes());Cell sexCell = value.getColumnLatestCell("cf".getBytes(), "sex".getBytes());String name = Bytes.toString(CellUtil.cloneValue(nameCell));String age = Bytes.toString(CellUtil.cloneValue(ageCell));String sex = Bytes.toString(CellUtil.cloneValue(sexCell));//從mapper的key中獲取rowkey的值String rowKey = Bytes.toString(key.get());StringBuffer sb = new StringBuffer();sb.append(rowKey).append(":");sb.append(name).append("-").append(age).append("-").append(sex);outKey.set(sb.toString());context.write(outKey, outValue);} }(3) reducer類
public class MyReducer extends Reducer<Text, IntWritable, Text, IntWritable> {@Overrideprotected void reduce(Text key, Iterable<IntWritable> values,Context context) throws IOException, InterruptedException {int i = 0;for (IntWritable val : values) {i += val.get();}context.write(key, new IntWritable(i));}}總結
以上是生活随笔為你收集整理的BigBrother的大数据之旅Day 13 hbase(2)的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: xp系统下硬盘安装linux,在NTFS
- 下一篇: English语法_副词 - ago /