Spark SQL读数据库时不支持某些数据类型的问题(Timestamp with local Timezone)
在大數據平臺中,經常需要做數據的ETL,從傳統關系型數據庫RDBMS中抽取數據到HDFS中。之前開發數據湖新版本時使用Spark SQL來完成ETL的工作,但是遇到了 Spark SQL?不支持某些數據類型(比如ORACLE中的Timestamp with local Timezone)的問題。
一、系統環境
-
Spark 版本:2.1.0.cloudera1
-
JDK 版本:Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_131
-
ORACLE JDBC driver 版本:ojdbc7.jar
-
Scala 版本:2.11.8
?
二、Spark SQL讀數據庫表遇到的不支持某些數據類型
Spark SQL 讀取傳統的關系型數據庫同樣需要用到?JDBC,畢竟這是提供的訪問數據庫官方 API。Spark要讀取數據庫需要解決兩個問題:
-
分布式讀取;
-
原始表數據到DataFrame的映射。
2.1 業務代碼
public class Config {// spark-jdbc parameter namespublic static String JDBC_PARA_URL = "url";public static String JDBC_PARA_USER = "user";public static String JDBC_PARA_PASSWORD = "password";public static String JDBC_PARA_DRIVER = "driver";public static String JDBC_PARA_TABLE = "dbtable";public static String JDBC_PARA_FETCH_SIZE = "fetchsize"; } import org.apache.spark.SparkContext import org.apache.spark.rdd.RDD import org.apache.spark.sql._// 主類 object Main {def main(args: Array[String]): Unit = {val sparkSession = SparkSession.builder().master("yarn").appName("test")getOrCreate()val sqlContext = sparkSession.sqlContextval sc = sparkSession.sparkContextval partitionNum = 16val fetchSize = 1000val jdbcUrl = "..."val userName = "..."val schema_table = "..."val password = "..."val jdbcDriver = "oracle.jdbc.driver.OracleDriver"// 注意需要將oracle jdbc driver jar放置在spark lib jars目錄下,或者spark2-submit提交spark application時添加--jars參數val jdbcDF = sqlContext.read.format("jdbc").options(Map(Config.JDBC_PARA_URL -> jdbcUrl,Config.JDBC_PARA_USER -> userName,Config.JDBC_PARA_TABLE -> schema_table,Config.JDBC_PARA_PASSWORD -> password,Config.JDBC_PARA_DRIVER -> jdbcDriver,Config.JDBC_PARA_FETCH_SIZE -> s"$fetchSize")).load()val rdd = jdbcDF.rddrdd.count()...... }2.2 部分數據類型不支持
比如ORACLE中的Timestamp with local Timezone?和?FLOAT(126)。
三、解決方法:自定義JdbcDialects
3.1 什么是JdbcDialects ?
Spark SQL 中的?org.apache.spark.sql.jdbc package?中有個類?JdbcDialects.scala,該類定義了Spark DataType 和 SQLType 之間的映射關系,分析該類的源碼可知,該類是一個抽象類,包含以下幾個方法:
-
def canHandle(url : String):判斷該JdbcDialect 實例是否能夠處理該jdbc url;
-
getCatalystType(sqlType: Int, typeName: String, size: Int, md: MetadataBuilder):輸入數據庫中的SQLType,得到對應的Spark DataType的mapping關系;
-
getJDBCType(dt: DataType):輸入Spark 的DataType,得到對應的數據庫的SQLType;
-
quoteIdentifier(colName: String):引用標識符,用來放置某些字段名用了數據庫的保留字(有些用戶會使用數據庫的保留字作為列名);
-
其他......。
該類還有一個伴生對象,其中包含3個方法:
-
get(url: String):根據database的url獲取JdbcDialect 對象;
-
unregisterDialect(dialect: JdbcDialect):將已注冊的JdbcDialect 注銷;
-
registerDialect(dialect: JdbcDialect):注冊一個JdbcDialect。
3.2 解決步驟
使用get(url: String)方法獲取當前的 JdbcDialect 對象;
將當前的 JdbcDialect 對象?unregistered?掉;
new 一個 JdbcDialect 對象,并重寫方法(主要是getCatalystType()方法,因為其定義了數據庫 SQLType 到 Spark DataType 的映射關系),修改映射關系,將不支持的 SQLType 以其他的支持的數據類型返回比如StringType,這樣就能夠解決問題了;
register新創建的 JdbcDialect 對象。
3.3 解決方案的業務代碼
?
object SaicSparkJdbcDialect {def useMyJdbcDIalect(jdbcUrl:String,dbType:String): Unit ={val logger = LoggerFactory.getLogger(classOf[SaicSparkJdbcDialect])// 將當前的 JdbcDialect 對象unregistered掉val dialect = JdbcDialectsJdbcDialects.unregisterDialect(dialect.get(jdbcUrl))if (dbType.equals("ORACLE")) {val OracleDialect = new JdbcDialect {// 只能處理ORACLE數據庫override def canHandle(url: String): Boolean = url.startsWith("jdbc:oracle")// 修改數據庫 SQLType 到 Spark DataType 的映射關系(從數據庫讀取到Spark中)override def getCatalystType(sqlType: Int, typeName: String, size: Int,md: MetadataBuilder): Option[DataType] = {if (sqlType==Types.TIMESTAMP || sqlType== -101 || sqlType== -102) {// 將不支持的 Timestamp with local Timezone 以TimestampType形式返回Some(TimestampType)} else if (sqlType == Types.BLOB) {Some(BinaryType)} else {Some(StringType)}}// 該方法定義的是數據庫Spark DataType 到 SQLType 的映射關系,此處不需要做修改override def getJDBCType(dt: DataType): Option[JdbcType] = dt match {case StringType => Some(JdbcType("VARCHAR2(2000)", java.sql.Types.VARCHAR))case BooleanType => Some(JdbcType("NUMBER(1)", java.sql.Types.NUMERIC))case IntegerType => Some(JdbcType("NUMBER(10)", java.sql.Types.NUMERIC))case LongType => Some(JdbcType("NUMBER(19)", java.sql.Types.NUMERIC))case DoubleType => Some(JdbcType("NUMBER(19,4)", java.sql.Types.NUMERIC))case FloatType => Some(JdbcType("NUMBER(19,4)", java.sql.Types.NUMERIC))case ShortType => Some(JdbcType("NUMBER(5)", java.sql.Types.NUMERIC))case ByteType => Some(JdbcType("NUMBER(3)", java.sql.Types.NUMERIC))case BinaryType => Some(JdbcType("BLOB", java.sql.Types.BLOB))case TimestampType => Some(JdbcType("DATE", java.sql.Types.TIMESTAMP))case DateType => Some(JdbcType("DATE", java.sql.Types.DATE))case _ => None}override def quoteIdentifier(colName: String): String = {colName}}// register新創建的 JdbcDialect 對象JdbcDialects.registerDialect(OracleDialect)}本文來自:https://www.jianshu.com/p/20b82891aac9?
總結
以上是生活随笔為你收集整理的Spark SQL读数据库时不支持某些数据类型的问题(Timestamp with local Timezone)的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 无意看到,当真给力!记住:永远不要在My
- 下一篇: 大剑无锋之new一个对象背后发生了什么?