使用Apache Spark让MySQL查询速度提升10倍以上
鏈接:http://coyee.com/article/11012-how-apache-spark-makes-your-slow-mysql-queries-10x-faster-or-more
介紹
在我的前一篇文章?Apache Spark with MySQL?中介紹了如何利用 Apache Spark 實現數據分析以及如何對大量存放于文本文件的數據進行轉換和分析。瓦迪姆還做了一個基準測試用來比較?MySQL 和 Spark with?Parquet 柱狀格式?(使用空中交通性能數據) 二者的性能。?這個測試非常棒,但如果我們不希望將數據從 MySQL 移到其他的存儲系統中,而是繼續在已有的 MySQL 服務器上執行查詢的話,Apache Spark 一樣可以幫到我們!
開始在已有的 MySQL 服務器之上使用 Apache Spark (無需將數據導出到 Spark 或者 Hadoop 平臺上),這樣至少可以提升 10 倍的查詢性能。使用多個 MySQL 服務器(復制或者 Percona XtraDB Cluster)可以讓我們在某些查詢上得到額外的性能提升。你也可以使用 Spark 的緩存功能來緩存整個 MySQL 查詢結果表。
思路很簡單:Spark 可以通過 JDBC 讀取 MySQL 上的數據,也可以執行 SQL 查詢,因此我們可以直接連接到 MySQL 并執行查詢。那么為什么速度會快呢?對一些需要運行很長時間的查詢(如報表或者BI),由于 Spark 是一個大規模并行系統,因此查詢會非常的快。MySQL 只能為每一個查詢分配一個 CPU 核來處理,而 Spark 可以使用所有集群節點的所有核。在下面的例子中,我們會在 Spark 中執行 MySQL 查詢,這個查詢速度比直接在 MySQL 上執行速度要快 5 到 10 倍。
另外,Spark 可以增加“集群”級別的并行機制,在使用 MySQL 復制或者 Percona XtraDB Cluster 的情況下,Spark 可以把查詢變成一組更小的查詢(有點像使用了分區表時可以在每個分區都執行一個查詢),然后在多個 Percona XtraDB Cluster 節點的多個從服務器上并行的執行這些小查詢。最后它會使用?map/reduce 方式將每個節點返回的結果聚合在一起行程完整的結果。
這篇文章跟我之前文章?“Airlines On-Time Performance” 所使用的數據庫是相同的。瓦迪姆創建了一些腳本可以方便的下載這些數據并上傳到 MySQL 數據庫。腳本的下載地址請看?這里。同時我們這次使用的是 2016年7月26日發布的?Apache Spark 2.0。
安裝 Apache Spark
使用獨立模式啟動 Apache Spark 是很簡單的,如下幾步即可:
示例:
root@thor:~/spark# ./sbin/start-master.sh less ../logs/spark-root-org.apache.spark.deploy.master.Master-1-thor.out 15/08/25 11:21:21 INFO Master: Starting Spark master at spark://thor:7077 15/08/25 11:21:21 INFO Utils: Successfully started service 'MasterUI' on port 8080. 15/08/25 11:21:21 INFO MasterWebUI: Started MasterWebUI at http://10.60.23.188:8080 root@thor:~/spark# ./sbin/start-slave.sh spark://thor:7077為了連接到 Spark ,我們可以使用 spark-shell (Scala)、pyspark (Python) 或者 ?spark-sql。spark-sql 和 MySQL 命令行類似,因此這是最簡單的選擇(你甚至可以用 show tables 命令)。我同時還需要在交互模式下使用 Scala ,因此我選擇的是?spark-shell 。在下面所有的例子中,我都是在 MySQL 和 Spark 上使用相同的 SQL 查詢,所以其實沒多大的不同。
為了讓 Spark 能用上 MySQL 服務器,我們需要驅動程序?Connector/J for MySQL. 下載這個壓縮文件解壓后拷貝?mysql-connector-java-5.1.39-bin.jar 到 spark 目錄,然后在 conf/spark-defaults.conf 中添加類路徑,如下:
spark.driver.extraClassPath = /usr/local/spark/mysql-connector-java-5.1.39-bin.jar spark.executor.extraClassPath = /usr/local/spark/mysql-connector-java-5.1.39-bin.jar利用 Apache Spark 運行 MySQL 查詢
在這個測試中我們使用的一臺擁有 12 核(老的 Intel(R) Xeon(R) CPU L5639 @ 2.13GHz 處理器) 以及 48G 內存,帶有 SSD 磁盤的物理服務器。 在這臺機器上我安裝了 MySQL 并啟動了 Spark 主節點和從節點。
現在我們可以在 Spark 中運行 MySQL 查詢了。首先,從 Spark 目錄中啟動 Shell (在我這里是?/usr/local/spark ):
$ ./bin/spark-shell --driver-memory 4G --master spark://server1:7077然后我們將連接到 MySQL 服務器并注冊臨時視圖:
val jdbcDF = spark.read.format("jdbc").options(Map("url" -> "jdbc:mysql://localhost:3306/ontime?user=root&password=","dbtable" -> "ontime.ontime_part","fetchSize" -> "10000","partitionColumn" -> "yeard", "lowerBound" -> "1988", "upperBound" -> "2016", "numPartitions" -> "28")).load() jdbcDF.createOrReplaceTempView("ontime")這樣我們就為 Spark 創建了一個“數據源”(換句話說就是相當于 Spark 建立了到 MySQL 的連接)。Spark 表名?“ontime” 對應連接到 MySQL 的ontime.ontime_part 表,現在可以在 Spark 中運行 SQL 了,它們是按順序被一一解析并轉換成 MySQL 查詢的。
“partitionColumn” 在這里非常重要,它告訴 Spark 并行的執行多個查詢,每個分區分配一個查詢執行。
現在我們可以運行查詢:
val sqlDF = sql("select min(year), max(year) as max_year, Carrier, count(*) as cnt, sum(if(ArrDelayMinutes>30, 1, 0)) as flights_delayed, round(sum(if(ArrDelayMinutes>30, 1, 0))/count(*),2) as rate FROM ontime WHERE DayOfWeek not in (6,7) and OriginState not in ('AK', 'HI', 'PR', 'VI') and DestState not in ('AK', 'HI', 'PR', 'VI') and (origin = 'RDU' or dest = 'RDU') GROUP by carrier HAVING cnt > 100000 and max_year > '1990' ORDER by rate DESC, cnt desc LIMIT 10") sqlDF.show()MySQL 查詢示例
讓我們暫時回到 MySQL 來看看這個查詢例子,我選出了如下的查詢語句 (來自我以前的文章):
select min(year), max(year) as max_year, Carrier, count(*) as cnt, sum(if(ArrDelayMinutes>30, 1, 0)) as flights_delayed, round(sum(if(ArrDelayMinutes>30, 1, 0))/count(*),2) as rate FROM ontime WHERE DayOfWeek not in (6,7) and OriginState not in ('AK', 'HI', 'PR', 'VI') and DestState not in ('AK', 'HI', 'PR', 'VI') GROUP by carrier HAVING cnt > 100000 and max_year > '1990' ORDER by rate DESC, cnt desc LIMIT 10?這個查詢用來查找出每個航空公司航班延誤的架數。此外該查詢還將很智能的計算準點率,考慮到航班數量(我們不希望小航空公司跟大航空公司比較,同時一些老的關閉的航空公司也不在計算范圍之內)。
我選擇這個查詢主要的原因是,這在 MySQL 很難再優化了,所有的這些 WHERE 條件智能過濾掉約 70% 的記錄行。我做了一個基本的計算:
mysql> select count(*) FROM ontime WHERE DayOfWeek not in (6,7) and OriginState not in ('AK', 'HI', 'PR', 'VI') and DestState not in ('AK', 'HI', 'PR', 'VI'); +-----------+ | count(*) | +-----------+ | 108776741 | +-----------+ mysql> select count(*) FROM ontime; +-----------+ | count(*) | +-----------+ | 152657276 | +-----------+ mysql> select round((108776741/152657276)*100, 2); +-------------------------------------+ | round((108776741/152657276)*100, 2) | +-------------------------------------+ | 71.26 | +-------------------------------------+表結構如下:
CREATE TABLE `ontime_part` (`YearD` int(11) NOT NULL,`Quarter` tinyint(4) DEFAULT NULL,`MonthD` tinyint(4) DEFAULT NULL,`DayofMonth` tinyint(4) DEFAULT NULL,`DayOfWeek` tinyint(4) DEFAULT NULL,`FlightDate` date DEFAULT NULL,`UniqueCarrier` char(7) DEFAULT NULL,`AirlineID` int(11) DEFAULT NULL,`Carrier` char(2) DEFAULT NULL,`TailNum` varchar(50) DEFAULT NULL, ...`id` int(11) NOT NULL AUTO_INCREMENT,PRIMARY KEY (`id`,`YearD`),KEY `covered` (`DayOfWeek`,`OriginState`,`DestState`,`Carrier`,`YearD`,`ArrDelayMinutes`) ) ENGINE=InnoDB AUTO_INCREMENT=162668935 DEFAULT CHARSET=latin1 /*!50100 PARTITION BY RANGE (YearD) (PARTITION p1987 VALUES LESS THAN (1988) ENGINE = InnoDB,PARTITION p1988 VALUES LESS THAN (1989) ENGINE = InnoDB,PARTITION p1989 VALUES LESS THAN (1990) ENGINE = InnoDB,PARTITION p1990 VALUES LESS THAN (1991) ENGINE = InnoDB,PARTITION p1991 VALUES LESS THAN (1992) ENGINE = InnoDB,PARTITION p1992 VALUES LESS THAN (1993) ENGINE = InnoDB,PARTITION p1993 VALUES LESS THAN (1994) ENGINE = InnoDB,PARTITION p1994 VALUES LESS THAN (1995) ENGINE = InnoDB,PARTITION p1995 VALUES LESS THAN (1996) ENGINE = InnoDB,PARTITION p1996 VALUES LESS THAN (1997) ENGINE = InnoDB,PARTITION p1997 VALUES LESS THAN (1998) ENGINE = InnoDB,PARTITION p1998 VALUES LESS THAN (1999) ENGINE = InnoDB,PARTITION p1999 VALUES LESS THAN (2000) ENGINE = InnoDB,PARTITION p2000 VALUES LESS THAN (2001) ENGINE = InnoDB,PARTITION p2001 VALUES LESS THAN (2002) ENGINE = InnoDB,PARTITION p2002 VALUES LESS THAN (2003) ENGINE = InnoDB,PARTITION p2003 VALUES LESS THAN (2004) ENGINE = InnoDB,PARTITION p2004 VALUES LESS THAN (2005) ENGINE = InnoDB,PARTITION p2005 VALUES LESS THAN (2006) ENGINE = InnoDB,PARTITION p2006 VALUES LESS THAN (2007) ENGINE = InnoDB,PARTITION p2007 VALUES LESS THAN (2008) ENGINE = InnoDB,PARTITION p2008 VALUES LESS THAN (2009) ENGINE = InnoDB,PARTITION p2009 VALUES LESS THAN (2010) ENGINE = InnoDB,PARTITION p2010 VALUES LESS THAN (2011) ENGINE = InnoDB,PARTITION p2011 VALUES LESS THAN (2012) ENGINE = InnoDB,PARTITION p2012 VALUES LESS THAN (2013) ENGINE = InnoDB,PARTITION p2013 VALUES LESS THAN (2014) ENGINE = InnoDB,PARTITION p2014 VALUES LESS THAN (2015) ENGINE = InnoDB,PARTITION p2015 VALUES LESS THAN (2016) ENGINE = InnoDB,PARTITION p_new VALUES LESS THAN MAXVALUE ENGINE = InnoDB) */就算有一個“覆蓋”索引,MySQL 也將掃描約 ~70M-100M 行的數據并創建一個臨時表:
mysql> explain select min(yearD), max(yearD) as max_year, Carrier, count(*) as cnt, sum(if(ArrDelayMinutes>30, 1, 0)) as flights_delayed, round(sum(if(ArrDelayMinutes>30, 1, 0))/count(*),2) as rate FROM ontime_part WHERE DayOfWeek not in (6,7) and OriginState not in ('AK', 'HI', 'PR', 'VI') and DestState not in ('AK', 'HI', 'PR', 'VI') GROUP by carrier HAVING cnt > 1000 and max_year > '1990' ORDER by rate DESC, cnt desc LIMIT 10G *************************** 1. row ***************************id: 1select_type: SIMPLEtable: ontime_parttype: range possible_keys: coveredkey: coveredkey_len: 2ref: NULLrows: 70483364Extra: Using where; Using index; Using temporary; Using filesort 1 row in set (0.00 sec)下面是 MySQL 查詢的響應時間:
mysql> select min(yearD), max(yearD) as max_year, Carrier, count(*) as cnt, sum(if(ArrDelayMinutes>30, 1, 0)) as flights_delayed, round(sum(if(ArrDelayMinutes>30, 1, 0))/count(*),2) as rate FROM ontime_part WHERE DayOfWeek not in (6,7) and OriginState not in ('AK', 'HI', 'PR', 'VI') and DestState not in ('AK', 'HI', 'PR', 'VI') GROUP by carrier HAVING cnt > 1000 and max_year > '1990' ORDER by rate DESC, cnt desc LIMIT 10; +------------+----------+---------+----------+-----------------+------+ | min(yearD) | max_year | Carrier | cnt | flights_delayed | rate | +------------+----------+---------+----------+-----------------+------+ | 2003 | 2013 | EV | 2962008 | 464264 | 0.16 | | 2003 | 2013 | B6 | 1237400 | 187863 | 0.15 | | 2006 | 2011 | XE | 1615266 | 230977 | 0.14 | | 2003 | 2005 | DH | 501056 | 69833 | 0.14 | | 2001 | 2013 | MQ | 4518106 | 605698 | 0.13 | | 2003 | 2013 | FL | 1692887 | 212069 | 0.13 | | 2004 | 2010 | OH | 1307404 | 175258 | 0.13 | | 2006 | 2013 | YV | 1121025 | 143597 | 0.13 | | 2003 | 2006 | RU | 1007248 | 126733 | 0.13 | | 1988 | 2013 | UA | 10717383 | 1327196 | 0.12 | +------------+----------+---------+----------+-----------------+------+ 10 rows in set (19 min 16.58 sec)足足執行了 19 分鐘,這個結果真的讓人爽不起來。
SQL in Spark
現在我們希望在 Spark 中運行相同的查詢,讓 Spark 從 MySQL 讀取數據。我們創建了一個“數據源”然后執行如下查詢:
scala> val jdbcDF = spark.read.format("jdbc").options(| Map("url" -> "jdbc:mysql://localhost:3306/ontime?user=root&password=mysql",| "dbtable" -> "ontime.ontime_sm",| "fetchSize" -> "10000",| "partitionColumn" -> "yeard", "lowerBound" -> "1988", "upperBound" -> "2015", "numPartitions" -> "48"| )).load() 16/08/02 23:24:12 WARN JDBCRelation: The number of partitions is reduced because the specified number of partitions is less than the difference between upper bound and lower bound. Updated number of partitions: 27; Input number of partitions: 48; Lower bound: 1988; Upper bound: 2015. dbcDF: org.apache.spark.sql.DataFrame = [id: int, YearD: date ... 19 more fields] scala> jdbcDF.createOrReplaceTempView("ontime") scala> val sqlDF = sql("select min(yearD), max(yearD) as max_year, Carrier, count(*) as cnt, sum(if(ArrDelayMinutes>30, 1, 0)) as flights_delayed, round(sum(if(ArrDelayMinutes>30, 1, 0))/count(*),2) as rate FROM ontime WHERE OriginState not in ('AK', 'HI', 'PR', 'VI') and DestState not in ('AK', 'HI', 'PR', 'VI') GROUP by carrier HAVING cnt > 1000 and max_year > '1990' ORDER by rate DESC, cnt desc LIMIT 10") sqlDF: org.apache.spark.sql.DataFrame = [min(yearD): date, max_year: date ... 4 more fields] scala> sqlDF.show() +----------+--------+-------+--------+---------------+----+ |min(yearD)|max_year|Carrier| cnt|flights_delayed|rate| +----------+--------+-------+--------+---------------+----+ | 2003| 2013| EV| 2962008| 464264|0.16| | 2003| 2013| B6| 1237400| 187863|0.15| | 2006| 2011| XE| 1615266| 230977|0.14| | 2003| 2005| DH| 501056| 69833|0.14| | 2001| 2013| MQ| 4518106| 605698|0.13| | 2003| 2013| FL| 1692887| 212069|0.13| | 2004| 2010| OH| 1307404| 175258|0.13| | 2006| 2013| YV| 1121025| 143597|0.13| | 2003| 2006| RU| 1007248| 126733|0.13| | 1988| 2013| UA|10717383| 1327196|0.12| +----------+--------+-------+--------+---------------+----+Spark-shell 并不會顯示查詢的執行時間,這個可以從 spark-sql 提供的 Web UI 中獲取到。我在 spark-sql 中重新執行相同的查詢:
./bin/spark-sql --driver-memory 4G --master spark://thor:7077 spark-sql> CREATE TEMPORARY VIEW ontime> USING org.apache.spark.sql.jdbc> OPTIONS (> url "jdbc:mysql://localhost:3306/ontime?user=root&password=",> dbtable "ontime.ontime_part",> fetchSize "1000",> partitionColumn "yearD", lowerBound "1988", upperBound "2014", numPartitions "48"> ); 16/08/04 01:44:27 WARN JDBCRelation: The number of partitions is reduced because the specified number of partitions is less than the difference between upper bound and lower bound. Updated number of partitions: 26; Input number of partitions: 48; Lower bound: 1988; Upper bound: 2014. Time taken: 3.864 seconds spark-sql> select min(yearD), max(yearD) as max_year, Carrier, count(*) as cnt, sum(if(ArrDelayMinutes>30, 1, 0)) as flights_delayed, round(sum(if(ArrDelayMinutes>30, 1, 0))/count(*),2) as rate FROM ontime WHERE DayOfWeek not in (6,7) and OriginState not in ('AK', 'HI', 'PR', 'VI') and DestState not in ('AK', 'HI', 'PR', 'VI') GROUP by carrier HAVING cnt > 1000 and max_year > '1990' ORDER by rate DESC, cnt desc LIMIT 10; 16/08/04 01:45:13 WARN Utils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.debug.maxToStringFields' in SparkEnv.conf. 2003 2013 EV 2962008 464264 0.16 2003 2013 B6 1237400 187863 0.15 2006 2011 XE 1615266 230977 0.14 2003 2005 DH 501056 69833 0.14 2001 2013 MQ 4518106 605698 0.13 2003 2013 FL 1692887 212069 0.13 2004 2010 OH 1307404 175258 0.13 2006 2013 YV 1121025 143597 0.13 2003 2006 RU 1007248 126733 0.13 1988 2013 UA 10717383 1327196 0.12 Time taken: 139.628 seconds, Fetched 10 row(s)可以看到查詢的時間足足快了 10 倍之多(同一臺機器,只有一臺機器)。但是到底這些查詢是怎么變成 MySQL 查詢的呢?然后為什么這樣的查詢會快那么多。讓我們深入到 MySQL 一探究竟:
深入 MySQL
Spark:
scala> sqlDF.show() [Stage 4:> (0 + 26) / 26]?
MySQL:
mysql> select id, info from information_schema.processlist where info is not NULL and info not like '%information_schema%'; +-------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ | id | info | +-------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ | 10948 | SELECT `YearD`,`ArrDelayMinutes`,`Carrier` FROM ontime.ontime_part WHERE (((NOT (DayOfWeek IN (6, 7)))) AND ((NOT (OriginState IN ('AK', 'HI', 'PR', 'VI')))) AND ((NOT (DestState IN ('AK', 'HI', 'PR', 'VI'))))) AND (yearD >= 2001 AND yearD < 2002) | | 10965 | SELECT `YearD`,`ArrDelayMinutes`,`Carrier` FROM ontime.ontime_part WHERE (((NOT (DayOfWeek IN (6, 7)))) AND ((NOT (OriginState IN ('AK', 'HI', 'PR', 'VI')))) AND ((NOT (DestState IN ('AK', 'HI', 'PR', 'VI'))))) AND (yearD >= 2007 AND yearD < 2008) | | 10966 | SELECT `YearD`,`ArrDelayMinutes`,`Carrier` FROM ontime.ontime_part WHERE (((NOT (DayOfWeek IN (6, 7)))) AND ((NOT (OriginState IN ('AK', 'HI', 'PR', 'VI')))) AND ((NOT (DestState IN ('AK', 'HI', 'PR', 'VI'))))) AND (yearD >= 1991 AND yearD < 1992) | | 10967 | SELECT `YearD`,`ArrDelayMinutes`,`Carrier` FROM ontime.ontime_part WHERE (((NOT (DayOfWeek IN (6, 7)))) AND ((NOT (OriginState IN ('AK', 'HI', 'PR', 'VI')))) AND ((NOT (DestState IN ('AK', 'HI', 'PR', 'VI'))))) AND (yearD >= 1994 AND yearD < 1995) | | 10968 | SELECT `YearD`,`ArrDelayMinutes`,`Carrier` FROM ontime.ontime_part WHERE (((NOT (DayOfWeek IN (6, 7)))) AND ((NOT (OriginState IN ('AK', 'HI', 'PR', 'VI')))) AND ((NOT (DestState IN ('AK', 'HI', 'PR', 'VI'))))) AND (yearD >= 1998 AND yearD < 1999) | | 10969 | SELECT `YearD`,`ArrDelayMinutes`,`Carrier` FROM ontime.ontime_part WHERE (((NOT (DayOfWeek IN (6, 7)))) AND ((NOT (OriginState IN ('AK', 'HI', 'PR', 'VI')))) AND ((NOT (DestState IN ('AK', 'HI', 'PR', 'VI'))))) AND (yearD >= 2010 AND yearD < 2011) | | 10970 | SELECT `YearD`,`ArrDelayMinutes`,`Carrier` FROM ontime.ontime_part WHERE (((NOT (DayOfWeek IN (6, 7)))) AND ((NOT (OriginState IN ('AK', 'HI', 'PR', 'VI')))) AND ((NOT (DestState IN ('AK', 'HI', 'PR', 'VI'))))) AND (yearD >= 2002 AND yearD < 2003) | | 10971 | SELECT `YearD`,`ArrDelayMinutes`,`Carrier` FROM ontime.ontime_part WHERE (((NOT (DayOfWeek IN (6, 7)))) AND ((NOT (OriginState IN ('AK', 'HI', 'PR', 'VI')))) AND ((NOT (DestState IN ('AK', 'HI', 'PR', 'VI'))))) AND (yearD >= 2006 AND yearD < 2007) | | 10972 | SELECT `YearD`,`ArrDelayMinutes`,`Carrier` FROM ontime.ontime_part WHERE (((NOT (DayOfWeek IN (6, 7)))) AND ((NOT (OriginState IN ('AK', 'HI', 'PR', 'VI')))) AND ((NOT (DestState IN ('AK', 'HI', 'PR', 'VI'))))) AND (yearD >= 1990 AND yearD < 1991) | | 10953 | SELECT `YearD`,`ArrDelayMinutes`,`Carrier` FROM ontime.ontime_part WHERE (((NOT (DayOfWeek IN (6, 7)))) AND ((NOT (OriginState IN ('AK', 'HI', 'PR', 'VI')))) AND ((NOT (DestState IN ('AK', 'HI', 'PR', 'VI'))))) AND (yearD >= 2009 AND yearD < 2010) | | 10947 | SELECT `YearD`,`ArrDelayMinutes`,`Carrier` FROM ontime.ontime_part WHERE (((NOT (DayOfWeek IN (6, 7)))) AND ((NOT (OriginState IN ('AK', 'HI', 'PR', 'VI')))) AND ((NOT (DestState IN ('AK', 'HI', 'PR', 'VI'))))) AND (yearD >= 1993 AND yearD < 1994) | | 10956 | SELECT `YearD`,`ArrDelayMinutes`,`Carrier` FROM ontime.ontime_part WHERE (((NOT (DayOfWeek IN (6, 7)))) AND ((NOT (OriginState IN ('AK', 'HI', 'PR', 'VI')))) AND ((NOT (DestState IN ('AK', 'HI', 'PR', 'VI'))))) AND (yearD < 1989 or yearD is null) | | 10951 | SELECT `YearD`,`ArrDelayMinutes`,`Carrier` FROM ontime.ontime_part WHERE (((NOT (DayOfWeek IN (6, 7)))) AND ((NOT (OriginState IN ('AK', 'HI', 'PR', 'VI')))) AND ((NOT (DestState IN ('AK', 'HI', 'PR', 'VI'))))) AND (yearD >= 2005 AND yearD < 2006) | | 10954 | SELECT `YearD`,`ArrDelayMinutes`,`Carrier` FROM ontime.ontime_part WHERE (((NOT (DayOfWeek IN (6, 7)))) AND ((NOT (OriginState IN ('AK', 'HI', 'PR', 'VI')))) AND ((NOT (DestState IN ('AK', 'HI', 'PR', 'VI'))))) AND (yearD >= 1996 AND yearD < 1997) | | 10955 | SELECT `YearD`,`ArrDelayMinutes`,`Carrier` FROM ontime.ontime_part WHERE (((NOT (DayOfWeek IN (6, 7)))) AND ((NOT (OriginState IN ('AK', 'HI', 'PR', 'VI')))) AND ((NOT (DestState IN ('AK', 'HI', 'PR', 'VI'))))) AND (yearD >= 2008 AND yearD < 2009) | | 10961 | SELECT `YearD`,`ArrDelayMinutes`,`Carrier` FROM ontime.ontime_part WHERE (((NOT (DayOfWeek IN (6, 7)))) AND ((NOT (OriginState IN ('AK', 'HI', 'PR', 'VI')))) AND ((NOT (DestState IN ('AK', 'HI', 'PR', 'VI'))))) AND (yearD >= 1999 AND yearD < 2000) | | 10962 | SELECT `YearD`,`ArrDelayMinutes`,`Carrier` FROM ontime.ontime_part WHERE (((NOT (DayOfWeek IN (6, 7)))) AND ((NOT (OriginState IN ('AK', 'HI', 'PR', 'VI')))) AND ((NOT (DestState IN ('AK', 'HI', 'PR', 'VI'))))) AND (yearD >= 2011 AND yearD < 2012) | | 10963 | SELECT `YearD`,`ArrDelayMinutes`,`Carrier` FROM ontime.ontime_part WHERE (((NOT (DayOfWeek IN (6, 7)))) AND ((NOT (OriginState IN ('AK', 'HI', 'PR', 'VI')))) AND ((NOT (DestState IN ('AK', 'HI', 'PR', 'VI'))))) AND (yearD >= 2003 AND yearD < 2004) | | 10964 | SELECT `YearD`,`ArrDelayMinutes`,`Carrier` FROM ontime.ontime_part WHERE (((NOT (DayOfWeek IN (6, 7)))) AND ((NOT (OriginState IN ('AK', 'HI', 'PR', 'VI')))) AND ((NOT (DestState IN ('AK', 'HI', 'PR', 'VI'))))) AND (yearD >= 1995 AND yearD < 1996) | | 10957 | SELECT `YearD`,`ArrDelayMinutes`,`Carrier` FROM ontime.ontime_part WHERE (((NOT (DayOfWeek IN (6, 7)))) AND ((NOT (OriginState IN ('AK', 'HI', 'PR', 'VI')))) AND ((NOT (DestState IN ('AK', 'HI', 'PR', 'VI'))))) AND (yearD >= 2004 AND yearD < 2005) | | 10949 | SELECT `YearD`,`ArrDelayMinutes`,`Carrier` FROM ontime.ontime_part WHERE (((NOT (DayOfWeek IN (6, 7)))) AND ((NOT (OriginState IN ('AK', 'HI', 'PR', 'VI')))) AND ((NOT (DestState IN ('AK', 'HI', 'PR', 'VI'))))) AND (yearD >= 1989 AND yearD < 1990) | | 10950 | SELECT `YearD`,`ArrDelayMinutes`,`Carrier` FROM ontime.ontime_part WHERE (((NOT (DayOfWeek IN (6, 7)))) AND ((NOT (OriginState IN ('AK', 'HI', 'PR', 'VI')))) AND ((NOT (DestState IN ('AK', 'HI', 'PR', 'VI'))))) AND (yearD >= 1997 AND yearD < 1998) | | 10952 | SELECT `YearD`,`ArrDelayMinutes`,`Carrier` FROM ontime.ontime_part WHERE (((NOT (DayOfWeek IN (6, 7)))) AND ((NOT (OriginState IN ('AK', 'HI', 'PR', 'VI')))) AND ((NOT (DestState IN ('AK', 'HI', 'PR', 'VI'))))) AND (yearD >= 2013) | | 10958 | SELECT `YearD`,`ArrDelayMinutes`,`Carrier` FROM ontime.ontime_part WHERE (((NOT (DayOfWeek IN (6, 7)))) AND ((NOT (OriginState IN ('AK', 'HI', 'PR', 'VI')))) AND ((NOT (DestState IN ('AK', 'HI', 'PR', 'VI'))))) AND (yearD >= 1992 AND yearD < 1993) | | 10960 | SELECT `YearD`,`ArrDelayMinutes`,`Carrier` FROM ontime.ontime_part WHERE (((NOT (DayOfWeek IN (6, 7)))) AND ((NOT (OriginState IN ('AK', 'HI', 'PR', 'VI')))) AND ((NOT (DestState IN ('AK', 'HI', 'PR', 'VI'))))) AND (yearD >= 2000 AND yearD < 2001) | | 10959 | SELECT `YearD`,`ArrDelayMinutes`,`Carrier` FROM ontime.ontime_part WHERE (((NOT (DayOfWeek IN (6, 7)))) AND ((NOT (OriginState IN ('AK', 'HI', 'PR', 'VI')))) AND ((NOT (DestState IN ('AK', 'HI', 'PR', 'VI'))))) AND (yearD >= 2012 AND yearD < 2013) | +-------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ 26 rows in set (0.00 sec)Spark 并行執行了 26 個查詢,棒極了。由于表本身是分區的,因此每個分區一個查詢,但是卻掃描了整個分區:
mysql> explain partitions SELECT `YearD`,`ArrDelayMinutes`,`Carrier` FROM ontime.ontime_part WHERE (((NOT (DayOfWeek IN (6, 7)))) AND ((NOT (OriginState IN ('AK', 'HI', 'PR', 'VI')))) AND ((NOT (DestState IN ('AK', 'HI', 'PR', 'VI'))))) AND (yearD >= 2001 AND yearD < 2002)G *************************** 1. row ***************************id: 1select_type: SIMPLEtable: ontime_partpartitions: p2001type: ALL possible_keys: NULLkey: NULLkey_len: NULLref: NULLrows: 5814106Extra: Using where 1 row in set (0.00 sec)在這種情況下,服務器有 12 核,24 個超線程,可以非常高效的并行執行 26 個查詢,而分區表可以幫助避免資源爭用的問題(我希望 MySQL 可以并行的掃描分區,但在寫的時候不是這樣)。
另外一件有趣的事是 Spark 可以“推送”一些條件到 MySQL,但只是在 WHERE 語句范圍內。所有的 GROUP BY/ORDER BY 聚合查詢都是直接在 Spark 中執行。它需要從 MySQL 獲取滿足這些條件的數據,這樣就不會再推送 GROUP BY/ORDER BY 到 MySQL 中。
這也意味著如果不帶 WHERE 條件的查詢(例如 "select count(*) as cnt, carrier?from ontime group?by carrier order by cnt desc limit 10")只能是從 MySQL 獲取完整的數據并加載到 Spark 中(等于在 MySQL 做所有 GROUP BY 的操作),這種查詢在 Spark 上運行根據其數據量以及索引的使用情況或慢或快沒準,但其要求更多的資源以及更多的內存占用。上述的查詢轉成 26 個查詢,每個包含一個?“select carrier?from ontime_part where (yearD >= N?AND yearD < N)”
將整個查詢推送到 MySQL
如果我們想要避免將所有數據從 MySQL 發送到 Spark,我們可以在查詢之上創建一個臨時表(類似 MySQL 在 select 語句中創建臨時表的方法),可以這樣編寫 Scala 代碼:
val tableQuery ="(select yeard, count(*) from ontime group by yeard) tmp"val jdbcDFtmp = spark.read.format("jdbc").options(Map("url" -> "jdbc:mysql://localhost:3306/ontime?user=root&password=","dbtable" -> tableQuery,"fetchSize" -> "10000")).load() jdbcDFtmp.createOrReplaceTempView("ontime_tmp")Spark SQL:
CREATE TEMPORARY VIEW ontime_tmp USING org.apache.spark.sql.jdbc OPTIONS (url "jdbc:mysql://localhost:3306/ontime?user=root&password=mysql",dbtable "(select yeard, count(*) from ontime_part group by yeard) tmp",fetchSize "1000" ); select * from ontime_tmp;請注意:
Spark 的查詢緩存
另外一個方案就是緩存查詢結果(甚至是整張表),然后在 Scala 使用 .filter 實現更快速處理。這種做法需要給 Spark 提供足夠多的內存。好消息是我們可以為 Spark 增加額外的節點來為 Spark 集群獲取更多的內存。
Spark SQL 示例:
CREATE TEMPORARY VIEW ontime_latest USING org.apache.spark.sql.jdbc OPTIONS (url "jdbc:mysql://localhost:3306/ontime?user=root&password=",dbtable "ontime.ontime_part partition (p2013, p2014)",fetchSize "1000",partitionColumn "yearD", lowerBound "1988", upperBound "2014", numPartitions "26" ); cache table ontime_latest; spark-sql> cache table ontime_latest; Time taken: 465.076 seconds spark-sql> select count(*) from ontime_latest; 5349447 Time taken: 0.526 seconds, Fetched 1 row(s) spark-sql> select count(*), dayofweek from ontime_latest group by dayofweek; 790896 1 634664 6 795540 3 794667 5 808243 4 743282 7 782155 2 Time taken: 0.541 seconds, Fetched 7 row(s) spark-sql> select min(yearD), max(yearD) as max_year, Carrier, count(*) as cnt, sum(if(ArrDelayMinutes>30, 1, 0)) as flights_delayed, round(sum(if(ArrDelayMinutes>30, 1, 0))/count(*),2) as rate FROM ontime_latest WHERE DayOfWeek not in (6,7) and OriginState not in ('AK', 'HI', 'PR', 'VI') and DestState not in ('AK', 'HI', 'PR', 'VI') and (origin='RDU' or dest = 'RDU') GROUP by carrier HAVING cnt > 1000 and max_year > '1990' ORDER by rate DESC, cnt desc LIMIT 10; 2013 2013 MQ 9339 1734 0.19 2013 2013 B6 3302 516 0.16 2013 2013 EV 9225 1331 0.14 2013 2013 UA 1317 177 0.13 2013 2013 AA 5354 620 0.12 2013 2013 9E 5520 593 0.11 2013 2013 WN 10968 1130 0.1 2013 2013 US 5722 549 0.1 2013 2013 DL 6313 478 0.08 2013 2013 FL 2433 205 0.08 Time taken: 2.036 seconds, Fetched 10 row(s)這里我們在 Spark 中緩存了分區 p2013 和 p2014 。這將從 MySQL 獲取數據并加載進 Spark 中。一旦在這些緩存的數據上執行查詢,可以想象速度要快得多。
利用 Scala 我們可以緩存查詢的結果,然后使用過濾器去獲取我們所需的信息:
val sqlDF = sql("SELECT flightdate, origin, dest, depdelayminutes, arrdelayminutes, carrier, TailNum, Cancelled, Diverted, Distance from ontime") sqlDF.cache().show() scala> sqlDF.filter("flightdate='1988-01-01'").count() res5: Long = 862使用 Spark 和 Percona XtraDB Cluster
Spark 也可以用在集群模式下,支持越來越多的節點,因為只從一個單一的 MySQL 服務器上讀取數據是很大的瓶頸。我們可以使用 MySQL 復制的從服務器或者是?Percona XtraDB Cluster?(PXC) 節點來作為 Spark 的數據源。為了測試這個,我們在 AWS 上搞來了三個?Percona XtraDB Cluster 節點的集群 ( m4.2xlarge Ubuntu 實例) 然后在每個節點上啟動 Apache Spark:
所有的 Spark worker 節點使用相同的內存配置參數:
cat conf/spark-env.sh export SPARK_WORKER_MEMORY=24g然后我就可以啟動?spark-sql (一樣需要 connector/J 的 JAR 文件拷貝到所有節點):
$ ./bin/spark-sql --driver-memory 4G --master spark://pxc1:7077當創建表時,我仍然使用 localhost 連接到 MySQL?(url “jdbc:mysql://localhost:3306/ontime?user=root&password=xxx”). 由于運行 Spark worker 的節點和運行 Percona Cluster 節點是同一個,因此它將使用的是本地連接。然后運行一個 Spark SQL 會將所有 26 個查詢平均分發到三個 MySQL 節點中。
另外我們可以在獨立的主機上運行 Spark 集群,然后連接到 HAProxy ,這樣可以實現多個 Percona XtraDB Cluster 節點的負載均衡。
查詢性能測試
最后我們來看看在三個 AWS Percona XtraDB Cluster 節點上查詢的響應時間測試結果:
查詢 1:?
select min(yearD), max(yearD) as max_year, Carrier, count(*) as cnt, sum(if(ArrDelayMinutes>30, 1, 0))as flights_delayed, round(sum(if(ArrDelayMinutes>30, 1, 0))/count(*),2) as rate FROM ontime_part WHERE DayOfWeeknot in (6,7) and OriginState not in ('AK', 'HI', 'PR', 'VI') and DestState not in ('AK', 'HI', 'PR', 'VI') GROUPby carrier HAVING cnt > 1000 and max_year > '1990' ORDER by rate DESC, cnt desc LIMIT 10;
| 查詢 / 索引類型 | MySQL 執行時間 | Spark 執行時間 (3 nodes) | Times 提升 |
| No covered index (分區) | 19 分 16.58 秒 | 192.17 秒 | 6.02 |
| Covered index (分區) | 2 分 10.81 秒 | 48.38 秒 | 2.7 |
?
查詢 2:??select?dayofweek,?count(*)?from?ontime_part?group?by?dayofweek;
| 查詢 / 索引類型 | MySQL 執行時間 | Spark 執行時間 (3 nodes) | Times 提升 |
| No covered index (partitoned) | 19 分 15.21 秒 | 195.058 秒 | 5.92 |
| Covered index (partitioned) | 1 分 10.38 秒 | 27.323 秒 | 2.58 |
?
這個結果看起來很棒,但還可以更棒。使用三個節點的 @ m4.2xlarge 我們將擁有 8*3 = 24 核處理器 (雖然它們是在 Spark 和 MySQL 間共享的)。我們預期可達 10 倍的性能提升,特別是在沒有覆蓋索引 (Covered index) 的情況下。
然而,在 m4.2xlarge 上的內存數量不允許我們超額運行 MySQL,因此所有的數據讀都是通過 EBS 的 ?IOPS,這個只能給我們提供?~120MB/sec 的數據讀取速度。我在三臺物理機上重做了上面的測試,這三臺機器配置如下:
- 28 cores?E5-2683 v3 @ 2.00GHz
- 240GB of RAM
- Samsung 850 PRO
這個測試是完全在內存之外的:
Query 1 (參考上面內容)
| 查詢 / 索引類型 | MySQL 執行時間 | Spark 執行時間 (3 nodes) | Times 提升 |
| No covered index (分區) | 3 min 13.94 sec | 14.255 sec | 13.61 |
| Covered index (分區) | 2 min 2.11 sec | 9.035 sec | 13.52 |
?Query 2:??select?dayofweek,?count(*)?from?ontime_part?group?by?dayofweek;?
| 查詢 / 索引類型 | MySQL 執行時間 | Spark 執行時間 (3 nodes) | Times 提升 |
| No covered index (分區) | ?2 min 0.36 sec | 7.055 sec | 17.06 |
| Covered index (分區) | 1 min 6.85?sec | 4.514 sec | 14.81 |
因為表只有 26 個分區,因此我們實際上并沒有足夠的并發來跑滿所有的處理器核以及內存。我也測試了帶 ID 主鍵的非分區表以及使用 128 個分區的情況。
分區的注意點
在測試中我們使用了分區表(根據年份進行分區)來降低 MySQL 資源爭用的情況。同時 Spark 中的?“partitionColumn” 參數并不要求 MySQL 表是分區的。舉例來說,如果一個表包含一個主鍵,我們可以在 Spark 中使用 CREATE VIEW:
CREATE OR REPLACE TEMPORARY VIEW ontime USING org.apache.spark.sql.jdbc OPTIONS (url "jdbc:mysql://127.0.0.1:3306/ontime?user=root&password=",dbtable "ontime.ontime",fetchSize "1000",partitionColumn "id", lowerBound "1", upperBound "162668934", numPartitions "128" );假設我們有足夠的 MySQL 服務器(例如很多節點或者從節點),我們可以提升分區數來提升并發性(而不是根據年份分區只得到 26 個分區)。實際上,上述的測試可以得到更快的響應時間,查詢1只執行了 6.44 秒。
Spark 表現不好的地方
對于那些本身執行很快的查詢(那些完全使用索引或者是可以有效使用索引)來說,使用?Spark 沒什么意義。因為從 MySQL 上加載數據并載入到 Spark 的開銷很大。這個開銷會影響查詢的速度。例如類似這樣的查詢??select count(*)?from?ontime_part?where?YearD?=?2013?and?DayOfWeek?=?7?and?OriginState?=?'NC'?and?DestState?='NC';?只掃描了 1300 行數據然后立即返回結果(在 MySQL 中幾乎不怎么消耗時間的查詢)
一個更好的例子是?select?max(id)?from?ontime_part. 在 MySQL 中這個查詢會使用索引,所有的計算都在 MySQL 中完成。而如果是 Spark 則需要從 MySQL 獲取所有的 ID?(select id from ontime_part) 然后計算最大值,這個查詢在 Spark 上花了 24.267 秒!
結論
在 MySQL 上使用 Apache Spark 作為一個額外的引擎層,可以幫助提升一些很慢的報表查詢的執行速度,為運行那些需要很長時間的查詢提供急需的伸縮性。此外,Spark 還可以幫我們實現常用查詢的緩存
Spark Web GUI 提供了很多監控 Spark 作業的方法,例如可以顯示作業進度:
?
以及 SQL 的可視化分解的詳情展示:
總結
以上是生活随笔為你收集整理的使用Apache Spark让MySQL查询速度提升10倍以上的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: MSYS2 gcc
- 下一篇: Mycat社区出版: 分布式数据库架构及