SparkSql读取外部数据源
1、產生背景
? ? 用戶需求:方便快速從不同的數據源(json、parquet、rdbms),經過混合處理(json join parquet),再將處理結果以特定的格式(son、Parquet)寫回指定的系統(HDFS、S3)上去
Spark SQL 1.2 ==> 外部數據源API
? ? Loading and saving Data is not easy
? ? Parse raw data:text/json/parquet
? ? Convert data format transformation
? ? Datasets stores in various Formats/Systems
2、目標
? ? ?對于開發人員:是否需要吧代碼合并到spark中 ??不需要 —jars ?
? ? ?用戶:
?? ?? ? ? ? 讀:spark.read.format(format)
?? ??? ??? ?? ??? ? format
? ??? ??? ??? ??? ?? ? ?? ?build-in:json parquet jdbc ?cvs(2+)
?? ??? ??? ??? ??? ?? ??? ?package: 外部的 并不是spark內置 https://spark-packages.org/
?? ??? ?? ? 寫:
?? ??? ??? ?? ? people.write.format(“parquet”).save(“path")
3、操作Parquet文件數據
?? ?? ? 加載數據:spark.read.format(“parquet”).load(path)
?? ?? ? 保存數據:df.write.format(“parquet”).save(path)
? ? ? ? spark.read.load(“file:…..json’) ?會報錯,因為sparksql默認處理的format就是parquet
4、操作Hive表數據
?? ?? ? spark.table(tableName)
?? ?? ? df.write.saveAsTable(tablename)
?? ?? ? spark.sql(“select deptno,count(1) as mount from?amp where group by deptno”).filter(“deptno is not null”).write.saveAsTable(“hive_table_1”)
?? ?? ? spark.sql(“show tables”).show
?? ?? ? spark.table(“hive_table_1”).show
?? ?? ? spark.sqlContext.setConf(“spark.sql.shuffle.partitions”,”10")
?? ?? ? 在生產環境中一定要注意設置spark.sql.shuffle.aprtitions,默認是200
5、操作mysql關系型數據庫
? ? 操作mysql的數據?
?//第一種方法
?val jdbcDF =? ? ? ? ?spark.read.format(“jdbc”).option(“url”,”jdbc:mysql://localhost:3306/hive”).option(“dbtable”,”hive.TBLS”).option(“user”,”root”).option(“password”,”root’)..option(“driver”,”com.mysql.jdbc.Driver”).load()
jdbcDF.printSchema
jdbcDF.show
jdbc.select(“TBL_ID”,”TBL_NAME”).show
//第二種方法
import java.util.Properties
val connectionProperties = new Properties()
connectionProperties.put(“user”,”root”)
connectionProperties.put(“password”,”root”)
connectionProperties.put(“driver”,”com.mysql.jdbc.Driver”)
val jdbcDF2 = spark.read.jdbc(“jdbc:mysql://localhost:3306”,”hive.TBLS”,connectionProperties)
6、綜合使用
外部數據源綜合案例
create database spark;
use spark;
CREATE TABLE DEPT(
DEPTNO?? ?int(2) ??PRIMARY KEY,? ?
DNAME VARCHAR(14),
LOC VARCHAR(13)
);
INSERT INTO DEPT VALUES(10,’ACCOUNTING’,’NEW YORK’);
INSERT INTO DEPT VALUES(20,’RESEARCH’,’DALLAS’);
INSERT INTO DEPT VALUES(30,’SALES’,’CHICAGO')
INSERT INTO DEPT VALUES(40,’OPERATIONS’,’BOSTON’)
總結
以上是生活随笔為你收集整理的SparkSql读取外部数据源的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Spark DataFrameDataS
- 下一篇: SparkSql常用命令操作