jdbc不能识别别名_Spark基础:读写JDBC
Spark SQL支持通過JDBC直接讀取數(shù)據(jù)庫中的數(shù)據(jù),這個(gè)特性是基于JdbcRDD實(shí)現(xiàn)。返回值作為DataFrame返回,這樣可以直接使用Spark SQL并跟其他的數(shù)據(jù)源進(jìn)行join操作。JDBC數(shù)據(jù)源可以很簡(jiǎn)單的通過Java或者Python,而不需要提供ClassTag。注意這與Spark SQL JDBC server不同,后者是基于Spark SQL執(zhí)行查詢。
要保證能使用需要把對(duì)應(yīng)的jdbc驅(qū)動(dòng)放到spark的classpath中。比如,想要連接postgres可以在啟動(dòng)命令中添加jars:
bin/spark-shell --driver-class-path postgresql-9.4.1207.jar --jars postgresql-9.4.1207.jar遠(yuǎn)程數(shù)據(jù)庫的表可以加載成DataFrame或者注冊(cè)成Spark SQL的臨時(shí)表,用戶可以在數(shù)據(jù)源選項(xiàng)中配置JDBC相關(guān)的連接參數(shù)。user和password一般是必須提供的參數(shù),另外一些參數(shù)可以參考下面的列表:
url
JDBC連接url,比如jdbc:postgresql://localhost/test?user=fred&password=secret
dbtable
需要讀取或者寫入的JDBC表,注意里面的內(nèi)容將會(huì)作為from后面的部分,比如 select * from 。注意不能同時(shí)配置dbtable和query。
query
query用于指定from后面的子查詢,拼接成的sql如下:SELECT FROM () spark_gen_alias 。注意dbtable和query不能同時(shí)使用;不允許同時(shí)使用partitionColumn和query。
注意:這里的dbtable和query其實(shí)沒有太大的區(qū)別,只是query會(huì)默認(rèn)套一層別名而已。
driver
jdbc驅(qū)動(dòng)driver
partitionColumn, lowerBound, upperBound
指定時(shí)這三項(xiàng)需要同時(shí)存在,描述了worker如何并行讀取數(shù)據(jù)庫。其中partitionColumn必須是數(shù)字、date、timestamp,lowerBound和upperBound只是決定了分區(qū)的步長(zhǎng),而不會(huì)過濾數(shù)據(jù),因此表中所有的數(shù)據(jù)都會(huì)被分區(qū)返回。該參數(shù)僅用于讀。
numPartitions
讀寫時(shí)的最大分區(qū)數(shù)。這也決定了連接JDBC的最大連接數(shù),如果并行度超過該配置,將會(huì)使用coalesce(partition)來降低并行度。
queryTimeout
driver執(zhí)行statement的等待時(shí)間,0意味著沒有限制。寫入的時(shí)候這個(gè)選項(xiàng)依賴于底層是如何實(shí)現(xiàn)setQueryTimeout的,比如h2 driver會(huì)檢查每個(gè)query。默認(rèn)是0
fetchSize
fetch的大小,決定了每一個(gè)fetch,拉取多少數(shù)據(jù)量。這個(gè)參數(shù)幫助針對(duì)默認(rèn)比較小的驅(qū)動(dòng)進(jìn)行調(diào)優(yōu),比如oracle默認(rèn)是10行。僅用于讀操作。
batchSize
batch大小,決定插入時(shí)的并發(fā)大小,默認(rèn)1000。
isolationLvel
事務(wù)隔離的等級(jí),作用于當(dāng)前連接。可以配置成NONE, READ_COMMITTED, READ_UNCOMMITTED, REPEATABLE_READ, SERIALIZABLE, 依賴于底層jdbc提供的事務(wù)隔離,默認(rèn)是READ_UNCOMMITTED。這個(gè)選項(xiàng)僅用于寫操作。
sessionInitStatment
每個(gè)數(shù)據(jù)庫session創(chuàng)建前執(zhí)行的操作,用于初始化。如定義一些觸發(fā)器操作。如 BEGIN execute immediate 'alter session set "_serial_direct_read"=true'; END;
truncate
寫操作選項(xiàng),當(dāng)使用SaveMode.Overwrite時(shí),該選項(xiàng)用于是否直接刪除并重建表。當(dāng)表結(jié)構(gòu)發(fā)現(xiàn)變化的時(shí)候會(huì)失效。默認(rèn)是false。
cascadeTruncate
寫操作選項(xiàng),是否開啟級(jí)聯(lián)刪除。
createTableOptions
寫操作選項(xiàng),一般用于配置特殊的分區(qū)或者數(shù)據(jù)庫配置,比如 CREATE TABLE t (name string) ENGINE=InnoDB
createTableColumnTypes
配置數(shù)據(jù)庫字段的類型,比如 name CHAR(64), comments VARCHAR(1024),僅支持spark sql中支持的數(shù)據(jù)類型。
customSchema
自定義讀取的schema信息,比如 id DECIMAL(38, 0), name STRING 。可以配置部分字段,其他的使用默認(rèn)的類型映射,比如 id DECIMAL(38, 0)。僅用于讀操作。
pushDownPredicate
該選項(xiàng)用于開啟或禁用jdbc數(shù)據(jù)源的謂詞下推。默認(rèn)是true。如果配置為false,那么所有的filter操作都會(huì)由spark來完成。當(dāng)過濾操作用spark更快時(shí),一般才會(huì)關(guān)閉下推功能。
// 加載jdbc val jdbcDF = spark.read.format("jdbc").option("url", "jdbc:postgresql:dbserver").option("dbtable", "schema.tablename").option("user", "username").option("password", "password").load()// 使用propeties val connectionProperties = new Properties() connectionProperties.put("user", "username") connectionProperties.put("password", "password")val jdbcDF2 = spark.read.jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties)// 指定自定義的schema信息 connectionProperties.put("customSchema", "id DECIMAL(38, 0), name STRING") val jdbcDF3 = spark.read.jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties)// 保存jdbc jdbcDF.write.format("jdbc").option("url", "jdbc:postgresql:dbserver").option("dbtable", "schema.tablename").option("user", "username").option("password", "password").save()jdbcDF2.write.jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties)// 指定自定義schema映射 jdbcDF.write.option("createTableColumnTypes", "name CHAR(64), comments VARCHAR(1024)").jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties)總結(jié)
以上是生活随笔為你收集整理的jdbc不能识别别名_Spark基础:读写JDBC的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: matlab 数据降维和重构_核主成分分
- 下一篇: verilog for循环_HDLBit