深度 | 一条查询SQL的前世今生 —— ClickHouse 源码阅读
作者:逍凱,阿里云數(shù)據(jù)庫(kù)實(shí)習(xí)開(kāi)發(fā)工程師
注:以下分析基于開(kāi)源 v19.15.2.2-stable 版本進(jìn)行,社區(qū)最新版本代碼改動(dòng)較大,但是總體思路是不變的。
01?用戶提交一條查詢SQL背后發(fā)生了什么
在傳統(tǒng)關(guān)系型數(shù)據(jù)庫(kù)中,SQL處理器的組件主要包括以下幾種:
? Query Parsing
負(fù)責(zé)進(jìn)行詞法和語(yǔ)法分析,把程序從人類高可讀的格式(即SQL)轉(zhuǎn)化成機(jī)器高可讀的格式(AST,抽象語(yǔ)法樹(shù))。
詞法分析指的是把SQL中的字符序列分解成一個(gè)個(gè)獨(dú)立的詞法單元——Token(<類型,值>)。
語(yǔ)法分析指的是從詞法分析器輸出的token中識(shí)別各類短語(yǔ),并構(gòu)造出一顆抽象語(yǔ)法樹(shù)。而按照構(gòu)造抽象語(yǔ)法樹(shù)的方向,又可以把語(yǔ)法分析分成自頂向下和自底向上分析兩種。而ClickHouse采用的則是手寫(xiě)一個(gè)遞歸下降的語(yǔ)法分析器。
? Query Rewrite
即通常我們說(shuō)的"Logical Optimizer"或基于規(guī)則的優(yōu)化器(Rule-Based Optimizer,即RBO)。
其負(fù)責(zé)應(yīng)用一些啟發(fā)式規(guī)則,負(fù)責(zé)簡(jiǎn)化和標(biāo)準(zhǔn)化查詢,無(wú)需改變查詢的語(yǔ)義。
常見(jiàn)操作有:謂詞和算子下推,視圖展開(kāi),簡(jiǎn)化常量運(yùn)算表達(dá)式,謂詞邏輯的重寫(xiě),語(yǔ)義的優(yōu)化等。
? Query Optimizer
即通常我們所說(shuō)的"Physical Optimizer",負(fù)責(zé)把內(nèi)部查詢表達(dá)轉(zhuǎn)化成一個(gè)高效的查詢計(jì)劃,指導(dǎo)DBMS如何去取表,如何進(jìn)行排序,如何Join。如下圖所示,一個(gè)查詢計(jì)劃可以被認(rèn)為是一個(gè)數(shù)據(jù)流圖,在這個(gè)數(shù)據(jù)流圖中,表數(shù)據(jù)會(huì)像在管道中傳輸一樣,從一個(gè)查詢操作符(operator)傳遞到另一個(gè)查詢操作符。
一個(gè)查詢計(jì)劃? Query Executor
查詢執(zhí)行器,負(fù)責(zé)執(zhí)行具體的查詢計(jì)劃,從存儲(chǔ)引擎中獲取數(shù)據(jù)并且對(duì)數(shù)據(jù)應(yīng)用查詢計(jì)劃得到結(jié)果。
執(zhí)行引擎也分為很多種,如經(jīng)典的火山模型(Volcano Model),還有ClickHouse采用的向量化執(zhí)行模型(Vectorization Model)。
(圖來(lái)自經(jīng)典論文 Architecture Of Database System)但不管是傳統(tǒng)的關(guān)系型數(shù)據(jù)庫(kù),還是非關(guān)系型數(shù)據(jù)庫(kù),SQL的解析和生成執(zhí)行計(jì)劃過(guò)程都是大同小異的,而縱覽ClickHouse的源代碼,可以把用戶提交一條查詢SQL背后的過(guò)程總結(jié)如下:
1.服務(wù)端接收客戶端發(fā)來(lái)的SQL請(qǐng)求,具體形式是一個(gè)網(wǎng)絡(luò)包,Server的協(xié)議層需要拆包把SQL解析出來(lái)
2.Server負(fù)責(zé)初始化上下文與Network Handler,然后?Parser?對(duì)Query做詞法和語(yǔ)法分析,解析成AST
3.Interpreter的?SyntaxAnalyzer?會(huì)應(yīng)用一些啟發(fā)式規(guī)則對(duì)AST進(jìn)行優(yōu)化重寫(xiě)
4.Interpreter的?ExpressionAnalyzer?根據(jù)上下文信息以及優(yōu)化重寫(xiě)后的AST生成物理執(zhí)行計(jì)劃
5.物理執(zhí)行計(jì)劃分發(fā)到本地或者分布式的executor,各自從存儲(chǔ)引擎中獲取數(shù)據(jù),應(yīng)用執(zhí)行計(jì)劃
6.Server把執(zhí)行后的結(jié)果以Block流的形式輸出到Socket緩沖區(qū),Client從Socket中讀取即可得到結(jié)果
01?接收客戶端請(qǐng)求
我們要以服務(wù)端的視角來(lái)出發(fā),首先來(lái)看server.cpp大概做什么事情:
下面只挑選重要的邏輯:
? 初始化上下文
? 初始化Zookeeper(ClickHouse的副本復(fù)制機(jī)制需要依賴ZooKeeper)
? 常規(guī)配置初始化
? 綁定服務(wù)端的端口,根據(jù)網(wǎng)絡(luò)協(xié)議初始化Handler,對(duì)客戶端提供服務(wù)
int Server::main() { // 初始化上下文 global_context = std::make_unique<Context>(Context::createGlobal()); global_context->setApplicationType(Context::ApplicationType::SERVER);// zk初始化 zkutil::ZooKeeperNodeCache main_config_zk_node_cache([&] { return global_context->getZooKeeper(); });//其他config的初始化 //...//綁定端口,對(duì)外提供服務(wù) auto address = make_socket_address(host, port); socket.bind(address, /* reuseAddress = */ true);//根據(jù)網(wǎng)絡(luò)協(xié)議建立不同的server類型 //現(xiàn)在支持的server類型有:HTTP,HTTPS,TCP,Interserver,mysql //以TCP版本為例: create_server("tcp_port", [&](UInt16 port) { Poco::Net::ServerSocket socket; auto address = socket_bind_listen(socket, listen_host, port); servers.emplace_back(std::make_unique<Poco::Net::TCPServer>( new TCPHandlerFactory(*this), server_pool, socket, new Poco::Net::TCPServerParams)); });//啟動(dòng)server for (auto & server : servers) server->start();}客戶端發(fā)來(lái)的請(qǐng)求是由各自網(wǎng)絡(luò)協(xié)議所對(duì)應(yīng)的?Handler?來(lái)進(jìn)行的,server在啟動(dòng)的時(shí)候 Handler 會(huì)被初始化并綁定在指定端口中。我們以TCPHandler為例,看看服務(wù)端是如何處理客戶端發(fā)來(lái)的請(qǐng)求的,重點(diǎn)關(guān)注?TCPHandler::runImpl?的函數(shù)實(shí)現(xiàn):
? 初始化輸入和輸出流的緩沖區(qū)
? 接受請(qǐng)求報(bào)文,拆包
? 執(zhí)行Query(包括整個(gè)詞法語(yǔ)法分析,Query重寫(xiě),物理計(jì)劃生成和生成結(jié)果)
? 把Query結(jié)果保存到輸出流,然后發(fā)送到Socket的緩沖區(qū),等待發(fā)送回客戶端
void TCPHandler::runImpl() { //實(shí)例化套接字對(duì)應(yīng)的輸入和輸出流緩沖區(qū) in = std::make_shared<ReadBufferFromPocoSocket>(socket()); out = std::make_shared<WriteBufferFromPocoSocket>(socket());while (1){ // 接收請(qǐng)求報(bào)文 receivePacket();// 執(zhí)行Query state.io = executeQuery(state.query, *query_context, false, state.stage, may_have_embedded_data);//根據(jù)Query種類來(lái)處理不同的Query //處理insert Query processInsertQuery(); //并發(fā)處理普通Query processOrdinaryQueryWithProcessors(); //單線程處理普通Query processOrdinaryQuery(); }}那CK處理客戶端發(fā)送過(guò)來(lái)的Query的具體邏輯是怎樣的呢?
我們可以在
dbms/src/Interpreters/executeQuery.cpp?中一探究竟:
具體邏輯在?executeQueryImpl?函數(shù)中,挑選核心的邏輯進(jìn)行講解:
static std::tuple<ASTPtr, BlockIO> executeQueryImpl() { //構(gòu)造Parser ParserQuery parser(end, settings.enable_debug_queries); ASTPtr ast;//把Query轉(zhuǎn)化為抽象語(yǔ)法樹(shù) ast = parseQuery(parser, begin, end, "", max_query_size);//生成interpreter實(shí)例 auto interpreter = InterpreterFactory::get(ast, context, stage);// interpreter解析AST,結(jié)果是BlockIO res = interpreter->execute();//返回結(jié)果是抽象語(yǔ)法樹(shù)和解析后的結(jié)果組成的二元組 return std::make_tuple(ast, res); }該函數(shù)所做的事情:
? 構(gòu)建Parser,把Query解析成AST(抽象語(yǔ)法樹(shù))
? InterpreterFactory根據(jù)AST生成對(duì)應(yīng)的Interpreter實(shí)例
? AST是由Interpreter來(lái)解析的,執(zhí)行結(jié)果是一個(gè)BlockIO,BlockIO是對(duì)?BlockInputStream?和?BlockOutputStream?的一個(gè)封裝。
總結(jié):
? 服務(wù)端調(diào)用?executeQuery?來(lái)處理client發(fā)送的Query,執(zhí)行后的結(jié)果保存在state這個(gè)結(jié)構(gòu)體的io成員中。
每一條Query都會(huì)對(duì)應(yīng)一個(gè)state結(jié)構(gòu)體,記錄了這條Query的id,處理狀態(tài),壓縮算法,Query的文本和Query所處理數(shù)據(jù)對(duì)應(yīng)的IO流等元信息。
? 然后服務(wù)端調(diào)用?processOrdinaryQuery?等方法把輸出流結(jié)果封裝成異步的IO流,發(fā)送到回client。
02?解析請(qǐng)求(Parser)
CK選擇采用手寫(xiě)一個(gè)遞歸下降的Parser來(lái)對(duì)SQL進(jìn)行解析,生成的結(jié)果是這個(gè)SQL對(duì)應(yīng)的抽象語(yǔ)法樹(shù)(AST),抽象語(yǔ)法樹(shù)由表示各個(gè)操作的節(jié)點(diǎn)(IAST)表示。而本節(jié)主要介紹Parser背后的核心邏輯:
詞法分析和語(yǔ)法分析的核心邏輯可以在parseQuery.cpp的?tryParseQuery?中一覽無(wú)余。
該函數(shù)利用lexer將掃描Query字符流,將其分割為一個(gè)個(gè)的Token,?token_iterator?即一個(gè)Token流迭代器,然后parser再對(duì)Token流進(jìn)行解析生成AST抽象語(yǔ)法樹(shù)。
ASTPtr tryParseQuery() { //Token為lexer詞法分析后的基本單位,詞法分析后生成的是Token流 Tokens tokens(pos, end, max_query_size); IParser::Pos token_iterator(tokens); ASTPtr res; //Token流經(jīng)過(guò)語(yǔ)法分析生成AST抽象語(yǔ)法樹(shù) bool parse_res = parser.parse(token_iterator, res, expected); return res;}我們可以看到,語(yǔ)法分析的核心就在于parser執(zhí)行的parse方法。parse 方法具體的實(shí)現(xiàn)在?ParserQuery.cpp?的?parseImpl?中。
bool ParserQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expected) { ParserQueryWithOutput query_with_output_p(enable_explain); ParserInsertQuery insert_p(end); ParserUseQuery use_p; ParserSetQuery set_p; ParserSystemQuery system_p;bool res = query_with_output_p.parse(pos, node, expected) || insert_p.parse(pos, node, expected) || use_p.parse(pos, node, expected) || set_p.parse(pos, node, expected) || system_p.parse(pos, node, expected);return res; }我們可以看到,這個(gè)方法粗略地把Query分為了五種,但是本質(zhì)上可以歸納為兩種(第一種為有結(jié)果輸出,對(duì)應(yīng)show,select,create等語(yǔ)句;第二種為無(wú)結(jié)果輸出,對(duì)應(yīng)insert,use,set和與系統(tǒng)相關(guān)的語(yǔ)句(如exit))
? QueryWithOutput
? InsertQuery
? UseQuery
? SetQuery
? SystemQuery
每一種Query都自定義了其專屬的Parser,所以代碼邏輯是當(dāng)接收到一個(gè)Query輸入的時(shí)候,會(huì)嘗試各種Query的Parser,直到成功為止。
我們可以select語(yǔ)句對(duì)應(yīng)的parser進(jìn)行分析:?
核心邏輯可以總結(jié)為:?
1.先給出select語(yǔ)句中可能出現(xiàn)的關(guān)鍵詞
2.在詞法分析生成的Token流中爬取這些關(guān)鍵詞
3.如果成功爬取,則?setExpression?函數(shù)會(huì)組裝該關(guān)鍵字對(duì)應(yīng)的AST節(jié)點(diǎn)
每一種SQL語(yǔ)句(如select,drop,insert,create)都有對(duì)應(yīng)的AST類,并且分別包含了這些語(yǔ)句中特有的關(guān)鍵字。
bool ParserSelectQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expected) { //創(chuàng)建AST樹(shù)節(jié)點(diǎn) auto select_query = std::make_shared<ASTSelectQuery>(); node = select_query;//select語(yǔ)句中會(huì)出現(xiàn)的關(guān)鍵詞 ParserKeyword s_select("SELECT"); ParserKeyword s_distinct("DISTINCT"); ParserKeyword s_from("FROM"); ParserKeyword s_prewhere("PREWHERE"); ParserKeyword s_where("WHERE"); ParserKeyword s_group_by("GROUP BY"); ParserKeyword s_with("WITH"); ParserKeyword s_totals("TOTALS"); ParserKeyword s_having("HAVING"); ParserKeyword s_order_by("ORDER BY"); ParserKeyword s_limit("LIMIT"); ParserKeyword s_settings("SETTINGS"); ParserKeyword s_by("BY"); ParserKeyword s_rollup("ROLLUP"); ParserKeyword s_cube("CUBE"); ParserKeyword s_top("TOP"); ParserKeyword s_with_ties("WITH TIES"); ParserKeyword s_offset("OFFSET");//... //依次對(duì)Token流爬取上述關(guān)鍵字 ParserTablesInSelectQuery().parse(pos, tables, expected)//根據(jù)語(yǔ)法分析結(jié)果設(shè)置AST的Expression屬性,可以理解為如果SQL存在該關(guān)鍵字,這個(gè)關(guān)鍵字都會(huì)轉(zhuǎn)化為AST上的一個(gè)節(jié)點(diǎn) select_query->setExpression(ASTSelectQuery::Expression::WITH, std::move(with_expression_list)); select_query->setExpression(ASTSelectQuery::Expression::SELECT, std::move(select_expression_list)); select_query->setExpression(ASTSelectQuery::Expression::TABLES, std::move(tables)); select_query->setExpression(ASTSelectQuery::Expression::PREWHERE, std::move(prewhere_expression)); select_query->setExpression(ASTSelectQuery::Expression::WHERE, std::move(where_expression)); select_query->setExpression(ASTSelectQuery::Expression::GROUP_BY, std::move(group_expression_list)); select_query->setExpression(ASTSelectQuery::Expression::HAVING, std::move(having_expression)); select_query->setExpression(ASTSelectQuery::Expression::ORDER_BY, std::move(order_expression_list)); select_query->setExpression(ASTSelectQuery::Expression::LIMIT_BY_OFFSET, std::move(limit_by_offset)); select_query->setExpression(ASTSelectQuery::Expression::LIMIT_BY_LENGTH, std::move(limit_by_length)); select_query->setExpression(ASTSelectQuery::Expression::LIMIT_BY, std::move(limit_by_expression_list)); select_query->setExpression(ASTSelectQuery::Expression::LIMIT_OFFSET, std::move(limit_offset)); select_query->setExpression(ASTSelectQuery::Expression::LIMIT_LENGTH, std::move(limit_length)); select_query->setExpression(ASTSelectQuery::Expression::SETTINGS, std::move(settings));}整個(gè)Parser的流程圖:
03?執(zhí)行請(qǐng)求(Interpreter)
解釋器(Interpreter)負(fù)責(zé)從抽象語(yǔ)法樹(shù)中創(chuàng)建查詢執(zhí)行的流水線,整條流水線以?BlockInputStream?和?BlockOutputStream?進(jìn)行組織。比方說(shuō)"select"是基于"from"的Block輸出流來(lái)進(jìn)行選擇的,選擇后的結(jié)果也會(huì)以Block輸出流的形式輸出到結(jié)果。首先我們來(lái)看:
dbms/src/Interpreters/InterpreterFactory.cpp
每一種Query都會(huì)有對(duì)應(yīng)的Interpreter,這個(gè)工廠方法就是根據(jù)AST的種類來(lái)實(shí)例化其對(duì)應(yīng)的Interpreter,由其來(lái)具體執(zhí)行對(duì)應(yīng)AST的執(zhí)行計(jì)劃:
std::unique_ptr<IInterpreter> InterpreterFactory::get(ASTPtr & query, Context & context, QueryProcessingStage::Enum stage) { //舉個(gè)例子,如果該AST是由select語(yǔ)句轉(zhuǎn)化過(guò)來(lái), if (query->as<ASTSelectQuery>()) { /// This is internal part of ASTSelectWithUnionQuery. /// Even if there is SELECT without union, it is represented by ASTSelectWithUnionQuery with single ASTSelectQuery as a child. return std::make_unique<InterpreterSelectQuery>(query, context, SelectQueryOptions(stage)); } }我們就以?InterpreterSelectQuery?為例,了解其實(shí)例化的核心邏輯:
InterpreterSelectQuery::InterpreterSelectQuery() { //獲取AST auto & query = getSelectQuery();//對(duì)AST做進(jìn)一步語(yǔ)法分析,對(duì)語(yǔ)法樹(shù)做優(yōu)化重寫(xiě) syntax_analyzer_result = SyntaxAnalyzer(context, options).analyze( query_ptr, source_header.getNamesAndTypesList(), required_result_column_names, storage, NamesAndTypesList());//每一種Query都會(huì)對(duì)應(yīng)一個(gè)特有的表達(dá)式分析器,用于爬取AST生成執(zhí)行計(jì)劃(操作鏈) query_analyzer = std::make_unique<SelectQueryExpressionAnalyzer>( query_ptr, syntax_analyzer_result, context, NameSet(required_result_column_names.begin(), required_result_column_names.end()), options.subquery_depth, !options.only_analyze); }語(yǔ)法分析直接生成的AST轉(zhuǎn)化成執(zhí)行計(jì)劃可能性能上并不是最優(yōu)的,因此需要SyntaxAnalyzer?對(duì)其進(jìn)行優(yōu)化重寫(xiě),在其源碼中可以看到其涉及到非常多?基規(guī)則優(yōu)化(rule based optimization)?的trick。
SyntaxAnalyzer?會(huì)逐個(gè)針對(duì)這些規(guī)則對(duì)查詢進(jìn)行檢查,確定其是否滿足轉(zhuǎn)換規(guī)則,一旦滿足就會(huì)對(duì)其進(jìn)行轉(zhuǎn)換。
SyntaxAnalyzerResultPtr SyntaxAnalyzer::analyze() { // 剔除冗余列 removeDuplicateColumns(result.source_columns);// 根據(jù)settings中enable_optimize_predicate_expression配置判斷是否進(jìn)行謂詞下移 replaceJoinedTable(node);// 根據(jù)settings中distributed_product_mode配置重寫(xiě)IN 與 JOIN 表達(dá)式 InJoinSubqueriesPreprocessor(context).visit(query);// 優(yōu)化Query內(nèi)部的布爾表達(dá)式 LogicalExpressionsOptimizer().perform();// 創(chuàng)建一個(gè)從別名到AST節(jié)點(diǎn)的映射字典 QueryAliasesVisitor(query_aliases_data, log.stream()).visit(query);// 公共子表達(dá)式的消除 QueryNormalizer(normalizer_data).visit(query);// 消除select從句后的冗余列 removeUnneededColumnsFromSelectClause(select_query, required_result_columns, remove_duplicates);// 執(zhí)行標(biāo)量子查詢,并且用常量替代標(biāo)量子查詢結(jié)果 executeScalarSubqueries(query, context, subquery_depth);// 如果是select語(yǔ)句還會(huì)做下列優(yōu)化:// 謂詞下移優(yōu)化 PredicateExpressionsOptimizer(select_query, settings, context).optimize();/// GROUP BY 從句的優(yōu)化 optimizeGroupBy(select_query, source_columns_set, context);/// ORDER BY 從句的冗余項(xiàng)剔除 optimizeOrderBy(select_query);/// LIMIT BY 從句的冗余列剔除 optimizeLimitBy(select_query);/// USING語(yǔ)句的冗余列剔除 optimizeUsing(select_query);}這里挑選幾個(gè)簡(jiǎn)單介紹一下:
? 公共子表達(dá)式消除(Common Subexpression Elimination)
如果表達(dá)式?x op y?先前被計(jì)算過(guò),并且從先前的計(jì)算到現(xiàn)在其計(jì)算表達(dá)式對(duì)應(yīng)的值沒(méi)有改變,那么 x op y 就稱為公共子表達(dá)式。公共子表達(dá)式消除會(huì)搜索所有相同計(jì)算表達(dá)式的實(shí)例,并分析是否值得用保存計(jì)算值的單個(gè)變量來(lái)替換它們,以減少計(jì)算的開(kāi)銷。
? 標(biāo)量子查詢(Scala Subquery)的常量替換
標(biāo)量子查詢就是返回單一值的子查詢,和公共子表達(dá)式消除相似,可以用常量來(lái)替換SQL中所有的標(biāo)量子查詢結(jié)果以減少計(jì)算開(kāi)銷。
? 謂詞下移(Predicate Pushdown)
把外層查詢塊中的WHERE子句的謂詞下移到較低層查詢塊如視圖,以盡可能把過(guò)濾數(shù)據(jù)的操作移動(dòng)到靠近數(shù)據(jù)源的位置。提前進(jìn)行數(shù)據(jù)過(guò)濾能夠大幅減少網(wǎng)絡(luò)傳輸或者內(nèi)存讀取訪問(wèn)的數(shù)據(jù)量,以提高查詢效率。
而?query_analyzer?的作用可以理解為解析優(yōu)化重寫(xiě)后的AST,然后對(duì)所要進(jìn)行的操作組成一條操作鏈,即物理執(zhí)行計(jì)劃,如:
ExpressionActionsChain chain; analyzer.appendWhere(chain); chain.addStep(); analyzer.appendSelect(chain); analyzer.appendOrderBy(chain); chain.finalize();上述代碼把where,select,orderby操作都加入到操作鏈中,接下來(lái)就可以從Storage層讀取Block,對(duì)Block數(shù)據(jù)應(yīng)用上述操作鏈的操作。而執(zhí)行的核心邏輯,就在對(duì)應(yīng)Interpreter的?executeImpl?方法實(shí)現(xiàn)中,這里以select語(yǔ)句的Interpreter來(lái)了解下讀取Block數(shù)據(jù)并且對(duì)block數(shù)據(jù)進(jìn)行相應(yīng)操作的流程。
void InterpreterSelectQuery::executeImpl(TPipeline & pipeline, const BlockInputStreamPtr & prepared_input) { // 對(duì)應(yīng)Query的AST auto & query = getSelectQuery();AnalysisResult expressions; // 物理計(jì)劃,判斷表達(dá)式是否有where,aggregate,having,order_by,litmit_by等字段 expressions = analyzeExpressions( getSelectQuery(), *query_analyzer, QueryProcessingStage::FetchColumns, options.to_stage, context, storage, true, filter_info);// 從Storage讀取數(shù)據(jù) executeFetchColumns(from_stage, pipeline, sorting_info, expressions.prewhere_info, expressions.columns_to_remove_after_prewhere);// eg:根據(jù)SQL的關(guān)鍵字在BlockStream流水線中執(zhí)行相應(yīng)的操作, 如where,aggregate,distinct都分別由一個(gè)函數(shù)負(fù)責(zé)執(zhí)行 executeWhere(pipeline, expressions.before_where, expressions.remove_where_filter);executeAggregation(pipeline, expressions.before_aggregation, aggregate_overflow_row, aggregate_final);executeDistinct(pipeline, true, expressions.selected_columns);}既然我們知道了執(zhí)行計(jì)劃AnalysisResult(即物理執(zhí)行計(jì)劃),接下來(lái)就需要從storage層中讀取數(shù)據(jù)來(lái)執(zhí)行對(duì)應(yīng)的操作,核心邏輯在?executeFetchColumns?中: 核心操作就是從storage層讀取所要處理列的Block,并組織成BlockStream。
void InterpreterSelectQuery::executeFetchColumns( QueryProcessingStage::Enum processing_stage, TPipeline & pipeline, const SortingInfoPtr & sorting_info, const PrewhereInfoPtr & prewhere_info, const Names & columns_to_remove_after_prewhere) { // 實(shí)例化Block Stream auto streams = storage->read(required_columns, query_info, context, processing_stage, max_block_size, max_streams) // 讀取列對(duì)應(yīng)的Block,并且組織成Block Stream streams = {std::make_shared<NullBlockInputStream>(storage->getSampleBlockForColumns(required_columns))}; streams.back() = std::make_shared<ExpressionBlockInputStream>(streams.back(), query_info.prewhere_info->remove_columns_actions); }讀取完Block Stream之后就是對(duì)其執(zhí)行各種execute操作如?executeAggregation?,?executeWhere?操作,詳見(jiàn)?InterpreterSelectQuery::executeImpl?的代碼。
因此Interpreter的處理過(guò)程可以總結(jié)為:
? 對(duì)AST進(jìn)行優(yōu)化重寫(xiě)
? 解析重寫(xiě)后的AST并生成操作鏈(執(zhí)行計(jì)劃)
? 從存儲(chǔ)引擎中讀取要處理的Block數(shù)據(jù)
? 對(duì)讀取的Block數(shù)據(jù)應(yīng)用操作鏈上的操作
那我們讀取Block Stream并進(jìn)行處理后,生成的結(jié)果如何寫(xiě)回到storage層呢? 我們這里以insert語(yǔ)句的Interpreter來(lái)了解下:
BlockIO InterpreterInsertQuery::execute() { // table為存儲(chǔ)引擎接口 StoragePtr table = getTable(query); BlockOutputStreamPtr out;// 從存儲(chǔ)引擎讀取Block Stream auto query_sample_block = getSampleBlock(query, table); out = std::make_shared<AddingDefaultBlockOutputStream>( out, query_sample_block, out->getHeader(), table->getColumns().getDefaults(), context);//執(zhí)行結(jié)果封裝成BlockIO BlockIO res; res.out = std::move(out); }上面代碼中的StoragePtr實(shí)際上就是IStorage這個(gè)存儲(chǔ)引擎的接口
???????using StoragePtr = std::shared_ptr<IStorage>;無(wú)論是寫(xiě)入還是讀取操作都是依靠底層存儲(chǔ)引擎(如MergeTree)的write和read接口來(lái)實(shí)現(xiàn)的,關(guān)于存儲(chǔ)引擎的細(xì)節(jié)實(shí)現(xiàn)這里暫時(shí)不贅述,這里我們只需要知道我們從存儲(chǔ)引擎接口中以流方式讀取Block數(shù)據(jù),而結(jié)果組織成BlockIO流輸出。Interpreter的流程總結(jié)如下:
04?返回請(qǐng)求結(jié)果
?
TCPHandler::runImpl?中,執(zhí)行完?executeQuery?之后需要調(diào)用各種processQuery的方法來(lái)給client返回執(zhí)行SQL后的結(jié)果。
我們以?TCPHandler::processOrdinaryQuery?為例做簡(jiǎn)單分析:
Server負(fù)責(zé)在?sendData?函數(shù)中把輸出結(jié)果寫(xiě)入到套接字輸出緩沖區(qū)中,client只要從這個(gè)輸出緩沖區(qū)讀取就能夠得到結(jié)果。
void TCPHandler::sendData(const Block & block) { //初始化OutputStream的參數(shù) initBlockOutput(block);// 調(diào)用BlockOutputStream的write函數(shù),把Block寫(xiě)到輸出流 state.block_out->write(block); state.maybe_compressed_out->next(); out->next(); }?
02?結(jié)語(yǔ)
了解ClickHouse背后SQL的查詢整個(gè)流程,不僅能讓數(shù)據(jù)庫(kù)使用者更清晰地認(rèn)識(shí)到如何編寫(xiě)最優(yōu)化的SQL,也能夠讓數(shù)據(jù)庫(kù)內(nèi)核開(kāi)發(fā)者加深對(duì)數(shù)據(jù)庫(kù)體系結(jié)構(gòu)的理解,提高開(kāi)發(fā)效率。
本文并沒(méi)有涉及到太深入的技術(shù)細(xì)節(jié),諸如向量化執(zhí)行引擎,SIMD,基于llvm的動(dòng)態(tài)代碼生成,類MergeTree存儲(chǔ)引擎等CK的技術(shù)細(xì)節(jié)也沒(méi)有提及,只是從宏觀角度給讀者介紹了執(zhí)行SQL背后內(nèi)核到底發(fā)生了什么。后續(xù)我們會(huì)推出更多內(nèi)核源碼解讀文章,敬請(qǐng)關(guān)注。
總結(jié)
以上是生活随笔為你收集整理的深度 | 一条查询SQL的前世今生 —— ClickHouse 源码阅读的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: 厉害了,如何通过双 key 来解决缓存并
- 下一篇: Java 线程池中的线程复用是如何实现的