Spark与Mysql(JdbcRDD)整合开发
| JdbcRDD[T: ClassTag]( ????sc: SparkContext, ????getConnection: () => Connection, ????sql: String, ????lowerBound: Long, ????upperBound: Long, ????numPartitions: Int, ????mapRow: (ResultSet) => T = JdbcRDD.resultSetToObjectArray _) | 
這個類帶了很多參數,關于這個函數的各個參數的含義,我覺得直接看英文就可以很好的理解,如下:
@param getConnection a function that returns an open Connection.
 The RDD takes care of closing the connection.
 @param sql the text of the query.
 The query must contain two ? placeholders for parameters used to partition the results.
 E.g. "select title, author from books where ? < = id and id <= ?"
 @param lowerBound the minimum value of the first placeholder
 @param upperBound the maximum value of the second placeholder
 The lower and upper bounds are inclusive.
 @param numPartitions the number of partitions.
 Given a lowerBound of 1, an upperBound of 20, and a numPartitions of 2,
 the query would be executed twice, once with (1, 10) and once with (11, 20)
 @param mapRow a function from a ResultSet to a single row of the desired result type(s).
 This should only call getInt, getString, etc; the RDD takes care of calling next.
 The default maps a ResultSet to an array of Object.
1、getConnection 返回一個已經打開的結構化數據庫連接,JdbcRDD會自動維護關閉。
2、sql 是查詢語句,此查詢語句必須包含兩處占位符?來作為分割數據庫ResulSet的參數,例如:"select title, author from books where ? < = id and id <= ?"
3、lowerBound, upperBound, numPartitions 分別為第一、第二占位符,partition的個數。例如,給出lowebound 1,upperbound 20, numpartitions 2,則查詢分別為(1, 10)與(11, 20)
4、mapRow 是轉換函數,將返回的ResultSet轉成RDD需用的單行數據,此處可以選擇Array或其他,也可以是自定義的case class。默認的是將ResultSet 轉換成一個Object數組。
下面我們說明如何使用該類。
|   package scala ? import java.sql.DriverManager ? import org.apache.spark.SparkContext import org.apache.spark.rdd.JdbcRDD ?  object SparkToJDBC { ? ??def main(args: Array[String]) { ????val sc = new SparkContext("local", "mysql") ????val rdd = new JdbcRDD( ??????sc, ??????() => { ????????Class.forName("com.mysql.jdbc.Driver").newInstance() ????????DriverManager.getConnection("jdbc:mysql://localhost:3306/db", "root", "123456") ??????}, ??????"SELECT content FROM mysqltest WHERE ID >= ? AND ID <= ?", ??????1, 100, 3, ??????r => r.getString(1)).cache() ? ????print(rdd.filter(_.contains("success")).count()) ????sc.stop() ??} }  | 
代碼比較簡短,主要是讀mysqltest 表中的數據,并統計ID >=1 && ID < = 100 && content.contains("success")的記錄條數。我們從代碼中可以看出JdbcRDD的sql參數要帶有兩個?的占位符,而這兩個占位符是給參數lowerBound和參數upperBound定義where語句的上下邊界的。從JdbcRDD類的構造函數可以知道,參數lowerBound和參數upperBound都只能是Long類型的,并不支持其他類型的比較,這個使得JdbcRDD使用場景比較有限。而且在使用過程中sql參數必須有類似 ID >= ? AND ID <= ?這樣的where語句,如果你寫成下面的形式:
| val rdd = new JdbcRDD( ??sc, ??() => { ????Class.forName("com.mysql.jdbc.Driver").newInstance() ????DriverManager.getConnection("jdbc:mysql://localhost:3306/db", "root", "123456") ??}, ??"SELECT content FROM mysqltest", ??1, 100, 3, ??r => r.getString(1)).cache() | 
那不好意思,運行的時候會出現以下的錯誤:
| 2014-09-10 15:47:45,621 (Executor task launch worker-0) [ERROR - org.apache.spark.Logging$class.logError(Logging.scala:95)] Exception in task ID 1 java.sql.SQLException: Parameter index out of range (1 > number of parameters, which is 0). ??at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:1074) ??at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:988) ??at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:974) ??at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:919) ??at com.mysql.jdbc.PreparedStatement.checkBounds(PreparedStatement.java:3813) ??at com.mysql.jdbc.PreparedStatement.setInternal(PreparedStatement.java:3795) ??at com.mysql.jdbc.PreparedStatement.setInternal(PreparedStatement.java:3840) ??at com.mysql.jdbc.PreparedStatement.setLong(PreparedStatement.java:3857) ??at org.apache.spark.rdd.JdbcRDD$$anon$1.<init>(JdbcRDD.scala:84) ??at org.apache.spark.rdd.JdbcRDD.compute(JdbcRDD.scala:70) ??at org.apache.spark.rdd.JdbcRDD.compute(JdbcRDD.scala:50) ??at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) ??at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:77) ??at org.apache.spark.rdd.RDD.iterator(RDD.scala:227) ??at org.apache.spark.rdd.FilteredRDD.compute(FilteredRDD.scala:34) ??at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) ??at org.apache.spark.rdd.RDD.iterator(RDD.scala:229) ??at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:111) ??at org.apache.spark.scheduler.Task.run(Task.scala:51) ??at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:183) ??at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886) ??at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908) ??at java.lang.Thread.run(Thread.java:619) | 
看下JdbcRDD類的compute函數實現就知道了:
| override def compute(thePart: Partition, context: TaskContext) = new NextIterator[T] { ????context.addOnCompleteCallback{ () => closeIfNeeded() } ????val part = thePart.asInstanceOf[JdbcPartition] ????val conn = getConnection() ????val stmt = conn.prepareStatement(sql, ResultSet.TYPE_FORWARD_ONLY, ????????????????????????????????ResultSet.CONCUR_READ_ONLY) ? ????if (conn.getMetaData.getURL.matches("jdbc:mysql:.*")) { ??????stmt.setFetchSize(Integer.MIN_VALUE) ??????logInfo("statement fetch size set to: " + stmt.getFetchSize + ????????????????????????????????" to force mySQL streaming ") ????} ? ????stmt.setLong(1, part.lower) ????stmt.setLong(2, part.upper) ? ........................... | 
  不過值得高興的是,我們可以自定義一個JdbcRDD,修改上面的計算思路,這樣就可以得到符合我們自己要求的JdbcRDD。
   PS: 在寫本文的時候,本來我想提供一個JAVA例子,但是JdbcRDD類的最后一個參數很不好傳,網上有個哥們是這么說的:
| I don't think there is a completely Java-friendly version of this class. However you should be able to get away with passing something generic like "ClassTag.MODULE.<k>apply(Object.class)"? There's probably something even simpler. | 
下面是我發郵件到Spark開發組詢問如何在Java中使用JdbcRDD,開發人員給我的回復信息如下:
The two parameters which might cause you some difficulty in Java are
getConnection, which is a Function0[Connection], i.e. a 0 argument function that returns a jdbc connection
 and
 mapRow, which is a Function1[ResultSet, T], i.e. a 1 argument function that takes a result set and returns whatever type you want to convert it to.
You could try and include the scala library as a compile-time dependency in your project, and make your own instances of classes that implement the Function0 and Function1 interfaces. I've never tried that, so your mileage may vary. The mapRow parameter might be skippable - it has as a default a function that just returns an array of object, which you could then cast.
It would probably be easy to make the JdbcRDD interface more usable from Java8, but I don't know how much demand there is for that.
創作挑戰賽新人創作獎勵來咯,堅持創作打卡瓜分現金大獎總結
以上是生活随笔為你收集整理的Spark与Mysql(JdbcRDD)整合开发的全部內容,希望文章能夠幫你解決所遇到的問題。
                            
                        - 上一篇: Spark Standalone模式应用
 - 下一篇: Spark配置属性详解