Spark _24 _读取JDBC中的数据创建DataFrame/DataSet(MySql为例)(三)
生活随笔
收集整理的這篇文章主要介紹了
Spark _24 _读取JDBC中的数据创建DataFrame/DataSet(MySql为例)(三)
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
兩種方式創建DataSet
現在數據庫中創建表不能給插入少量數據。
?
javaapi:
package SparkSql;import org.apache.spark.SparkConf; import org.apache.spark.SparkContext; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.sql.DataFrameReader; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.SQLContext;import java.util.HashMap; import java.util.Map;/*** @author George* @description* 讀取JDBC中的數據創建DataFrame(MySql為例)* 兩種方式創建DataFrame**/ public class JDBCDemo {public static void main(String[] args) {SparkConf conf = new SparkConf();conf.setAppName("jdbc");conf.setMaster("local");JavaSparkContext sc = new JavaSparkContext(conf);sc.setLogLevel("error");SQLContext sqlContext = new SQLContext(sc);/*** 第一種方式讀取MySql數據庫表,加載為DataFrame*/Map<String,String> options = new HashMap<>();options.put("url","jdbc:mysql://localhost:3306/spark");options.put("driver","com.mysql.jdbc.Driver");options.put("user","root");options.put("password","123456");options.put("dbtable","person");// options 為基礎數據源添加輸入選項。Dataset<Row> person = sqlContext.read().format("jdbc").options(options).load();person.show();/*** +---+----------+---+* | id| name|age|* +---+----------+---+* | 1| George| 22|* | 2| kangkang| 20|* | 3|GeorgeDage| 28|* | 4| limumu| 1|* +---+----------+---+*/person.registerTempTable("person");/*** 第二種方式讀取MySql數據表加載為DataFrame*/DataFrameReader reader = sqlContext.read().format("jdbc");reader.option("url","jdbc:mysql://localhost:3306/spark");reader.option("driver","com.mysql.jdbc.Driver");reader.option("user","root");reader.option("password","123456");reader.option("dbtable","score");Dataset<Row> load = reader.load();load.show();/*** +---+----------+-----+* | id| name|score|* +---+----------+-----+* | 1| George| 100|* | 2| kangkang| 100|* | 3|GeorgeDage| 90|* | 4| limumu| 120|* +---+----------+-----+*/load.registerTempTable("score");sc.stop();} }scalaAPI:
并將組合的數據重新插入到mysql中
package SparkSqlimport java.util.Propertiesimport org.apache.spark.sql.{SQLContext, SaveMode} import org.apache.spark.{SparkConf, SparkContext}import scala.collection.mutableobject JDBCScalaDemo {def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppName("jdbc").setMaster("local")val sc = new SparkContext(conf)sc.setLogLevel("error")val sqlContext = new SQLContext(sc)/*** 第一種方式讀取Mysql數據庫表創建DF*/val options = new mutable.HashMap[String,String]()options.put("url", "jdbc:mysql://localhost:3306/spark")options.put("driver","com.mysql.jdbc.Driver")options.put("user","root")options.put("password", "123456")options.put("dbtable","person")val person = sqlContext.read.format("jdbc").options(options).load()person.show()person.registerTempTable("person")/*** 第二種方式讀取Mysql數據庫表創建DF*/val reader = sqlContext.read.format("jdbc")reader.option("url", "jdbc:mysql://localhost:3306/spark")reader.option("driver","com.mysql.jdbc.Driver")reader.option("user","root")reader.option("password","123456")reader.option("dbtable", "score")val frame = reader.load()frame.show()frame.registerTempTable("score")/*** +---+----------+---+* | id| name|age|* +---+----------+---+* | 1| George| 22|* | 2| kangkang| 20|* | 3|GeorgeDage| 28|* | 4| limumu| 1|* +---+----------+---+** +---+----------+-----+* | id| name|score|* +---+----------+-----+* | 1| George| 100|* | 2| kangkang| 100|* | 3|GeorgeDage| 90|* | 4| limumu| 120|* +---+----------+-----+*/val result = sqlContext.sql("select person.id,person.name,person.age," +"score.score from person,score where person.id = score.id")result.show()/*** +---+----------+---+-----+* | id| name|age|score|* +---+----------+---+-----+* | 1| George| 22| 100|* | 3|GeorgeDage| 28| 90|* | 4| limumu| 1| 120|* | 2| kangkang| 20| 100|* +---+----------+---+-----+*//*** 將數據寫入到mysql中*/val properties = new Properties()properties.setProperty("user", "root")properties.setProperty("password", "123456")result.write.mode(SaveMode.Append).jdbc("jdbc:mysql://localhost:3306/spark", "result", properties)sc.stop()} }結果:去mysql查看:?
總結
以上是生活随笔為你收集整理的Spark _24 _读取JDBC中的数据创建DataFrame/DataSet(MySql为例)(三)的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Spark _23 _读取parquet
- 下一篇: Spark _25 _读取Hive中的数