sparksql dataframe变成csv保存_Spark大数据分析(三):DataFrame和SQL
Spark SQL 是 Spark 處理結構化數據的一個模塊, 與基礎的 Spark RDD API 不同, Spark SQL 提供了查詢結構化數據及計算結果等信息的接口.在內部, Spark SQL 使用這個額外的信息去執行額外的優化.有幾種方式可以跟 Spark SQL 進行交互, 包括 SQL 和 Dataset API.當使用相同執行引擎進行計算時, 無論使用哪種 API / 語言都可以快速的計算。
SQL
Spark SQL 的功能之一是執行 SQL 查詢,Spark SQL 也能夠被用于從已存在的 Hive 環境中讀取數據。當以另外的編程語言運行SQL 時, 查詢結果將以 Dataset/DataFrame的形式返回,也可以使用 命令行或者通過 JDBC/ODBC與 SQL 接口交互.
DataFrames
從RDD里可以生成類似大家在pandas中的DataFrame,同時可以方便地在上面完成各種操作。
1.構建SparkSession
Spark SQL中所有功能的入口點是 SparkSession 類. 要創建一個 SparkSession, 僅使用 SparkSession.builder()就可以了:
from2.創建 DataFrames
在一個 SparkSession中, 應用程序可以從一個 已經存在的 RDD 或者 hive表, 或者從Spark數據源中創建一個DataFrames.
舉個例子, 下面就是基于一個JSON文件創建一個DataFrame:
df = spark.read.json("data/people.json") df.show()#必須使用show()不然不會打印3.DataFrame 操作
DataFrames 提供了一個特定的語法用在 Scala, Java, Python and R中機構化數據的操作。
在Python中,可以通過(df.age) 或者(df['age'])來獲取DataFrame的列. 雖然前者便于交互式操作, 但是還是建議用戶使用后者, 這樣不會破壞列名,也能引用DataFrame的類。
通過以下操作進行select
#查看字段屬性 df.printSchema()root
|-- age: long (nullable = true)
|-- name: string (nullable = true)
以下操作的filter做條件過濾
df.filter(df['age'] > 21).show()df.groupBy("age").count().show()還可以創建視圖,然后使用SQL語句進行處理。得到的也是dataframe。
df.createOrReplaceTempView("people") sqlDF = spark.sql("SELECT * FROM people") sqlDF.show()spark DataFrame與RDD交互
Spark SQL 支持兩種不同的方法用于轉換已存在的 RDD 成為 Dataset
第一種方法是使用反射去推斷一個包含指定的對象類型的 RDD 的 Schema.在你的 Spark 應用程序中當你已知 Schema 時這個基于方法的反射可以讓你的代碼更簡潔.
第二種用于創建 Dataset 的方法是通過一個允許你構造一個 Schema 然后把它應用到一個已存在的 RDD 的編程接口.然而這種方法更繁瑣, 當列和它們的類型知道運行時都是未知時它允許你去構造 Dataset.
當數據不規整,無法像csv或者excel等文件一樣直接讀取時,可以通過如下形式自定義dataframe樣式。
from pyspark.sql import Rowsc = spark.sparkContext lines = sc.textFile("data/people.txt") parts = lines.map(lambda l: l.split(",")) people = parts.map(lambda p: Row(name=p[0], age=int(p[1])))# Infer the schema, and register the DataFrame as a table. schemaPeople = spark.createDataFrame(people) schemaPeople.createOrReplaceTempView("people") teenagers = spark.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19") type(teenagers)pyspark.sql.dataframe.DataFrame
type(teenagers.rdd)pyspark.rdd.RDD
teenagers.rdd.first()Row(name='Justin')
teenNames = teenagers.rdd.map(lambda p: "Name: " + p.name).collect() for name in teenNames:print(name)Name: Justi
以編程的方式指定Schema
也可以通過以下的方式去初始化一個 DataFrame。
- RDD從原始的RDD創建一個RDD的toples或者一個列表;
- Step 1 被創建后, 創建 Schema 表示一個 StructType 匹配 RDD 中的結構.
- 通過 SparkSession 提供的 createDataFrame 方法應用 Schema 到 RDD .
總結
以上是生活随笔為你收集整理的sparksql dataframe变成csv保存_Spark大数据分析(三):DataFrame和SQL的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: three.js 几何体-组合网格_3d
- 下一篇: asp从后台调出的公式怎么参与运算_吴望