数据仓库—stg层_数据仓库之Hive快速入门 - 离线实时数仓架构
數據倉庫VS數據庫
數據倉庫的定義:
數據倉庫是將多個數據源的數據經過ETL(Extract(抽取)、Transform(轉換)、Load(加載))理之后,按照一定的主題集成起來提供決策支持和聯機分析應用的結構化數據環境
數據倉庫VS數據庫:
數據庫是面向事務的設計,數據倉庫是面向主題設計的
數據庫一般存儲在線交易數據,數據倉庫存儲的一般是歷史數據
數據庫設計是避免冗余,采用三范式的規則來設計,數據倉庫在設計是有意引入冗余,采用反范式的方式來設計
OLTP VS OLAP:
聯機事務處理OLTP是傳統的關系型數據庫的主要應用,主要是基本的、日常的事務處理,例如銀行交易
聯機分析處理OLAP是數據倉庫系統的主要應用,支持復雜的分析操作,側重決策支持,并且提供直觀易懂的查詢結果
常規的數倉架構:
為什么建設數據倉庫:
各個業務數據存在不一致,數據關系混亂
業務系統一般針對于OLTP,而數據倉庫可以實現OLAP分析
數據倉庫是多源的復雜環境,可以對多個業務的數據進行統一分析
數據倉庫建設目標:
集成多源數據,數據來源和去向可追溯,梳理血緣關系
減少重復開發,保存通用型中間數據,避免重復計算
屏蔽底層業務邏輯,對外提供一致的、 結構清晰的數據
如何實現:
實現通用型數據ETL工具
根據業務建立合理的數據分層模型
數據倉庫分層建設
數倉建設背景:
數據建設剛起步,大部分數據經過粗暴的數據接入后直接對接業務
數據建設發展到一定階段,發現數據的使用雜亂無章,各種業務都是從原始數據直接計算而得。
各種重復計算,嚴重浪費了計算資源,需要優化性能
為什么進行數倉分層:
清晰數據結構:每個數據分層都有對應的作用域
數據血緣追蹤:對各層之間的數據表轉換進行跟蹤,建立血緣關系
減少重復開發:規范數據分層,開發通用的中間層數據
屏蔽原始數據的異常:通過數據分層管控數據質量
屏蔽業務的影響:不必改一次業務就需要重新接入數據
復雜問題簡單化:將復雜的數倉架構分解成多個數據層來完成
常見的分層含義:
STG層
原始數據層:存儲原始數據,數據結構與采集數據一致
存儲周期:保存全部數據
表命名規范:stg_主題_表內容_分表規則
ODS層
數據操作層:對STG層數據進行初步處理,如去除臟數據,去除無用字段.
存儲周期:默認保留近30天數據
表命名規范:ods_主題_表內容_分表規則
DWD層
數據明細層:數據處理后的寬表,目標為滿足80%的業務需求
存儲周期:保留歷史至今所有的數據
表命名規范:dwd_業務描述時間粒度
DWS層
數據匯總層:匯總數據,解決數據匯總計算和數據完整度問題
存儲周期:保留歷史至今所有的數據
表命名規范:dws_業務描述_時間粒度_sum
DIM層
公共維度層:存儲公共的信息數據,用于DWD、DWS的數據關聯
存儲周期:按需存儲,一般保留歷史至今所有的數據
表命名規范:dim_維度描述
DM層
數據集市層:用于BI、多維分析、標簽、數據挖掘等
存儲周期:按需存儲,--般保留歷史至今所有的數據
表命名規范:dm_主題_表內容_分表規則
分層之間的數據流轉:
Hive是什么
Hive簡介:
Hive是基于Hadoop的數據倉庫工具,提供類SQL語法(HiveQL)
默認以MR作為計算引擎(也支持其他計算引擎,例如tez)、HDFS 作為存儲系統,提供超大數據集的計算/擴展能力
Hive是將數據映射成數據庫和一張張的表,庫和表的元數據信息一般存在關系型數據庫
Hive的簡單架構圖:
Hive VS Hadoop:
Hive數據存儲:Hive的數據是存儲在HDFS.上的,Hive的庫和表是對HDFS.上數據的映射
Hive元數據存儲:元數據存儲一般在外部關系庫( Mysql )與Presto Impala等共享
Hive語句的執行過程:將HQL轉換為MapReduce任務運行
Hive與關系數據庫Mysql的區別
產品定位
Hive是數據倉庫,為海量數據的離線分析設計的,不支持OLTP(聯機事務處理所需的關鍵功能ACID,而更接近于OLAP(聯機分析技術)),適給離線處理大數據集。而MySQL是關系型數據庫,是為實時業務設計的。
可擴展性
Hive中的數據存儲在HDFS(Hadoop的分布式文件系統),metastore元數據一 般存儲在獨立的關系型數據庫中,而MySQL則是服務器本地的文件系統。因此Hive具有良好的可擴展性,數據庫由于ACID語義的嚴格限制,擴展性十分有限。
讀寫模式
Hive為讀時模式,數據的驗證則是在查詢時進行的,這有利于大數據集的導入,讀時模式使數據的加載非常迅速,數據的加載僅是文件復制或移動。MySQL為寫時模式,數據在寫入數據庫時對照模式檢查。寫時模式有利于提升查詢性能,因為數據庫可以對列進行索引。
數據更新
Hive是針對數據倉庫應用設計的,而數倉的內容是讀多寫少的,Hive中不支持對數據進行改寫,所有數據都是在加載的時候確定好的。而數據庫中的數據通常是需要經常進行修改的。
索引
Hive支持索引,但是Hive的索引與關系型數據庫中的索引并不相同,比如,Hive不支持主鍵或者外鍵。Hive提供了有限的索引功能,可以為-些字段建立索引,一張表的索引數據存儲在另外一張表中。由于數據的訪問延遲較高,Hive不適合在線數據查詢。數據庫在少星的特定條件的數據訪問中,索引可以提供較低的延遲。
計算模型
Hive默認使用的模型是MapReduce(也可以on spark、on tez),而MySQL使用的是自己設計的Executor計算模型
Hive安裝部署
參考:
Hive基本使用(上)Hive數據類型/分區/基礎語法
Hive數據類型:
基本數據類型:int、 float、 double、 string、 boolean、 bigint等
復雜類型:array、map、 struct
Hive分區:
Hive將海量數據按某幾個字段進行分區,查詢時不必加載全部數據
分區對應到HDFS就是HDFS的目錄.
分區分為靜態分區和動態分區兩種
Hive常用基礎語法:
USE DATABASE_NAME
CREATE DATABASE IF NOT EXISTS DB NAME
DESC DATABASE DB NAME
CREATE TABLE TABLE_ NAME (..) ROW FORMAT DELIMITED FIELDS TERMINATED BY "\t" STORE AS TEXTFILE
SELECT * FROM TABLE NAME
ALTER TABLE TABLE_NAME RENAME TO NEW_TABLE_NAME
寫個Python腳本生成一些測試數據:
import json
import random
import uuid
name = ('Tom', 'Jerry', 'Jim', 'Angela', 'Ann', 'Bella', 'Bonnie', 'Caroline')
hobby = ('reading', 'play', 'dancing', 'sing')
subject = ('math', 'chinese', 'english', 'computer')
data = []
for item in name:
scores = {key: random.randint(60, 100) for key in subject}
data.append("|".join([uuid.uuid4().hex, item, ','.join(
random.sample(set(hobby), 2)), ','.join(["{0}:{1}".format(k, v) for k, v in scores.items()])]))
with open('test.csv', 'w') as f:
f.write('\n'.join(data))
執行該腳本,生成測試數據文件:
[root@hadoop01 ~/py-script]# python3 gen_data.py
[root@hadoop01 ~/py-script]# ll -h
...
-rw-r--r--. 1 root root 745 11月 9 11:09 test.csv
[root@hadoop01 ~/py-script]#
我們可以看一下生成的數據:
[root@hadoop01 ~/py-script]# cat test.csv
f4914b91c5284b01832149776ca53c8d|Tom|reading,dancing|math:91,chinese:86,english:67,computer:77
...
數據以 | 符進行分割,前兩個字段都是string類型,第三個字段是array類型,第四個字段是map類型
創建測試用的數據庫:
0: jdbc:hive2://localhost:10000> create database hive_test;
No rows affected (0.051 seconds)
0: jdbc:hive2://localhost:10000> use hive_test;
No rows affected (0.06 seconds)
0: jdbc:hive2://localhost:10000>
創建測試表:
CREATE TABLE test(
user_id string,
user_name string,
hobby array,
scores map
)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '|'
COLLECTION ITEMS TERMINATED BY ','
MAP KEYS TERMINATED BY ':'
LINES TERMINATED BY '\n';
將本地數據加載到Hive中:
0: jdbc:hive2://localhost:10000> load data local inpath '/root/py-script/test.csv' overwrite into table test;
No rows affected (0.785 seconds)
0: jdbc:hive2://localhost:10000>
查詢數據:
Hive將HQL轉換為MapReduce的流程
了解了Hive中的SQL基本操作之后,我們來看看Hive是如何將SQL轉換為MapReduce任務的,整個轉換過程分為六個階段:
Antr定義SQL的語法規則,完成SQL詞法,語法解析,將SQL 轉化為抽象語法樹AST Tree
遍歷AST Tree,抽象出查詢的基本組成單元QueryBlock
遍歷QueryBlock,翻譯為執行操作樹OperatorTree
邏輯層優化器進行OperatorTree變換,合并不必要的ReduceSinkOperator,減少shufle數據量
遍歷OperatorTree,翻譯為MapReduce任務
物理層優化器進行MapReduce任務的變換,生成最終的執行計劃
與普通SQL一樣,我們可以通過在HQL前面加上explain關鍵字查看HQL的執行計劃:
explain select * from test where id > 10 limit 1000
Hive會將這條語句解析成一個個的Operator,Operator就是Hive解析之后的最小單元,每個Operator其實都是對應一個MapReduce任務。例如,上面這條語句被Hive解析后,就是由如下Operator組成:
同時,Hive實現了優化器對這些Operator的順序進行優化,幫助我們提升查詢效率。Hive中的優化器主要分為三類:
RBO(Rule-Based Optimizer):基于規則的優化器
CBO(Cost-Based Optimizer):基于代價的優化器,這是默認的優化器
動態CBO:在執行計劃生成的過程中動態優化的方式
Hive基本使用(中)內部表/外部表/分區表/分桶表
內部表:
和傳統數據庫的Table概念類似,對應HDFS上存儲目錄,刪除表時,刪除元數據和表數據。內部表的數據,會存放在HDFS中的特定的位置中,可以通過配置文件指定。當刪除表時,數據文件也會一并刪除。適用于臨時創建的中間表。
外部表:
指向已經存在的HDFS數據,刪除時只刪除元數據信息。適用于想要在Hive之外使用表的數據的情況,當你刪除External Table時,只是刪除了表的元數據,它的數據并沒有被刪除。適用于數據多部門共享。建表時使用create external table。指定external關鍵字即可。
分區表:
Partition對應普通數據庫對Partition列的密集索引,將數據按照Partition列存儲到不同目錄,便于并行分析,減少數據量。分區表創建表的時候需要指定分區字段。
分區字段與普通字段的區別:分區字段會在HDFS表目錄下生成一個分區字段名稱的目錄,而普通字段則不會,查詢的時候可以當成普通字段來使用,一般不直接和業務直接相關。
分桶表:
對數據進行hash,放到不同文件存儲,方便抽樣和join查詢。可以將內部表,外部表和分區表進一步組織成桶表,可以將表的列通過Hash算法進一步分解成不同的文件存儲。
對于內部表和外部表的概念和應用場景我們很容易理解,我們需要重點關注一下分區表和分桶表。 我們為什么要建立分區表和分桶表呢?HQL通過where子句來限制條件提取數據,那么與其遍歷一張大表,不如將這張大表拆分成多個小表,并通過合適的索引來掃描表中的一小部分,分區和分桶都是采用了這種理念。
分區會創建物理目錄,并且可以具有子目錄(通常會按照時間、地區分區),目錄名以 分區名=值 形式命名,例如:create_time=202011。分區名會作為表中的偽列,這樣通過where字句中加入分區的限制可以在僅掃描對應子目錄下的數據。通過 partitioned by (feld1 type, ...) 創建分區列。
分桶可以繼續在分區的基礎上再劃分小表,分桶根據哈希值來確定數據的分布(即MapReducer中的分區),比如分區下的一部分數據可以根據分桶再分為多個桶,這樣在查詢時先計算對應列的哈希值并計算桶號,只需要掃描對應桶中的數據即可。分桶通過clustered by(field) into n buckets創建。
接下來簡單演示下這幾種表的操作,首先將上一小節生成的測試數據文件上傳到hdfs中:
[root@hadoop01 ~]# hdfs dfs -mkdir /test
[root@hadoop01 ~]# hdfs dfs -put py-script/test.csv /test
[root@hadoop01 ~]# hdfs dfs -ls /test
Found 1 items
-rw-r--r-- 1 root supergroup 745 2020-11-09 11:34 /test/test.csv
[root@hadoop01 ~]#
內部表
建表SQL:
CREATE TABLE test_table(
user_id string,
user_name string,
hobby array,
scores map
)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '|'
COLLECTION ITEMS TERMINATED BY ','
MAP KEYS TERMINATED BY ':'
LINES TERMINATED BY '\n';
將hdfs數據加載到Hive中:
0: jdbc:hive2://localhost:10000> load data inpath '/test/test.csv' overwrite into table test_table;
No rows affected (0.169 seconds)
0: jdbc:hive2://localhost:10000>
查看創建的表存儲在hdfs的哪個目錄下:
0: jdbc:hive2://localhost:10000> show create table test_table;
+----------------------------------------------------+
| createtab_stmt |
+----------------------------------------------------+
| CREATE TABLE `test_table`( |
| `user_id` string, |
| `user_name` string, |
| `hobby` array, |
| `scores` map) |
| ROW FORMAT SERDE |
| 'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe' |
| WITH SERDEPROPERTIES ( |
| 'collection.delim'=',', |
| 'field.delim'='|', |
| 'line.delim'='\n', |
| 'mapkey.delim'=':', |
| 'serialization.format'='|') |
| STORED AS INPUTFORMAT |
| 'org.apache.hadoop.mapred.TextInputFormat' |
| OUTPUTFORMAT |
| 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat' |
| LOCATION |
| 'hdfs://hadoop01:8020/user/hive/warehouse/hive_test.db/test_table' |
| TBLPROPERTIES ( |
| 'bucketing_version'='2', |
| 'transient_lastDdlTime'='1604893190') |
+----------------------------------------------------+
22 rows selected (0.115 seconds)
0: jdbc:hive2://localhost:10000>
在hdfs中可以查看到數據文件:
[root@hadoop01 ~]# hdfs dfs -ls /user/hive/warehouse/hive_test.db/test_table
Found 1 items
-rw-r--r-- 1 root supergroup 745 2020-11-09 11:34 /user/hive/warehouse/hive_test.db/test_table/test.csv
[root@hadoop01 ~]#
刪除表:
0: jdbc:hive2://localhost:10000> drop table test_table;
No rows affected (0.107 seconds)
0: jdbc:hive2://localhost:10000>
查看hdfs會發現該表所對應的存儲目錄也一并被刪除了:
[root@hadoop01 ~]# hdfs dfs -ls /user/hive/warehouse/hive_test.db/
Found 2 items
drwxr-xr-x - root supergroup 0 2020-11-09 11:52 /user/hive/warehouse/hive_test.db/external_table
drwxr-xr-x - root supergroup 0 2020-11-09 11:23 /user/hive/warehouse/hive_test.db/test
[root@hadoop01 ~]#
外部表
建表SQL,與內部表的區別就在于external關鍵字:
CREATE external TABLE external_table(
user_id string,
user_name string,
hobby array,
scores map
)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '|'
COLLECTION ITEMS TERMINATED BY ','
MAP KEYS TERMINATED BY ':'
LINES TERMINATED BY '\n';
將數據文件加載到Hive中:
0: jdbc:hive2://localhost:10000> load data inpath '/test/test.csv' overwrite into table external_table;
No rows affected (0.182 seconds)
0: jdbc:hive2://localhost:10000>
此時會發現hdfs中的數據文件會被移動到hive的目錄下:
[root@hadoop01 ~]# hdfs dfs -ls /test
[root@hadoop01 ~]# hdfs dfs -ls /user/hive/warehouse/hive_test.db/external_table
Found 1 items
-rw-r--r-- 1 root supergroup 745 2020-11-09 11:52 /user/hive/warehouse/hive_test.db/external_table/test.csv
[root@hadoop01 ~]#
刪除表:
0: jdbc:hive2://localhost:10000> drop table external_table;
No rows affected (0.112 seconds)
0: jdbc:hive2://localhost:10000>
查看hdfs會發現該表所對應的存儲目錄仍然存在:
[root@hadoop01 ~]# hdfs dfs -ls /user/hive/warehouse/hive_test.db/external_table
Found 1 items
-rw-r--r-- 1 root supergroup 745 2020-11-09 11:52 /user/hive/warehouse/hive_test.db/external_table/test.csv
[root@hadoop01 ~]#
分區表
建表語句:
CREATE TABLE partition_table(
user_id string,
user_name string,
hobby array,
scores map
)
PARTITIONED BY (create_time string)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '|'
COLLECTION ITEMS TERMINATED BY ','
MAP KEYS TERMINATED BY ':'
LINES TERMINATED BY '\n';
將數據文件加載到Hive中,并指定分區:
0: jdbc:hive2://localhost:10000> load data local inpath '/root/py-script/test.csv' overwrite into table partition_table partition (create_time='202011');
No rows affected (0.747 seconds)
0: jdbc:hive2://localhost:10000> load data local inpath '/root/py-script/test.csv' overwrite into table partition_table partition (create_time='202012');
No rows affected (0.347 seconds)
0: jdbc:hive2://localhost:10000>
執行如下sql,可以從不同的分區統計結果:
0: jdbc:hive2://localhost:10000> select count(*) from partition_table;
+------+
| _c0 |
+------+
| 16 |
+------+
1 row selected (15.881 seconds)
0: jdbc:hive2://localhost:10000> select count(*) from partition_table where create_time='202011';
+------+
| _c0 |
+------+
| 8 |
+------+
1 row selected (14.639 seconds)
0: jdbc:hive2://localhost:10000> select count(*) from partition_table where create_time='202012';
+------+
| _c0 |
+------+
| 8 |
+------+
1 row selected (15.555 seconds)
0: jdbc:hive2://localhost:10000>
分區表在hdfs中的存儲結構:
[root@hadoop01 ~]# hdfs dfs -ls /user/hive/warehouse/hive_test.db/partition_table
Found 2 items
drwxr-xr-x - root supergroup 0 2020-11-09 12:08 /user/hive/warehouse/hive_test.db/partition_table/create_time=202011
drwxr-xr-x - root supergroup 0 2020-11-09 12:09 /user/hive/warehouse/hive_test.db/partition_table/create_time=202012
[root@hadoop01 ~]# hdfs dfs -ls /user/hive/warehouse/hive_test.db/partition_table/create_time=202011
Found 1 items
-rw-r--r-- 1 root supergroup 745 2020-11-09 12:08 /user/hive/warehouse/hive_test.db/partition_table/create_time=202011/test.csv
[root@hadoop01 ~]#
分桶表
建表語句:
CREATE TABLE bucket_table(
user_id string,
user_name string,
hobby array,
scores map
)
clustered by (user_name) sorted by (user_name) into 2 buckets
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '|'
COLLECTION ITEMS TERMINATED BY ','
MAP KEYS TERMINATED BY ':'
LINES TERMINATED BY '\n';
將test表中的數據插入到bucket_table中:
0: jdbc:hive2://localhost:10000> insert into bucket_table select * from test;
No rows affected (17.393 seconds)
0: jdbc:hive2://localhost:10000>
抽樣查詢:
分桶表在hdfs的存儲目錄如下:
[root@hadoop01 ~]# hdfs dfs -ls /user/hive/warehouse/hive_test.db/bucket_table
Found 2 items
-rw-r--r-- 1 root supergroup 465 2020-11-09 13:54 /user/hive/warehouse/hive_test.db/bucket_table/000000_0
-rw-r--r-- 1 root supergroup 281 2020-11-09 13:54 /user/hive/warehouse/hive_test.db/bucket_table/000001_0
[root@hadoop01 ~]#
Hive基本使用(下)內置函數/自定義函數/實現UDF
Hive常見內置函數:
字符串類型:concat、substr、 upper、 lower
時間類型:year、month、 day
復雜類型:size、 get_json_object
查詢引擎都自帶了一部分函數來幫助我們解決查詢過程當中一些復雜的數據計算或者數據轉換操作,但是有時候自帶的函數功能不能滿足業務的需要。這時候就需要我們自己開發自定義的函數來輔助完成了,這就是所謂的用戶自定義函數UDF(User-Defined Functions)。Hive支持三類自定義函數:
UDF:普通的用戶自定義函數。用來處理輸入一行,輸出一行的操作,類似Map操作。如轉換字符串大小寫,獲取字符串長度等
UDAF:用戶自定義聚合函數(User-defined aggregate function),用來處理輸入多行,輸出一行的操作,類似Reduce操作。比如MAX、COUNT函數。
UDTF:用戶自定義表產生函數(User defined table-generating function),用來處理輸入一行,輸出多行(即一個表)的操作, 不是特別常用
UDF函數其實就是一段遵循一定接口規范的程序。在執行過程中Hive將SQL轉換為MapReduce程序,在執行過程當中在執行我們的UDF函數。
本小節簡單演示下自定義UDF函數,首先創建一個空的Maven項目,然后添加hive-exec依賴,版本與你安裝的Hive版本需對應上。完整的pom文件內容如下:
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
4.0.0
org.example
hive-udf-test
1.0-SNAPSHOT
org.apache.hive
hive-exec
3.1.2
org.apache.maven.plugins
maven-compiler-plugin
8
8
首先創建一個繼承UDF的類,我們實現的這個自定義函數功能就是簡單的獲取字段的長度:
package com.example.hive.udf;
import org.apache.hadoop.hive.ql.exec.UDF;
import org.apache.hadoop.io.Text;
public class StrLen extends UDF {
public int evaluate(final Text col) {
return col.getLength();
}
}
以上這種自定義函數只能支持處理普通類型的數據,如果要對復雜類型的數據做處理則需要繼承GenericUDF,并實現其抽象方法。例如,我們實現一個對測試數據中的scores字段求平均值的函數:
package com.example.hive.udf;
import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
import org.apache.hadoop.hive.serde2.objectinspector.MapObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
import java.text.DecimalFormat;
public class AvgScore extends GenericUDF {
/**
* 函數的名稱
*/
private static final String FUNC_NAME = "AVG_SCORE";
/**
* 函數所作用的字段類型,這里是map類型
*/
private transient MapObjectInspector mapOi;
/**
* 控制精度只返回兩位小數
*/
DecimalFormat df = new DecimalFormat("#.##");
@Override
public ObjectInspector initialize(ObjectInspector[] objectInspectors) throws UDFArgumentException {
// 在此方法中可以做一些前置的校驗,例如檢測函數參數個數、檢測函數參數類型
mapOi = (MapObjectInspector) objectInspectors[0];
// 指定函數的輸出類型
return PrimitiveObjectInspectorFactory.javaDoubleObjectInspector;
}
@Override
public Object evaluate(DeferredObject[] deferredObjects) throws HiveException {
// 函數的核心邏輯,取出map中的value進行求平均值,并返回一個Double類型的結果值
Object o = deferredObjects[0].get();
double v = mapOi.getMap(o).values().stream()
.mapToDouble(a -> Double.parseDouble(a.toString()))
.average()
.orElse(0.0);
return Double.parseDouble(df.format(v));
}
@Override
public String getDisplayString(String[] strings) {
return "func(map)";
}
}
對項目進行打包,并上傳到服務器中:
[root@hadoop01 ~/jars]# ls
hive-udf-test-1.0-SNAPSHOT.jar
[root@hadoop01 ~/jars]#
將jar包上傳到hdfs中:
[root@hadoop01 ~/jars]# hdfs dfs -mkdir /udfs
[root@hadoop01 ~/jars]# hdfs dfs -put hive-udf-test-1.0-SNAPSHOT.jar /udfs
[root@hadoop01 ~/jars]# hdfs dfs -ls /udfs
Found 1 items
-rw-r--r-- 1 root supergroup 4030 2020-11-09 14:25 /udfs/hive-udf-test-1.0-SNAPSHOT.jar
[root@hadoop01 ~/jars]#
在Hive中添加該jar包:
0: jdbc:hive2://localhost:10000> add jar hdfs://hadoop01:8020/udfs/hive-udf-test-1.0-SNAPSHOT.jar;
No rows affected (0.022 seconds)
0: jdbc:hive2://localhost:10000>
然后注冊臨時函數,臨時函數只會在當前的session中生效:
0: jdbc:hive2://localhost:10000> CREATE TEMPORARY FUNCTION strlen as "com.example.hive.udf.StrLen";
No rows affected (0.026 seconds)
0: jdbc:hive2://localhost:10000> CREATE TEMPORARY FUNCTION avg_score as "com.example.hive.udf.AvgScore";
No rows affected (0.008 seconds)
0: jdbc:hive2://localhost:10000>
使用自定義函數處理:
0: jdbc:hive2://localhost:10000> select user_name, strlen(user_name) as length, avg_score(scores) as avg_score from test;
+------------+---------+------------+
| user_name | length | avg_score |
+------------+---------+------------+
| Tom | 3 | 80.25 |
| Jerry | 5 | 77.5 |
| Jim | 3 | 83.75 |
| Angela | 6 | 84.5 |
| Ann | 3 | 90.0 |
| Bella | 5 | 69.25 |
| Bonnie | 6 | 76.5 |
| Caroline | 8 | 84.5 |
+------------+---------+------------+
8 rows selected (0.083 seconds)
0: jdbc:hive2://localhost:10000>
刪除已注冊的臨時函數:
0: jdbc:hive2://localhost:10000> drop temporary function strlen;
No rows affected (0.01 seconds)
0: jdbc:hive2://localhost:10000> drop temporary function avg_score;
No rows affected (0.009 seconds)
0: jdbc:hive2://localhost:10000>
臨時函數只會在當前的session中生效,如果需要注冊成永久函數則只需要把TEMPORARY關鍵字給去掉即可。如下所示:
0: jdbc:hive2://localhost:10000> create function strlen as 'com.example.hive.udf.StrLen' using jar 'hdfs://hadoop01:8020/udfs/hive-udf-test-1.0-SNAPSHOT.jar';
No rows affected (0.049 seconds)
0: jdbc:hive2://localhost:10000> create function avg_score as 'com.example.hive.udf.AvgScore' using jar 'hdfs://hadoop01:8020/udfs/hive-udf-test-1.0-SNAPSHOT.jar';
No rows affected (0.026 seconds)
0: jdbc:hive2://localhost:10000>
刪除永久函數也是把TEMPORARY關鍵字給去掉即可。如下所示:
0: jdbc:hive2://localhost:10000> drop function strlen;
No rows affected (0.031 seconds)
0: jdbc:hive2://localhost:10000> drop function avg_score;
No rows affected (0.026 seconds)
0: jdbc:hive2://localhost:10000>
Hive存儲結構 - OrcFile
Hive支持的存儲格式:
TextFile是默認的存儲格式,通過簡單的分隔符可以對csv等類型的文件進行解析。但實際應用中通常都是使用OrcFile格式,因為ORCFile是列式存儲格式,更加適合大數據查詢的場景。
我們都知道關系型數據庫基本是使用行式存儲作為存儲格式,而大數據領域更多的是采用列式存儲,因為大數據分析場景中通常需要讀取大量行,但是只需要少數的幾個列。這也是為什么通常使用OrcFile作為Hive的存儲格式的原因。由此可見,大數據的絕大部分應用場景都是OLAP場景。
OLAP場景的特點
讀多于寫
不同于事務處理(OLTP)的場景,比如電商場景中加購物車、下單、支付等需要在原地進行大量insert、update、delete操作,數據分析(OLAP)場景通常是將數據批量導入后,進行任意維度的靈活探索、BI工具洞察、報表制作等。
數據一次性寫入后,分析師需要嘗試從各個角度對數據做挖掘、分析,直到發現其中的商業價值、業務變化趨勢等信息。這是一個需要反復試錯、不斷調整、持續優化的過程,其中數據的讀取次數遠多于寫入次數。這就要求底層數據庫為這個特點做專門設計,而不是盲目采用傳統數據庫的技術架構。
大寬表,讀大量行但是少量列,結果集較小
在OLAP場景中,通常存在一張或是幾張多列的大寬表,列數高達數百甚至數千列。對數據分析處理時,選擇其中的少數幾列作為維度列、其他少數幾列作為指標列,然后對全表或某一個較大范圍內的數據做聚合計算。這個過程會掃描大量的行數據,但是只用到了其中的少數列。而聚合計算的結果集相比于動輒數十億的原始數據,也明顯小得多。
數據批量寫入,且數據不更新或少更新
OLTP類業務對于延時(Latency)要求更高,要避免讓客戶等待造成業務損失;而OLAP類業務,由于數據量非常大,通常更加關注寫入吞吐(Throughput),要求海量數據能夠盡快導入完成。一旦導入完成,歷史數據往往作為存檔,不會再做更新、刪除操作。
無需事務,數據一致性要求低
OLAP類業務對于事務需求較少,通常是導入歷史日志數據,或搭配一款事務型數據庫并實時從事務型數據庫中進行數據同步。多數OLAP系統都支持最終一致性。
靈活多變,不適合預先建模
分析場景下,隨著業務變化要及時調整分析維度、挖掘方法,以盡快發現數據價值、更新業務指標。而數據倉庫中通常存儲著海量的歷史數據,調整代價十分高昂。預先建模技術雖然可以在特定場景中加速計算,但是無法滿足業務靈活多變的發展需求,維護成本過高。
行式存儲和列式存儲
行式存儲和列式存儲的對比圖:
與行式存儲將每一行的數據連續存儲不同,列式存儲將每一列的數據連續存儲。相比于行式存儲,列式存儲在分析場景下有著許多優良的特性:
如前所述,分析場景中往往需要讀大量行但是少數幾個列。在行存模式下,數據按行連續存儲,所有列的數據都存儲在一個block中,不參與計算的列在IO時也要全部讀出,讀取操作被嚴重放大。而列存模式下,只需要讀取參與計算的列即可,極大的減低了IO cost,加速了查詢。
同一列中的數據屬于同一類型,壓縮效果顯著。列存往往有著高達十倍甚至更高的壓縮比,節省了大量的存儲空間,降低了存儲成本。
更高的壓縮比意味著更小的data size,從磁盤中讀取相應數據耗時更短。
自由的壓縮算法選擇。不同列的數據具有不同的數據類型,適用的壓縮算法也就不盡相同??梢葬槍Σ煌蓄愋?#xff0c;選擇最合適的壓縮算法。
高壓縮比,意味著同等大小的內存能夠存放更多數據,系統cache效果更好。
OrcFile
OrcFile存儲格式:
Orc列式存儲優點:
查詢時只需要讀取查詢所涉及的列,降低IO消耗,同時保存每一列統計信息,實現部分謂詞下推
每列數據類型一致,可針對不同的數據類型采用其高效的壓縮算法
列式存儲格式假設數據不會發生改變,支持分片、流式讀取,更好的適應分布式文件存儲的特性
除了Orc外,Parquet也是常用的列式存儲格式。Orc VS Parquet:
OrcFile和Parquet都是Apache的頂級項目
Parquet不支持ACID、不支持更新,Orc支持有限的ACID和更新
Parquet的壓縮能力較高,Orc的查詢效率較高
離線數倉VS實時數倉
離線數倉:
離線數據倉庫主要基于Hive等技術來構建T+1的離線數據
通過定時任務每天拉取增量數據導入到Hive表中
創建各個業務相關的主題維度數據,對外提供T+1的數據查詢接口
離線數倉架構:
數據源通過離線的方式導入到離線數倉中
數據分層架構:ODS、DWD、 DM
下游應用根據業務需求選擇直接讀取DM
實時數倉:
實時數倉基于數據采集工具,將原始數據寫入到Kafka等數據通道
數據最終寫入到類似于HBase這樣支持快速讀寫的存儲系統
對外提供分鐘級別、甚至秒級別的查詢方案
實時數倉架構:
業務實時性要求的不斷提高,實時處理從次要部分變成了主要部分
Lambda架構:在離線大數據架構基礎上加了一個加速層,使用流處理技術完成實時性較高的指標計算
Kappa架構:以實時事件處理為核心,統一數據處理
圖解Lambda架構數據流程
Lambda 架構(Lambda Architecture)是由 Twitter 工程師南森·馬茨(Nathan Marz)提出的大數據處理架構。這一架構的提出基于馬茨在 BackType 和 Twitter 上的分布式數據處理系統的經驗。
Lambda 架構使開發人員能夠構建大規模分布式數據處理系統。它具有很好的靈活性和可擴展性,也對硬件故障和人為失誤有很好的容錯性。
Lambda 架構總共由三層系統組成:批處理層(Batch Layer),速度處理層(Speed Layer),以及用于響應查詢的服務層(Serving Layer)。
在 Lambda 架構中,每層都有自己所肩負的任務。批處理層存儲管理主數據集(不可變的數據集)和預先批處理計算好的視圖。批處理層使用可處理大量數據的分布式處理系統預先計算結果。它通過處理所有的已有歷史數據來實現數據的準確性。這意味著它是基于完整的數據集來重新計算的,能夠修復任何錯誤,然后更新現有的數據視圖。輸出通常存儲在只讀數據庫中,更新則完全取代現有的預先計算好的視圖。
速度處理層會實時處理新來的數據。速度層通過提供最新數據的實時視圖來最小化延遲。速度層所生成的數據視圖可能不如批處理層最終生成的視圖那樣準確或完整,但它們幾乎在收到數據后立即可用。而當同樣的數據在批處理層處理完成后,在速度層的數據就可以被替代掉了。
本質上,速度層彌補了批處理層所導致的數據視圖滯后。比如說,批處理層的每個任務都需要 1 個小時才能完成,而在這 1 個小時里,我們是無法獲取批處理層中最新任務給出的數據視圖的。而速度層因為能夠實時處理數據給出結果,就彌補了這 1 個小時的滯后。
所有在批處理層和速度層處理完的結果都輸出存儲在服務層中,服務層通過返回預先計算的數據視圖或從速度層處理構建好數據視圖來響應查詢。
所有的新用戶行為數據都可以同時流入批處理層和速度層。批處理層會永久保存數據并且對數據進行預處理,得到我們想要的用戶行為模型并寫入服務層。而速度層也同時對新用戶行為數據進行處理,得到實時的用戶行為模型。
而當“應該對用戶投放什么樣的廣告”作為一個查詢(Query)來到時,我們從服務層既查詢服務層中保存好的批處理輸出模型,也對速度層中處理的實時行為進行查詢,這樣我們就可以得到一個完整的用戶行為歷史了。
一個查詢就如下圖所示,既通過批處理層兼顧了數據的完整性,也可以通過速度層彌補批處理層的高延時性,讓整個查詢具有實時性。
Kappa 架構 VS Lambda
Lambda 架構的不足
雖然 Lambda 架構使用起來十分靈活,并且可以適用于很多的應用場景,但在實際應用的時候,Lambda 架構也存在著一些不足,主要表現在它的維護很復雜。
使用 Lambda 架構時,架構師需要維護兩個復雜的分布式系統,并且保證他們邏輯上產生相同的結果輸出到服務層中。舉個例子吧,我們在部署 Lambda 架構的時候,可以部署 Apache Hadoop 到批處理層上,同時部署 Apache Flink 到速度層上。
我們都知道,在分布式框架中進行編程其實是十分復雜的,尤其是我們還會針對不同的框架進行專門的優化。所以幾乎每一個架構師都認同,Lambda 架構在實戰中維護起來具有一定的復雜性。
那要怎么解決這個問題呢?我們先來思考一下,造成這個架構維護起來如此復雜的根本原因是什么呢?
維護 Lambda 架構的復雜性在于我們要同時維護兩套系統架構:批處理層和速度層。我們已經說過了,在架構中加入批處理層是因為從批處理層得到的結果具有高準確性,而加入速度層是因為它在處理大規模數據時具有低延時性。
那我們能不能改進其中某一層的架構,讓它具有另外一層架構的特性呢?例如,改進批處理層的系統讓它具有更低的延時性,又或者是改進速度層的系統,讓它產生的數據視圖更具準確性和更加接近歷史數據呢?
另外一種在大規模數據處理中常用的架構——Kappa 架構(Kappa Architecture),便是在這樣的思考下誕生的。
Kappa 架構
Kappa 架構是由 LinkedIn 的前首席工程師杰伊·克雷普斯(Jay Kreps)提出的一種架構思想??死灼账故菐讉€著名開源項目(包括 Apache Kafka 和 Apache Samza 這樣的流處理系統)的作者之一,也是現在 Confluent 大數據公司的 CEO。
克雷普斯提出了一個改進 Lambda 架構的觀點:
我們能不能改進 Lambda 架構中速度層的系統性能,使得它也可以處理好數據的完整性和準確性問題呢?我們能不能改進 Lambda 架構中的速度層,使它既能夠進行實時數據處理,同時也有能力在業務邏輯更新的情況下重新處理以前處理過的歷史數據呢?
他根據自身多年的架構經驗發現,我們是可以做到這樣的改進的。我們知道像 Apache Kafka 這樣的流處理平臺是具有永久保存數據日志的功能的。通過Kafka的這一特性,我們可以重新處理部署于速度層架構中的歷史數據。
下面我就以 Kafka 為例來介紹整個全新架構的過程。
第一步,部署 Kafka,并設置數據日志的保留期(Retention Period)。
這里的保留期指的是你希望能夠重新處理的歷史數據的時間區間。例如,如果你希望重新處理最多一年的歷史數據,那就可以把 Apache Kafka 中的保留期設置為 365 天。如果你希望能夠處理所有的歷史數據,那就可以把 Apache Kafka 中的保留期設置為“永久(Forever)”。
第二步,如果我們需要改進現有的邏輯算法,那就表示我們需要對歷史數據進行重新處理。我們需要做的就是重新啟動一個 Kafka 作業實例(Instance)。這個作業實例將重頭開始,重新計算保留好的歷史數據,并將結果輸出到一個新的數據視圖中。
我們知道 Kafka 的底層是使用 Log Offset 來判斷現在已經處理到哪個數據塊了,所以只需要將 Log Offset 設置為 0,新的作業實例就會重頭開始處理歷史數據。
第三步,當這個新的數據視圖處理過的數據進度趕上了舊的數據視圖時,我們的應用便可以切換到從新的數據視圖中讀取。
第四步,停止舊版本的作業實例,并刪除舊的數據視圖。
這個架構就如同下圖所示。
與 Lambda 架構不同的是,Kappa 架構去掉了批處理層這一體系結構,而只保留了速度層。你只需要在業務邏輯改變又或者是代碼更改的時候進行數據的重新處理。Kappa 架構統一了數據的處理方式,不再維護離線和實時兩套代碼邏輯。
Kappa 架構的不足
Kappa 架構也是有著它自身的不足的。因為 Kappa 架構只保留了速度層而缺少批處理層,在速度層上處理大規模數據可能會有數據更新出錯的情況發生,這就需要我們花費更多的時間在處理這些錯誤異常上面。如果需求發生變化或歷史數據需要重新處理都得通過上游重放來完成。并且重新處理歷史的吞吐能力會低于批處理。
還有一點,Kappa 架構的批處理和流處理都放在了速度層上,這導致了這種架構是使用同一套代碼來處理算法邏輯的。所以 Kappa 架構并不適用于批處理和流處理代碼邏輯不一致的場景。
Lambda VS Kappa
主流大公司的實時數倉架構
阿里菜鳥實時數倉
美團實時數倉
實時數倉建設特征
整體架構設計通過分層設計為OLAP查詢分擔壓力
復雜的計算統一在實時計算層做,避免給OLAP查詢帶來過大的壓力
匯總計算通過OLAP數據查詢引擎進行
整個架構中實時計算一般 是Spark+Flink配合
消息隊列Kafka一家獨大,配合HBase、ES、 Mysq|進行數據落盤
OLAP領域Presto、Druid、 Clickhouse、 Greenplum等等層出不窮
總結
以上是生活随笔為你收集整理的数据仓库—stg层_数据仓库之Hive快速入门 - 离线实时数仓架构的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 判断参数大于0_格力GMV5多联机调试参
- 下一篇: python怎么连接socket_pyt