MaxCompute Mars 完全指南
Mars 簡(jiǎn)介
Mars 能利用并行和分布式技術(shù),加速 Python 數(shù)據(jù)科學(xué)棧,包括?numpy、pandas?和?scikit-learn。同時(shí),也能輕松與 TensorFlow、PyTorch 和 XGBoost 集成。
Mars tensor?的接口和 numpy 保持一致,但支持大規(guī)模高維數(shù)組。樣例代碼如下。
import mars.tensor as mta = mt.random.rand(10000, 50) b = mt.random.rand(50, 5000) a.dot(b).execute()Mars DataFrame?接口和 pandas 保持一致,但可以支撐大規(guī)模數(shù)據(jù)處理和分析。樣例代碼如下。
import mars.dataframe as md ratings = md.read_csv('Downloads/ml-20m/ratings.csv') movies = md.read_csv('Downloads/ml-20m/movies.csv') movie_rating = ratings.groupby('movieId', as_index=False).agg({'rating': 'mean'}) result = movie_rating.merge(movies[['movieId', 'title']], on='movieId') result.sort_values(by='rating', ascending=False).execute()Mars learn?保持和 scikit-learn 接口一致。樣例代碼如下。
import mars.dataframe as md from mars.learn.neighbors import NearestNeighborsdf = md.read_csv('data.csv') nn = NearestNeighbors(n_neighbors=10) nn.fit(df) neighbors = nn.kneighbors(df).fetch()Mars learn 可以很方便地與?TensorFlow、PyTorch 和?XGBoost?集成,點(diǎn)擊鏈接查看文檔。
在 MaxCompute 上使用 Mars,我們提供了簡(jiǎn)單易用的接口來(lái)拉起 Mars 集群,用戶不需要關(guān)心安裝和維護(hù)集群。同時(shí),通過(guò) MaxCompute 拉起的 Mars,也支持直接讀寫 MaxCompute 表。
申請(qǐng)?jiān)囉?/h2>
目前我們采用申請(qǐng)開通的方式,公共云用戶請(qǐng)?zhí)峁紊暾?qǐng)。
環(huán)境準(zhǔn)備
要在 MaxCompute 上運(yùn)行 Mars,需要有相應(yīng)的運(yùn)行環(huán)境。這可以分為如下幾種情況。
我們分別展開。
開箱即用的環(huán)境
開箱即用的環(huán)境,如 dataworks 的?pyodps3 節(jié)點(diǎn),已經(jīng)包含了 PyODPS 和 Mars。
在新建的 pyodps3 節(jié)點(diǎn)里運(yùn)行如下命令檢查版本,確保滿足要求。
from odps import __version__ as odps_version from mars import __version__ as mars_versionprint(odps_version) print(mars_version)輸出的第一個(gè)為 PyODPS 版本,第二個(gè)為 Mars 版本。要求?PyODPS 至少是 0.9.0?。
其他環(huán)境
這個(gè)環(huán)境就要求通過(guò) pip 安裝 PyODPS 和 Mars。Python 版本推薦使用 3.7 版本,至少需要是 3.5 版本。
通過(guò)如下命令安裝:
pip install -U pip # 可選,確保 pip 版本足夠新 pip install pyarrow==0.12.1 # 目前 pyarrow 版本固定到 0.12.1 pip install pyodps>0.9.0 # pyodps 需要至少 0.9.0 pip install pymars>=0.4.0rc1 # mars 版本需要至少是 0.4.0rc1準(zhǔn)備 ODPS 入口
ODPS 入口是 MaxCompute 所有操作的基礎(chǔ):
- 對(duì)于開箱即用的環(huán)境,如 dataworks,我們會(huì)自動(dòng)創(chuàng)建?o?即 ODPS 入口實(shí)例,因此可以不需要?jiǎng)?chuàng)建。
- 對(duì)于其他環(huán)境,需要通過(guò)?access_id、access_key?等參數(shù)創(chuàng)建,詳細(xì)參考?文檔。
基本概念
- MaxCompute 任務(wù)實(shí)例:MaxCompute 上任務(wù)以 instance 概念存在。Mars 集群也是通過(guò)一個(gè) MaxCompute Instance 拉起。
- Logview 地址:每個(gè) MaxCompute instance 包含一個(gè) logview 地址來(lái)查看任務(wù)狀態(tài)。拉起 Mars 集群的 instance 也不例外。
- Mars UI: Mars 集群拉起后,會(huì)包含一個(gè) Web UI,通過(guò)這個(gè) Web UI,可以查看 Mars 集群、任務(wù)狀態(tài),可以提交任務(wù)。當(dāng)集群拉起后,一般來(lái)說(shuō)就不需要和 MaxCompute 任務(wù)實(shí)例交互了。
- Mars session:Mars session 和具體的執(zhí)行有關(guān),一般情況下用戶不需要關(guān)心 session,因?yàn)闀?huì)包含默認(rèn)的 session。通過(guò)?o.create_mars_cluster?創(chuàng)建了 Mars 集群后,會(huì)創(chuàng)建默認(rèn)連接到 Mars 集群的 session。
- Jupyter Notebook:Jupyter Notebook 是一個(gè)基于網(wǎng)頁(yè)的用于交互式計(jì)算的應(yīng)用程序,可以用來(lái)開發(fā)、文檔編寫、運(yùn)行代碼和展示結(jié)果。
基礎(chǔ)用法
創(chuàng)建 Mars 集群
準(zhǔn)備好環(huán)境后,接著我們就可以拉起 Mars 集群了。
有了?o?這個(gè)對(duì)象后,拉起 Mars 集群非常簡(jiǎn)單,只需要運(yùn)行如下代碼。
from odps import options options.verbose = True # 在 dataworks pyodps3 里已經(jīng)設(shè)置,所以不需要前兩行代碼 client = o.create_mars_cluster(5, 4, 16, min_worker_num=3)這個(gè)例子里指定了 worker 數(shù)量為 5 的集群,每個(gè) worker 是4核、16G 內(nèi)存的配置,min_worker_num?指當(dāng) worker 已經(jīng)起了3個(gè)后,就可以返回?client?對(duì)象了,而不用等全部 5 個(gè) worker 都啟動(dòng)再返回。Mars 集群的創(chuàng)建過(guò)程可能比較慢,需要耐心等待。
注意:申請(qǐng)的單個(gè) worker 內(nèi)存需大于 1G,CPU 核數(shù)和內(nèi)存的最佳比例為 1:4,例如單 worker 4核、16G。同時(shí),新建的 worker 個(gè)數(shù)也不要超過(guò) 30 個(gè),否則會(huì)對(duì)鏡像服務(wù)器造成壓力,如果需要使用超過(guò) 30 個(gè) worker,請(qǐng)工單申請(qǐng)。
這個(gè)過(guò)程中會(huì)打印 MaxCompute instance 的 logview、 Mars UI 以及 Notebook 地址。Mars UI 可以用來(lái)連接 Mars 集群,亦可以用來(lái)查看集群、任務(wù)狀態(tài)。
Mars 集群的創(chuàng)建就是一個(gè) MaxCompute 任務(wù),因此也有 instance id、logview 等 MaxCompute 通用的概念。
提交作業(yè)
Mars 集群創(chuàng)建的時(shí)候會(huì)設(shè)置默認(rèn) session,通過(guò)?.execute()?執(zhí)行時(shí)任務(wù)會(huì)被自動(dòng)提交到集群。
import mars.dataframe as md import mars.tensor as mtmd.DataFrame(mt.random.rand(10, 3)).execute() # execute 自動(dòng)提交任務(wù)到創(chuàng)建的集群停止并釋放集群
目前一個(gè) Mars 集群超過(guò)3天就會(huì)被自動(dòng)釋放。當(dāng) Mars 集群不再需要使用時(shí),也可以通過(guò)調(diào)用?client.stop_server()?手動(dòng)釋放:
client.stop_server()MaxCompute 表讀寫支持
Mars 可以直讀和直寫 MaxCompute 表。
讀表
通過(guò)?o.to_mars_dataframe?來(lái)讀取 MaxCompute 表,并返回?Mars DataFrame。
In [1]: df = o.to_mars_dataframe('test_mars') In [2]: df.head(6).execute() Out[2]: col1 col2 0 0 0 1 0 1 2 0 2 3 1 0 4 1 1 5 1 2寫表
通過(guò)?o.persist_mars_dataframe(df, 'table_name')?將 Mars DataFrame 保存成 MaxCompute 表。
In [3]: df = o.to_mars_dataframe('test_mars') In [4]: df2 = df + 1 In [5]: o.persist_mars_dataframe(df2, 'test_mars_persist') # 保存 Mars DataFrame In [6]: o.get_table('test_mars_persist').to_df().head(6) # 通過(guò) PyODPS DataFrame 查看數(shù)據(jù)col1 col2 0 1 1 1 1 2 2 1 3 3 2 1 4 2 2 5 2 3使用 Mars 集群自帶的 Jupyter Notebook
創(chuàng)建 Mars 集群會(huì)自動(dòng)創(chuàng)建一個(gè) Jupyter Notebook 以編寫代碼。
新建一個(gè) Notebook 會(huì)自動(dòng)設(shè)置 session,提交任務(wù)到集群。因此在這個(gè) notebook 內(nèi)也不需要顯示創(chuàng)建?session。
import mars.dataframe as mdmd.DataFrame(mt.random.rand(10, 3)).sum().execute() # 在 notebook 里運(yùn)行,execute 自動(dòng)提交到當(dāng)前集群有一點(diǎn)要注意:這個(gè) notebook 不會(huì)保存你的 notebook 文件,所以要記得自行保存。
用戶也可以使用自己的 notebook 連接到集群,此時(shí)參考 使用已經(jīng)創(chuàng)建的 Mars 集群 章節(jié)。
其他用法
使用已經(jīng)創(chuàng)建的 Mars 集群
首先,我們可以通過(guò) instance id 重建 Mars 集群的 client。
client = o.create_mars_cluster(instance_id=**instance-id**)如果只是想使用 Mars,可以使用 Mars session 來(lái)連接。給定 Mars UI 的地址。則:
from mars.session import new_session new_session('**Mars UI address**').as_default() # 設(shè)置為默認(rèn) session獲取 Mars UI 地址
Mars 集群創(chuàng)建的時(shí)候指定了?options.verbose=True?會(huì)打印 Mars UI 地址。
也可以通過(guò)?client.endpoint?來(lái)獲取 Mars UI。
print(client.endpoint)獲取 Logview 地址
創(chuàng)建集群的時(shí)候指定了?options.verbose=True?會(huì)自動(dòng)打印 logview。
也可以通過(guò)?client.get_logview_address()?獲取 logview 地址。
print(client.get_logview_address())獲取 Jupyter Notebook 地址
Mars 集群創(chuàng)建的時(shí)候指定了?options.verbose=True?會(huì)打印 Jupyter Notebook 地址。
也可以通過(guò)?client.get_notebook_endpoint()?獲取 Jupyter Notebook 地址。
print(client.get_notebook_endpoint())Mars 和 PyODPS DataFrame 對(duì)比
有同學(xué)會(huì)問(wèn),Mars 和 PyODPS DataFrame 有什么區(qū)別呢?
API
Mars DataFrame 的接口完全兼容 pandas。除了 DataFrame,Mars tensor 兼容 numpy,Mars learn 兼容 scikit-learn。
而 PyODPS 只有 DataFrame 接口,和 pandas 的接口存在著很多不同。
索引
Mars DataFrame 有 pandas 索引的概念。
In [1]: import mars.dataframe as mdIn [5]: import mars.tensor as mtIn [7]: df = md.DataFrame(mt.random.rand(10, 3), index=md.date_range('2020-5-1', periods=10)) In [9]: df.loc['2020-5'].execute() Out[9]: 0 1 2 2020-05-01 0.061912 0.507101 0.372242 2020-05-02 0.833663 0.818519 0.943887 2020-05-03 0.579214 0.573056 0.319786 2020-05-04 0.476143 0.245831 0.434038 2020-05-05 0.444866 0.465851 0.445263 2020-05-06 0.654311 0.972639 0.443985 2020-05-07 0.276574 0.096421 0.264799 2020-05-08 0.106188 0.921479 0.202131 2020-05-09 0.281736 0.465473 0.003585 2020-05-10 0.400000 0.451150 0.956905PyODPS 里沒有索引的概念,因此跟索引有關(guān)的操作全部都不支持。
數(shù)據(jù)順序
Mars DataFrame 一旦創(chuàng)建,保證順序,因此一些時(shí)序操作比如?shift,以及向前向后填空值如ffill、bfill?,只有 Mars DataFrame 支持。
In [3]: df = md.DataFrame([[1, None], [None, 1]]) In [4]: df.execute() Out[4]: 0 1 0 1.0 NaN 1 NaN 1.0In [5]: df.ffill().execute() # 空值用上一行的值 Out[5]: 0 1 0 1.0 NaN 1 1.0 1.0PyODPS 由于背后使用 MaxCompute 計(jì)算和存儲(chǔ)數(shù)據(jù),而 MaxCompute 并不保證數(shù)據(jù)順序,所以這些操作在 MaxCompute 上都無(wú)法支持。
執(zhí)行層
PyODPS 本身只是個(gè)客戶端,不包含任何服務(wù)端部分。PyODPS DataFrame 在真正執(zhí)行時(shí),會(huì)將計(jì)算編譯到 MaxCompute SQL 執(zhí)行。因此,PyODPS DataFrame 支持的操作,取決于 MaxCompute SQL 本身。此外,每一次調(diào)用?execute?方法時(shí),會(huì)提交一次 MaxCompute 作業(yè),需要在集群內(nèi)調(diào)度。
Mars 本身包含客戶端和分布式執(zhí)行層。通過(guò)調(diào)用?o.create_mars_cluster?,會(huì)在 MaxCompute 內(nèi)部拉起 Mars 集群,一旦 Mars 集群拉起,后續(xù)的交互就直接和 Mars 集群進(jìn)行。計(jì)算會(huì)直接提交到這個(gè)集群,調(diào)度開銷極小。在數(shù)據(jù)規(guī)模不是特別大的時(shí)候,Mars 應(yīng)更有優(yōu)勢(shì)。
使用場(chǎng)景指引
有同學(xué)會(huì)關(guān)心,何時(shí)使用 Mars,何時(shí)使用 PyODPS DataFrame?我們分別闡述。
適合 Mars 的使用場(chǎng)景。
- 如果你經(jīng)常使用 PyODPS DataFrame 的?to_pandas()?方法,將 PyODPS DataFrame 轉(zhuǎn)成 pandas DataFrame,推薦使用 Mars DataFrame。
- Mars DataFrame 目標(biāo)是完全兼容 pandas 的接口以及行為,如果你熟悉 pandas 的接口,而不愿意學(xué)習(xí) PyODPS DataFrame 的接口,那么使用 Mars。
-
Mars DataFrame 因?yàn)榧嫒?pandas 的行為,因此如下的特性如果你需要用到,那么使用 Mars。
- Mars DataFrame 包含行和列索引,如果需要使用索引,使用 Mars。
- Mars DataFrame 創(chuàng)建后會(huì)保證順序,通過(guò) iloc 等接口可以獲取某個(gè)偏移的數(shù)據(jù)。如?df.iloc[10]?可以獲取第10行數(shù)據(jù)。此外,如?df.shift()?、df.ffill()?等需要有保證順序特性的接口也在 Mars DataFrame 里得到了實(shí)現(xiàn),有這方面的需求可以使用 Mars。
- Mars 還包含?Mars tensor?來(lái)并行和分布式化 Numpy,以及?Mars learn?來(lái)并行和分布式化 scikit-learn、以及支持在 Mars 集群里分布式運(yùn)行 TensorFlow、PyTorch 和 XGBoost。有這方面的需求使用 Mars。
- Mars 集群一旦創(chuàng)建,后續(xù)不再需要通過(guò) MaxCompute 調(diào)度,任務(wù)可以直接提交到 Mars 集群執(zhí)行;此外,Mars 對(duì)于中小型任務(wù)(數(shù)據(jù)量 T 級(jí)別以下),會(huì)有較好的性能。這些情況可以使用 Mars。
適合 PyODPS DataFrame 的使用場(chǎng)景
- PyODPS DataFrame 會(huì)把 DataFrame 任務(wù)編譯成 MaxCompute SQL 執(zhí)行,如果希望依托 MaxCompute 調(diào)度任務(wù),使用 PyODPS DataFrame。
- PyODPS DataFrame 會(huì)編譯任務(wù)到 MaxCompute 執(zhí)行,由于 MaxCompute 相當(dāng)穩(wěn)定,而 Mars 相對(duì)比較新,如果對(duì)穩(wěn)定性有很高要求,那么使用 PyODPS DataFrame。
- 數(shù)據(jù)量特別大(T 級(jí)別以上),使用 PyODPS DataFrame。
Mars 參考文檔
- Mars 開源地址:https://github.com/mars-project/mars
- Mars 文檔:https://docs.pymars.org/zh_CN/latest/
- Mars 團(tuán)隊(duì)專欄:https://zhuanlan.zhihu.com/mars-project
技術(shù)支持
技術(shù)支持請(qǐng)加 PyODPS 釘釘群:11701793
FAQ
Q:一個(gè)用戶創(chuàng)建的 Mars 集群,別人能不能用。
A:可以,參考 使用已經(jīng)創(chuàng)建的 Mars 集群 章節(jié)。
原文鏈接
本文為云棲社區(qū)原創(chuàng)內(nèi)容,未經(jīng)允許不得轉(zhuǎn)載。
總結(jié)
以上是生活随笔為你收集整理的MaxCompute Mars 完全指南的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: 性能为MySQL 10倍!阿里云推出云原
- 下一篇: 使用Blink CEP实现差值聚合计算