Citus中的分片策略:Append Distribution追加分配
翻譯自《Citus官方文檔》
Append Distribution追加分配
Append distribution is a specialized technique which requires care to use efficiently. Hash distribution is a better choice for most situations.
追加分配是需要謹慎使用的專門技術。散列分布是大多數情況下更好的選擇。
While Citus’ most common use cases involve hash data distribution, it can also distribute timeseries data across a variable number of shards by their order in time. This section provides a short reference to loading, deleting, and maninpulating timeseries data.
雖然Citus最常見的用例涉及散列數據分布,但它也可以按時間順序在可變數量的碎片上分布時間序列數據。 本節提供加載,刪除和操作時間序列數據的簡短參考。
As the name suggests, append based distribution is more suited to append-only use cases. This typically includes event based data which arrives in a time-ordered series. You can then distribute your largest tables by time, and batch load your events into Citus in intervals of N minutes. This data model can be generalized to a number of time series use cases; for example, each line in a website’s log file, machine activity logs or aggregated website events. Append based distribution supports more efficient range queries. This is because given a range query on the distribution key, the Citus query planner can easily determine which shards overlap that range and send the query to only to relevant shards.
顧名思義,基于append的分發更適合于追加用例。 這通常包括以時間排序的系列到達的基于事件的數據。 然后,您可以按時間分配最大的表格,并以N分鐘的時間間隔將您的活動批量加載到Citus中。 這個數據模型可以推廣到許多時間序列用例;例如網站日志文件中的每一行,機器活動日志或聚合的網站事件。 追加基于分布支持更有效的范圍查詢。這是因為通過對分發密鑰進行范圍查詢,Citus查詢計劃人員可以輕松確定哪些分片與該范圍重疊,并將查詢僅發送給相關的分片。
Hash based distribution is more suited to cases where you want to do real-time inserts along with analytics on your data or want to distribute by a non-ordered column (eg. user id). This data model is relevant for real-time analytics use cases; for example, actions in a mobile application, user website events, or social media analytics. In this case, Citus will maintain minimum and maximum hash ranges for all the created shards. Whenever a row is inserted, updated or deleted, Citus will redirect the query to the correct shard and issue it locally. This data model is more suited for doing co-located joins and for queries involving equality based filters on the distribution column.
基于散列的分布更適合于您想要對數據進行實時插入以及對數據進行分析的情況,或者希望通過無序列(例如,用戶標識)進行分發的情況。 該數據模型與實時分析用例相關; 例如,移動應用程序中的操作,用戶網站事件或社交媒體分析。 在這種情況下,Citus將為所有創建的碎片維護最小和最大散列范圍。 無論何時插入,更新或刪除行,Citus都會將查詢重定向到正確的分片并在本地發布。 此數據模型更適合于執行共址連接以及在分布列上使用基于等式的過濾器的查詢。
Citus uses slightly different syntaxes for creation and manipulation of append and hash distributed tables. Also, the operations supported on the tables differ based on the distribution method chosen. In the sections that follow, we describe the syntax for creating append distributed tables, and also describe the operations which can be done on them.
Citus使用稍微不同的語法來創建和操作append和hash分布表。 此外,表支持的操作因所選分配方法而異。 在接下來的部分中,我們描述創建附加分布表的語法,并描述可以在其上執行的操作。
Creating and Distributing Tables創建和分配表格
The instructions below assume that the PostgreSQL installation is in your path. If not, you will need to add it to your PATH environment variable. For example:
以下說明假定PostgreSQL安裝位于您的路徑中。 如果沒有,您需要將其添加到PATH環境變量中。 例如:
We use the github events dataset to illustrate the commands below. You can download that dataset by running:
我們使用github事件數據集來說明下面的命令。 您可以通過運行下載該數據集:
To create an append distributed table, you need to first define the table schema. To do so, you can define a table using the CREATE TABLE statement in the same way as you would do with a regular PostgreSQL table.
要創建追加分布表,您需要先定義表格模式。 為此,您可以像使用常規PostgreSQL表一樣使用CREATE TABLE語句定義表。
Next, you can use the create_distributed_table() function to mark the table as an append distributed table and specify its distribution column.
接下來,您可以使用create_distributed_table()函數將該表標記為追加分布表并指定其分布列。
This function informs Citus that the github_events table should be distributed by append on the created_at column. Note that this method doesn’t enforce a particular distribution; it merely tells the database to keep minimum and maximum values for the created_at column in each shard which are later used by the database for optimizing queries.
該函數通知Citus github_events表應該通過追加在created_at列上進行分配。 請注意,此方法不強制執行特定的分配; 它只是告訴數據庫為每個分片中的created_at列保留最小值和最大值,稍后由數據庫用于優化查詢。
Expiring Data到期數據
In append distribution, users typically want to track data only for the last few months / years. In such cases, the shards that are no longer needed still occupy disk space. To address this, Citus provides a user defined function master_apply_delete_command() to delete old shards. The function takes a DELETE command as input and deletes all the shards that match the delete criteria with their metadata.
在附加分發中,用戶通常只想跟蹤過去幾個月/年的數據。 在這種情況下,不再需要的碎片仍占用磁盤空間。為了解決這個問題,Citus提供了一個用戶定義的函數master_apply_delete_command()來刪除舊的分片。 該函數將DELETE命令作為輸入,并刪除與刪除條件及其元數據匹配的所有碎片。
The function uses shard metadata to decide whether or not a shard needs to be deleted, so it requires the WHERE clause in the DELETE statement to be on the distribution column. If no condition is specified, then all shards are selected for deletion. The UDF then connects to the worker nodes and issues DROP commands for all the shards which need to be deleted. If a drop query for a particular shard replica fails, then that replica is marked as TO DELETE. The shard replicas which are marked as TO DELETE are not considered for future queries and can be cleaned up later.
該函數使用分片元數據來決定是否需要刪除分片,因此它要求DELETE語句中的WHERE子句位于分布列上。 如果沒有指定條件,則選擇所有分片進行刪除。 然后,UDF連接到工作節點,并為需要刪除的所有分片發出DROP命令。 如果特定分片副本的刪除查詢失敗,則該副本被標記為“刪除”。 標記為TO DELETE的分片副本不會被考慮用于將來的查詢,并且可以稍后進行清理。
The example below deletes those shards from the github_events table which have all rows with created_at >= ‘2015-01-01 00:00:00’. Note that the table is distributed on the created_at column.
下面的示例從github_events表中刪除那些包含created_at> =’2015-01-01 00:00:00’的所有行的碎片。 請注意,該表分布在created_at列中。
To learn more about the function, its arguments and its usage, please visit the Citus Utility Function Reference section of our documentation. Please note that this function only deletes complete shards and not individual rows from shards. If your use case requires deletion of individual rows in real-time, see the section below about deleting data.
要了解更多關于功能,參數和用法的信息,請訪問我們文檔中的Citus Utility函數參考部分。 請注意,此功能只會刪除完整的碎片,而不會從碎片中刪除單獨的行。 如果您的用例需要實時刪除單個行,請參閱以下有關刪除數據的部分。
Deleting Data刪除數據
The most flexible way to modify or delete rows throughout a Citus cluster with regular SQL statements:
使用常規SQL語句修改或刪除整個Citus集群中行的最靈活的方法:
DELETE FROM github_events WHERE created_at >= ‘2015-01-01 00:03:00’;
Unlike master_apply_delete_command, standard SQL works at the row- rather than shard-level to modify or delete all rows that match the condition in the where clause. It deletes rows regardless of whether they comprise an entire shard.
與master_apply_delete_command不同,標準SQL在行而不是分片級上修改或刪除與where子句中的條件匹配的所有行。 它會刪除行,而不管它們是否包含整個分片。
Dropping Tables刪除表
You can use the standard PostgreSQL DROP TABLE command to remove your append distributed tables. As with regular tables, DROP TABLE removes any indexes, rules, triggers, and constraints that exist for the target table. In addition, it also drops the shards on the worker nodes and cleans up their metadata.
您可以使用標準的PostgreSQL DROP TABLE命令刪除附加的分布式表。 與常規表一樣,DROP TABLE刪除目標表存在的所有索引,規則,觸發器和約束。 此外,它還會刪除工作節點上的碎片并清理其元數據。
DROP TABLE github_events;
Data Loading數據加載
Citus supports two methods to load data into your append distributed tables. The first one is suitable for bulk loads from files and involves using the \copy command. For use cases requiring smaller, incremental data loads, Citus provides two user defined functions. We describe each of the methods and their usage below.
Citus支持兩種將數據加載到附加分布表的方法。 第一個適用于文件的批量加載,并涉及使用\copy命令。 對于需要更小的增量數據加載的用例,Citus提供了兩個用戶定義的函數。 我們在下面描述每種方法及其用法。
Bulk load using \copy使用\copy批量加載
The \copy command is used to copy data from a file to a distributed table while handling replication and failures automatically. You can also use the server side COPY command. In the examples, we use the \copy command from psql, which sends a COPY .. FROM STDIN to the server and reads files on the client side, whereas COPY from a file would read the file on the server.
\copy命令用于將數據從文件復制到分布式表中,同時自動處理復制和失敗。 你也可以使用服務器端的COPY命令。 在這些示例中,我們使用psql的\copy命令,該命令將COPY .. FROM STDIN發送到服務器,并在客戶端讀取文件,而來自文件的COPY會讀取服務器上的文件。
You can use \copy both on the coordinator and from any of the workers. When using it from the worker, you need to add the master_host option. Behind the scenes, \copy first opens a connection to the coordinator using the provided master_host option and uses master_create_empty_shard to create a new shard. Then, the command connects to the workers and copies data into the replicas until the size reaches shard_max_size, at which point another new shard is created. Finally, the command fetches statistics for the shards and updates the metadata.
您可以在協調節點和任何工作節點上使用\copy。從工作節點中使用它時,需要添加master_host選項。在幕后,\copy首先使用提供的master_host選項打開到協調節點的連接,并使用master_create_empty_shard創建新的分片。 然后,該命令連接到工作節點并將數據復制到副本中,直到大小達到shard_max_size,此時會創建另一個新的碎片。 最后,該命令獲取分片的統計信息并更新元數據。
Citus assigns a unique shard id to each new shard and all its replicas have the same shard id. Each shard is represented on the worker node as a regular PostgreSQL table with name ‘tablename_shardid’ where tablename is the name of the distributed table and shardid is the unique id assigned to that shard. One can connect to the worker postgres instances to view or run commands on individual shards.
Citus為每個新分片分配一個唯一的分片ID,并且其所有副本具有相同的分片ID。 每個分片在工作節點上表示為名為’tablename_shardid’的普通PostgreSQL表,其中tablename是分布式表的名稱,shardid是分配給該分片的唯一標識。 可以連接到worker postgres實例來查看或運行單個分片上的命令。
By default, the \copy command depends on two configuration parameters for its behavior. These are called citus.shard_max_size and citus.shard_replication_factor.
默認情況下,\copy命令依賴于其行為的兩個配置參數。 這些被稱為citus.shard_max_size和citus.shard_replication_factor。
The configuration setting citus.shard_replication_factor can only be set on the coordinator node.
配置設置citus.shard_replication_factor只能在協調節點上設置。
Please note that you can load several files in parallel through separate database connections or from different nodes. It is also worth noting that \copy always creates at least one shard and does not append to existing shards. You can use the method described below to append to previously created shards.
請注意,您可以通過單獨的數據庫連接或從不同的節點并行加載多個文件。 還值得注意的是,\copy總是創建至少一個分片,并且不會附加到現有的分片上。 您可以使用下面描述的方法追加到以前創建的分片。
There is no notion of snapshot isolation across shards, which means that a multi-shard SELECT that runs concurrently with a COPY might see it committed on some shards, but not on others. If the user is storing events data, he may occasionally observe small gaps in recent data. It is up to applications to deal with this if it is a problem (e.g. exclude the most recent data from queries, or use some lock).
在分片之間沒有快照隔離的概念,這意味著與COPY同時運行的多分片SELECT可能會在某些分片上看到它提交,但在其他分片上沒有。 如果用戶正在存儲事件數據,他可能偶爾會觀察到最近數據中的小差距。 如果應用程序出現問題(例如排除查詢中的最新數據或使用某些鎖定),則由應用程序處理。
If COPY fails to open a connection for a shard placement then it behaves in the same way as INSERT, namely to mark the placement(s) as inactive unless there are no more active placements. If any other failure occurs after connecting, the transaction is rolled back and thus no metadata changes are made.
如果COPY無法為分片展示位置打開連接,那么它的行為方式與INSERT相同,即將展示位置標記為非活動狀態,除非沒有更多的活動展示位置。 如果連接后發生任何其他故障,事務將回滾,因此不會進行元數據更改。
Incremental loads by appending to existing shards通過附加到現有碎片增量加載
The \copy command always creates a new shard when it is used and is best suited for bulk loading of data. Using \copy to load smaller data increments will result in many small shards which might not be ideal. In order to allow smaller, incremental loads into append distributed tables, Citus provides 2 user defined functions. They are master_create_empty_shard() and master_append_table_to_shard().
\copy命令在使用時總是創建一個新的分片,并且最適合批量加載數據。 使用\copy加載較小的數據增量會導致許多小碎片,這可能并不理想。 為了允許更小的增量加載到追加分布表中,Citus提供了2個用戶定義的函數。 它們是master_create_empty_shard()和master_append_table_to_shard()。
master_create_empty_shard() can be used to create new empty shards for a table. This function also replicates the empty shard to citus.shard_replication_factor number of nodes like the \copy command.
master_create_empty_shard()可用于為表創建新的空分片。 該函數還將空分片復制到citus.shard_replication_factor節點的數目,如\copy命令。
master_append_table_to_shard() can be used to append the contents of a PostgreSQL table to an existing shard. This allows the user to control the shard to which the rows will be appended. It also returns the shard fill ratio which helps to make a decision on whether more data should be appended to this shard or if a new shard should be created.
master_append_table_to_shard()可用于將PostgreSQL表的內容附加到現有分片。 這允許用戶控制行將被附加到的碎片。 它還返回碎片填充比率,這有助于決定是否應將更多數據附加到此碎片或者是否應創建新的碎片。
To use the above functionality, you can first insert incoming data into a regular PostgreSQL table. You can then create an empty shard using master_create_empty_shard(). Then, using master_append_table_to_shard(), you can append the contents of the staging table to the specified shard, and then subsequently delete the data from the staging table. Once the shard fill ratio returned by the append function becomes close to 1, you can create a new shard and start appending to the new one.
要使用上述功能,您可以首先將傳入數據插入常規PostgreSQL表中。 然后可以使用master_create_empty_shard()創建一個空的分片。 然后,使用master_append_table_to_shard(),可以將臨時表的內容附加到指定的分片,然后從臨時表中刪除數據。 一旦附加函數返回的分片填充率接近1,您可以創建一個新的分片并開始追加到新的分片。
To learn more about the two UDFs, their arguments and usage, please visit the Citus Utility Function Reference section of the documentation.
要詳細了解這兩個UDF,它們的論點和用法,請訪問文檔中的Citus Utility Function Reference部分。
Increasing data loading performance提高數據加載性能
The methods described above enable you to achieve high bulk load rates which are sufficient for most use cases. If you require even higher data load rates, you can use the functions described above in several ways and write scripts to better control sharding and data loading. The next section explains how to go even faster.
上述方法使您可以實現大批量加載速率,這對大多數使用情況都是足夠的。 如果您需要更高的數據加載速率,則可以通過多種方式使用上述功能,并編寫腳本以更好地控制分片和數據加載。 下一節將介紹如何更快地進行。
Scaling Data Ingestion縮放數據攝入
If your use-case does not require real-time ingests, then using append distributed tables will give you the highest ingest rates. This approach is more suitable for use-cases which use time-series data and where the database can be a few minutes or more behind.
如果你的用例不需要實時攝取,那么使用append分布表將會給你最高的攝取率。 這種方法更適用于使用時間序列數據的用例,并且數據庫可能滯后幾分鐘或更長時間。
Coordinator Node Bulk Ingestion (100k/s-200k/s) 協調節點批量攝取
To ingest data into an append distributed table, you can use the COPY command, which will create a new shard out of the data you ingest. COPY can break up files larger than the configured citus.shard_max_size into multiple shards. COPY for append distributed tables only opens connections for the new shards, which means it behaves a bit differently than COPY for hash distributed tables, which may open connections for all shards. A COPY for append distributed tables command does not ingest rows in parallel over many connections, but it is safe to run many commands in parallel.
要將數據提取到追加分布表中,可以使用COPY命令,該命令將從您提取的數據中創建一個新的碎片。 COPY可以將大于配置的citus.shard_max_size的文件分解為多個分片。 用于追加分布表的COPY只會打開新分片的連接,這意味著它與散列分布表的COPY行為有點不同,這可能會打開所有分片的連接。 追加分布表命令的COPY不會在許多連接上并行攝取行,但可以并行運行多個命令。
COPY creates new shards every time it is used, which allows many files to be ingested simultaneously, but may cause issues if queries end up involving thousands of shards. An alternative way to ingest data is to append it to existing shards using the master_append_table_to_shard function. To use master_append_table_to_shard, the data needs to be loaded into a staging table and some custom logic to select an appropriate shard is required.
COPY每次使用時都會創建新的碎片,這樣可以同時吸收許多文件,但如果查詢最終會涉及數千個碎片,則可能會導致問題。 獲取數據的另一種方法是使用master_append_table_to_shard函數將其追加到現有碎片。 要使用master_append_table_to_shard,需要將數據加載到臨時表中,并且需要一些自定義邏輯來選擇適當的分片。
An example of a shard selection function is given below. It appends to a shard until its size is greater than 1GB and then creates a new one, which has the drawback of only allowing one append at a time, but the advantage of bounding shard sizes.
下面給出了一個分片選擇函數的例子。 它附加到一個分片,直到它的大小大于1GB,然后創建一個新的分支,其缺點是一次只允許一個附加,但是限制分片大小的優點。
It may also be useful to create a sequence to generate a unique name for the staging table. This way each ingestion can be handled independently.
創建序列來為臨時表生成唯一名稱也可能很有用。 這樣每個攝取都可以獨立處理。
To learn more about the master_append_table_to_shard and master_create_empty_shard UDFs, please visit the Citus Utility Function Reference section of the documentation.
要了解關于master_append_table_to_shard和master_create_empty_shard UDF的更多信息,請訪問文檔中的Citus Utility函數參考部分。
Worker Node Bulk Ingestion (100k/s-1M/s) 工作節點批量攝取
For very high data ingestion rates, data can be staged via the workers. This method scales out horizontally and provides the highest ingestion rates, but can be more complex to use. Hence, we recommend trying this method only if your data ingestion rates cannot be addressed by the previously described methods.
對于非常高的數據攝取率,數據可以通過工作節點進行。 該方法水平擴大并提供最高的攝取率,但使用起來可能更復雜。 因此,我們建議僅當您的數據攝取率無法通過前述方法解決時才嘗試使用此方法。
Append distributed tables support COPY via the worker, by specifying the address of the coordinator in a master_host option, and optionally a master_port option (defaults to 5432). COPY via the workers has the same general properties as COPY via the coordinator, except the initial parsing is not bottlenecked on the coordinator.
通過指定master_host選項中協調節點地址以及可選的master_port選項(默認為5432),附加分布式表格支持COPY。 通過工作節點的COPY通過協調節點具有與COPY相同的一般屬性,但初始解析不是協調節點的瓶頸。
An alternative to using COPY is to create a staging table and use standard SQL clients to append it to the distributed table, which is similar to staging data via the coordinator. An example of staging a file via a worker using psql is as follows:
使用COPY的替代方法是創建登臺表并使用標準SQL客戶端將其附加到分布式表,這類似于通過協調節點登臺數據。 使用psql通過工作節點登錄文件的示例如下所示:
The example above uses a choose_underutilized_shard function to select the shard to which to append. To ensure parallel data ingestion, this function should balance across many different shards.
上面的例子使用choose_underutilized_shard函數來選擇要附加的分片。 為了確保并行數據攝入,這個函數應該平衡許多不同的分片。
An example choose_underutilized_shard function belows randomly picks one of the 20 smallest shards or creates a new one if there are less than 20 under 1GB. This allows 20 concurrent appends, which allows data ingestion of up to 1 million rows/s (depending on indexes, size, capacity).
下面的choose_underutilized_shard函數示例將隨機選取20個最小碎片中的一個,如果在1GB以下的值小于20,則會創建一個新碎片。 這允許20個并發追加,允許數據攝取高達100萬行/秒(取決于索引,大小,容量)。
A drawback of ingesting into many shards concurrently is that shards may span longer time ranges, which means that queries for a specific time period may involve shards that contain a lot of data outside of that period.
同時攝入多個碎片的缺點是碎片可能跨越較長的時間范圍,這意味著對于特定時間段的查詢可能涉及在該時段之外包含大量數據的碎片。
In addition to copying into temporary staging tables, it is also possible to set up tables on the workers which can continuously take INSERTs. In that case, the data has to be periodically moved into a staging table and then appended, but this requires more advanced scripting.
除了復制到臨時登臺表之外,還可以在可連續進行INSERT的工作人員上設置表格。 在這種情況下,數據必須定期移動到臨時表中,然后追加,但這需要更高級的腳本。
Pre-processing Data in Citus預處理數據
The format in which raw data is delivered often differs from the schema used in the database. For example, the raw data may be in the form of log files in which every line is a JSON object, while in the database table it is more efficient to store common values in separate columns. Moreover, a distributed table should always have a distribution column. Fortunately, PostgreSQL is a very powerful data processing tool. You can apply arbitrary pre-processing using SQL before putting the results into a staging table.
原始數據交付的格式通常與數據庫中使用的模式不同。 例如,原始數據可能是日志文件的形式,其中每一行都是JSON對象,而在數據庫表中,將常見值存儲在單獨的列中效率更高。 而且,分布式表格應該總是有一個分布列。 幸運的是,PostgreSQL是一個非常強大的數據處理工具。 在將結果放入登臺表之前,可以使用SQL進行任意預處理。
For example, assume we have the following table schema and want to load the compressed JSON logs from githubarchive.org:
例如,假設我們有下面的表模式,并想從githubarchive.org加載壓縮的JSON日志:
To load the data, we can download the data, decompress it, filter out unsupported rows, and extract the fields in which we are interested into a staging table using 3 commands:
要加載數據,我們可以下載數據,解壓縮數據,過濾不支持的行,并使用3個命令將我們感興趣的字段提取到臨時表中:
You can then use the master_append_table_to_shard function to append this staging table to the distributed table.
然后,您可以使用master_append_table_to_shard函數將該臨時表追加到分布式表中。
This approach works especially well when staging data via the workers, since the pre-processing itself can be scaled out by running it on many workers in parallel for different chunks of input data.
這種方法在通過工作節點分級數據時效果特別好,因為預處理本身可以通過在不同的輸入數據塊上并行運行許多工作節點來擴展。
For a more complete example, see Interactive Analytics on GitHub Data using PostgreSQL with Citus.
有關更完整的示例,請參閱使用PostgreSQL和Citus的GitHub數據交互式分析。
總結
以上是生活随笔為你收集整理的Citus中的分片策略:Append Distribution追加分配的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 如何利用主题班会渗透合践行理想信念教育
- 下一篇: 常见低通滤波器