ClickHouse源码阅读(0000 0110) —— 使用ReplicatedMergeTree引擎时的副本选择问题
在使用ReplicatedMergeTree引擎和Distributed引擎的時候,對于同一張表,服務(wù)器上存在多個副本,在查詢數(shù)據(jù)的時候,是如何在這些副本之間進(jìn)行選擇的呢?結(jié)合源碼來試著分析一下...
對于一條SELECT SQL,從以下方法開始:
pipeline.streams = storage->read(required_columns, query_info, context, processing_stage, max_block_size, max_streams);如圖:
進(jìn)入到這個方法中,主要步驟包括修改AST(修改表名)、物化header、構(gòu)建select_stream_factory等,最后執(zhí)行ClusterProxy::executeQuery()方法,簡化代碼如下:
BlockInputStreams StorageDistributed::read(const Names & /*column_names*/,const SelectQueryInfo &query_info,const Context &context,QueryProcessingStage::Enum processed_stage,const size_t /*max_block_size*/,const unsigned /*num_streams*/) {auto cluster = getCluster();const Settings &settings = context.getSettingsRef();//修改AST, 修改表名const auto &modified_query_ast = rewriteSelectQuery(......);//header, 應(yīng)該是列名那行, 即Structure of query resultBlock header = materializeBlock(......);ClusterProxy::SelectStreamFactory select_stream_factory = ......;......//重點(diǎn)方法return ClusterProxy::executeQuery(select_stream_factory, cluster, modified_query_ast, context, settings);}進(jìn)入ClusterProxy::executeQuery()方法, 會設(shè)置網(wǎng)絡(luò)帶寬限制,然后遍歷數(shù)據(jù)的所有分片,對每個分片執(zhí)行createForShard()方法,簡化代碼如下:
BlockInputStreams executeQuery(IStreamFactory & stream_factory, const ClusterPtr & cluster,const ASTPtr & query_ast, const Context & context, const Settings & settings) {BlockInputStreams res;const std::string query = queryToString(query_ast);....../// Network bandwidth limit, if needed. 網(wǎng)絡(luò)帶寬限制ThrottlerPtr throttler;if (settings.max_network_bandwidth || settings.max_network_bytes){......}//遍歷數(shù)據(jù)的所有的分片,針對每個分片for (const auto & shard_info : cluster->getShardsInfo())stream_factory.createForShard(shard_info, query, query_ast, new_context, throttler, res);return res; }進(jìn)入到createForShard()方法,代碼邏輯還是比較清晰的,先定義了emplace_local_stream和emplace_remote_stream,然后根據(jù)prefer_localhost_replica和shard_info.isLocal()這兩個條件判斷使用local_stream還是remotr_stream,簡化代碼如下:
//遍歷數(shù)據(jù)的所有的分片,針對每個分片void SelectStreamFactory::createForShard(const Cluster::ShardInfo &shard_info,const String &query, const ASTPtr &query_ast,const Context &context, const ThrottlerPtr &throttler,BlockInputStreams &res) {auto emplace_local_stream = [&]()//將 數(shù)據(jù)流 放在本地{res.emplace_back(createLocalStream(query_ast, context, processed_stage));};auto emplace_remote_stream = [&]()//將 數(shù)據(jù)流 發(fā)送給遠(yuǎn)程服務(wù)器{//構(gòu)建RemoteBlockInputStreamauto stream = std::make_shared<RemoteBlockInputStream>(shard_info.pool, query, header, context, nullptr,throttler, external_tables, processed_stage);stream->setPoolMode(PoolMode::GET_MANY);if (!table_func_ptr)stream->setMainTable(main_table);res.emplace_back(std::move(stream));};const auto &settings = context.getSettingsRef();//prefer_localhost_replica = 1 且 本地服務(wù)器上存在這個分片(shard_info.isLocal() = true), 就使用本地分片數(shù)據(jù).// 如果本地服務(wù)器上沒有這個分片, 則只能連接遠(yuǎn)程獲取該分片的數(shù)據(jù)emplace_remote_stream//prefer_localhost_replica = 0 則 連接遠(yuǎn)程獲取該分片的數(shù)據(jù)emplace_remote_streamif (settings.prefer_localhost_replica && shard_info.isLocal()) {......} elseemplace_remote_stream();}注意:可以具體看下shard_info.isLocal()方法的具體實(shí)現(xiàn)。
?
下面分開分析,先分析滿足settings.prefer_localhost_replica && shard_info.isLocal()條件的情況,主要流程包括:
1-判斷本地服務(wù)器的這個分片上有沒有這個表;
2-判斷這個表是不是用的復(fù)制表引擎;
3-如果本地分片存在這個表,且這個表是復(fù)制表,那么就該考慮副本的延遲問題了。
4-獲取本地副本的延遲和配置的可允許的最大延遲時間,兩者比較。如果本地副本的延遲時間小于max_allowed_delay, 說明本地副本是可以使用的,否則認(rèn)為本地副本已經(jīng)過期了。
5-如果本地副本已經(jīng)過期了,則看fallback_to_stale_replicas_for_distributed_queries這個配置參數(shù),是不是允許使用過期的副本。
6-如果不允許使用過期的副本,即設(shè)置了fallback_to_stale_replicas_for_distributed_queries=0,則看當(dāng)前分片是不是有遠(yuǎn)程副本分片,如果有則使用遠(yuǎn)程分片;如果沒有則拋出異常;
7-如果允許使用過期的副本,即設(shè)置了fallback_to_stale_replicas_for_distributed_queries=1,且當(dāng)前分片沒有遠(yuǎn)程副本分片,則使用本地分片過期的數(shù)據(jù);
8-如果允許使用過期的副本,且當(dāng)前分片也有遠(yuǎn)程副本分片,就先嘗試使用遠(yuǎn)程副本, 但如果它們也過期了, 則退回使用本地副本 。懶洋洋地做這件事以避免在主線程中連接(惰性創(chuàng)建連接)(后面有部分代碼還沒有仔細(xì)看)。
基本判斷邏輯就是這樣了。帶注釋的代碼如下:
//運(yùn)行到這里表示本地服務(wù)器上存在這個分片StoragePtr main_table_storage;//根據(jù)庫名表名在本地服務(wù)器的分片上的找到需要查詢的這個表if (table_func_ptr) {const auto *table_function = table_func_ptr->as<ASTFunction>();main_table_storage = TableFunctionFactory::instance().get(table_function->name, context)->execute(table_func_ptr, context);} elsemain_table_storage = context.tryGetTable(main_table.database, main_table.table);//本地服務(wù)器的這個分片上沒有這個表if (!main_table_storage) /// Table is absent on a local server.{ProfileEvents::increment(ProfileEvents::DistributedConnectionMissingTable);//shard_info.hasRemoteConnections() 本次查詢是否是遠(yuǎn)程發(fā)過來的(是否有遠(yuǎn)程副本), 如果是則需要emplace_remote_streamif (shard_info.hasRemoteConnections()) {LOG_WARNING(&Logger::get("ClusterProxy::SelectStreamFactory"),"There is no table " << main_table.database << "." << main_table.table<< " on local replica of shard " << shard_info.shard_num<< ", will try remote replicas.");emplace_remote_stream();} elseemplace_local_stream(); /// Let it fail the usual way.return;}//運(yùn)行到這里, 表示這個表在本地服務(wù)器的這個分片上//通過一個動態(tài)轉(zhuǎn)換來判斷這個表有沒有副本const auto *replicated_storage = dynamic_cast<const StorageReplicatedMergeTree *>(main_table_storage.get());//該分片在遠(yuǎn)程沒有副本, 只能使用本地分片數(shù)據(jù)if (!replicated_storage) {/// Table is not replicated, use local server.emplace_local_stream();return;}// 代碼運(yùn)行到這里, 說明對于當(dāng)前分片, 本地服務(wù)器上有這個表, 且該分片也有遠(yuǎn)程副本.// 那應(yīng)該怎么選呢?// 到了這一步就需要考慮應(yīng)該選擇本地的還是遠(yuǎn)程的了// 如果設(shè)置了max_replica_delay_for_distributed_queries(分布式查詢的最大副本延遲)這個參數(shù), 則復(fù)制表的分布式查詢將選擇復(fù)制延遲時間(秒)小于指定值(不包括該指定值)的服務(wù)器.// 零意味著不考慮延遲。UInt64 max_allowed_delay = settings.max_replica_delay_for_distributed_queries;//沒有設(shè)置max_allowed_delay這個參數(shù), 則不考慮延遲, 優(yōu)先使用本地分片數(shù)據(jù)if (!max_allowed_delay) {emplace_local_stream();return;}//設(shè)置了max_allowed_delay這個參數(shù), 就先獲取副本的絕對延遲 (這里獲取的應(yīng)該是本地副本的延遲時間)UInt32 local_delay = replicated_storage->getAbsoluteDelay();//如果本地副本的延遲時間小于max_allowed_delay, 說明本地副本是可以使用的if (local_delay < max_allowed_delay) {emplace_local_stream();return;}/// If we reached this point, local replica is stale.// 如果代碼執(zhí)行到這里, 表示本地副本已經(jīng)過期了(復(fù)制延遲時間 >= 指定值300s)ProfileEvents::increment(ProfileEvents::DistributedConnectionStaleReplica);LOG_WARNING(&Logger::get("ClusterProxy::SelectStreamFactory"),"Local replica of shard " << shard_info.shard_num << " is stale (delay: " << local_delay<< "s.)");// 代碼運(yùn)行到這里表示設(shè)置了max_replica_delay_for_distributed_queries(分布式查詢的最大副本延遲)這個參數(shù), 并且本地的副本已過期了.// 如果設(shè)置了fallback_to_stale_replicas_for_distributed_queries=0, 表示不允許使用過期的副本,// 則將查看當(dāng)前分片是不是有遠(yuǎn)程副本, 如果由則使用遠(yuǎn)程副本, 如果沒有則報錯if (!settings.fallback_to_stale_replicas_for_distributed_queries) {if (shard_info.hasRemoteConnections()) {/// If we cannot fallback, then we cannot use local replica. Try our luck with remote replicas.emplace_remote_stream();return;} elsethrow Exception("Local replica of shard " + toString(shard_info.shard_num)+ " is stale (delay: " + toString(local_delay) + "s.), but no other replica configured",ErrorCodes::ALL_REPLICAS_ARE_STALE);}// 如果設(shè)置了fallback_to_stale_replicas_for_distributed_queries=1, 表示允許使用過期的副本,// 再判斷當(dāng)前分片是不是有遠(yuǎn)程副本, 如果沒有遠(yuǎn)程副本. 則只能使用本地過期的副本if (!shard_info.hasRemoteConnections()) {/// There are no remote replicas but we are allowed to fall back to stale local replica.emplace_local_stream();return;}//代碼運(yùn)行到這里表示允許使用過期的副本, 且遠(yuǎn)程也有副本/// Try our luck with remote replicas, but if they are stale too, then fallback to local replica./// Do it lazily to avoid connecting in the main thread.//于是就先嘗試使用遠(yuǎn)程副本, 但如果它們也過期了, 則退回使用本地副本//懶洋洋地做這件事以避免在主線程中連接(惰性創(chuàng)建連接)//惰性創(chuàng)建數(shù)據(jù)流(類比spark中, 一次行動操作觸發(fā)一次計算), 這里也是, 先捋清底層數(shù)據(jù)都有哪些, 再創(chuàng)建streamauto lazily_create_stream = [pool = shard_info.pool, shard_num = shard_info.shard_num, query, header = header, query_ast, context, throttler,main_table = main_table, table_func_ptr = table_func_ptr, external_tables = external_tables, stage = processed_stage,local_delay]()-> BlockInputStreamPtr {std::vector<ConnectionPoolWithFailover::TryResult> try_results;try {if (table_func_ptr)try_results = pool->getManyForTableFunction(&context.getSettingsRef(), PoolMode::GET_MANY);elsetry_results = pool->getManyChecked(&context.getSettingsRef(), PoolMode::GET_MANY,main_table);}catch (const Exception &ex) {if (ex.code() == ErrorCodes::ALL_CONNECTION_TRIES_FAILED)LOG_WARNING(&Logger::get("ClusterProxy::SelectStreamFactory"),"Connections to remote replicas of local shard " << shard_num<< " failed, will use stale local replica");elsethrow;}double max_remote_delay = 0.0;for (const auto &try_result : try_results) {if (!try_result.is_up_to_date)max_remote_delay = std::max(try_result.staleness, max_remote_delay);}if (try_results.empty() || local_delay < max_remote_delay)return createLocalStream(query_ast, context, stage);else {std::vector<IConnectionPool::Entry> connections;connections.reserve(try_results.size());for (auto &try_result : try_results)connections.emplace_back(std::move(try_result.entry));return std::make_shared<RemoteBlockInputStream>(std::move(connections), query, header, context, nullptr, throttler, external_tables,stage);}};res.emplace_back(std::make_shared<LazyBlockInputStream>("LazyShardWithLocalReplica", header,lazily_create_stream));對于使用遠(yuǎn)程副本的情況,先看下定義的emplace_remote_stream,代碼如下:
auto emplace_remote_stream = [&]()//將 數(shù)據(jù)流 發(fā)送給遠(yuǎn)程服務(wù)器{//構(gòu)建RemoteBlockInputStreamauto stream = std::make_shared<RemoteBlockInputStream>(shard_info.pool, query, header, context, nullptr,throttler, external_tables, processed_stage);stream->setPoolMode(PoolMode::GET_MANY);if (!table_func_ptr)stream->setMainTable(main_table);res.emplace_back(std::move(stream));};關(guān)鍵在于構(gòu)建RemoteBlockInputStream。注意shard_info.pool的類型是ConnectionPoolWithFailoverPtr(具有容錯功能的連接池)。進(jìn)一步找到構(gòu)建RemoteBlockInputStream的方法:
RemoteBlockInputStream::RemoteBlockInputStream(const ConnectionPoolWithFailoverPtr &pool,const String &query_, const Block &header_, const Context &context_, const Settings *settings,const ThrottlerPtr &throttler, const Tables &external_tables_, QueryProcessingStage::Enum stage_): header(header_), query(query_), context(context_), external_tables(external_tables_), stage(stage_) {if (settings)context.setSettings(*settings);//創(chuàng)建多路連接create_multiplexed_connections = [this, pool, throttler]() {const Settings ¤t_settings = context.getSettingsRef();std::vector<IConnectionPool::Entry> connections;if (main_table) {//如果限定了表名(沒有使用remote表函數(shù)的情況)auto try_results = pool->getManyChecked(¤t_settings, pool_mode, *main_table);connections.reserve(try_results.size());for (auto &try_result : try_results)connections.emplace_back(std::move(try_result.entry));} else//對于使用了remote表函數(shù)的情況connections = pool->getMany(¤t_settings, pool_mode);return std::make_unique<MultiplexedConnections>(std::move(connections), current_settings, throttler);};}其中,對于沒有使用表函數(shù)的情況,pool->getManyChecked()這個方法是重點(diǎn)。
?
好了,這篇文章已經(jīng)很長了,就先到這兒,剩余的內(nèi)容到下篇文章中吧。
總結(jié)
以上是生活随笔為你收集整理的ClickHouse源码阅读(0000 0110) —— 使用ReplicatedMergeTree引擎时的副本选择问题的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 绝地求生刺激战场亚服务器要维护多久,绝地
- 下一篇: js的基本语法