sparkcore写mysql_spark读写mysql
首先還是pom文件:
UTF-8
1.8
1.8
UTF-8
2.11.12
2.4.5
2.7.7
2.11
org.scala-lang
scala-library
${scala.version}
org.apache.spark
spark-core_2.11
${spark.version}
org.apache.spark
spark-sql_2.11
${spark.version}
org.apache.spark
spark-streaming_2.11
${spark.version}
org.apache.hadoop
hadoop-client
${hadoop.version}
mysql
mysql-connector-java
5.1.45
log4j
log4j
1.2.17
runtime
代碼:讀mysql
import java.sql.DriverManager
import org.apache.spark.rdd.JdbcRDD
import org.apache.spark.{SparkConf, SparkContext}
object MysqlRDD {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setAppName("readMysql").setMaster("local[*]")
val sparkContext = new SparkContext(sparkConf)
val jdbcrdd: JdbcRDD[String] = new JdbcRDD(sparkContext
, ()=>{
Class.forName("com.mysql.jdbc.Driver")
DriverManager.getConnection("jdbc:mysql://hadoop01:3306/transaction", "root", "root")
}
, "select * from orders where realTotalMoney>? and realTotalMoney"
, 150
, 151
, 1
, (r) => {
r.getString(1)+","+
r.getString(2)+","+
r.getString(3)+","+
r.getString(4)+","+
r.getString(5)
}
)
jdbcrdd.foreach(println)
print(jdbcrdd.count())
sparkContext.stop()
}
}
寫入mysql,這里有效率問題需要注意:
低效版本:
import java.sql.DriverManager
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object RddToMysql {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setAppName("rddToMysql").setMaster("local[*]")
val sparkContext: SparkContext = SparkContext.getOrCreate(sparkConf)
val rdd: RDD[(Int, String, Int)] = sparkContext.parallelize(List((1, "yls", 31), (2, "byl", 27), (3, "yms", 29)),1)
rdd.foreach{ case (a: Int, b: String, c: Int) => {
Class.forName("com.mysql.jdbc.Driver")
val connection = DriverManager.getConnection("jdbc:mysql://hadoop01:3306/test", "root", "root")
val sql = "insert into student(id,name,age) values(?,?,?)"
val preparedStatement = connection.prepareStatement(sql)
preparedStatement.setInt(1, a)
preparedStatement.setString(2, b)
preparedStatement.setInt(3, c)
preparedStatement.executeUpdate()
preparedStatement.close()
}}
sparkContext.stop()
}
}
效率提升版本:
import java.sql.DriverManager
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object RddToMysql {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setAppName("rddToMysql").setMaster("local[*]")
val sparkContext: SparkContext = SparkContext.getOrCreate(sparkConf)
val rdd: RDD[(Int, String, Int)] = sparkContext.parallelize(List((1, "yls", 31), (2, "byl", 27), (3, "yms", 29)),1)
rdd.foreachPartition{case it:Iterator[(Int,String,Int)]=>{
Class.forName("com.mysql.jdbc.Driver")
val connection = DriverManager.getConnection("jdbc:mysql://hadoop01:3306/test", "root", "root")
val sql = "insert into student(id,name,age) values(?,?,?)"
it.foreach{case (a:Int,b:String,c:Int)=>{
val preparedStatement = connection.prepareStatement(sql)
preparedStatement.setInt(1, a)
preparedStatement.setString(2, b)
preparedStatement.setInt(3, c)
preparedStatement.executeUpdate()
preparedStatement.close()
}
}
}}
sparkContext.stop()
}
}
總結(jié)
以上是生活随笔為你收集整理的sparkcore写mysql_spark读写mysql的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: if break语句_8、嵌套if语句、
- 下一篇: docker mysql编辑器_dock