spark 持久化 mysql_Spark读取数据库(Mysql)的四种方式讲解
目前
一、不指定查詢條件
這個方式鏈接MySql的函數(shù)原型是: def jdbc(url: String, table: String, properties: Properties): DataFrame
我們只需要提供Driver的url,需要查詢的表名,以及連接表相關屬性properties。下面是具體例子: val url = "jdbc:mysql://www.iteblog.com:3306/iteblog?user=iteblog&password=iteblog"
val prop = new Properties()
val df = sqlContext.read.jdbc(url, "iteblog", prop )
println(df.count())
println(df.rdd.partitions.size)
我們運行上面的程序,可以看到df.rdd.partitions.size輸出結果是1,這個結果的含義是iteblog表的所有數(shù)據(jù)都是由RDD的一個分區(qū)處理的,所以說,如果你這個表很大,很可能會出現(xiàn)OOM WARN TaskSetManager: Lost task 0.0 in stage 1.0 (TID 14, spark047219):
java.lang.OutOfMemoryError: GC overhead limit exceeded at com.mysql.jdbc.MysqlIO.reuseAndReadPacket(MysqlIO.java:3380)
這種方式在數(shù)據(jù)量大的時候不建議使用。
如果想及時了解iteblog_hadoop
二、指定數(shù)據(jù)庫字段的范圍
這種方式就是通過指定數(shù)據(jù)庫中某個字段的范圍,但是遺憾的是,這個字段必須是數(shù)字,來看看這個函數(shù)的函數(shù)原型: def jdbc(
url: String,
table: String,
columnName: String,
lowerBound: Long,
upperBound: Long,
numPartitions: Int,
connectionProperties: Properties): DataFrame
前兩個字段的含義和方法一類似。columnName就是需要分區(qū)的字段,這個字段在數(shù)據(jù)庫中的類型必須是數(shù)字;lowerBound就是分區(qū)的下界;upperBound就是分區(qū)的上界;numPartitions是分區(qū)的個數(shù)。同樣,我們也來看看如何使用: val lowerBound = 1
val upperBound = 100000
val numPartitions = 5
val url = "jdbc:mysql://www.iteblog.com:3306/iteblog?user=iteblog&password=iteblog"
val prop = new Properties()
val df = sqlContext.read.jdbc(url, "iteblog", "id", lowerBound, upperBound, numPartitions, prop)
這個方法可以將iteblog表的數(shù)據(jù)分布到RDD的幾個分區(qū)中,分區(qū)的數(shù)量由numPartitions參數(shù)決定,在理想情況下,每個分區(qū)處理相同數(shù)量的數(shù)據(jù),我們在使用的時候不建議將這個值設置的比較大,因為這可能導致數(shù)據(jù)庫掛掉!但是根據(jù)前面介紹,這個函數(shù)的缺點就是只能使用整形數(shù)據(jù)字段作為分區(qū)關鍵字。
這個函數(shù)在極端情況下,也就是設置將numPartitions設置為1,其含義和第一種方式一致。
三、根據(jù)任意字段進行分區(qū)
基于前面兩種方法的限制,Spark還提供了根據(jù)任意字段進行分區(qū)的方法,函數(shù)原型如下: def jdbc(
url: String,
table: String,
predicates: Array[String],
connectionProperties: Properties): DataFrame
這個函數(shù)相比第一種方式多了predicates參數(shù),我們可以通過這個參數(shù)設置分區(qū)的依據(jù),來看看例子: val predicates = Array[String]("reportDate <= '2014-12-31'",
"reportDate > '2014-12-31' and reportDate <= '2015-12-31'")
val url = "jdbc:mysql://www.iteblog.com:3306/iteblog?user=iteblog&password=iteblog"
val prop = new Properties()
val df = sqlContext.read.jdbc(url, "iteblog", predicates, prop)
最后rdd的分區(qū)數(shù)量就等于predicates.length。
四、通過load獲取
Spark還提供通過load的方式來讀取數(shù)據(jù)。 sqlContext.read.format("jdbc").options(
Map("url" -> "jdbc:mysql://www.iteblog.com:3306/iteblog?user=iteblog&password=iteblog",
"dbtable" -> "iteblog")).load()
options函數(shù)支持url、driver、dbtable、partitionColumn、lowerBound、upperBound以及numPartitions選項,細心的同學肯定發(fā)現(xiàn)這個和方法二的參數(shù)一致。是的,其內(nèi)部實現(xiàn)原理部分和方法二大體一致。同時load方法還支持json、orc等數(shù)據(jù)源的讀取。
總結
以上是生活随笔為你收集整理的spark 持久化 mysql_Spark读取数据库(Mysql)的四种方式讲解的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: pythondjango项目集成_[Py
- 下一篇: weex 富文本_Weex richte