MaxCompute与OSS非结构化数据读写互通(及图像处理实例)
為什么80%的碼農(nóng)都做不了架構(gòu)師?>>> ??
摘要:?MaxCompute作為阿里巴巴集團(tuán)內(nèi)部絕大多數(shù)大數(shù)據(jù)處理需求的核心計(jì)算組件,擁有強(qiáng)大的計(jì)算能力,隨著集團(tuán)內(nèi)外大數(shù)據(jù)業(yè)務(wù)的不斷擴(kuò)展,新的數(shù)據(jù)使用場(chǎng)景也在不斷產(chǎn)生。在這樣的背景下,MaxCompute(ODPS)計(jì)算框架持續(xù)演化,而原來(lái)主要面對(duì)內(nèi)部特殊格式數(shù)據(jù)的強(qiáng)大計(jì)算能力,也正在一步步的通過(guò)新增的非結(jié)構(gòu)化數(shù)據(jù)處理框架,開(kāi)放給不同的外部數(shù)據(jù)。
?前言
MaxCompute作為阿里巴巴集團(tuán)內(nèi)部絕大多數(shù)大數(shù)據(jù)處理需求的核心計(jì)算組件,擁有強(qiáng)大的計(jì)算能力,隨著集團(tuán)內(nèi)外大數(shù)據(jù)業(yè)務(wù)的不斷擴(kuò)展,新的數(shù)據(jù)使用場(chǎng)景也在不斷產(chǎn)生。在這樣的背景下,MaxCompute(ODPS)計(jì)算框架持續(xù)演化,而原來(lái)主要面對(duì)內(nèi)部特殊格式數(shù)據(jù)的強(qiáng)大計(jì)算能力,也正在一步步的通過(guò)新增的非結(jié)構(gòu)化數(shù)據(jù)處理框架,開(kāi)放給不同的外部數(shù)據(jù)。 我們相信阿里巴巴集團(tuán)的這種需求,也代表著業(yè)界大數(shù)據(jù)領(lǐng)域的最前沿實(shí)踐和走向,具有相當(dāng)?shù)钠者m性。在之前我們已經(jīng)對(duì)MaxCompute 2.0新增的非結(jié)構(gòu)化框架做過(guò)整體介紹,描述了在MaxCompute上如何處理存儲(chǔ)在OSS上面的非結(jié)構(gòu)化數(shù)據(jù),側(cè)重點(diǎn)在怎樣從OSS讀取各種非結(jié)構(gòu)化數(shù)據(jù)并在MaxCompute上進(jìn)行計(jì)算。 而一個(gè)完整數(shù)據(jù)鏈路,讀取和計(jì)算處理之后,必然也會(huì)涉及到非結(jié)構(gòu)化數(shù)據(jù)的?寫(xiě)出。 在這里我們著重介紹一下從MaxCompute往OSS輸出非結(jié)構(gòu)化數(shù)據(jù),并提供一個(gè)具體的在MaxCompute上進(jìn)行圖像處理的實(shí)例,?來(lái)展示從【OSS->MaxCompute->OSS】的整個(gè)數(shù)據(jù)鏈路閉環(huán)的實(shí)現(xiàn)。 至于對(duì)于KV NoSQL類型數(shù)據(jù)的輸出,在對(duì)TableStore數(shù)據(jù)處理介紹?中已經(jīng)有所介紹,這里就不再重復(fù)。
1. 使用前提和假設(shè)
1.1 MaxCompute 2.0 功能
這里介紹的功能基于MaxCompute新一代的2.0計(jì)算框架,目前2.0計(jì)算框架已經(jīng)全面上線,默認(rèn)就可使用。
另外本文中使用了MaxCompute 2.0新引進(jìn)的一個(gè)BINARY類型,目前在使用BINARY類型時(shí),還需要顯性設(shè)置set odps.sql.type.system.odps2=true。
1.2 網(wǎng)絡(luò)連通性與訪問(wèn)權(quán)限
另外因?yàn)镸axCompute與OSS是兩個(gè)分開(kāi)的云計(jì)算,與云存儲(chǔ)服務(wù),所以在不同的部署集群上的網(wǎng)絡(luò)連通性有可能影響MaxCompute訪問(wèn)OSS的數(shù)據(jù)的可達(dá)性。 關(guān)于OSS的節(jié)點(diǎn),實(shí)例,服務(wù)地址等概念,可以參見(jiàn)OSS相關(guān)介紹。 在MaxCompute公共云服務(wù)訪問(wèn)OSS存儲(chǔ),推薦使用OSS私網(wǎng)地址(即以-internal.aliyuncs.com結(jié)尾的host地址)。
此外需要指出的是,MaxCompute計(jì)算服務(wù)要訪問(wèn)TableStore數(shù)據(jù)需要有一個(gè)安全的授權(quán)通道。 在這個(gè)問(wèn)題上,MaxCompute結(jié)合了阿里云的訪問(wèn)控制服務(wù)(RAM)和令牌服務(wù)(STS)來(lái)實(shí)現(xiàn)對(duì)數(shù)據(jù)的安全反問(wèn):
首先需要在RAM中授權(quán)MaxCompute訪問(wèn)OSS的權(quán)限。登錄RAM控制臺(tái),創(chuàng)建角色AliyunODPSDefaultRole,并將策略內(nèi)容設(shè)置為:
{"Statement": [{"Action": "sts:AssumeRole","Effect": "Allow","Principal": {"Service": ["odps.aliyuncs.com"]}}],"Version": "1" }然后編輯該角色的授權(quán)策略,將權(quán)限AliyunODPSRolePolicy授權(quán)給該角色。
如果覺(jué)得這些步驟太麻煩,還可以登錄阿里云賬號(hào)點(diǎn)擊此處完成一鍵授權(quán)。
2. MaxCompute內(nèi)置的OSS數(shù)據(jù)輸出handler
2.1 創(chuàng)建External Table
MaxCompute非結(jié)構(gòu)化數(shù)據(jù)框架希望從根本上提供MaxCompute與各種數(shù)據(jù)的聯(lián)通,這里的“各種數(shù)據(jù)”是兩個(gè)維度上的:
而數(shù)據(jù)的這兩個(gè)維度的特征,都是通過(guò)EXTERNAL TABLE的概念來(lái)引入MaxCompute的計(jì)算體系的。 與讀取OSS數(shù)據(jù)的使用方法類似,對(duì)OSS數(shù)據(jù)進(jìn)行寫(xiě)操作,在如上打開(kāi)安全授權(quán)通道后,也是先通過(guò)CREATE EXTERNAL TABLE語(yǔ)句創(chuàng)建出一個(gè)外部表,再通過(guò)標(biāo)準(zhǔn)MaxCompute SQL的INSERT INTO/OVERWRITE等語(yǔ)句來(lái)實(shí)現(xiàn)的,這里先用MaxCompute內(nèi)置的TsvStorageHandler為例來(lái)說(shuō)明一下用法:
DROP TABLE IF EXISTS tpch_lineitem_tsv_external;CREATE EXTERNAL TABLE IF NOT EXISTS tpch_lineitem_tsv_external ( orderkey BIGINT, suppkey BIGINT, discount DOUBLE, tax DOUBLE, shipdate STRING, linestatus STRING, shipmode STRING, comment STRING ) STORED BY 'com.aliyun.odps.TsvStorageHandler' ----------------------------------------- (1) LOCATION 'oss://oss-cn-shanghai-internal.aliyuncs.com/oss-odps-test/tsv_output_folder/'; --(2)這個(gè)DDL語(yǔ)句建立了一個(gè)外部表tpch_lineitem_tsv_external,并將前面提到的兩個(gè)維度的外部數(shù)據(jù)信息關(guān)聯(lián)到這個(gè)外部表上。
其中OSS數(shù)據(jù)存儲(chǔ)的具體地址的URI格式為:
LOCATION 'oss://${endpoint}/${bucket}/${userPath}/'最后還要提到的是,在上面的DDL語(yǔ)句中定義了外部表的Schema, 對(duì)于數(shù)據(jù)輸出而言,這表示輸出的數(shù)據(jù)格式將由這個(gè)Schema描述。 就TSV格式而言,這個(gè)schema描述比較直觀容易理解; 而在用戶自定義的輸出數(shù)據(jù)格式上,這個(gè)schema與輸出數(shù)據(jù)的聯(lián)系則更松散一些,有著更大的自由度。 在后面介紹通過(guò)自定義StorageHandler/Outputer的時(shí)候會(huì)詳細(xì)展開(kāi)。
2.2 通過(guò)對(duì)External Table的 INSERT 操作實(shí)現(xiàn)TSV文本文件的寫(xiě)出
在將OSS數(shù)據(jù)通過(guò)External Table關(guān)聯(lián)上后,對(duì)OSS文件的寫(xiě)出可以對(duì)External Table做標(biāo)準(zhǔn)的SQL INSERT OVERWRITE/INSERT INTO來(lái)操作。 具體輸出數(shù)據(jù)的來(lái)源可以有兩種
2.2.1 從MaxCompute內(nèi)部表輸出數(shù)據(jù)到OSS
這里先來(lái)看第一種場(chǎng)景:假設(shè)我們已經(jīng)有一個(gè)名為tpch_lineitem的MaxCompute內(nèi)部表,其schema可以通過(guò)
DESCRIBE tpch_lineitem;得到:
+------------------------------------------------------------------------------------+ | InternalTable: YES | Size: 241483831680 | +------------------------------------------------------------------------------------+ | Native Columns: | +------------------------------------------------------------------------------------+ | Field | Type | Label | Comment | +------------------------------------------------------------------------------------+ | l_orderkey | bigint | | | | l_partkey | bigint | | | | l_suppkey | bigint | | | | l_linenumber | bigint | | | | l_quantity | double | | | | l_extendedprice | double | | | | l_discount | double | | | | l_tax | double | | | | l_returnflag | string | | | | l_linestatus | string | | | | l_shipdate | string | | | | l_commitdate | string | | | | l_receiptdate | string | | | | l_shipinstruct | string | | | | l_shipmode | string | | | | l_comment | string | | | +------------------------------------------------------------------------------------+其中有16個(gè)columns。 現(xiàn)在我們希望將其中的一部分?jǐn)?shù)據(jù)以TSV格式導(dǎo)出到OSS上面。 那么在用上述DDL創(chuàng)建出External Table之后,使用如下INSERT OVERWRITE操作就可以實(shí)現(xiàn):
INSERT OVERWRITE TABLE tpch_lineitem_tsv_external SELECT l_orderkey, l_suppkey, l_discount, l_tax, l_shipdate, l_linestatus, l_shipmode, l_commentFROM tpch_lineitemWHERE l_discount = 0.07 and l_tax = 0.01;這里將從內(nèi)部的tpch_lineitem表中,在符合l_discount = 0.07 并 l_tax = 0.01的行中選出8個(gè)列(對(duì)應(yīng)tpch_lineitem_tsv_external這個(gè)外部表的schema)按照TSV的格式寫(xiě)到OSS上。 在上面這個(gè)INSERT OVERWRITE操作成功完成后,就可以看到OSS上的對(duì)應(yīng)LOCATION產(chǎn)生了一系列文件:
osscmd ls oss://oss-odps-test/tsv_output_folder/2017-01-14 06:48:27 39.00B Standard oss://oss-odps-test/tsv_output_folder/.odps/.meta 2017-01-14 06:48:12 4.80MB Standard oss://oss-odps-test/tsv_output_folder/.odps/20170113224724561g9m6csz7/M1_0_0-0.tsv 2017-01-14 06:48:05 4.78MB Standard oss://oss-odps-test/tsv_output_folder/.odps/20170113224724561g9m6csz7/M1_1_0-0.tsv 2017-01-14 06:47:48 4.79MB Standard oss://oss-odps-test/tsv_output_folder/.odps/20170113224724561g9m6csz7/M1_2_0-0.tsv ...這里可以看到,通過(guò)上面LOCATION指定的oss-odps-test這個(gè)OSS bucket下的tsv_output_folder文件夾下產(chǎn)生了一個(gè).odps文件夾,這其中將有一些.tsv文件,以及一個(gè).meta文件。 這樣子的文件結(jié)構(gòu)是MaxCompute(ODPS)往OSS上輸出所特有的:
這里迅速看一下這些tsv文件的內(nèi)容:
osscmd cat oss://oss-odps-test/tsv_output_folder/.odps/20170113232648738gam6csz7/M1_0_0-0.tsv 4236000067 9992377 0.07 0.01 1992-11-06 F RAIL across the ideas nag 4236000290 3272628 0.07 0.01 1998-04-28 O RAIL uriously. furiously unusual dinos int 4236000386 8081402 0.07 0.01 1994-02-19 F RAIL its. express, iron 4236000710 3879271 0.07 0.01 1995-03-10 F AIR es are carefully fluffily spe ...可以看到確實(shí)在OSS上產(chǎn)生了對(duì)應(yīng)的TSV數(shù)據(jù)文件。
最后,大家可能也注意到了,這個(gè)INSERT OVERWRITE操作產(chǎn)生了多個(gè)TSV文件,對(duì)于MaxCompute內(nèi)置的TSV/CSV處理來(lái)說(shuō),產(chǎn)生的文件數(shù)目與對(duì)應(yīng)SQL stage的并發(fā)度是相同的,在上面這個(gè)例子中,INSER OVERWITE ... SELECT ... FROM ...; 的操作在源數(shù)據(jù)表(tpch_lineitem) 上分配了1000個(gè)mapper,所以最后產(chǎn)生了1000個(gè)TSV文件的。 如果需要控制TSV文件的數(shù)目,可以配合MaxCompute的各種靈活語(yǔ)義和配置來(lái)實(shí)現(xiàn)。 比如如果需要強(qiáng)制產(chǎn)生一個(gè)TSV文件,那在這個(gè)特定例子中,可以在INSER OVERWITE ... SELECT ... FROM ...最后加上一個(gè)DISTRIBUTE BY l_discount, 就可以在最后插入僅有一個(gè)Reducer的Reduce stage, 也就會(huì)只輸出一個(gè)TSV文件了:
osscmd ls oss://oss-odps-test/tsv_output_folder/2017-01-14 08:03:41 39.00B Standard oss://oss-odps-test/tsv_output_folder/.odps/.meta 2017-01-14 08:03:35 4.20GB Standard oss://oss-odps-test/tsv_output_folder/.odps/20170113234037735gcm6csz7/R2_1_33_0-0.tsv可以看到在增加了DISTRIBUTE BY l_discount后,現(xiàn)在同樣的數(shù)據(jù)只了一個(gè)輸出TSV文件,當(dāng)然這個(gè)文件的size就大多了。 這方面的調(diào)控技巧還有很多,都是可以依賴SQL語(yǔ)言的靈活性,數(shù)據(jù)本身的特性,以及MaxCompute計(jì)算相關(guān)設(shè)置來(lái)實(shí)現(xiàn)的,這里就不深入展開(kāi)了。
2.2.2 以MaxCompute為計(jì)算介質(zhì),實(shí)現(xiàn)不同存儲(chǔ)介質(zhì)之間的數(shù)據(jù)轉(zhuǎn)移
External Table作為一個(gè)MaxCompute與外部存儲(chǔ)介質(zhì)的一個(gè)切入點(diǎn),之前已經(jīng)介紹過(guò)對(duì)OSS數(shù)據(jù)的讀取以及TableStore數(shù)據(jù)的操作,結(jié)合對(duì)外部數(shù)據(jù)讀取和寫(xiě)出的功能,就可以實(shí)現(xiàn)通過(guò)External Table實(shí)現(xiàn)各種各樣的數(shù)據(jù)計(jì)算/存儲(chǔ)鏈路,比如:
而這些操作與上面數(shù)據(jù)源為MaxCompute內(nèi)部表的場(chǎng)景,?唯一的區(qū)別只是SELECT的來(lái)源變成一個(gè)External table,而不是MaxCompute內(nèi)置表。
3. 通過(guò)自定義StorageHandler來(lái)實(shí)現(xiàn)數(shù)據(jù)輸出
除了使用內(nèi)置的StorageHandler來(lái)實(shí)現(xiàn)在OSS上輸出TSV/CSV等常見(jiàn)文本格式,MaxCompute非結(jié)構(gòu)化框架提供了通用的SDK,允許用戶對(duì)外輸出自定義數(shù)據(jù)格式文件,包括圖像,音頻,視頻等等。 這種對(duì)于用戶自定義的完全非結(jié)構(gòu)化數(shù)據(jù)格式支持,也是MaxCompute從結(jié)構(gòu)化/文本類數(shù)據(jù)的一個(gè)向外擴(kuò)展,在這里我們會(huì)以一個(gè)圖像處理的例子,來(lái)走通整個(gè)【OSS->MaxCompute->OSS】數(shù)據(jù)鏈路,尤其著重介紹對(duì)OSS輸出文件的功能。
為了方便大家理解,這里先提供一個(gè)在使用用戶自定義代碼的場(chǎng)景下,數(shù)據(jù)在MaxCompute計(jì)算平臺(tái)上的流程:
從上圖可以看出,從數(shù)據(jù)的流動(dòng)和處理邏輯上理解,用戶可以簡(jiǎn)單地把非結(jié)構(gòu)化處理框架理解成在MaxCompute計(jì)算平臺(tái)兩端有機(jī)耦合的的數(shù)據(jù)導(dǎo)入(Ingres)以及導(dǎo)出(Egress):
值得指出的是,這里面所有的步驟都是可以由用戶根據(jù)需要來(lái)進(jìn)行自由的選擇與拼接的。 比如如果用戶的輸入就是MaxCompute的內(nèi)部表,那步驟1.就沒(méi)有必要了,事實(shí)上在前面的章節(jié)2中的例子,我們就實(shí)現(xiàn)了將內(nèi)部表直接寫(xiě)成OSS上的TSV文件的流程。 同理, 如果用戶沒(méi)有輸出的需求,步驟3. 就沒(méi)有必要,比如我們之前介紹的OSS數(shù)據(jù)的讀取。 最后,步驟2.也是可以省略的,比如如果用戶的所有計(jì)算邏輯都是在自定義的Extract/Output中完成,沒(méi)有進(jìn)行SQL邏輯運(yùn)算的需要,那步驟1.是可以直接連接到步驟3.的。
理解了上面這個(gè)數(shù)據(jù)變換的流程,我們就可以來(lái)通過(guò)一個(gè)圖像處理例子來(lái)看看怎么具體的通過(guò)非結(jié)構(gòu)化框架在MaxCompute SQL上完整的實(shí)現(xiàn)非結(jié)構(gòu)化數(shù)據(jù)的讀取,計(jì)算以及輸出了:
3.1 范例:OSS圖像文件 -> MaxCompute計(jì)算處理 -> OSS圖像輸出
這里我們先提供實(shí)現(xiàn)這整個(gè)【OSS->MaxCompute->OSS】數(shù)據(jù)鏈路需要用到的MaxCompute SQL query,并做簡(jiǎn)單的注解,詳細(xì)的用戶代碼實(shí)現(xiàn)邏輯將在后面的3.2子章節(jié)中介紹SDK接口的時(shí)候做展開(kāi)解釋。
3.1.1 關(guān)聯(lián)OSS上的原始輸入圖像到External Table: images_input_external
DROP TABLE IF EXISTS images_input_external; CREATE EXTERNAL TABLE IF NOT EXISTS images_input_external ( name STRING, width BIGINT, height BIGINT, image BINARY ) STORED BY 'com.aliyun.odps.udf.example.image.ImageStorageHandler' --- (1) WITH SERDEPROPERTIES ('inputImageFormat'='BMP' , 'transformedImageFormat' = 'JPG') --- (2) LOCATION 'oss://oss-cn-shanghai-internal.aliyuncs.com/oss-odps-test/dev/SampleData/test_images/mixed_bmp/' --- (3) USING 'odps-udf-example.jar'; --- (4)說(shuō)明:
另外要說(shuō)明的是這里指定的External Table的schema就是用戶在進(jìn)行Extract操作后構(gòu)造的Record格式,具體怎么構(gòu)造這個(gè)Schema用戶可以根據(jù)需要自己根據(jù)能從輸入數(shù)據(jù)中抽取到的信息定義。 在這里我們定義了對(duì)于輸入圖片數(shù)據(jù),會(huì)將圖片名稱,圖片的長(zhǎng)和寬,以及圖片的二進(jìn)制bytes抽取出來(lái)放進(jìn)Record(見(jiàn)后面的Extractor代碼說(shuō)明),所以就有了上面的【STRING,BIGINT,BIGINT,BINARY】的schema。
3.1.2 關(guān)聯(lián)OSS輸出地址到External Table: images_output_external
CREATE EXTERNAL TABLE IF NOT EXISTS images_output_external ( image_name STRING, image_width BIGINT, image_height BIGINT, outimage BINARY ) STORED BY 'com.aliyun.odps.udf.example.image.ImageStorageHandler' LOCATION 'oss://oss-cn-shanghai-internal.aliyuncs.com/oss-odps-test/dev/output/images_output/' ---(1) USING 'odps-udf-example.jar';說(shuō)明: 可以看到這里創(chuàng)建關(guān)聯(lián)輸出圖像文件的External Table,使用的DDL語(yǔ)句,與前面關(guān)聯(lián)輸入圖像時(shí)使用的DDL語(yǔ)句是非常類似的:只是LOCATION不一樣,表明圖像數(shù)據(jù)處理后將輸出到另外一個(gè)地址。 另外還有一點(diǎn)就是這里我們沒(méi)有使用SERDEPROPERTIES來(lái)進(jìn)行傳參,這個(gè)只是在這個(gè)場(chǎng)景上沒(méi)有需求,在有需求的時(shí)候可以用同樣的方法把參數(shù)傳遞給outputer。 當(dāng)然這里兩個(gè)DDL語(yǔ)句如此相似,有一個(gè)原因是因?yàn)槲覀冞@個(gè)例子中用戶代碼中對(duì)于Extract出的Record以及輸入給Outputer的Record使用了一樣的schema, 同時(shí)這一對(duì)Extractor和Outputer都被封裝在了同一個(gè)ImageStorageHandler里放在同一個(gè)JAR包里。?在實(shí)際應(yīng)用中,這些都是可以根據(jù)實(shí)際需求自己調(diào)整的,由用戶自己選擇組合和打包方式。
3.1.3 從OSS讀取原始圖片數(shù)據(jù)到MaxCompute, 計(jì)算處理,并輸出圖像到OSS
在上面的3.1.1以及3.1.2子章節(jié)中的兩個(gè)DDL語(yǔ)句,分別實(shí)現(xiàn)了把輸入OSS數(shù)據(jù),以及計(jì)劃輸出OSS數(shù)據(jù),分別綁定到兩個(gè)LOCATION以及指定對(duì)應(yīng)的用戶處理代碼,參數(shù)等設(shè)置。 然而這兩個(gè)DDL語(yǔ)句對(duì)系統(tǒng)而言,只是進(jìn)行了一些宏數(shù)據(jù)的記錄操作,并不會(huì)涉及具體的數(shù)據(jù)計(jì)算操作。 在這兩個(gè)DDL語(yǔ)句運(yùn)行成功后,運(yùn)行如下SQL語(yǔ)句才會(huì)引發(fā)真正的運(yùn)算。 換句話說(shuō),在Fig.1中描述的整個(gè)【OSS->MaxCompute->OSS】數(shù)據(jù)讀取/計(jì)算/輸出鏈路,實(shí)際上都是通過(guò)下面一個(gè)簡(jiǎn)單的SQL 語(yǔ)句完成的:
INSERT OVERWRITE TABLE images_output_external SELECT * FROM images_input_external WHERE width = 1024;這看起來(lái)就是一個(gè)標(biāo)準(zhǔn)的MaxCompute SQL語(yǔ)句,只不過(guò)因?yàn)樯婕傲薸mages_output_external和images_input_external這兩個(gè)外部表,所以真正進(jìn)行的物理操作與傳統(tǒng)的SQL操作會(huì)有一些區(qū)別:在這個(gè)過(guò)程中,涉及了讀寫(xiě)OSS,以及通過(guò)ImageStorageHandler這個(gè)wrapper,調(diào)用自定義的Extractor,Outputer代碼來(lái)對(duì)數(shù)據(jù)進(jìn)行操作。 下面就來(lái)具體看看在這個(gè)例子中的用戶自定義代碼實(shí)現(xiàn)了怎樣的功能,以及具體是如何實(shí)現(xiàn)的。
3.2 ImageStorageHandler實(shí)現(xiàn)
如同之前介紹過(guò)的,MaxCompute非結(jié)構(gòu)化框架通過(guò)StorageHandler這個(gè)接口來(lái)描述對(duì)各種數(shù)據(jù)存儲(chǔ)格式的處理。 具體來(lái)說(shuō),StorageHandler作為一個(gè)wrapper class, 讓用戶指定自己定義的Exatractor(用于數(shù)據(jù)的讀入,解析,處理等) 以及Outputer(用于數(shù)據(jù)的處理和輸出等)。 用戶自定義的StorageHandler 應(yīng)該繼承?OdpsStorageHandler,實(shí)現(xiàn)getExtractorClass以及getOutputerClass?兩個(gè)接口。
通常作為wrapper class, StorageHandler的實(shí)現(xiàn)都很簡(jiǎn)單,比如這里的ImageStorageHandler?就只是通過(guò)這兩個(gè)接口指定了我們將使用ImageExtractor以及ImageOutputer:
package com.aliyun.odps.udf.example.image;public class ImageStorageHandler extends OdpsStorageHandler {@Overridepublic Class<? extends Extractor> getExtractorClass() {return ImageExtractor.class;}@Overridepublic Class<? extends Outputer> getOutputerClass() {return ImageOutputer.class;} }另外要說(shuō)明的是如果確定在使用某個(gè)StorageHandler的時(shí)候,只需要用到Extractor,或者只需要用到Outputer功能,那不需要的接口則不用實(shí)現(xiàn)。 比如如果我們只需要讀取OSS數(shù)據(jù)而不需要做INSERT操作,那getOutputerClass()的實(shí)現(xiàn)只需要扔個(gè)NotImplemented exception就可以了,不會(huì)被調(diào)用到。
3.3 ImageExtractor實(shí)現(xiàn)
因?yàn)閷?duì)于SDK中Extractor接口的介紹以及對(duì)用戶如何寫(xiě)一個(gè)自定義的Extractor,在之前介紹的OSS數(shù)據(jù)的讀取中已經(jīng)有所涉及,所以這里就不再對(duì)這方面做深入的介紹。
Extractor的工作在于讀取輸入數(shù)據(jù)并進(jìn)行用戶自定義處理,那么我們首先來(lái)看看這里由images_input_external這個(gè)外表綁定的OSS輸入LOCATION上存放的具體數(shù)據(jù)內(nèi)容:
osscmd ls oss://oss-odps-test/dev/SampleData/test_images/mixed_bmp/ 2017-01-09 14:02:01 1875.05KB Standard oss://oss-odps-test/dev/SampleData/test_images/mixed_bmp/barbara.bmp 2017-01-09 14:02:00 768.05KB Standard oss://oss-odps-test/dev/SampleData/test_images/mixed_bmp/cameraman.bmp 2017-01-09 14:02:00 1054.74KB Standard oss://oss-odps-test/dev/SampleData/test_images/mixed_bmp/fishingboat.bmp 2017-01-09 14:01:59 257.05KB Standard oss://oss-odps-test/dev/SampleData/test_images/mixed_bmp/goldhill.bmp 2017-01-09 14:01:59 468.80KB Standard oss://oss-odps-test/dev/SampleData/test_images/mixed_bmp/house.bmp 2017-01-09 14:01:59 468.80KB Standard oss://oss-odps-test/dev/SampleData/test_images/mixed_bmp/jetplane.bmp 2017-01-09 14:02:01 2.32MB Standard oss://oss-odps-test/dev/SampleData/test_images/mixed_bmp/lake.bmp 2017-01-09 14:01:59 257.05KB Standard oss://oss-odps-test/dev/SampleData/test_images/mixed_bmp/lena.bmp 2017-01-09 14:02:00 768.05KB Standard oss://oss-odps-test/dev/SampleData/test_images/mixed_bmp/livingroom.bmp 2017-01-09 14:02:00 768.05KB Standard oss://oss-odps-test/dev/SampleData/test_images/mixed_bmp/pirate.bmp 2017-01-09 14:02:00 768.05KB Standard oss://oss-odps-test/dev/SampleData/test_images/mixed_bmp/walkbridge.bmp 2017-01-09 14:02:00 1054.74KB Standard oss://oss-odps-test/dev/SampleData/test_images/mixed_bmp/woman_blonde.bmp 2017-01-09 14:02:00 768.05KB Standard oss://oss-odps-test/dev/SampleData/test_images/mixed_bmp/woman_darkhair.bmp可以看到這個(gè)LOCATION存放了一系列bmp圖像數(shù)據(jù),分辨率從 400 x 400 到 1200 x 1200不等。 具體在這個(gè)例子中用到的ImageExtractor的詳細(xì)代碼在github上可以找到, 這里只做一些簡(jiǎn)單介紹說(shuō)明該Extractor做了些什么工作:
從輸入的OSS地址上使用非結(jié)構(gòu)化框架提供的InputStream接口讀取圖像數(shù)據(jù),并在本地進(jìn)行如下操作
- 對(duì)于圖像寬度小于1024的圖片,統(tǒng)一放大到1024 x 1024; 對(duì)于圖像寬度大于1024的圖片,跳過(guò)不進(jìn)行處理
- 處理過(guò)的圖片,在內(nèi)存中轉(zhuǎn)存成由輸入?yún)?shù)指定的格式(JPG)
另外要說(shuō)明的是,目前Record作為MaxCompute結(jié)構(gòu)化數(shù)據(jù)處理的基本單元,有一些額外的限制,比如BINARY/STRING類型都有8MB大小的限制,但是在大部分場(chǎng)景下這個(gè)大小應(yīng)該是能滿足存儲(chǔ)需求的。
3.4 ImageOutputer的實(shí)現(xiàn)
接下來(lái)我們著重講一下ImageOutputer的實(shí)現(xiàn)。 首先所有的用戶輸出邏輯都必須實(shí)現(xiàn)Outputer接口,具體來(lái)說(shuō)有如下三個(gè):setup, output和close, 這和Extractor的setup, extract和close三個(gè)接口基本上是對(duì)稱的。
// Base outputer class, custom outputer shall extend from this class public abstract class Outputer{public abstract void setup(ExecutionContext ctx, OutputStreamSet outputStreamSet, DataAttributes attributes);public abstract void output(Record record) throws IOException;public abstract void close() throws IOException; }這其中setup()和close()在一個(gè)outputer中只會(huì)調(diào)用一次。 用戶可以在setup里面做初始化準(zhǔn)備工作,另外通常需要把setup()傳遞進(jìn)來(lái)的這三個(gè)參數(shù)保存成ouputerd的class variable, 方便之后output()或者close()接口中使用。 而close()這個(gè)接口用于方便用戶代碼的掃尾工作。
通常情況下大部分的數(shù)據(jù)處理發(fā)生在output(Record)這個(gè)接口內(nèi)。 MaxCompute系統(tǒng)會(huì)根據(jù)當(dāng)前outputer分配處理的Record數(shù)目不斷調(diào)用,也就是對(duì)每個(gè)輸入Record系統(tǒng)會(huì)調(diào)用一次?output(Record)。 系統(tǒng)假設(shè)在一個(gè)output(Record) 調(diào)用返回的時(shí)候,用戶代碼已經(jīng)消費(fèi)完這個(gè)Record, 因此在當(dāng)前output(Record)返回后,系統(tǒng)可能將這個(gè)Record所使用的內(nèi)存用作它用: 所以不推薦一個(gè)Record中的信息在跨多個(gè)output()函數(shù)調(diào)用被使用,如果一定有這個(gè)需求的話,用戶必須把相關(guān)信息通過(guò)class variable等方式自行另外保存。
3.4.1 ImageOutputer.setup()
setup用于初始化整個(gè)outputer, 在這個(gè)接口上提供了整個(gè)outputer操作過(guò)程中可能需要的參數(shù):
- ExecutionContext: 用于提供一些系統(tǒng)信息和接口,比如讀取resource等,在ImageOutputer這個(gè)例子中我們沒(méi)有用到這個(gè)參數(shù);
- OutputStreamSet: 用戶可以從這個(gè)類的next()接口獲取對(duì)外輸出所需要的OutputStream,具體用法我們?cè)谙旅嬖敿?xì)介紹;
- DataAttributes: 用戶通過(guò)SERDEPROPERTIES設(shè)置的key-value參數(shù)可以通過(guò)這個(gè)類獲取,參數(shù)獲取這里ImageOutputer例子中沒(méi)有用到,但是Extractor上的setup參數(shù)中也有這個(gè)類,在上面的ImageExtractor用到了改功能,可以參考一下。 同時(shí)這個(gè)類上面還提供了一些helper接口,比如方便用戶驗(yàn)證schema等。
在我們這個(gè)ImageOutputer里,setup()的實(shí)現(xiàn)比較簡(jiǎn)單:
@Overridepublic void setup(ExecutionContext ctx, OutputStreamSet outputStreamSet, DataAttributes attributes) {this.outputStreamSet = outputStreamSet;this.attributes = attributes;this.attributes.verifySchema(new OdpsType[]{ OdpsType.STRING, OdpsType.BIGINT, OdpsType.BIGINT, OdpsType.BINARY });}只是做了簡(jiǎn)單的初始化以及對(duì)schema的驗(yàn)證。
3.4.2 ImageOutputer.output(Record) 以及 OutputStreamSet的使用
在介紹具體output()接口之前,首先我們要來(lái)看看?OutputStreamSet, 這個(gè)類有兩個(gè)接口:
public interface OutputStreamSet{SinkOutputStream next(); SinkOutputStream next(String fileNamePostfix);}兩個(gè)接口都是用來(lái)獲取一個(gè)新的SinkOutputStream(一個(gè)Java?OutputStream的實(shí)現(xiàn),可以按照OutputStream使用),兩個(gè)接口唯一的區(qū)別是next()獲取的OutputStream寫(xiě)出的文件名完全由MaxCompute系統(tǒng)決定,而next(String fileNamePostfix)則允許用戶提供文件名的postfix。 提供這個(gè)postfix的意義是,在輸出文件具體地址和名字格式總體由MaxCompute系統(tǒng)決定的前提下,用戶依然可以定制一個(gè)方便理解的postfix。 比如使用next("_boat.jpg")?得到的OutputStream可能對(duì)應(yīng)如下一個(gè)輸出文文件:
oss://oss-odps-test/dev/output/images_output/.odps/20170115148446219dicjab270/M1_0_-1--1-0_boat.jpg這其中尾端的"_boat.jpg"可以幫助用戶理解輸出文件的涵義。 如果這個(gè)?OutputStream是由next()獲得的話,那對(duì)應(yīng)的輸出文件可能就是這樣的:
oss://oss-odps-test/dev/output/images_output/.odps/20170115148446219dicjab270/M1_0_-1--1-0用戶可能就需要具體讀取這個(gè)文件才能知道這個(gè)文件中具體存放了什么內(nèi)容。
前面提到output(Record)這個(gè)接口會(huì)由系統(tǒng)不斷調(diào)用,但是應(yīng)該強(qiáng)調(diào)的是,并不一定在每一個(gè)Record都需要調(diào)用一次OutputStreamSet.next()接口來(lái)獲得一個(gè)新的OutputStream。?事實(shí)上在大多數(shù)情況下,我們建議在一個(gè)Outputer里面盡可能減少調(diào)用next()的次數(shù)(最好只調(diào)用一次)。 也就是說(shuō)理想情況下,一個(gè)outpuer只應(yīng)該產(chǎn)生一個(gè)輸出文件。 比如處理TSV這種文本格式文件,假設(shè)有5000個(gè)Record對(duì)應(yīng)5000行TSV數(shù)據(jù),那么最理想的情況是應(yīng)該把這5000行數(shù)據(jù)全部寫(xiě)到一個(gè)TSV文件中。 當(dāng)然用戶可能會(huì)有各種各樣不同的切分輸出文件的需求:比如希望每個(gè)文件大小控制在一定范圍,或比如文件的邊界有顯著的意義等等。
具體到當(dāng)前這個(gè)圖像例子,從下面的ImageOutputer代碼實(shí)現(xiàn)中可以看出,這個(gè)例子中確實(shí)是處理每個(gè)Record就調(diào)用一次next()的,因?yàn)樵诋?dāng)前場(chǎng)景中,每一個(gè)輸入的Record都表示一張圖片的信息(binary bytes, 圖像名字,圖像長(zhǎng)寬),所以這里通過(guò)多次調(diào)用next()來(lái)輸出多個(gè)圖片文件。 但是我們還是需要再次強(qiáng)調(diào),調(diào)用next()的次數(shù)過(guò)多可能有一些其他弊端,比如造成碎片化小數(shù)據(jù)在OSS上的存儲(chǔ)等等。 尤其在MaxCompute這種分布式計(jì)算系統(tǒng)上,因?yàn)橄到y(tǒng)本身就會(huì)調(diào)度起多個(gè)outputer進(jìn)行并行計(jì)算處理,如果每個(gè)outpuer都輸出過(guò)多文件的話,最后產(chǎn)生的文件數(shù)目會(huì)有一個(gè)乘性效應(yīng)。 回頭來(lái)看我們這個(gè)例子中,即使在這里,多個(gè)圖像其實(shí)也可以通過(guò)一個(gè)OutputStream,按照tar/tar.gz的方式寫(xiě)到單個(gè)文件中,這些都是在實(shí)現(xiàn)具體系統(tǒng)中用戶需要根據(jù)自己的場(chǎng)景, 以及處理邏輯,輸出數(shù)據(jù)類型等信息來(lái)進(jìn)行優(yōu)化和tradeoff的。
在理解了這些之后,現(xiàn)在來(lái)具體看看ImageOutputer的實(shí)現(xiàn)output接口實(shí)現(xiàn):
@Overridepublic void output(Record record) throws IOException {String name = record.getString(0);Long width = record.getBigint(1);Long height = record.getBigint(2);ByteArrayInputStream input = new ByteArrayInputStream(record.getBytes(3));BufferedImage sobelEdgeImage = getEdgeImage(input);OutputStream outputStream = this.outputStreamSet.next(name + "_" + width + "x" + height + "." + outputFormat);ImageIO.write(sobelEdgeImage, this.outputFormat, outputStream);}可以看到這里主要就做了三件事情:
3.4.3 ImageOutputer.close()
在這個(gè)例子中,outputer.close()接口沒(méi)有包含具體的實(shí)現(xiàn)邏輯,是個(gè)no-op。
至此我們就介紹完了一個(gè)output的實(shí)現(xiàn),現(xiàn)在可以看看在運(yùn)行完這個(gè)SQL query,對(duì)應(yīng)OSS地址的數(shù)據(jù):
osscmd ls oss://oss-odps-test/dev/output/images_output/ 2017-01-15 14:36:50 215.19KB Standard oss://oss-odps-test/dev/output/images_output/.odps/20170115148446219dicjab270/M1_0_-1--1-0-barbara_1024x1024.jpg 2017-01-15 14:36:50 108.90KB Standard oss://oss-odps-test/dev/output/images_output/.odps/20170115148446219dicjab270/M1_0_-1--1-1-cameraman_1024x1024.jpg 2017-01-15 14:36:50 169.54KB Standard oss://oss-odps-test/dev/output/images_output/.odps/20170115148446219dicjab270/M1_0_-1--1-2-fishingboat_1024x1024.jpg 2017-01-15 14:36:50 214.94KB Standard oss://oss-odps-test/dev/output/images_output/.odps/20170115148446219dicjab270/M1_0_-1--1-3-goldhill_1024x1024.jpg 2017-01-15 14:36:50 71.00KB Standard oss://oss-odps-test/dev/output/images_output/.odps/20170115148446219dicjab270/M1_0_-1--1-4-house_1024x1024.jpg 2017-01-15 14:36:50 126.50KB Standard oss://oss-odps-test/dev/output/images_output/.odps/20170115148446219dicjab270/M1_0_-1--1-5-jetplane_1024x1024.jpg 2017-01-15 14:36:50 169.63KB Standard oss://oss-odps-test/dev/output/images_output/.odps/20170115148446219dicjab270/M1_0_-1--1-6-lake_1024x1024.jpg 2017-01-15 14:36:50 194.18KB Standard oss://oss-odps-test/dev/output/images_output/.odps/20170115148446219dicjab270/M1_0_-1--1-7-lena_1024x1024.jpg ...可以看到圖像數(shù)據(jù)按照期待格式寫(xiě)到了指定地址,這里我們就選一個(gè)輸入圖像(lena.bmp)以及對(duì)應(yīng)的輸出圖像(M1_0_-1--1-7-lena_1024x1024.jpg)看一下對(duì)比:
這個(gè)例子中整個(gè)圖像處理流程已經(jīng)通過(guò)如上的SQL query完成。 而從上面展示的ImageExtractor以及ImageOutputer?源代碼,我們可以看出整個(gè)過(guò)程中用戶的邏輯基本與寫(xiě)單機(jī)圖像處理程序無(wú)異,用戶的代碼只需要在Extractor上做InputStream到Record的準(zhǔn)換,而在Outputer上做反向的Record到OutputSteam的寫(xiě)出處理,其他核心的處理邏輯實(shí)現(xiàn)基本和單機(jī)算法實(shí)現(xiàn)相同,在用戶的層面,并不用去操心底層分布式系統(tǒng)的細(xì)節(jié)以及MaxCompute和OSS的交互。
3.5 數(shù)據(jù)處理步驟的靈活性
從上面這個(gè)例子中我們也可以看出,在一個(gè)完整的【OSS->MaxCompute->OSS】數(shù)據(jù)流程中,Extractor和Outputer中涉及的具體計(jì)算邏輯其實(shí)也并不一定會(huì)有一個(gè)非常明確的邊界。 Extractor和Outputer只要各自完成所需的轉(zhuǎn)換Record/Stream的轉(zhuǎn)換,具體的額外算法邏輯在兩個(gè)地方都有機(jī)會(huì)完成。 比如上面這個(gè)例子的整個(gè)流程涉及了如下圖像處理相關(guān)的運(yùn)算:
上面的例子實(shí)現(xiàn)中,把1. 和 2. 放在ImageExtractor中完成,而3.則放在ImageOutputer中完成,但并不是唯一的選擇。 我們完全可以把所有3個(gè)步驟都放在ImageExtractor中完成,讓ImageOutputer只做Record到寫(xiě)出最后圖像的操作;也可以在ImageExtractor中只做讀取原始binary到Recrod, 而把所有3個(gè)圖像處理步驟都放在ImageOutputer中進(jìn)行,等等。 具體進(jìn)行怎樣的選擇,用戶可以完全根據(jù)需要自己實(shí)現(xiàn)。
另外一個(gè)系統(tǒng)設(shè)計(jì)的點(diǎn)是如果對(duì)于一個(gè)數(shù)據(jù)需要做重復(fù)的運(yùn)算,那可以考慮將數(shù)據(jù)從OSS中通過(guò)Extractor讀出進(jìn)MaxCompute,然后存儲(chǔ)成MaxCompute的內(nèi)置表格再進(jìn)行(多次)的計(jì)算。 這個(gè)對(duì)于MaxCompute和OSS沒(méi)有進(jìn)行混布,不在一個(gè)物理網(wǎng)絡(luò)上的場(chǎng)景尤其有意義: MaxCompute從內(nèi)置表中讀取數(shù)據(jù)無(wú)疑要比從外部OSS存儲(chǔ)服務(wù)中讀出數(shù)據(jù)要有效得多。 在上面3.1.3子章節(jié)中的圖像處理例子,這個(gè)INSER OVERWITE操作:
INSERT OVERWRITE TABLE images_output_external SELECT * FROM images_input_external WHERE width = 1024;就可以改寫(xiě)成兩個(gè)分開(kāi)的語(yǔ)句:
INSERT OVERWRITE TABLE images_internal SELECT * FROM images_input_external WHERE width = 1024;INSERT OVERWRITE TABLE images_output_external SELECT * FROM image_internal;通過(guò)把數(shù)據(jù)寫(xiě)到一個(gè)內(nèi)部images_internal表中,后面如果有多次讀取數(shù)據(jù)的需求的話,就可以不再去訪問(wèn)外部OSS了。 這里也可以看到MaxCompute非結(jié)構(gòu)化框架以及SQL語(yǔ)法本身提供了非常高的靈活性和可擴(kuò)展性,用戶可以根據(jù)實(shí)際計(jì)算的不同模式/場(chǎng)景/需求,來(lái)在上面完成各種各樣的數(shù)據(jù)計(jì)算工作流。
5. 結(jié)語(yǔ)
非結(jié)構(gòu)化數(shù)據(jù)處理框架隨著MaxCompute 2.0一起推出,意在豐富MaxCompute平臺(tái)的數(shù)據(jù)處理生態(tài),來(lái)打通阿里云核心計(jì)算平臺(tái)與阿里云各個(gè)重要存儲(chǔ)服務(wù)之間的數(shù)據(jù)鏈路。 在之前介紹過(guò)的讀取OSS以及處理TableStore數(shù)據(jù)的整體方案后,本文側(cè)重介紹數(shù)據(jù)往OSS的輸出方案,并依托一個(gè)圖像處理的處理實(shí)例,展示了【OSS->MaxCompute->OSS】整個(gè)數(shù)據(jù)鏈路的實(shí)現(xiàn)。 在這些新功能的基礎(chǔ)上,我們希望實(shí)現(xiàn)整個(gè)阿里云計(jì)算與數(shù)據(jù)的生態(tài)融合: 在不同的項(xiàng)目上,我們已經(jīng)看到了在MaxCompute上處理OSS上的海量視頻,圖像等非結(jié)構(gòu)化數(shù)據(jù)的巨大潛力。 今后隨著這個(gè)生態(tài)的豐富,我們期望OSS數(shù)據(jù),TableStore數(shù)據(jù)以及MaxCompute內(nèi)部存儲(chǔ)的數(shù)據(jù),都能在MaxCompute的核心計(jì)算引擎上進(jìn)行融合,從而產(chǎn)生更大的價(jià)值。
原文鏈接
轉(zhuǎn)載于:https://my.oschina.net/yunqi/blog/1787410
總結(jié)
以上是生活随笔為你收集整理的MaxCompute与OSS非结构化数据读写互通(及图像处理实例)的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: 《构建之法》阅读第四章、第十七章收获
- 下一篇: vue中的provide/inject的