3atv精品不卡视频,97人人超碰国产精品最新,中文字幕av一区二区三区人妻少妇,久久久精品波多野结衣,日韩一区二区三区精品

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

Greenplum 实时数据仓库实践(5)——实时数据同步

發布時間:2024/8/1 编程问答 24 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Greenplum 实时数据仓库实践(5)——实时数据同步 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

目錄

5.1 數據抽取方式

5.1.1 基于源數據的CDC

5.1.2 基于觸發器的CDC

5.1.3 基于快照的CDC

5.1.4 基于日志的CDC

5.2 MySQL數據復制

5.2.1 復制的用途

5.2.2 二進制日志

5.2.3 復制步驟

5.3 使用Kafka

5.3.1 Kafka基本概念

1. 消息和批次

2. 主題與分區

3. 生產者和消費者

4. broker和集群

5.3.2 Kafka消費者與分區

5.4 選擇主題分區數?

5.4.1 使用單分區

5.4.2 如何選定分區數量

5.5 maxwell + Kafka + bireme

5.5.1 總體架構

5.5.2 maxwell安裝配置

5.5.3 bireme安裝配置

5.5.4 實時CDC

1. 全量同步

2. 增量同步

5.6 Canal + Kafka + ClientAdapter

5.6.1 總體架構

5.6.2 Canal Server安裝配置

5.6.3 Canal Adapter安裝配置

5.6.4 HA模式配置

1. 配置Canal Server

2. 配置Canal Adapter

3. 自動切換

5.6.5 實時CDC

5.6.6 消費延遲監控

小結


? ? ? ? 構建實時數據倉庫最大的挑戰在于從操作型數據源實時抽取數據,即ETL過程中的Extract部分。我們要以全量加增量的方式,實時捕獲源系統中所需的所有數據及其變化,而這一切都要在不影響對業務數據庫正常操作的前提下進行,目標是要滿足高負載、低延遲,難點正在于此,所以需要完全不同于批處理的技術加以實現。當操作型數據進入數據倉庫過渡區或ODS以后,就可以利用數據倉庫系統軟件提供的功能特性進行后續處理,不論是Greenplum、Hive或是其他軟件,這些處理往往只需要使用其中一種,相對來說簡單一些。

? ? ? ? Greenplum作為數據倉庫的計算引擎,其數據來源多是業務數據,其中以MySQL為主。本篇將介紹兩種主要的從MySQL實時同步數據到Greenplum的解決方案,一是maxwell + Kafka + bireme、二是Canal + Kafka + ClientAdapter,這兩個方案的共同點是都使用開源組件,不需要編寫代碼,只要進行適當配置便可運行。總體來說,兩種方案都是基于MySQL binlog捕獲數據變化,然后將binlog以數據流的形式傳入Kafka消息隊列,再以消費的方式將數據變化應用到Greenplum。但是,兩者在實現上區別很大,尤其是消費端的不同實現方式使數據載入Greenplum的性能差別巨大。由于主要的MySQL變化數據捕獲技術都是基于其復制協議,并以消息系統作為中間組件,所以先會介紹作為基礎的MySQL數據復制和Kafka。

5.1 數據抽取方式

? ? ? ? 抽取數據是ETL處理過程的第一個步驟,也是數據倉庫中最重要和最具有挑戰性的部分,適當的數據抽取是成功建立數據倉庫的關鍵。
? ? ? ? 從源抽取數據導入數據倉庫或過渡區有兩種方式,可以從源把數據抓取出來(拉),也可以請求源把數據發送(推)到數據倉庫。影響選擇數據抽取方式的一個重要因素是操作型系統的可用性和數據量,這是抽取整個數據集還是僅僅抽取自最后一次抽取以來的變化數據的基礎。我們考慮以下兩個問題:

  • 需要抽取哪部分源數據加載到數據倉庫?有兩種可選方式,完全抽取和變化數據捕獲。
  • 數據抽取的方向是什么?有兩種方式,拉模式,即數據倉庫主動去源系統拉取數據;推模式,由源系統將自己的數據推送給數據倉庫。

? ? ? ? 對于第二個問題來說,通常要改變或增加操作型業務系統的功能是非常困難的,這種困難不僅體現在技術上,還有來自于業務系統用戶及其開發者的阻力。理論上講,數據倉庫不應該要求對源系統做任何改造,實際上也很少由源系統推數據給數據倉庫。因此對這個問題的答案比較明確,大都采用拉數據模式。下面我們著重討論第一個問題。

? ? ? ? 如果數據量很小并且易處理,一般來說采取完全源數據抽取,就是將所有的文件記錄或所有的數據庫表數據抽取至數據倉庫。這種方式適合基礎編碼類型的源數據,比如郵政編碼、學歷、民族等。基礎編碼型源數據通常是維度表的數據來源。如果源數據量很大,抽取全部數據是不可行的,那么只能抽取變化的源數據,即最后一次抽取以來發生了變化的數據。這種數據抽取模式稱為變化數據捕獲,簡稱CDC(Change Data Capture),常被用于抽取操作型系統的事務數據,比如銷售訂單、用戶注冊,或各種類型的應用日志記錄等。

? ? ? ? CDC大體可以分為兩種,一種是侵入式的,另一種是非侵入式的。所謂侵入式的是指CDC操作會給源系統帶來性能的影響。只要CDC操作以任何一種方式對源庫執行了SQL語句,就可以認為是侵入式的CDC。常用的四種CDC方法是:基于時間戳的CDC、基于觸發器的CDC、基于快照的CDC、基于日志的CDC,其中前三種是侵入性的。表5-1總結了四種CDC方案的特點。

時間戳

觸發器

快照

日志

能區分插入/更新

周期內,檢測到多次更新

能檢測到刪除

不具有侵入性

支持實時

不依賴數據庫

表5-1 四種CDC方案比較

5.1.1 基于源數據的CDC

? ? ? ? 基于源數據的CDC要求源數據里有相關的屬性列,抽取過程可以利用這些屬性列來判斷哪些數據是增量數據。最常見的屬性列有以下兩種。

  • 時間戳:這種方法至少需要一個更新時間戳,但最好有兩個,一個插入時間戳,表示記錄何時創建,一個更新時間戳,表示記錄最后一次更新的時間。
  • 序列:大多數數據庫系統都提供自增功能。如果數據庫表列被定義成自增的,就可以很容易地根據該列識別出新插入的數據。

? ? ? ? 這種方法的實現較為簡單,假設表t1中有一個時間戳字段last_inserted,t2表中有一個自增序列字段id,則下面SQL語句的查詢結果就是新增的數據,其中{last_load_time}和{last_load_id}分別表示ETL系統中記錄的最后一次數據裝載時間和最大自增序列號。

select * from t1 where last_inserted > {last_load_time}; select * from t2 where id > {last_load_id};

? ? ? ? 通常需要建立一個額外的數據庫表存儲上一次更新時間或上一次抽取的最后一個序列號。在實踐中,一般是在一個獨立的模式下或在數據過渡區里創建這個參數表。基于時間戳和自增序列的方法是CDC最簡單的實現方式,也是最常用的方法,但它的缺點也很明顯:

  • 不能區分插入和更新操作。只有當源系統包含了插入時間戳和更新時間戳兩個字段,才能區別插入和更新,否則不能區分。
  • 不能記錄刪除數據的操作。不能捕獲到刪除操作,除非是邏輯刪除,即記錄沒有被真的刪除,只是做了邏輯上的刪除標志。
  • 無法識別多次更新。如果在一次同步周期內,數據被更新了多次,只能同步最后一次更新操作,中間的更行操作都丟失了。
  • 不具有實時能力。時間戳和基于序列的數據抽取一般適用于批量操作,不適合于實時場景下的數據抽取。

? ? ? ? 這種方法是具有侵入性的,如果操作型系統中沒有時間戳或時間戳信息是不可用的,那么不得不通過修改源系統把時間戳包含進去,首先要求修改操作型系統的表包含一個新的時間戳列,然后建立一個觸發器,在修改一行時更新時間戳列的值。在實施這些操作前必須被源系統的擁有者所接受,并且要仔細評估對源系統產生的影響。

? ? ? ? 有些方案通過高頻率掃描遞增列的方式實現準實時數據抽取。例如Flume的flume-ng-sql-source插件,缺省每5秒查詢一次源表的主鍵以捕獲新增數據,“利用Flume將MySQL表數據準實時抽取到HDFS”展示了一個具體示例。

5.1.2 基于觸發器的CDC

? ? ? ? 當執行INSERT、UPDATE、DELETE這些SQL語句時,可以激活數據庫里的觸發器,并執行一些動作,就是說觸發器可以用來捕獲變更的數據并把數據保存到中間臨時表里。然后這些變更的數據再從臨時表中取出,被抽取到數據倉庫的過渡區里。但在大多數場合下,不允許向操作型數據庫里添加觸發器(業務數據庫的變動通常都異常慎重),而且這種方法會降低系統的性能,所以此方法用的并不是很多。

? ? ? ? 作為直接在源數據庫上建立觸發器的替代方案,可以使用源數據庫的復制功能,把源數據庫上的數據復制到從庫上,在從庫上建立觸發器以提供CDC功能。盡管這種方法看上去過程冗余,且需要額外的存儲空間,但實際上這種方法非常有效,而且沒有侵入性。復制是大部分數據庫系統的標準功能,如MySQL、Oracle和SQL Server等都有各自的數據復制方案。

5.1.3 基于快照的CDC

? ? ? ? 如果沒有時間戳,也不允許使用觸發器,就要使用快照表了。可以通過比較源表和快照表來獲得數據變化。快照就是一次性抽取源系統中的全部數據,把這些數據裝載到數據倉庫的過渡區中。下次需要同步時,再從源系統中抽取全部數據,并把全部數據也放到數據倉庫的過渡區中,作為這個表的第二個版本,然后再比較這兩個版本的數據,從而找到變化。

? ? ? ? 有多個方法可以獲得這兩個版本數據的差異。假設表有兩個列id和name,id是主鍵列。該表的第一、二個版本的快照表名為snapshot_1、snapshot_2。下面的SQL語句在主鍵id列上做全外鏈接,并根據主鍵比較的結果增加一個標志字段,I表示新增,U表示更新,D代表刪除,N代表沒有變化。外層查詢過濾掉沒有變化的記錄。

select * from? (select case when t2.id is null then 'D'when t1.id is null then 'I'when t1.name <> t2.name then 'U'else 'N'end as flag,case when t2.id is null then t1.id else t2.id end as id,t2.namefrom snapshot_1 t1 full outer join snapshot_2 t2 on t1.id = t2.id) awhere flag <> 'N';

? ? ? ? 當然,這樣的SQL語句需要數據庫支持全外鏈接,對于MySQL這樣不支持全外鏈接的數據庫,可以使用類似下面的SQL語句:

select 'U' as flag, t2.id as id, t2.name as namefrom snapshot_1 t1 inner join snapshot_2 t2 on t1.id = t2.idwhere t1.name != t2.nameunion all? select 'D' as flag, t1.id as id, t1.name as namefrom snapshot_1 t1 left join snapshot_2 t2 on t1.id = t2.idwhere t2.id is nullunion all? select 'I' as flag, t2.id as id, t2.name as namefrom snapshot_2 t2 left join snapshot_1 t1 on t2.id = t1.idwhere t1.id is null;

? ? ? ? 基于快照的CDC可以檢測到插入、更新和刪除的數據,這是相對于基于時間戳的CDC方案的優點。它的缺點是需要大量的存儲空間來保存快照。另外,當表很大時,這種查詢會有比較嚴重的性能問題。

5.1.4 基于日志的CDC

? ? ? ? 最復雜的和最沒有侵入性的CDC方法是基于日志的方式。數據庫會把每個插入、更新、刪除操作記錄到日志里。如使用MySQL數據庫,只要在數據庫服務器中啟用二進制日志binlog(設置log_bin服務器系統變量),之后就可以實時從數據庫日志中讀取到所有數據庫寫操作,并使用這些操作來更新數據倉庫中的數據。這種方式需要把二進制日志轉為可以理解的格式,然后再把里面的操作按照順序讀取出來。

? ? ? ? MySQL提供了一個叫做mysqlbinlog的日志讀取工具。這個工具可以把二進制的日志格式轉換為可讀的格式,然后就可以把這種格式的輸出保存到文本文件里,或者直接把這種格式的日志應用到MySQL客戶端用于數據還原操作。mysqlbinlog工具有很多命令行參數,其中最重要的一組參數可以設置開始/截止時間戳,這樣能夠只從日志里截取一段時間的日志。另外,日志里的每個日志項都有一個序列號,也可以用來做偏移操作。MySQL的日志提供了上述兩種方式來防止CDC過程發生重復或丟失數據的情況。下面是使用mysqlbinlog的兩個例子。第一條命令將jbms_binlog.000002文件中從120偏移量以后的操作應用到一個MySQL數據庫中。第二條命令將jbms_binlog.000002文件中一段時間的操作格式化輸出到一個文本文件中。

mysqlbinlog --start-position=120 jbms_binlog.000002 | mysql -u root -p123456 mysqlbinlog --start-date="2011-02-27 13:10:12" --stop-date="2011-02-27 13:47:21" jbms_binlog.000002 > temp/002.txt

? ? ? ? 使用基于數據庫的日志工具也有缺陷,即只能用來處理一種特定的數據庫,如果要在異構的數據庫環境下使用基于日志的CDC方法,就要使用GoldenGate之類的軟件。本篇介紹的兩種實時數據同步方案都是使用開源組件完成類似功能。

5.2 MySQL數據復制

? ? ? ? Maxwell、Canal都可以實時讀取MySQL二進制日志,本質上都是將自身偽裝成一個從庫,利用MySQL原生的主從復制協議獲取并處理二進制日志。了解MySQL復制的基本原理有助于理解和使用這些組件。

? ? ? ? 簡單說,復制就是將來自一個MySQL數據庫服務器(主庫)的數據復制到一個或多個MySQL數據庫服務器(從庫)。傳統的MySQL復制提供了一種簡單的Primary-Secondary復制方法,默認情況下,復制是單向異步的。MySQL支持兩種復制方式:基于行的復制和基于語句的復制。這兩種方式都是通過在主庫上記錄二進制日志(binlog)、在從庫重放中繼日志(relylog)的方式來實現異步的數據復制。二進制日志或中繼日志中的記錄被稱為事件。所謂異步包含兩層含義,一是主庫的二進制日志寫入與將其發送到從庫是異步進行的,二是從庫獲取與重放日志事件是異步進行的。這意味著,在同一時間點從庫上的數據更新可能落后于主庫,并且無法保證主從之間的延遲間隔。

? ? ? ? 復制給主庫增加的開銷主要體現在啟用二進制日志帶來的I/O,但是增加并不大,MySQL官方文檔中稱開啟二進制日志會產生1%的性能損耗。出于對歷史事務備份以及從介質失敗中恢復的目的,這點開銷是非常必要的。除此之外,每個從庫也會對主庫產生一些負載,例如網絡和I/O。當從庫讀取主庫的二進制日志時,也會造成一定的I/O開銷。如果從一個主庫復制到多個從庫,喚醒多個復制線程發送二進制日志內容的開銷將會累加。但所有這些復制帶來的額外開銷相對于應用對MySQL服務器造成的高負載來說都微不足道。

5.2.1 復制的用途

? ? ? ? 復制的用途主要體現在以下五個方面:

1. 橫向擴展
? ? ? ? 通過復制可以將讀操作指向從庫來獲得更好的讀擴展。所有寫入和更新都在主庫上進行,但讀取可能發生在一個或多個從庫上。在這種讀寫分離模型中,主庫專用于更新,顯然比同時進行讀寫操作會有更好的寫性能。需要注意的是,對于寫操作并不適合通過復制來擴展。在一主多從架構中,寫操作會被執行多次,正如“木桶效應”,這時整個系統的寫性能取決于寫入最慢的那部分。

2. 負載均衡
? ? ? ? 通過MySQL復制可以將讀操作分布到多個服務器上,實現對讀密集型應用的優化。對于小規模的應用,可以簡單地對機器名做硬編碼或者使用DNS輪詢(將一個機器名指向多個IP地址)。當然也可以使用復雜的方法,例如使用LVS網絡負載均衡器等,能夠很好地將負載分配到不同的MySQL服務器上。

3. 提高數據安全性
? ? ? ? 提高數據安全性可以從兩方面來理解。其一,因為數據被復制到從庫,并且從庫可以暫停復制過程,所以可以在從庫上執行備份操作而不會影響對應的主庫。其二,當主庫出現問題時,還有從庫的數據可以被訪問。但是,對備份來說,復制僅是一項有意義的技術補充,它既不是備份也不能夠取代備份。例如,當用戶誤刪除一個表,而且此操作已經在從庫上被復制執行,這種情況下只能用備份來恢復。

4. 提高高可用性
? ? ? ? 復制可以幫助應用程序避免MySQL單點失敗,一個包含復制的設計良好的故障切換系統能夠顯著縮短宕機時間。

5. 滾動升級
? ? ? ? 比較普遍的做法是,使用一個高版本MySQL作為從庫,保證在升級全部實例前,查詢能夠在從庫上按照預期執行。測試沒有問題后,將高版本的MySQL切換為主庫,并將應用連接至該主庫,然后重新搭建高版本的從庫。

? ? ? ? 后面介紹Maxwell和Canal方案時會看到,其架構正是利用了橫向擴展中的級聯主從拓撲結構,以及從庫可以安全暫停復制的特點才得以實現。

5.2.2 二進制日志

? ? ? ? MySQL復制依賴二進制日志(binlog),所以要理解復制如何工作,先要了解MySQL的二進制日志。

? ? ? ? 二進制日志包含描述數據庫更改的事件,如建表操作或對表數據的更改等。開啟二進制日志有兩個重要目的:

  • 用于復制。主庫上的二進制日志提供要發送到從庫的數據更改記錄。主庫將其二進制日志中包含的事件發送到從庫,從庫執行這些事件以對其本地數據進行相同的更改。
  • 用于恢復。當出現介質錯誤,如磁盤故障時,數據恢復操作需要使用二進制日志。還原備份后,重新執行備份后記錄的二進制日志中的事件,最大限度減少數據丟失。

? ? ? ? 不難看出,MySQL二進制日志所起的作用與Oracle的歸檔日志類似。二進制日志只記錄更新數據的事件,不記錄SELECT或SHOW等語句。通過設置log-bin系統變量開啟二進制日志,不同版本MySQL的缺省配置可能不同,如MySQL 5.6的缺省為不開啟,MySQL 8中缺省是開啟的。

? ? ? ? 二進制日志有STATEMENT、ROW、MIXED三種格式,通過binlog-format系統變量設置:

  • STATMENT格式,基于SQL語句的復制(statement-based replication,SBR)。每一條會修改數據的SQL語句會被記錄到binlog中。這種格式的優點是不需要記錄每行的數據變化,這樣二進制日志會比較少,減少磁盤I/O,提高性能。缺點是在某些情況下會導致主庫與從庫中的數據不一致,例如last_insert_id()、now()等非確定性函數,以及用戶自定義函數(user-defined functions,udf)等易出現問題。
  • ROW格式,基于行的復制(row-based replication,RBR)。該格式不記錄SQL語句的上下文信息,僅記錄哪條數據被修改了,修改成了什么樣子,能清楚記錄每一行數據的修改細節。其優點是不會出現某些特定情況下的存儲過程、函數或觸發器的調用和觸發無法被正確復制的問題。缺點是通常會產生大量的日志,尤其像大表上執行alter table操作時會讓日志暴漲。
  • MIXED格式,混合復制(mixed-based replication,MBR)。它是語句和行兩種格式的混合體,默認使用STATEMENT模式保存二進制日志,對于STATEMENT模式無法正確復制的操作,會自動切換到基于行的格式,MySQL會根據執行的SQL語句選擇日志保存方式。

? ? ? ? 不同版本MySQL的binlog-format參數的缺省值可能不同,如MySQL 5.6的缺省值為STATEMENT,MySQL 8缺省使用ROW格式。二進制日志的存放位置最好設置到與MySQL數據目錄不同的磁盤分區,以降低磁盤I/O的競爭,提升性能,并且在數據磁盤故障的時候還可以利用備份和二進制日志恢復數據。

5.2.3 復制步驟

? ? ? ? 總的來說,MySQL復制有五個步驟:

  • 在主庫上把數據更改事件記錄到二進制日志中。
  • 從庫上的I/O線程向主庫詢問二進制日志中的事件。
  • 主庫上的binlog dump線程向I/O線程發送二進制事件。
  • 從庫上的I/O線程將二進制日志事件復制到自己的中繼日志中。
  • 從庫上的SQL線程讀取中繼日志中的事件,并將其重放到從庫上。
  • 圖5-1更詳細地描述了復制的細節。

    圖5-1 復制如何工作

    ? ? ? ? 第一步是在主庫上記錄二進制日志。每次準備提交事務完成數據更新前,主庫將數據更新的事件記錄到二進制日志中。MySQL會按事務提交的順序而非每條語句的執行順序來記錄二進制日志。在記錄二進制日志后,主庫會告訴存儲引擎可以提交事務了。

    ? ? ? ? 下一步,從庫將主庫的二進制日志復制到其本地的中繼日志中。首先,從庫會啟動一個工作線程,稱為I/O線程。I/O線程跟主庫建立一個普通的客戶端連接,然后在主庫上啟動一個特殊的二進制日志轉儲(binlog dump)線程,它會讀取主庫上二進制日志中的事件,但不會對事件進行輪詢。如果該線程追趕上了主庫,它將進入睡眠狀態,直到主庫發送信號通知其有新的事件時才會被喚醒,從庫I/O線程會將接收到的事件記錄到中繼日志中。

    ? ? ? ? 從庫的SQL線程執行最后一步,該線程從中繼日志中讀取事件并在從庫上執行,從而實現從庫數據的更新。當SQL線程追趕I/O線程時,中繼日志通常已經在系統緩存中,所以讀取中繼日志的開銷很低。SQL線程執行的事件也可以通過log_slave_updates系統變量來決定是否寫入其自己的二進制日志中,這可以用于級聯復制的場景。

    ? ? ? ? 這種復制架構實現了獲取事件和重放事件的解耦,允許這兩個過程異步進行。也就是說I/O線程能夠獨立于SQL線程之外工作。但這種架構也限制了復制的過程,其中最重要的一點是在主庫上并發更新的查詢在從庫上通常只能串行化執行,因為缺省只有一個SQL線程來重放中繼日志中的事件。在MySQL 5.6以后已經可以通過配置slave_parallel_workers等系統變量進行并行復制,相關細節參見“組提交與多線程復制”。

    5.3 使用Kafka

    ? ? ? ? 從MySQL復制中從庫的角度看,實際上是實現了一個消息隊列的功能。消息就是二進制日志中的事件,持久化存儲在中繼日志文件里。I/O線程是消息的生產者,向中繼日志寫數據,SQL線程是消息的消費者,從中繼日志讀取數據并在目標庫上重放。隊列是一種先進先出的數據結構,這個簡單定義就決定了隊列中的數據一定是有序的。在數據復制場景中這種有序性極為重要,如果不能保證事件重放與產生同序,主從庫的數據將會不一致,也就失去了數據復制的意義。

    ? ? ? ? 中繼日志、I/O線程、SQL線程是MySQL內部的實現。在本專題討論的異構環境中,源是MySQL,目標是Greenplum。作為一種不嚴格的類比,Maxwell、Canal實現的是類似I/O線程的功能,bireme、Canal的ClientAdapter組件實現的是類似SQL線程的功能。那中繼日志呢,是Kafka登場的時候了,當然Kafka比中繼日志或消息隊列要復雜得多,它是一個完整的消息系統。嚴格來說,在本實時數據同步場景中,Kafka并不是必須的。比如Canal的TCP服務器模式,就是直接將網絡數據包直接發送給消費者進行消費,消息數據只存在于內存中,并不做持久化。這種實現方式用于生產環境很不合適,既有丟失數據的風險,也缺乏必要的管理和監控,引入Kafka正好可以物盡其用。

    5.3.1 Kafka基本概念

    ? ? ? ? Kafka是一款基于發布與訂閱的分布式消息系統,其數據按照一定順序持久化保存,并可以按需讀取。此外,Kafka的數據分布在整個集群中,具備數據故障保護和性能伸縮能力。

    1. 消息和批次

    ? ? ? ? Kafka的數據單元被稱為消息,它可以被看作是數據庫里的一條記錄。消息由字節數組組成,所以對于Kafka來說,消息里的數據沒有特別的格式或含義。消息可以有一個可選的元數據,也就是鍵。與消息一樣,鍵也是一個字節數組,對于Kafka來說也沒有特殊的含義。當消息以一種可控的方式寫入不同分區時會用到鍵。最簡單的例子就是為鍵生成一個一致性哈希值,然后使用哈希值對主題分區進行取模,為消息選取分區。這樣可以保證具有相同鍵的消息總是被寫到相同的分區上。對數據庫來說,通常將表的主鍵作為消息的鍵,這是Kafka保證消費順序的關鍵所在,后面將詳細說明。

    ? ? ? ? 為了提高效率,消息被分批次寫入Kafka。批次就是一組消息,這些消息屬于同一個主題和分區。把消息分批次傳輸可以減少網絡開銷。不過,這要在延遲時間和吞吐量之間做出權衡:批次越大,單位時間處理的消息就越多,單個消息的傳輸時間就越長。批次數據會被壓縮,這樣可以提升數據的傳輸和存儲能力,但要做更多的計算處理。

    2. 主題與分區

    ? ? ? ? Kafka的消息通過主題(topic)進行分類。主題就好比數據庫的表,或者文件系統的目錄。主題可以被分為若干個分區(partition),一個分區就是一個提交日志。消息以追加的方式寫入分區,然后以先進先出的順序讀取。注意,由于一個主題一般包含幾個分區,因此無法在整個主題范圍內保證消息的順序,但可以保證消息在單個分區內的順序。如果需要所有消息都是有序的,那么最好只用一個分區。圖5-2所示的主題有4個分區,消息被追加寫入每個分區的尾部。Kafka通過分區來實現數據冗余和伸縮性。分區可以分布在不同的服務器上,也就是說,一個主題可以橫跨多個服務器,以此來提供更強大的性能。

    ?圖5-2 包含多個分區的主題

    3. 生產者和消費者

    ? ? ? ? Kafka的客戶端就是Kafka系統的用戶,它們被分為兩種基本類型:生產者和消費者。除此之外,還有其他兩個客戶端API——用于數據集成的Kafka Connect API和用于流式處理的Kafka Streams。這些客戶端API使用生產者和消費者作為內部組件,提供了高級的功能。

    ? ? ? ? 生產者(producer)創建消息。一般情況下,一個消息會被發布到一個特定的主題上。生產者在默認情況下把消息均勻分布到主題的所有分區上,而并不關心特定消息會被寫到哪個分區。在某些情況下,生產者會把消息直接寫到指定的分區。這通常是通過消息鍵和分區器來實現的,分區器為鍵生成一個哈希值,并將其映射到指定分區。這樣可以保證包含同一個鍵的消息會被寫到同一個分區上。生產者也可以使用自定義的分區器,根據不同業務規則將消息映射到分區。

    ? ? ? ? 消費者(consumer)讀取數據。消費者訂閱一個或多個主題,并按消息生成的順序讀取它們。消費者通過檢查消息的偏移量來區分已經讀取的消息。偏移量(offset)是另一種元數據,它是一個不斷遞增的整數值,在消息創建時,Kafka會把它添加到消息里。在給定分區里,每個消息的偏移量都是唯一的。消費者把每個分區最后讀取的消息偏移量保存在Zookeeper或Kafka中,如果消費者關閉或重啟,它的讀取狀態不會丟失。

    ? ? ? ? 消費者是消費者組(consumer group)的一部分,也就是說,會有一個或多個消費者共同讀取一個主題。組保證每個分區只能被同組中的一個消費者使用。圖5-3所示的組中,有3個消費者同時讀取一個主題。其中兩個消費者各自讀取一個分區,另一個消費者讀取其他兩個分區。消費者和分區之間的映射通常被稱為消費者對分區的所有權關系(ownership)。

    ? ? ? ? 通過這種方式,消費者可以消費包含大量消息的主題。而且如果一個消費者失效,組里的其他消費者可以接管失效消費者的工作。

    圖5-3 消費者組從主題讀取消息

    4. broker和集群

    ? ? ? ? 一個獨立的Kafka服務器被稱為broker。broker接收來自生產者的消息,為消息設置偏移量,并提交消息到磁盤保存。broker為消費者提供服務,對讀取分區的請求作出響應,返回已經提交到磁盤上的消息。根據特定的硬件機器性能特征,單個broker可以輕松處理數千個分區以及每秒百萬級的消息量。

    ? ? ? ? broker是集群的組成部分,每個集群都有一個broker同時充當了集群控制器(controller)的角色,它被自動從集群的活躍成員中選舉出來,負責將分區分配給broker和監控broker等管理工作。在集群中,一個分區從屬于一個broker,該broker被稱為分區的首領(leader)。一個分區可以分配給多個broker,這個時候發生分區復制,如圖5-4所示。這種復制機制為分區提供了消息冗余,如果有一個broker失效,其他broker可以接管領導權。不過,相關的消費者和生產者都要重新連接到新的首領。

    ?圖5-4 集群里的分區復制

    ? ? ? ? 消息保存期限(retention)是Kafka的一個重要特性。Kafka broker默認的消息保留策略是這樣的:要么保留一段時間(比如7天),要么保留到消息到達一定大小的字節數(比如1GB)。當消息數量達到這些上限時,舊消息就會過期并被刪除,所以在任何時刻,可用消息的總量都不會超過配置參數所指定的大小。主題可以配置自己的保留策略,能將消息保留到不再使用它們為止。例如,用于跟蹤用戶活動的數據可能需要保留幾天,而應用程序的度量指標可能只需要保留幾小時。可以通過配置把主題當做緊湊型日志(log compacted),只有最后一個帶有特定鍵的消息會被保留下來。這種情況對于變更日志類型的數據比較適用,因為人們只關心最后時刻發生的那個變更。

    5.3.2 Kafka消費者與分區

    ? ? ? ? 通常消息的生成速度比消費速度快,顯然此時有必要對消費者進行橫向擴展。就像多個生產者可以向相同的主題寫入消息一樣,我們也可以使用多個消費者從同一主題讀取消息,對消息進行分流。

    ? ? ? ? Kafka消費者從屬于消費者組,一個組里的消費者訂閱的是同一個主題,每個消費者接收主題一部分分區的消息。假設主題T1有4個分區,我們創建了消費者C1,它是組G1里唯一的消費者,我們用它訂閱主題T1。消費者C1將收到主題T1全部4個分區的消息,如圖5-5所示。

    圖5-5 1個消費者接收4個分區的消息

    ? ? ? ? 如果在組G1里新增一個消費者C2,那么每個消費者將分別從兩個分區接收消息。我們假設消費者C1接收分區0和分區2的消息,消費者C2接收分區1和分區3的消息,如圖5-6所示。

    圖5-6 2個消費者接收4個分區的消息

    ? ? ? ? 如果組G1有4個消費者,那么每個消費者可以分配到一個分區,如圖5-7所示。

    圖5-7 4個消費者接收4個分區的消息

    ? ? ? ? 如果我們往組里添加更多的消費者,超過主題的分區數量,那么有一部分消費者就會被閑置,不會接收任何消息,如圖5-8所示。

    圖5-8 5個消費者接收4個分區的消息

    ? ? ? ? 往群組里增加消費者是橫向擴展消費能力的主要方式。Kafka消費者經常會做一些高延遲的操作,比如把數據寫到數據庫或HDFS,或者使用數據進行比較耗時的計算。在這些情況下,單個消費者無法跟上數據生成的速度,所以可以增加更多的消費者,讓它們分擔負載,每個消費者只處理部分分區的消息,這就是橫向擴展的主要手段。我們有必要為主題創建大量的分區,在負載增長時可以加入更多的消費者。不過要注意,不要讓消費者數量超過主題分區的數量,多余的消費者只會被閑置。

    ? ? ? ? 除了通過增加消費者來橫向擴展單個應用程序外,還經常出現多個應用程序從同一個主題讀取數據的情況。實際上,Kafka設計的主要目標之一,就是要讓主題里的數據能夠滿足企業各種應用場景的需求。在這些場景里,每個應用程序可以獲取到所有的消息,而不只是其中的一部分。只要保證每個應用程序有自己的消費者組,就可以讓它們獲取到主題的所有消息。橫向擴展Kafka消費者或消費者組并不會對性能造成負面影響。

    ? ? ? ? 在上面的例子里,如果新增一個只包含一個消費者的組G2,那么這個消費者將從主題T1上接收所有消息,與組G1之間互不影響。組G2可以增加更多的消費者,每個消費者可以消費若干個分區,就像組G1那樣,如圖5-9所示。總的來說,組G2還是會接收所有消息,不管有沒有其他組存在。

    ? ? ? ? 簡而言之,為每一個需要獲取一個或多個主題全部消息的應用程序創建一個消費者組,然后往組里添加消費者來擴展讀取能力和擴展能力,組里的每個消費者只處理一部分消息。

    圖5-9 兩個消費者組對應一個主題

    5.4 選擇主題分區數?

    5.4.1 使用單分區

    ? ? ? ? 上一節提到,Kafka只能保證單個分區中消息的順序,因此如果要求與數據庫保持強一致性,最好只使用一個分區。那么,單分區的吞吐量能否滿足負載需求呢?下面就在現有環境上做一個測試,以得出有根據的量化的結論。

    1. 測量MySQL binlog日志量

    ? ? ? ? 測試方法為使用tpcc-mysql工具,執行一段時間的壓測,然后查看這段時間產生的binlog文件大小,得出binlog吞吐量。TPC-C是專門針對聯機交易處理系統(OLTP系統)的規范,而tpcc-mysql則是percona公司基于TPC-C衍生出來的產品,專用于MySQL基準測試,下載地址為https://github.com/Percona-Lab/tpcc-mysql。關于tpcc-mysql的安裝和使用,參見“測試規劃”。

    (1)從庫重置binlog

    reset master; show master status;

    ? ? ? ? 初始binlog文件名和偏移量分別是mysql-bin.000001和120。

    (2)主庫執行tpcc測試

    # 10倉庫,32并發線程,預熱10秒,執行300秒 ~/tpcc-mysql-master/tpcc_start -h172.16.1.126 -d tpcc_test -u root -p "123456" -w 10 -c 32 -r 10 -l 300

    ? ? ? ? 得到的每分鐘事務數為:5543.600 TpmC

    (3)壓測執行結束后,在從庫查詢binlog日志量

    show binary logs;

    ? ? ? ? 此時binlog文件名和偏移量分別是mysql-bin.000001和406396209。預熱10秒,執行300秒,binlog產生速度為:(406396209-120)/1024/1024/310 ≈ 1.25MB/S。

    2. 測量kafka單分區生產者吞吐量
    (1)創建topic

    # 創建topic kafka-topics.sh --create --topic test --bootstrap-server 172.16.1.124:9092 --partitions 1 --replication-factor 3 # 查看topic kafka-topics.sh --describe --topic test --bootstrap-server 172.16.1.124:9092

    ? ? ? ? 創建了一個單分區三副本的topic:

    Topic: test?? ?Partition: 0?? ?Leader: 339?? ?Replicas: 339,330,340?? ?Isr: 339,330,340

    (2)執行測試

    kafka-producer-perf-test.sh --topic test --num-records 500000 --record-size 2048 --throughput -1 --producer-props bootstrap.servers=172.16.1.124:9092 acks=1

    ? ? ? ? kafka-producer-perf-test.sh是Kafka提供的生產者性能測試命令行工具,這里所使用的選項說明:

    • num-records:指定發送的消息總數。
    • record-size:指定每條消息的字節數,這里假設約為一個binlog event的大小。在MySQL中可用show binlog events命令查看每個event的大小。
    • throughput指定每秒發送的消息數,-1為不限制。
    • acks:指定生產者的應答方式,有效值為0、1、all。0表示生產者在成功寫入消息之前不會等待任何來自服務器的響應,吞吐量最高,但最可能丟失消息。1表示只要首領節點收到消息,生產者就會收到一個來自服務器的成功響應。all表示只有所有參與復制的節點全部收到消息時,生產者才會收到一個來自服務器的成功響應,最安全但延遲最高。

    ? ? ? ? 測試結果為:

    500000 records sent, 10989.010989 records/sec (21.46 MB/sec), 1267.54 ms avg latency, 1714.00 ms max latency, 1388 ms 50th, 1475 ms 95th, 1496 ms 99th, 1693 ms 99.9th.

    ? ? ? ? 可以看到單分區平均吞吐量約21.46 MB/S,平均每秒發送10989條2KB的消息。兩相比較,Kafka單分區生產者的消息吞吐量大約是壓測binlog吞吐量的17倍。實際生產環境的硬件配置會比本實驗環境高得多,單分區吞吐量通常可達100 MB/S。通過這個粗略測試得出的結論是單分區可以承載一般的生產數據庫負載。

    3. 測量kafka單分區消費者吞吐量
    ? ? ? ? 單分區只能有一個消費者(一個消費組中),但可以利用多個線程提高消費性能。

    kafka-consumer-perf-test.sh --broker-list 172.16.1.124:9092 --topic test --messages 500000 --threads 1

    ? ? ? ? --threads指定消費線程數,1、3、6、12時的測試結果如下:

    start.time, end.time, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg, nMsg.sec, rebalance.time.ms, fetch.time.ms, fetch.MB.sec, fetch.nMsg.sec # 1線程 2021-12-09 10:57:19:198, 2021-12-09 10:57:28:921, 976.6543, 100.4478, 500047, 51429.2914, 3034, 6689, 146.0090, 74756.6153 # 3線程 2021-12-09 10:57:52:134, 2021-12-09 10:58:00:280, 976.6543, 119.8937, 500047, 61385.5880, 3039, 5107, 191.2384, 97914.0396 # 6線程 2021-12-09 10:58:58:345, 2021-12-09 10:59:06:495, 976.6543, 119.8349, 500047, 61355.4601, 3031, 5119, 190.7901, 97684.5087 # 12線程 2021-12-09 10:59:16:028, 2021-12-09 10:59:24:093, 976.6543, 121.0979, 500047, 62002.1079, 3031, 5034, 194.0116, 99333.9293

    5.4.2 如何選定分區數量

    ? ? ? ? 嚴格說只要涉及多分區,一定會有消費順序問題。在非強一致性場景中,可以通過選擇表的主鍵作為分區鍵,以適當避免消費亂序帶來的數據一致性問題,同時利用多分區保持Kafka的擴展性。在選擇分區數量時,需要考慮如下幾個因素。

    • 主題需要達到多大的吞吐量?例如是每秒寫入100KB還是1GB?
    • 從單個分區讀取數據的最大吞吐量是多少?每個分區一般都會有一個消費者,如果知道消費者寫入數據庫的速度不會超過每秒50MB,那么從一個分區讀取數據的吞吐量也不需要超過每秒50MB。
    • 可以通過類似的方法估算生產者向單個分區寫入數據的吞吐量,不過生產者的速度一般比消費者快得多,所以最好為生產者多估算一些吞吐量。
    • 每個broker包含的分區個數、可用的磁盤空間和網絡帶寬。
    • 如果消息是按不同鍵寫入分區的,那么為已有主題新增分區會很困難。
    • 單個broker對分區個數是有限制的,因為分區越多,占用內存越多,完成首領選舉需要的時間也越長。

    ? ? ? ? 如果估算出主題的吞吐量和消費者吞吐量,可以用主題吞吐量除以消費者吞吐量算出分區個數。如果不知道這些信息,根據經驗,把分區大小限制在25GB以內可以得到比較理想的效果。

    5.5 maxwell + Kafka + bireme

    ? ? ? ? 本節介紹的方法是采用 maxwell + Kafka + bireme,將MySQL數據實時同步至Greenplum。maxwell實時解析MySQL的binlog,并將輸出的JSON格式數據發送到Kafka,Kafka在此方案中主要用于消息中轉,bireme負責讀取Kafka的消息,并應用于Greenplum數據庫以增量同步數據。方法實施的主要流程為如下三步:

  • 搭建Kafka服務。
  • 搭建maxwell服務,修改配置使其能夠連接MySQL并能向Kafka寫入數據。
  • 搭建bireme服務,修改配置使其能讀取Kafka的數據并能向Greenplum寫入數據。
  • 5.5.1 總體架構

    ? ? ? ? 本方案的總體架構如圖5-10所示。

    圖5-10 maxwell + Kafka + bireme 架構

    ? ? ? ? 圖中的maswell從MySQL復制的從庫中級聯獲取binlog,這樣做的原因將在5.5.4小節“實時CDC”中詳細說明。maxwell是一個能實時讀取MySQL二進制日志binlog,并生成JSON 格式的消息,作為生產者發送給Kafka、Kinesis、RabbitMQ、Redis、Google Cloud Pub/Sub、文件或其他平臺的應用程序,其中Kafka是maxwell支持最完善的一個消息系統。它的常見應用場景有ETL、維護緩存、收集表級別的dml指標、增量到搜索引擎、數據分區遷移、切庫binlog回滾方案等。maswell在GitHub上具有較高的活躍度,官網地址為地址為https://github.com/zendesk/maxwell。

    ? ? ? ? maxwell主要提供了下列功能:

    • 支持 SELECT * FROM table 方式進行全量數據初始化。
    • 支持GTID,當MySQL發生failover后,自動恢復binlog位置。
    • 可以對數據進行分區,解決數據傾斜問題,發送到Kafka的數據支持database、table、column等級別的數據分區。
    • 工作方式是偽裝為MySQL Slave,在主庫上創建dump線程連接,接收binlog事件,然后根據schemas信息拼裝成JSON字符串,可以接受ddl、xid、row等各種事件。

    ? ? ? ? bireme是一個Greenplum數據倉庫的增量同步工具,目前支持MySQL、PostgreSQL和MongoDB數據源,maxwell + Kafka 是一種支持的數據源類型。bireme作為Kafka的消費者,采用 DELETE + COPY 的方式,將數據源的修改記錄同步到Greenplum,相較于INSERT、UPDATE、DELETE方式,COPY方式速度更快,性能更優。bireme的主要特性是采用小批量加載方式(默認加載延遲時間為10秒鐘)提升數據同步的性能,但要求所有同步表在源和目標數據庫中都必須有主鍵。bireme官網地址為https://github.com/HashDataInc/bireme/。

    ? ? ? ? Kafka在本架構中作為消息中間件將maxwell和bireme橋接在一起,上下游組件的實現都依賴于它。正如本節開頭所述,搭建Kafka服務是實施本方案的第一步。為了簡便,在此實驗環境中使用CDH中的Kafka服務,基本信息如下:

    • Kafka集群由三臺虛機組成,實例為:172.16.1.124:9092(controller)、172.16.1.125:9092、172.16.1.126:9092。
    • 三臺虛機的基本硬件配置同為:4核CPU、8GB內存、千兆網卡。
    • Kafka版本為kafka_2.11-2.2.1-cdh6.3.1,即編譯Kafka源代碼的Scala編譯器版本號為2.11,Kafka版本號為2.2.1,平臺為CDH 6.3.1。可執行下面的命令查看Kafka版本信息: ps -ef|grep '/libs/kafka.\{2,40\}.jar'

    • 除default.replication.factor默認副本數配置為3,其他Kafka配置參數均采用缺省值。

    ? ? ? ? default.replication.factor參數指定broker級別的復制系數,CDH中的Kafka缺省值為1。這里將該設置改為3,也就是說每個分區總共會被3個不同的broker復制3次。默認情況下,Kafka會確保分區的每個副本被放在不同的broker上。

    ? ? ? ? 如果復制系數為N,那么在N-1個broker失效的情況下,仍然能夠從主題讀取數據或向主題寫入數據。所以,更高的復制系數會帶來更高的可用性。另一方面,復制系數N需要至少N個broker,而且N個數據副本會占用N倍的磁盤空間。通常要在可用性和存儲硬件之間做出權衡。

    ? ? ? ? 如果因broker重啟導致的主題不可用是可接受的(這在集群里屬正常行為),那么把復制系數設為1即可。復制系數為2意味著可以容忍1個broker失效。但是要知道,有時候1個broker發生失效會導致集群不穩定,迫使重啟另一個broker——集群控制器,也就是說,如果將復制系數設置為2,就有可能因為重啟等問題導致集群暫時不可用。基于以上原因,建議在要求高可用的場景里把復制系數設置為3,大多數情況下這已經足夠安全。

    ? ? ? ? 下面在Kafka中創建一個topic,在后面配置maxwell時將使用該topic:

    # 設置Kafka可執行文件路徑 export PATH=$PATH:/opt/cloudera/parcels/CDH-6.3.1-1.cdh6.3.1.p0.1470567/lib/kafka/bin/; # 創建一個三分區三副本的topic,主要用于演示數據在分區間的均勻分布 kafka-topics.sh --create --topic mytopic --bootstrap-server 172.16.1.124:9092 --partitions 3 --replication-factor 3 # 查看topic kafka-topics.sh --list --bootstrap-server 172.16.1.124:9092 # 查看partition kafka-topics.sh --describe --topic mytopic --bootstrap-server 172.16.1.124:9092 # 查看每個分區的大小 kafka-log-dirs.sh --describe --topic-list mytopic --bootstrap-server 172.16.1.124:9092

    ? ? ? ? mytopic的分區如下:

    Topic: mytopic?? ?Partition: 0?? ?Leader: 340?? ?Replicas: 340,339,330?? ?Isr: 340,339,330 Topic: mytopic?? ?Partition: 1?? ?Leader: 330?? ?Replicas: 330,340,339?? ?Isr: 330,340,339 Topic: mytopic?? ?Partition: 2?? ?Leader: 339?? ?Replicas: 339,330,340?? ?Isr: 339,330,340

    ? ? ? ? ISR(In Sync Replica)是Kafka的副本同步機制,leader會維持一個與其保持同步的副本集合,該集合就是ISR,每個分區都有一個ISR,由leader動態維護。我們要保證Kafka不丟消息,就要保證ISR這組集合中至少有一個存活,并且消息成功提交。

    ? ? ? ? 本實驗環境部署的其他角色如下:

    • MySQL主庫:172.16.1.126:3306
    • MySQL從庫:172.16.1.127:3306
    • Greenplum Master:114.112.77.198:5432。

    ? ? ? ? Greenplum集群主機的操作系統版本為CentOS Linux release 7.9.2009 (Core),其他所有主機操作系統版本為CentOS Linux release 7.2.1511 (Core)。MySQL已經開啟主從復制,相關配置如下:

    log-bin=mysql-bin ? ?# 開啟 binlog binlog-format=ROW ? ?# 選擇 ROW 格式 server_id=126 ? ? ? ?# 主庫server_id,從庫為127 log_slave_updates ? ?# 開啟級聯binlog

    ? ? ? ? 我們還事先在MySQL中創建了maxwell用于連接數據庫的用戶,并授予了相關權限。

    -- 在126主庫執行 create user 'maxwell'@'%' identified by '123456'; grant all on maxwell.* to 'maxwell'@'%'; grant select, replication client, replication slave on *.* to 'maxwell'@'%';?

    ? ? ? ? MySQL主從復制相關配置參見“配置異步復制”,Greenplum安裝部署參見本專題上一篇“Greenplum 實時數據倉庫實踐(4)——Greenplum安裝部署”。

    5.5.2 maxwell安裝配置

    ? ? ? ? 我們在172.16.1.126上搭建maxwell服務。

    1. 下載并解壓

    wget https://github.com/zendesk/maxwell/releases/download/v1.34.1/maxwell-1.34.1.tar.gz tar -zxvf maxwell-1.34.1.tar.gz

    2. 修改配置文件
    (1)備份示例配置文件

    cd ~/maxwell-1.34.1 cp config.properties.example config.properties

    (2)編輯config.properties文件內容如下

    log_level=info metrics_type=http http_port=9090producer=kafka# Kafka配置 kafka_topic=mytopic kafka.bootstrap.servers=172.16.1.124:9092,172.16.1.125:9092,172.16.1.126:9092 kafka.compression.type=snappy kafka.retries=0 kafka.acks=1 kafka.batch.size=16384 kafka_partition_hash=murmur3 producer_partition_by=primary_key# MySQL配置 host=172.16.1.127 port=3306 user=maxwell password=123456filter=exclude: *.*, include: tpcc_test.*, include: source.*, include: test.*

    ? ? ? ? 配置項說明:

    • log_level:日志級別,有效值debug、info、warn、error,默認info。
    • metrics_type:監控報告類型,有效值slf4j、jmx、http、datadog。
    • http_port:metrics_type為http時使用的端口號。
    • producer:生產者,有效值為stdout、file、kafka、kinesis、pubsub、sqs、rabbitmq、redis,默認stdout(控制臺輸出)。
    • kafka_topic:Kafka主題名,maxwell向該主題寫數據,默認值為maxwell。除了指定為靜態topic,還可以動態傳參,如namespace_%{database}_%{table},啟動maxwell時%{database}和%{table}將被具體的庫名和表名替換,并在Kafka中自動創建這些topic。namespace命名空間用于限制topic名稱,可以省略。
    • kafka.bootstrap.servers:Kafka broker列表。
    • kafka.compression.type:消息壓縮類型,默認不壓縮。使用壓縮可以降低網絡傳輸和存儲開銷,而這往往是向Kafka發送消息的瓶頸所在。
    • kafka.retries:生產者從服務器收到錯誤時重發消息的次數,如果到達該值,生產者將放棄重試并返回錯誤。
    • kafka.acks:生產者響應方式。
    • kafka.batch.size:一個批次使用的內存字節數,默認16KB。生產者會把發送到同一個分區的消息放到一個批次里,然后按批次發送消息。如果該值設置得太小,因為生產者需要更頻繁地發送消息,會增加一些額外開銷。
    • kafka_partition_hash:為消息選擇Kafka分區時使用的hash算法,有效值default、murmur3,默認default。
    • producer_partition_by:輸入到Kafka的分區函數,有效值database、table、primary_key、transaction_id、column、random,默認database。在很多業務系統中,不同數據庫的活躍度差異很大,主體業務的數據庫操作頻繁,產生的binlog也就很多,而maxwell默認使用數據庫名作為key進行hash,那么顯而易見,binlog的操作經常都被分到同一個分區里,造成數據傾斜。這里選擇了主鍵作為分區key,同一主鍵被分到同一分區,同時選用murmurhash3哈希算法,以獲得更好的效率和分布。用主鍵作為分區key還可以使得對同一主鍵行的更新將保持與數據庫同序。
    • filter:過濾規則,通過 exclude 排除,通過 include 包含,值可以為具體的數據庫、表、列,甚至用 Javascript 來定義復雜的過濾規則,可以用正則表達式描述。這里配置為接收MySQL源端tpcc_test、source、test三個庫里所有表的binlog。
    • host、port、user、password:連接MySQL實例所用的IP、端口、用戶名、密碼。

    ? ? ? ? maxwell完整的配置參數說明參見Reference - Maxwell's Daemon。

    3. 啟動maxwell
    ? ? ? ? maxwell用Java語言開發,啟動maxwell 1.34.1需要JDK 11運行環境,JDK 8報錯:

    Error: A JNI error has occurred, please check your installation and try again Exception in thread "main" java.lang.UnsupportedClassVersionError: com/zendesk/maxwell/Maxwell has been compiled by a more recent version of the Java Runtime (class file version 55.0), this version of the Java Runtime only recognizes class file versions up to 52.0

    因此先安裝JDK 11:

    yum -y install jsvc rpm -ivh jdk-11.0.12_linux-x64_bin.rpm

    然后啟動maxwell:

    export JAVA_HOME=/usr/java/jdk-11.0.12 cd ~/maxwell-1.34.1 bin/maxwell --config config.properties --daemon

    ? ? ? ? --config選項指定配置文件,--daemon選項指定maxwell實例作為守護進程到后臺運行。maxwell啟動時,會在它所連接的MySQL實例中創建一個maxwell數據庫,其中包含如下7個表,保存maxwell的元數據。

    • bootstrap:用于數據初始化,5.5.4小節會介紹maxwell的bootstrap功能。
    • columns:記錄所有的列信息。
    • databases:記錄所有的數據庫信息。
    • heartbeats:記錄心跳信息。
    • positions:記錄binlog讀取位置,包括binlog文件及其偏移量。
    • schemas:記錄DDL的binlog信息。
    • tables:記錄所有的表信息。

    ? ? ? ? maxwell成功啟動后,將在日志文件中看到類似下面的信息:

    [mysql@node2~/maxwell-1.34.1]$tail /home/mysql/maxwell-1.34.1/bin/../logs/MaxwellDaemon.out 15:22:54,297 INFO ?BinlogConnectorReplicator - Setting initial binlog pos to: mysql-bin.000001:406400351 15:22:54,302 INFO ?MaxwellHTTPServer - Maxwell http server starting 15:22:54,306 INFO ?MaxwellHTTPServer - Maxwell http server started on port 9090 15:22:54,326 INFO ?BinaryLogClient - Connected to 172.16.1.127:3306 at mysql-bin.000001/406400351 (sid:6379, cid:5179862) 15:22:54,326 INFO ?BinlogConnectorReplicator - Binlog connected. 15:22:54,339 INFO ?log - Logging initialized @4344ms to org.eclipse.jetty.util.log.Slf4jLog 15:22:54,595 INFO ?Server - jetty-9.4.41.v20210516; built: 2021-05-16T23:56:28.993Z; git: 98607f93c7833e7dc59489b13f3cb0a114fb9f4c; jvm 11.0.12+8-LTS-237 15:22:54,710 INFO ?ContextHandler - Started o.e.j.s.ServletContextHandler@4ceb3f9a{/,null,AVAILABLE} 15:22:54,746 INFO ?AbstractConnector - Started ServerConnector@202c0dbd{HTTP/1.1, (http/1.1)}{0.0.0.0:9090} 15:22:54,747 INFO ?Server - Started @4755ms [mysql@node2~/maxwell-1.34.1]$

    ? ? ? ? 從http://172.16.1.126:9090/可以獲取所有監控指標,指標說明參見Reference - Maxwell's Daemon。

    5.5.3 bireme安裝配置

    ? ? ? ? 我們在172.16.1.126上搭建bireme服務。

    1. 下載并解壓

    wget https://github.com/HashDataInc/bireme/releases/download/v2.0.0-alpha-1/bireme-2.0.0-alpha-1.tar.gz tar -zxvf bireme-2.0.0-alpha-1.tar.gz

    2. 修改配置文件
    (1)備份示例配置文件

    cd ~/bireme-2.0.0-alpha-1/etc/ cp config.properties config.properties.bak

    (2)編輯config.properties文件內容如下

    target.url = jdbc:postgresql://114.112.77.198:5432/dw target.user = dwtest target.passwd = 123456data_source = mysqlmysql.type = maxwell mysql.kafka.server = 172.16.1.124:9092,172.16.1.125:9092,172.16.1.126:9092 mysql.kafka.topic = mytopicpipeline.thread_pool.size = 3state.server.addr = 172.16.1.126 state.server.port = 9091

    ? ? ? ? 配置項說明:

    • target.url、target.user、target.passwd:目標Greenplum的URL、用戶名、密碼,bireme使用這些信息連接Greenplum數據庫并寫入數據。
    • data_source:指定數據源<source_name>,多個數據源用逗號分隔開,忽略空白字符。
    • <source_name>.type:指定數據源的類型。
    • <source_name>.kafka.server:數據源的Kafka地址。
    • <source_name>.kafka.topic:數據源在Kafka中對應的topic。
    • pipeline.thread_pool.size:pipeline線程數。每個數據源可以有多個pipeline,對于maxwell,每個Kafka topic分區對應一個pipeline。
    • state.server.addr、state.server.port:監控服務器的IP、端口。bireme啟動一個輕量級的HTTP服務器方便用戶獲取當前的數據裝載狀態。

    ? ? ? ? bireme完整的配置參數說明參見https://github.com/HashDataInc/bireme/blob/master/README_zh-cn.md。

    (3)編輯mysql.properties文件內容如下

    test.t1 = public.t1

    ? ? ? ? 在config.properties文件中指定數據源為mysql,因此需要新建文件~/bireme-2.0.0-alpha-1/etc/mysql.properties,在其中加入源表到目標表的映射。這里是將MySQL中test.t1表的數據同步到Greenplum的public.t1表中。(4)在源和目標庫創建表,注意都要有主鍵

    -- MySQL,126主庫執行 use test; create table t1 (a int primary key);-- Greenplum set search_path=public; create table t1 (a int primary key);

    ? ? ? ? 在MySQL主庫上建表的DDL語句會寫到binlog中,并在從庫上重放。同樣,因為我們建表前已經啟動了maxwell,該建表語句也會隨binlog傳遞到maxwell。maxwell可以通過啟用output_ddl支持DDL事件捕獲,該參數是boolean類型,默認為false。默認配置時不會將DLL的binlog事件發送到Kafka,只會記錄到日志文件和maxwell.schemas表中。

    ? ? ? ? 如果output_ddl設置為true,除了日志文件和maxwell.schemas表,DDL事件還會被寫到由ddl_kafka_topic參數指定的Kafka topic中,默認為kafka_topic。Kafka中的DDL消息需要由消費者實現消費邏輯,bireme不處理DDL。我們使用默認配置,DDL只記錄信息,不寫入消息,因此只要在目標Greenplum庫手工執行同構的DDL語句,使源和目標保持相同的表結構,MySQL中執行的DDL語句就不會影響后面的數據同步。

    3. 啟動bireme
    ? ? ? ? bireme用Java語言開發,啟動bireme 2.0.0需要JDK 8運行環境,用JDK 11報錯:

    Bireme JMX enabled by default Starting the bireme service... Cannot find any VM in Java Home /usr/java/jdk-11.0.12 Failed to start bireme service

    因此先安裝JDK 8:

    yum -y install java-1.8.0-openjdk.x86_64

    然后啟動bireme,我這里使用的是安裝CDH時自帶的JDK 8:

    export JAVA_HOME=/usr/java/jdk1.8.0_181-cloudera cd ~/bireme-2.0.0-alpha-1 bin/bireme start

    bireme成功啟動后,將在控制臺看到類似下面的信息:

    Bireme JMX enabled by default Starting the bireme service... The bireme service has started.

    從http://172.16.1.126:9091/?pretty可以獲取bireme狀態:

    {"source_name": "mysql","type": "MAXWELL","pipelines": [{"name": "mytopic-1","latest": "1970-01-01T08:00:00.000Z","delay": 0.0,"state": "NORMAL"},{"name": "mytopic-2","latest": "1970-01-01T08:00:00.000Z","delay": 0.0,"state": "NORMAL"},{"name": "mytopic-3","latest": "1970-01-01T08:00:00.000Z","delay": 0.0,"state": "NORMAL"}] }
    • source_name:數據源名稱。
    • type:數據源類型。
    • pipelines:包含了一組pipeline的同步狀態,每一個數據源可能用多個pipeline同時工作。
    • name:pipeline名稱。
    • latest:最新的數據產生時間。
    • delay:從數據進入bireme到成功加載并返回的時間間隔。
    • state是pipeline的狀態。

    ? ? ? ? 現在所有服務都已正常,可以進行一些簡單的測試:

    -- 在MySQL主庫執行一些數據修改 use test; insert into t1 values (1); insert into t1 values (2); insert into t1 values (3); update t1 set a=10 where a=1; delete from t1 where a=2; commit;-- 查詢Greenplum dw=> select * from public.t1;a ? ----310 (2 rows)

    ? ? ? ? MySQL中的數據變化被實時同步到Greenplum中。

    4. 如何保證數據的順序消費
    ? ? ? ? bireme實現中的一個pipeline就是Kafka中的一個消費者。我們建的topic有三個分區,maxwell在寫的時候指定一個主鍵作為hash key,那么同一主鍵的相關數據,一定會被分發到同一個分區中去,而單個分區中的數據一定是有序的。消費者從分區中取出數據的時候,也一定是有序的,到這里順序沒有錯亂。接著,我們在消費者里可能設置多個線程來并發處理消息(如配置pipeline.thread_pool.size=3),因為如果消費者是單線程,而處理又比較耗時的話,吞吐量太低。一個消費者多線程并發處理就可能出現亂序問題,如圖5-11所示。

    圖5-11 多線程消費造成亂序

    ?

    ? ? ? ? 解決該問題的方式是寫N個內存阻塞隊列,具有相同主鍵的數據都到同一個隊列,然后對于N個線程,每個線程分別消費一個隊列即可,這樣就能保證順序性,如圖5-12所示。bireme就是使用這個模型實現的。

    圖5-12 用內存阻塞隊列解決多線程消費亂序問題

    ?

    5.5.4 實時CDC

    ? ? ? ? 大多數情況下,數據同步被要求在不影響線上業務的情況下聯機執行,而且還要求對線上庫的影響越小越好。例如,同步過程中對主庫加鎖會影響對主庫的訪問,因此通常是不被允許的。本節演示如何在保持對線上庫正常讀寫的前提下,通過全量加增量的方式,完成MySQL到Greenplum的實時數據同步。

    ? ? ? ? 為展示完整過程,先做一些清理工作,然后對主庫執行tpcc-mysql壓測,模擬正在使用的線上業務數據庫,在壓測執行期間做全部九個測試用表的全量和增量數據同步。

    • maxwell、bireme清理:
    # 停止maxwell進程 ps -ef | grep maxwell | grep -v grep | awk '{print $2}' | xargs kill# 在127從庫刪除maxwell庫 drop database maxwell;# 停止bireme服務 export JAVA_HOME=/usr/java/jdk1.8.0_181-cloudera cd ~/bireme-2.0.0-alpha-1 bin/bireme stop# 清除bireme日志 cat /dev/null > ~/bireme-2.0.0-alpha-1/logs/bireme.err cat /dev/null > ~/bireme-2.0.0-alpha-1/logs/bireme.gc cat /dev/null > ~/bireme-2.0.0-alpha-1/logs/bireme.out
    • bireme要求所有表都具有主鍵。tpcc-mysql測試中的history表沒有主鍵,因此在主庫為該表添加主鍵,構成主鍵的字段為表全部八個字段的聯合。
    alter table history add primary key (h_c_id,h_c_d_id,h_c_w_id,h_d_id,h_w_id,h_date,h_amount,h_data);
    • 主庫執行壓測模擬業務庫。后面的數據同步操作均在壓測期間執行。
    # 10倉庫,32并發線程,預熱5分鐘,執行半小時 ~/tpcc-mysql-master/tpcc_start -h172.16.1.126 -d tpcc_test -u root -p "123456" -w 10 -c 32 -r 300 -l 1800

    1. 全量同步

    (1)在Greenplum中創建模式
    ? ? ? ? 模式(schema)是一個有趣的概念,不同數據庫系統中的模式代表完全不同的東西。如Oracle中,默認在創建用戶的時候,就建立了一個和用戶同名的模式,并且互相綁定,因此很多情況下Oracle的用戶和模式可以通用。MySQL中的schema是database的同義詞。而Greenplum中的模式是從PostgreSQL繼承來的,其概念與SQL Server的模式更為類似,是數據庫中的邏輯對象。

    ? ? ? ? Greenplum的模式是數據庫中對象和數據的邏輯組織。模式允許在一個數據庫中存在多個同名的對象,如果對象屬于不同的模式,同名對象之間不會沖突。使用schema有如下好處:

    • 方便管理多個用戶共享一個數據庫,但是又可以互相獨立。
    • 方便管理眾多對象,更有邏輯性。
    • 方便兼容某些第三方應用程序,如果創建對象時是帶schema的。

    ? ? ? ? 比如要設計一個復雜系統,由眾多模塊構成,有時候模塊間又需要具有獨立性。各模塊存放單獨的數據庫顯然是不合適的。此時就可使用schema來劃分各模塊間的對象,再對用戶進行適當的權限控制,這樣邏輯也非常清晰。執行以下操作在Greenplum中創建schema。

    # 連接master psql -d dw -U dwtest -h 127.0.0.1 # 創建schema create schema tpcc_test; # 修改用戶搜索路徑 alter database dw set search_path to public,pg_catalog,tpcc_test;

    (2)在tpcc_test模式中創建tpcc-mysql測試用表
    ? ? ? ? tpcc-mysql安裝目錄下的create_table.sql文件中包含MySQL里的建表腳本。將該SQL腳本改為Greenplum版:

    • 去掉Engine=InnoDB,這是MySQL用的。
    • 將tinyint改為smallint,Greenplum沒有tinyint數據類型。
    • 將datetime改為timestamp,Greenplum沒有datetime數據類型。
    • 為history表添加主鍵,構成主鍵的字段為該表全部八個字段的聯合。

    ? ? ? ? Greenplum是分布式數據庫,一般為提高查詢性能需要在建表時通過distributed by子句指定分布鍵。如果表有主鍵,同時沒有指定分布鍵,則Greenplum自動使用主鍵作為表的分布鍵,我們出于簡便使用這種方式。關于選擇分布鍵的最佳實踐,將在下一篇的建立示例數據倉庫環境中加以說明。執行以下操作在tpcc_test模式中建表。

    -- 設置當前模式 set search_path to tpcc_test;-- 創建tpcc-mysql測試用的9個表 ...?create table history ( h_c_id int,? h_c_d_id smallint,? h_c_w_id smallint, h_d_id smallint, h_w_id smallint, h_date timestamp, h_amount decimal(6,2),? h_data varchar(24), primary key (h_c_id,h_c_d_id,h_c_w_id,h_d_id,h_w_id,h_date,h_amount,h_data) );...?

    (3)修改bireme表映射配置
    ? ? ? ? 編輯~/bireme-2.0.0-alpha-1/etc/mysql.properties內容如下:

    tpcc_test.customer = tpcc_test.customer tpcc_test.district = tpcc_test.district tpcc_test.history = tpcc_test.history tpcc_test.item = tpcc_test.item tpcc_test.new_orders = tpcc_test.new_orders tpcc_test.order_line = tpcc_test.order_line tpcc_test.orders = tpcc_test.orders tpcc_test.stock = tpcc_test.stock tpcc_test.warehouse = tpcc_test.warehouse

    ? ? ? ? 等號左邊是MySQL庫表名,右邊為對應的Greenplum模式及表名。bireme雖然提供了表映射配置文件,但實際只支持Greenplum中的public模式,如果映射其他模式,bireme啟動時會報錯:

    Greenplum table and MySQL table size are inconsistent

    ? ? ? ? 通過查看GetPrimaryKeys.java的源代碼,發現它在查詢Greenplum表的元數據時,使用的是硬編碼:

    String tableList = sb.toString().substring(0, sb.toString().length() - 1) + ")"; String tableSql = "select tablename from pg_tables where schemaname='public' and tablename in "+ tableList + ""; String prSql = "SELECT NULL AS TABLE_CAT, "+ "n.nspname ?AS TABLE_SCHEM, "+ "ct.relname AS TABLE_NAME, "+ "a.attname ?AS COLUMN_NAME, "+ "(i.keys).n AS KEY_SEQ, "+ "ci.relname AS PK_NAME "+ "FROM pg_catalog.pg_class ct JOIN pg_catalog.pg_attribute a ON (ct.oid = a.attrelid) "+ "JOIN pg_catalog.pg_namespace n ON (ct.relnamespace = n.oid) "+ "JOIN ( SELECT i.indexrelid, i.indrelid, i.indisprimary, information_schema._pg_expandarray(i.indkey) AS KEYS FROM pg_catalog.pg_index i) i ON (a.attnum = (i.keys).x AND a.attrelid = i.indrelid) "+ "JOIN pg_catalog.pg_class ci ON (ci.oid = i.indexrelid) WHERE TRUE AND n.nspname = 'public' AND ct.relname in "+ tableList + " AND i.indisprimary ORDER BY TABLE_NAME, pk_name, key_seq";

    ? ? ? ? 可以簡單修改源碼解決此問題,但還有另一個表映射問題。在表映射配置文件中,目標端Greenplum只能使用一個模式而不能自由配置。該問題與bireme的整體實現架構有關,它使用二維數組存儲配置項,代碼倒是簡化了,可邏輯完全不對。這個問題可不像public那么好改,估計得重構才行。

    (4)停止127從庫復制

    -- 在127從庫執行 stop slave;

    ? ? ? ? 這么簡單的一句SQL卻是實現全量數據同步的關鍵所在。從庫停止復制,不影響主庫的正常使用,也就不會影響業務。此時從庫的數據處于靜止狀態,不會產生變化,這使得獲取全量數據變得輕而易舉。

    (5)執行全量數據同步
    ? ? ? ? maxwell提供了一個命令工具 maxwell-bootstrap 幫助我們完成數據初始化,它基于 SELECT * FROM table 的方式進行全量數據讀取,不會產生多余的binlog。啟動maxwell時,如果使用--bootstrapper=sync,則初始化引導和binlog接收使用同一線程,這意味著所有binlog事件都將被阻止,直到引導完成。如果使用--bootstrapper=async(默認配置),maxwell將產生一個用于引導的單獨線程。在這種異步模式下,非引導表將由主線程正常復制,而引導表的binlog事件將排隊,并在引導過程結束時發送到復制流。

    ? ? ? ? 如果maxwell在下次引導時崩潰,它將完全重新引導全量數據,不管之前的進度如何。如果不需要此行為,則需要手動更新bootstrap表。具體來說,是將未完成的引導程序行標記為“完成”(is_complete=1)或刪除該行。

    ? ? ? ? 雖然maxwell考慮到了全量數據初始化問題,但bireme卻處理不了全量數據消費,會報類似下面的錯誤:

    cn.hashdata.bireme.BiremeException: Not found. Record does not have a field named "w_id"

    ? ? ? ? 設置producer=stdout,從控制臺可以看到bootstrap和正常insert會生產不同的JSON輸出。執行maxwell-bootstrap:

    export JAVA_HOME=/usr/java/jdk-11.0.12 cd ~/maxwell-1.34.1 bin/maxwell-bootstrap --config config.properties --database test --table t1 --client_id maxwell

    控制臺輸出:

    {"database":"maxwell","table":"bootstrap","type":"insert","ts":1638778749,"xid":39429616,"commit":true,"data":{"id":1,"database_name":"tpcc_test","table_name":"t1","where_clause":null,"is_complete":0,"inserted_rows":0,"total_rows":1,"created_at":null,"started_at":null,"completed_at":null,"binlog_file":null,"binlog_position":0,"client_id":"maxwell","comment":null}} {"database":"tpcc_test","table":"t1","type":"bootstrap-start","ts":1638778770,"data":{}} {"database":"tpcc_test","table":"t1","type":"bootstrap-insert","ts":1638778770,"data":{"a":1}} {"database":"tpcc_test","table":"t1","type":"bootstrap-complete","ts":1638778770,"data":{}}

    普通insert控制臺輸出:

    {"database":"tpcc_test","table":"t1","type":"insert","ts":1638778816,"xid":39429828,"commit":true,"data":{"a":2}}

    ? ? ? ? bireme不處理bootstrap相關類型,因此這里無法使用maxwell-bootstrap進行全量數據同步。我們執行以下操作,手工將源表的全量數據復制到目標表。

    • 在127從庫將源表數據導出成文本文件
    mkdir tpcc_test_bak mysqldump -u root -p123456 -S /data/mysql.sock -t -T ~/tpcc_test_bak tpcc_test customer district history item new_orders order_line orders stock warehouse --fields-terminated-by='|' --single-transaction
    • 復制到198目標服務器
    scp ~/tpcc_test_bak/*.txt gpadmin@114.112.77.198:/data/tpcc_test_bak/
    • 在198上將文本文件導入目標表
    # 用gpadmin用戶連接數據庫 psql -d dw-- 用copy命令執行導入 copy customer from '/data/tpcc_test_bak/customer.txt' with delimiter '|'; copy district from '/data/tpcc_test_bak/district.txt' with delimiter '|'; copy history from '/data/tpcc_test_bak/history.txt' with delimiter '|'; copy item from '/data/tpcc_test_bak/item.txt' with delimiter '|'; copy new_orders from '/data/tpcc_test_bak/new_orders.txt' with delimiter '|'; copy order_line from '/data/tpcc_test_bak/order_line.txt' with delimiter '|'; copy orders from '/data/tpcc_test_bak/orders.txt' with delimiter '|'; copy stock from '/data/tpcc_test_bak/stock.txt' with delimiter '|'; copy warehouse from '/data/tpcc_test_bak/warehouse.txt' with delimiter '|';-- 分析表 analyze customer; analyze district; analyze history; analyze item; analyze new_orders; analyze order_line; analyze orders; analyze stock; analyze warehouse;

    2. 增量同步

    ? ? ? ? maxwell是從從庫接收binlog,停止復制使得從庫的binlog不再發生變化,從而給maxwell提供了一個增量數據同步的初始binlog位點。只要此時啟動maxwell與bireme服務,然后開啟從庫的復制,增量數據就會自動執行同步。

    (1)啟動maxwell

    export JAVA_HOME=/usr/java/jdk-11.0.12 cd ~/maxwell-1.34.1 bin/maxwell --config config.properties --daemon

    (2)啟動bireme

    export JAVA_HOME=/usr/java/jdk1.8.0_181-cloudera cd ~/bireme-2.0.0-alpha-1 bin/bireme start

    (3)啟動MySQL從庫的數據復制

    -- 在127從庫執行 start slave;

    (4)查看Kafka消費情況

    export PATH=$PATH:/opt/cloudera/parcels/CDH-6.3.1-1.cdh6.3.1.p0.1470567/lib/kafka/bin/; kafka-consumer-groups.sh --bootstrap-server 172.16.1.124:9092 --describe --group bireme

    ? ? ? ? 壓測結束時,三個消費者的LAG在幾秒后都變為0,說明此時源和目標已經實時同步,而且bireme方式的消費延遲很小,幾乎是同時完成數據同步。可以對比mysql主庫、從庫和Greenplum的表數據以確認三者的數據一致性。

    ? ? ? ? 聰明如你也許已經想到,如果數據量太大,導致全量同步執行時間過長,以至于MySQL從庫的復制停滯太久,在重新啟動復制后會不會延遲越拉越久,而永遠不能追上主庫呢?理論上可能發生這種情況,但實際上不太可能出現。首先,業務庫不可能永遠滿載工作,總有波峰波谷。其次,數據倉庫通常只需要同步部分業務數據,而不會應用全部binlog。最后,我們還能采取各種手段加快MySQL主從復制,使從庫在一個可接受的時間范圍內追上主庫,包括:

    • 在數據完整性允許的情況下,設置innodb_flush_log_at_trx_commit和sync_binlog雙0。
    • 使用MySQL 5.6以后版本的組提交和多線程復制。
    • 使用MySQL 5.7.22及其以后版本的基于WriteSet的多線程復制。

    ? ? ? ? maxwell + Kafka + bireme 方案的有點主要體現在兩點:一是容易上手,只需配置無需編程即可使用;二是消費速度快,這得益于bireme采用的 DELETE + COPY 方法,通過小批次準實時進行數據裝載的方式。這種方案的缺點也很明顯,bireme的實現比較糟糕。前面已經看到了幾處,如不支持DDL,不支持maxwell-bootstrap,不支持源表和目標表的自由映射等。而且,當pipeline.thread_pool.size值設置小于Kafka 分區數時,極易出現Consumer group 'xxx' is rebalancing問題,該問題還不能自行恢復,我極度懷疑這是由實現代碼的瑕疵所造成。也難怪,bireme這個個人作品在github上已經四年沒更新了。下面介紹更為流行,也是我們所采用的基于Canal的解決方案。

    5.6 Canal + Kafka + ClientAdapter

    ? ? ? ? 本節介紹的方法和上節類似,只是將Kafka的生產者與消費者換成了Canal Server和Canal Adapter。

    5.6.1 總體架構

    ? ? ? ? 本方案的總體架構如圖5-13所示。

    圖5-13 Canal + Kafka + ClientAdapter 架構

    ? ? ? ? Canal是阿里開源的一個的組件,無論功能還是實現上都與maxwell類似。其主要用途是基于MySQL數據庫增量日志解析,提供增量數據訂閱和消費,工作原理相對比較簡單:

  • Canal 模擬 MySQL slave 的交互協議,偽裝自己為 MySQL slave ,向 MySQL master 發送 dump 協議。
  • MySQL master 收到 dump 請求,開始推送 binary log 給 slave (即 Canal )。
  • Canal 解析 binary log 對象(原始字節流)。
  • ? ? ? ? 圖5-14顯示了Canal服務器的構成模塊。Server代表一個Canal運行實例,對應于一個jvm。Instance對應于一個數據隊列,1個Server對應1..n個Instance。Instance模塊中,EventParser完成數據源接入,模擬slave與master進行交互并解析協議。EventSink是Parser和Store的連接器,進行數據過濾、加工與分發。EventStore負責存儲數據。MetaManager是增量訂閱與消費信息管理器。

    圖5-14 Canal服務器構成模塊

    ? ? ? ? Canal 1.1.1版本之后默認支持將Canal Server接收到的binlog數據直接投遞到消息隊列,目前默認支持的消息系統有Kafka和RocketMQ。早期的Canal僅提供Client API,需要用戶自己編寫客戶端程序實現消費邏輯。Canal 1.1.1版本之后增加了client-adapter,提供客戶端數據落地的適配及啟動功能。

    ? ? ? ? 下面演示安裝配置Canal Server和Canal Adapter實現MySQL到Greenplum的實時數據同步。這里使用的環境與5.5.1的相同,MySQL已經配置好主從復制,CDH的Kafka服務正常。我們還事先在MySQL中創建了Canal用于連接數據庫的用戶,并授予了相關權限。

    -- 在126主庫執行 create user canal identified by 'canal'; ? grant select, replication slave, replication client on *.* to 'canal'@'%';

    ? ? ? ? 下面在Kafka中創建一個topic,在后面配置Canal時將使用該topic:

    kafka-topics.sh --create --topic example --bootstrap-server 172.16.1.124:9092 --partitions 3 --replication-factor 3

    example的分區如下:

    Topic: example?? ?Partition: 0?? ?Leader: 340?? ?Replicas: 340,339,330?? ?Isr: 340,339,330 Topic: example?? ?Partition: 1?? ?Leader: 339?? ?Replicas: 339,330,340?? ?Isr: 339,330,340 Topic: example?? ?Partition: 2?? ?Leader: 330?? ?Replicas: 330,340,339?? ?Isr: 330,340,339

    ? ? ? ? Client-Adapter在1.14版本為了解決對MySQL關鍵字的兼容問題引入了一個BUG,使它只能兼容MySQL,在向Greenplum插入數據時會報錯:

    canal 1.1.5 bug 2021-10-08 16:51:09.347 [pool-2-thread-1] ERROR com.alibaba.otter.canal.client.adapter.support.Util - ERROR: syntax error at or near "`"Position: 15 org.postgresql.util.PSQLException: ERROR: syntax error at or near "`"Position: 15

    該問題說明詳見https://github.com/alibaba/canal/pull/3020,直到Adapter 1.1.5依然沒有解決。1.1.3版本還沒有出現此問題,因此我們選擇使用Canal 1.1.3,要求JDK 1.8以上。

    5.6.2 Canal Server安裝配置

    ? ? ? ? 我們在172.16.1.126上運行Canal Server。

    1. 下載并解壓

    wget https://github.com/alibaba/canal/releases/download/canal-1.1.3/canal.deployer-1.1.3.tar.gz tar -zxvf canal.deployer-1.1.3.tar.gz -C ~/canal_113/deployer/

    2. 修改配置文件
    (1)編輯Canal配置文件/home/mysql/canal_113/deployer/conf/canal.properties,修改以下配置項。

    # Canal服務器模式 canal.serverMode = kafka # Kafka服務器地址 canal.mq.servers = 172.16.1.124:9092,172.16.1.125:9092:172.16.1.126:9092

    ? ? ? ? canal.properties是Canal服務器配置文件,其中包括三部分定義:

    • common argument:通用參數定義,可以將instance.properties的公用參數,抽取放置到這里,這樣每個實例啟動的時候就可以共享配置。instance.properties配置定義優先級高于canal.properties
    • destinations: Canal實例列表定義,列出當前服務器上有多少個實例,每個實例的加載方式是spring/manager等。
    • MQ:消息隊列相關配置。

    (2)編輯instance配置文件/home/mysql/canal_113/deployer/conf/example/instance.properties,修改以下配置項。

    # Canal實例對應的MySQL master canal.instance.master.address=172.16.1.127:3306 # 注釋canal.mq.partition配置項 # canal.mq.partition=0 # Kafka topic分區數 canal.mq.partitionsNum=3 # 哈希分區規則,指定所有正則匹配的表對應的哈希字段為表主鍵 canal.mq.partitionHash=.*\\..*:$pk$

    ? ? ? ? instance.properties是Canal實例配置文件,在canal.properties定義了canal.destinations后,需要在canal.conf.dir對應的目錄下建立同名目錄。例如缺省配置:

    canal.destinations = example canal.conf.dir = ../conf

    這時需要在canal.properties所在目錄中存在(或創建)example目錄,example目錄里有一個instance.properties文件。

    ? ? ? ? 與上節介紹的bireme類似,Canal同樣存在消息隊列的順序性問題。Canal目前選擇支持的Kafka/rocketmq,本質上都是基于本地文件的方式來支持分區級的順序消息能力,也就是binlog寫入消息隊列可以有一些順序性保障,這取決于用戶的參數選擇。

    ? ? ? ? Canal支持消息隊列數據的幾種路由方式:單topic單分區,單topic多分區、多topic單分區、多topic多分區。canal.mq.dynamicTopic參數主要控制是單topic還是多topic,針對命中條件的表可以發到表名對應的topic、庫名對應的topic,或默認topic。canal.mq.partitionsNum、canal.mq.partitionHash,主要控制是否多分區以及分區的partition的路由計算,針對命中條件的可以做到按表級做分區、主鍵級做分區等。

    ? ? ? ? Canal的消費順序性,主要取決于路由選擇,例如:

    • 單topic單分區,可以嚴格保證和binlog一樣的順序性,缺點就是性能比較慢,單分區的性能寫入大概在2~3k的TPS。
    • 多topic單分區,可以保證表級別的順序性,一張表或者一個庫的所有數據都寫入到一個topic的單分區中,可以保證有序性,針對熱點表也存在寫入分區的性能問題。
    • 單topic、多topic的多分區,如果用戶選擇的是指定table的方式,保障的是表級別的順序性(存在熱點表寫入分區的性能問題),如果用戶選擇的是指定pk hash的方式,那只能保障的是一個主鍵的多次binlog順序性。pk hash的方式性能最好,但需要權衡業務,如果業務上有主鍵變更或者對多主鍵數據有順序性依賴,就會產生業務處理錯亂的情況。如果有主鍵變更,主鍵變更前和變更后的值會落在不同的分區里,業務消費就會有先后順序的問題,需要注意。

    ? ? ? ? 如果事先創建好topic,canal.mq.partitionsNum參數值不能大于該topic的分區數。如果選擇在開始向Kafka發送消息時自動創建topic,則canal.mq.partitionsNum值不能大于Kafka的 num.partitions參數值。否則在Canal Server啟動時報錯:

    ERROR com.alibaba.otter.canal.kafka.CanalKafkaProducer - Invalid partition given with record: 1 is not in the range [0...1).

    3. 啟動Canal Server
    ? ? ? ? 執行下面的命令啟動Canal Server:

    ~/canal_113/deployer/bin/startup.sh

    Canal Server成功啟動后,將在日志文件/home/mysql/canal_113/deployer/logs/canal/canal.log中看到類似下面的信息:

    2021-12-14 16:29:42.724 [destination = example , address = /172.16.1.127:3306 , EventParser] WARN ?c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - ---> find start position successfully, EntryPosition[included=false,journalName=mysql-bin.000001,position=333,serverId=127,gtid=,timestamp=1639466655000] cost : 688ms , the next step is binlog dump

    從MySQL可以看到canal用戶創建的dump線程:

    *************************** 4. row ***************************Id: 5739453User: canalHost: 172.16.1.126:22423db: NULL Command: Binlog DumpTime: 123State: Master has sent all binlog to slave; waiting for binlog to be updatedInfo: NULL

    5.6.3 Canal Adapter安裝配置

    ? ? ? ? 我們在172.16.1.126上運行Canal Adapter。

    1. 下載并解壓

    wget https://github.com/alibaba/canal/releases/download/canal-1.1.3/canal.adapter-1.1.3.tar.gz tar -zxvf canal.adapter-1.1.3.tar.gz -C ~/canal_113/adapter/

    2. 修改配置文件
    (1)編輯啟動器配置文件/home/mysql/canal_113/adapter/conf/application.yml,內容如下。

    server:port: 8081?? ??? ??? ?# REST 端口號 spring:jackson:date-format: yyyy-MM-dd HH:mm:sstime-zone: GMT+8default-property-inclusion: non_nullcanal.conf:mode: kafka?? ??? ??? ?# canal client的模式: tcp kafka rocketMQcanalServerHost: 127.0.0.1:11111?? ?# 對應單機模式下的canal server的ip:portmqServers: 172.16.1.124:9092,172.16.1.125:9092:172.16.1.126:9092?? ?# kafka或rocketMQ地址batchSize: 5000?? ??? ?# 每次獲取數據的批大小, 單位為KsyncBatchSize: 10000?? # 每次同步的批數量retries: 0?? ??? ??? ? # 重試次數, -1為無限重試timeout:?? ??? ??? ??? # 同步超時時間, 單位毫秒accessKey:secretKey:canalAdapters:?? ??? ? # 適配器列表- instance: example ?? # canal 實例名或者 MQ topic 名groups:?? ??? ??? ???# 消費分組列表- groupId: g1?? ??? ?# 分組id,如果是MQ模式將用到該值outerAdapters:?? ? # 分組內適配器列表- name: logger?? ? # 日志適配器- name: rdb?? ??? ?# 指定為rdb類型同步key: Greenplum?? # 適配器唯一標識,與表映射配置中outerAdapterKey對應properties:?? ???# 目標庫jdbc相關參數jdbc.driverClassName: org.postgresql.Driver?? ??? ??? # jdbc驅動名,部分jdbc的jar包需要自行放致lib目錄下jdbc.url: jdbc:postgresql://114.112.77.198:5432/dw?? ?# jdbc urljdbc.username: dwtest?? ??? ??? ??? ??? ??? ??? ??? ??# jdbc usernamejdbc.password: 123456?? ??? ??? ??? ??? ??? ??? ??? ??# jdbc passwordthreads: 10?? ??? ??? ??? ??? ??? ??? ??? ??? ??? ??? # 并行執行的線程數, 默認為1commitSize: 30000?? ??? ??? ??? ??? ??? ??? ??? ??? ??# 批次提交的最大行數

    ? ? ? ? 1.1.3版本的ClientAdapter支持如下功能:

    • 客戶端啟動器
    • 同步管理REST接口
    • 日志適配器
    • 關系型數據庫的表對表數據同步
    • HBase的表對表數據同步
    • ElasticSearch多表數據同步

    ? ? ? ? 適配器將會自動加載 conf/rdb 下的所有.yml結尾的表映射配置文件。

    (2)編輯RDB表映射文件/home/mysql/canal_113/adapter/conf/rdb/t1.yml,內容如下。

    dataSourceKey: defaultDS?? ? # 源數據源的key destination: example?? ??? ? # cannal的instance或者MQ的topic groupId: g1?? ??? ??? ??? ???# 對應MQ模式下的groupId,只會同步對應groupId的數據?? ? outerAdapterKey: Greenplum?? # adapter key, 對應上面配置outAdapters中的key concurrent: true?? ??? ??? ? # 是否按主鍵hash并行同步,并行同步的表必須保證主鍵不會更改,及不存在依賴該主鍵的其他同步表上的外鍵約束。 dbMapping:database: test?? ??? ??? ? # 源數據源的database/shcematable: t1?? ??? ??? ??? ???# 源數據源表名targetTable: public.t1?? ? # 目標數據源的模式名.表名targetPk:?? ??? ??? ??? ???# 主鍵映射a: a?? ??? ??? ??? ??? ? # 如果是復合主鍵可以換行映射多個 # ?mapAll: true?? ??? ??? ???# 是否整表映射,要求源表和目標表字段名一模一樣。如果targetColumns也配置了映射,則以targetColumns配置為準。targetColumns:?? ??? ??? ? # 字段映射,格式: 目標表字段: 源表字段,如果字段名一樣源表字段名可不填。a: acommitBatch: 30000 ?? ??? ?# 批量提交的大小

    ? ? ? ? RDB adapter 用于適配MySQL到關系型數據庫(需支持jdbc)的數據同步及導入。

    3. 啟動Canal Adapter
    ? ? ? ? 執行下面的命令啟動Canal Adapter:

    ~/canal_113/adapter/bin/startup.sh

    Canal Adapter成功啟動后,將在日志文件/home/mysql/canal_113/adapter/logs/adapter/adapter.log中看到類似下面的信息:

    2021-12-14 17:40:37.206 [main] INFO ?c.a.o.canal.adapter.launcher.loader.CanalAdapterLoader - Start adapter for canal-client mq topic: example-g1 succeed ... 2021-12-14 17:40:37.384 [Thread-5] INFO ?c.a.o.c.adapter.launcher.loader.CanalAdapterKafkaWorker - =============> Start to subscribe topic: example <============= 2021-12-14 17:40:37.385 [Thread-5] INFO ?c.a.o.c.adapter.launcher.loader.CanalAdapterKafkaWorker - =============> Subscribe topic: example succeed <============= ...

    ? ? ? ? 現在所有服務都已正常,可以進行一些簡單的測試:

    -- 在MySQL主庫執行一些數據修改 use test; insert into t1 values (4),(5),(6); update t1 set a=30 where a=3; delete from t1 where a=10; commit;-- 查詢Greenplum dw=> select * from public.t1;a ? ----30654 (4 rows)

    ? ? ? ? MySQL中的數據變化被實時同步到Greenplum中。

    5.6.4 HA模式配置

    ? ? ? ? Canal的高可用不是服務器級別,而是基于實例的,一個實例對應一個MySQL實例。Canal通過將增量訂閱&消費的關系信息持久化存儲在Zookeeper中,保證數據集群共享,以支持HA模式。我們在172.16.1.127上再安裝一個Canal Server,然后對126、127上的Canal Server進行HA模式配置。配置中所使用的Zookeeper是Kafka同一CDH集群中的Zookeeper服務。

    1. 配置Canal Server

    (1)在127上安裝Canal Server

    wget https://github.com/alibaba/canal/releases/download/canal-1.1.3/canal.deployer-1.1.3.tar.gz tar -zxvf canal.deployer-1.1.3.tar.gz -C ~/canal_113/deployer/

    (2)修改canal.properties文件,加上zookeeper配置。

    # 兩個Canal Server的配置相同 canal.zkServers = 172.16.1.125:2181,172.16.1.126:2181,172.16.1.127:2181 canal.instance.global.spring.xml = classpath:spring/default-instance.xml # 注釋下面行 # canal.instance.global.spring.xml = classpath:spring/file-instance.xml

    (3)修改example/instance.properties文件

    canal.instance.mysql.slaveId=1126?? ?# 另一臺機器改成1127,保證slaveId不重復即可

    ? ? ? ? 注意,兩臺機器上的instance目錄的名字需要保證完全一致,HA模式是依賴于instance name進行管理,同時必須都選擇default-instance.xml配置。

    (4)啟動兩臺機器的Canal Server

    ~/canal_113/deployer/bin/startup.sh

    ? ? ? ? 啟動后可以查看logs/example/example.log,只會看到一臺機器上出現了啟動成功的日志,如這里啟動成功的是126:

    2021-12-15 11:23:46.593 [main] INFO ?c.a.otter.canal.instance.core.AbstractCanalInstance - start successful....

    zookeeper中記錄了集群信息:

    [zk: localhost:2181(CONNECTED) 6] ls /otter/canal/destinations/example/cluster [172.16.1.127:11111, 172.16.1.126:11111]

    2. 配置Canal Adapter

    (1)修改Adapter啟動器配置文件application.yml,加上zookeeper配置。

    # 注釋下面一行 # canalServerHost: 127.0.0.1:11111 zookeeperHosts: 172.16.1.125:2181,172.16.1.126:2181,172.16.1.127:2181

    (2)重啟Adapter

    ~/canal_113/adapter/bin/stop.sh ~/canal_113/adapter/bin/startup.sh

    ? ? ? ? Adapter會自動從zookeeper中的running節點獲取當前服務的工作節點,然后與其建立連接。連接成功后,Canal Server會記錄當前正在工作的服務器信息:

    [zk: localhost:2181(CONNECTED) 9] get /otter/canal/destinations/example/running {"active":true,"address":"172.16.1.126:11111","cid":1}

    ? ? ? ? 現在進行一些簡單的測試驗證功能是否正常:

    -- 在MySQL主庫執行一些數據修改 use test; insert into t1 values (7),(8),(9); update t1 set a=40 where a=4; delete from t1 where a in (5,6,7); commit;-- 查詢Greenplum dw=> select * from public.t1;a ? ----308940 (4 rows)

    ? ? ? ? MySQL中的數據變化被實時同步到Greenplum中,所有組件工作正常。

    ? ? ? ? 數據消費成功后,Canal Server會在zookeeper中記錄下當前最后一次消費成功的binlog位點,下次重啟客戶端時時,會從這最后一個位點繼續進行消費。

    [zk: localhost:2181(CONNECTED) 19] get /otter/canal/destinations/example/1001/cursor {"@type":"com.alibaba.otter.canal.protocol.position.LogPosition","identity":{"slaveId":-1,"sourceAddress":{"address":"node3","port":3306}},"postion":{"gtid":"","included":false,"journalName":"mysql-bin.000001","position":4270,"serverId":126,"timestamp":1639541811000}}

    3. 自動切換

    ? ? ? ? 停止正在工作的126的Canal Server:

    ~/canal_113/deployer/bin/stop.sh

    這時127會立馬啟動example instance,提供新的數據服務:

    [zk: localhost:2181(CONNECTED) 20] get /otter/canal/destinations/example/running {"active":true,"address":"172.16.1.127:11111","cid":1}

    驗證功能是否正常:

    -- 在MySQL主庫執行一些數據修改 use test; insert into t1 values (1),(2); update t1 set a=3 where a=30; delete from t1 where a in (8,9,40); commit;-- 查詢Greenplum dw=> select * from public.t1;a? ---132 (3 rows)

    啟動126的Canal Server,它將再次被添加到集群中:

    [zk: localhost:2181(CONNECTED) 22] ls /otter/canal/destinations/example/cluster [172.16.1.127:11111, 172.16.1.126:11111]

    5.6.5 實時CDC

    ? ? ? ? 我們依然可以使用上節介紹的方法,進行全量加增量的實時數據同步。工作原理和操作步驟別無二致,只是實現的組件變了,maxwell替換為Canal Server,bireme替換為Canal Adapter,而這對于數據倉庫用戶來說完全透明。

    ? ? ? ? 也許在初始化數據同步之前需要進行必要的清理工作:

    ~/canal_113/deployer/bin/stop.sh ~/canal_113/adapter/bin/stop.sh rm ~/canal_113/deployer/conf/example/meta.dat rm ~/canal_113/deployer/conf/example/h2.mv.db cat /dev/null > ~/canal_113/adapter/logs/adapter/adapter.log cat /dev/null > ~/canal_113/deployer/logs/canal/canal.log? cat /dev/null > ~/canal_113/deployer/logs/example/example.log?

    ? ? ? ? meta.dat文件的內容是個json串,存儲實例最后獲取的binlog位點信息,例如:

    {"clientDatas":[{"clientIdentity":{"clientId":1001,"destination":"example","filter":""},"cursor":{"identity":{"slaveId":-1,"sourceAddress":{"address":"node3","port":3306}},"postion":{"gtid":"","included":false,"journalName":"mysql-bin.000001","position":3707,"serverId":126,"timestamp":1639531036000}}}],"destination":"example"}

    Canal重啟時,從meta.dat文件中journalName的起始位置開始同步,如果此時獲取到的binlog信息在MySQL中已被清除,啟動將會失敗。通常在MySQL執行了reset master、purge binary logs等操作,或修改完Canal的instance.properties配置后,重啟Canal Server前需要刪除meta.dat文件。

    ? ? ? ? h2.mv.db是Canal的表元數據存儲數據庫。當MySQL修改了表結構,根據binlog的DDL語句,將該時刻表結構元數據信息在h2.mv.db的meta_snapshot、meta_history等表中。改設計主要為解決MySQL某一時刻發生DDL變更,如果回溯時間跨越DDL變更的時刻,產生解析字段不一致的問題。目前Canal Adapter不支持DDL。

    ? ? ? ? 清理后的Canal處于一個全新的初始狀態,此時可以在不影響業務數據庫正常訪問的前提下進行實時數據同步,主要操作步驟歸納如下:

  • 在目標Greenplum中創建需要同步的表,其結構與MySQL中的源表一致。
  • 配置Canal Adapter的表映射關系,為每個同步表生成一個yml文件。
  • 停止MySQL從庫的復制,使其數據靜止不變。從庫可以安全停止復制是本方案成立的關鍵因素。
  • 執行全量同步,將需要同步的MySQL表數據導入Greenplum的對應表中。這步可以采用多種方式實現,如執行mysqldump或select ... into outfile,將MySQL數據導出成文件,再用Greenplum的copy命令或gpfdist執行數據裝載。或者使用Kettle這樣的工具,直接以數據流的形式傳導數據,不需要再文件落盤。
  • 啟動Canal Server和Canal Adapter,從MySQL從庫獲取binlog,經Kafka中轉,將數據變化應用于目標庫。
  • 啟動MySQL從庫的復制,增量變化數據自動同步。
  • 5.6.6 消費延遲監控

    ? ? ? ? 有別于bireme的DELETE + COPY,Canal Adapter在Greenplum上逐條執行INSERT、UPDATE、DELETE語句。從日志中清晰可見:

    2021-12-15 12:45:37.597 [pool-24-thread-1] INFO ?c.a.o.canal.client.adapter.logger.LoggerAdapterExample - DML: {"data":[{"a":3}],"database":"test","destination":"example","es":1639543537000,"groupId":"g1","isDdl":false,"old":[{"a":30}],"pkNames":null,"sql":"","table":"t1","ts":1639543512399,"type":"UPDATE"} 2021-12-15 12:45:38.308 [pool-24-thread-1] INFO ?c.a.o.canal.client.adapter.logger.LoggerAdapterExample - DML: {"data":[{"a":1},{"a":2}],"database":"test","destination":"example","es":1639543537000,"groupId":"g1","isDdl":false,"old":null,"pkNames":null,"sql":"","table":"t1","ts":1639543512393,"type":"INSERT"} 2021-12-15 12:46:02.934 [pool-24-thread-1] INFO ?c.a.o.canal.client.adapter.logger.LoggerAdapterExample - DML: {"data":[{"a":8},{"a":40}],"database":"test","destination":"example","es":1639543562000,"groupId":"g1","isDdl":false,"old":null,"pkNames":null,"sql":"","table":"t1","ts":1639543537814,"type":"DELETE"}

    ? ? ? ? 正如上節所見,壓測中bireme的批次方式與MySQL的執行速度大差不差,而Canal Adapter的這種方式在Greenplum中的執行速度會比MySQL中慢得多。下面我們還是使用tpcc-mysql壓測制造MySQL負載,然后執行腳本監控消費延遲。Greenplum中已經創建好tpcc-mysql測試所用的9張表,需要配置這些表的映射關系,為每張表生成一個yml文件:

    [mysql@node2~]$ls -l /home/mysql/canal_113/adapter/conf/rdb total 40 -rw-r--r-- 1 mysql mysql 825 Oct 11 11:13 customer.yml -rw-r--r-- 1 mysql mysql 538 Oct 11 11:18 district.yml -rw-r--r-- 1 mysql mysql 603 Oct 22 15:38 history.yml -rw-r--r-- 1 mysql mysql 379 Oct 11 11:23 item.yml -rw-r--r-- 1 mysql mysql 407 Oct 11 11:26 new_orders.yml -rw-r--r-- 1 mysql mysql 631 Oct 11 11:30 order_line.yml -rw-r--r-- 1 mysql mysql 506 Oct 11 11:33 orders.yml -rw-r--r-- 1 mysql mysql 720 Oct 11 11:44 stock.yml -rw-r--r-- 1 mysql mysql 275 Dec 14 17:37 t1.yml -rw-r--r-- 1 mysql mysql 473 Oct 11 11:50 warehouse.yml [mysql@node2~]$

    ? ? ? ? history.yml文件內容如下,其他表映射配置文件類似:

    dataSourceKey: defaultDS destination: example groupId: g1 outerAdapterKey: Greenplum concurrent: true dbMapping:database: tpcc_testtable: historytargetTable: tpcc_test.historytargetPk:h_c_id: h_c_idh_c_d_id: h_c_d_idh_c_w_id: h_c_w_idh_d_id: h_d_idh_w_id: h_w_idh_date: h_dateh_amount: h_amounth_data: h_data # ?mapAll: truetargetColumns:h_c_id: h_c_idh_c_d_id: h_c_d_idh_c_w_id: h_c_w_idh_d_id: h_d_idh_w_id: h_w_idh_date: h_dateh_amount: h_amounth_data: h_datacommitBatch: 30000

    ? ? ? ? 消費延遲監控腳本lag.sh內容如下:

    #!/bin/bash export PATH=$PATH:/opt/cloudera/parcels/CDH-6.3.1-1.cdh6.3.1.p0.1470567/lib/kafka/bin/;~/tpcc-mysql-master/tpcc_start -h172.16.1.126 -d tpcc_test -u root -p "123456" -w 10 -c 32 -r 60 -l 540 > tpcc_test.log 2>&1rate1=0 for ((i=1; i<=10; i++)) dostartTime=`date +%Y%m%d-%H:%M:%S`startTime_s=`date +%s`lag=$(kafka-consumer-groups.sh --bootstrap-server 172.16.1.124:9092 --describe --group $1 |sed -n "3, 5p" | awk '{lag+=$5}END{print lag}')lag1=$lagsleep 60lag=$(kafka-consumer-groups.sh --bootstrap-server 172.16.1.124:9092 --describe --group $1 |sed -n "3, 5p" | awk '{lag+=$5}END{print lag}')endTime=`date +%Y%m%d-%H:%M:%S`endTime_s=`date +%s`lag2=$laglag=$(($lag1 - $lag2))sumTime=$[ $endTime_s - $startTime_s ]rate=`expr $lag / $sumTime`rate1=$(($rate1 + $rate))echo "$startTime ---> $endTime" "Total:$sumTime seconds, consume $rate messages per second." doneecho; avg_rate=`expr $rate1 / 10`; left=`expr $lag2 / $avg_rate` echo "It will take about $left seconds to complete the consumption."

    ? ? ? ? 說明:

    • 首先執行10分鐘的tpcc壓測制造MySQL負載。
    • 壓測結束后查看消費延遲,以位移(OFFSET)差作為度量。查看10次,每次相隔1分鐘。
    • 每次計算每秒消費的消息數,取10次的平均值估算還需要多長時間完成消費。
    • example topic中有三個分區,消費時會創建三個消費者,每個消費者用多線程(threads參數指定)進行消費,因此需要累加三個消費者的延遲。

    ? ? ? ? 在數據同步使用的所有組件工作正常的情況下執行腳本,命令行參數是消費組名稱(groupId參數指定):

    ./lag.sh g1

    ? ? ? ? 輸出結果如下:

    20211216-11:33:05 ---> 20211216-11:34:11 Total:66 seconds, consume 1509 messages per second. 20211216-11:34:11 ---> 20211216-11:35:17 Total:66 seconds, consume 1488 messages per second. 20211216-11:35:17 ---> 20211216-11:36:23 Total:66 seconds, consume 1508 messages per second. 20211216-11:36:23 ---> 20211216-11:37:29 Total:66 seconds, consume 1511 messages per second. 20211216-11:37:29 ---> 20211216-11:38:35 Total:66 seconds, consume 1465 messages per second. 20211216-11:38:35 ---> 20211216-11:39:41 Total:66 seconds, consume 1510 messages per second. 20211216-11:39:41 ---> 20211216-11:40:48 Total:67 seconds, consume 1444 messages per second. 20211216-11:40:48 ---> 20211216-11:41:54 Total:66 seconds, consume 1511 messages per second. 20211216-11:41:54 ---> 20211216-11:43:00 Total:66 seconds, consume 1465 messages per second. 20211216-11:43:00 ---> 20211216-11:44:06 Total:66 seconds, consume 1465 messages per second.It will take about 387 seconds to complete the consumption.

    ? ? ? ? 在本實驗環境中,MySQL執行10分鐘的壓測負載,Greenplum大約需要執行約27分半(kafka-consumer-groups.sh命令本身需要約6秒的執行時間),兩者的QPS相差2.75倍。由此也可以看出,Greenplum作為分布式數據庫,專為分析型數據倉庫場景所設計,單條DML的執行效率遠沒有MySQL這種主機型數據庫高,并不適合高并發小事務的OLTP型應用。

    ? ? ? ? 雖然性能上比bireme的微批處理慢不少,但Canal Adapter的INSERT、UPDATE、DELETE處理方式,使得用類似于數據庫觸發器的功能實現自動實時ETL成為可能,這也是下一篇所要討論的主題。

    小結

    • 時間戳、觸發器、快照表、日志是常用的四種變化數據捕獲方法。使用日志不會侵入數據庫,適合做實時CDC。
    • Maxwell和Canal本質都是MySQL binlog解析器,工作方式是把自己偽裝成Slave,實現MySQL復制協議。
    • Maxwell和Canal可作為生產者,將binlog解析結果以消息形式輸出到Kafka中。
    • Kafka在數據同步方案中用于消息中轉和持久化。使用Kafka時要注意多分區的消息順序問題,通常可以將表主鍵作為哈希分區鍵,保證主鍵行的更新與源同序。
    • bireme是一個Greenplum數據倉庫的增量同步工具,支持將maxwell + Kafka 作為數據源,特點是采用DELETE + COPY方式,數據同步速度快。
    • Canal Adapter提供客戶端數據落地的適配及啟動功能,可將MySQL binlog事件在目標數據庫中回放,使用的是INSERT、UPDATE、DELETE方式。

    總結

    以上是生活随笔為你收集整理的Greenplum 实时数据仓库实践(5)——实时数据同步的全部內容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。

    av无码不卡在线观看免费 | 免费看男女做好爽好硬视频 | 波多野42部无码喷潮在线 | 亚洲精品国偷拍自产在线观看蜜桃 | 秋霞成人午夜鲁丝一区二区三区 | 激情国产av做激情国产爱 | 欧美日韩亚洲国产精品 | 亚洲一区av无码专区在线观看 | 最近免费中文字幕中文高清百度 | 国产精品对白交换视频 | 欧美35页视频在线观看 | 亚洲精品综合一区二区三区在线 | 亚洲中文无码av永久不收费 | 夜夜躁日日躁狠狠久久av | 久久亚洲a片com人成 | www国产精品内射老师 | 亚洲熟妇色xxxxx欧美老妇y | 国产区女主播在线观看 | 日日橹狠狠爱欧美视频 | 亚洲精品欧美二区三区中文字幕 | www成人国产高清内射 | 国内综合精品午夜久久资源 | 亚洲欧美日韩国产精品一区二区 | 久久精品国产日本波多野结衣 | 在线精品国产一区二区三区 | 欧美人妻一区二区三区 | 欧美xxxxx精品 | 欧美精品无码一区二区三区 | 一个人看的www免费视频在线观看 | 亚洲乱码日产精品bd | 国产精品办公室沙发 | 又大又紧又粉嫩18p少妇 | 一本久久伊人热热精品中文字幕 | 激情内射日本一区二区三区 | 亚洲の无码国产の无码影院 | 成人一在线视频日韩国产 | 久久午夜夜伦鲁鲁片无码免费 | 综合网日日天干夜夜久久 | 色偷偷人人澡人人爽人人模 | 亚洲色欲色欲欲www在线 | 久久天天躁夜夜躁狠狠 | 精品国产aⅴ无码一区二区 | 天堂久久天堂av色综合 | 少妇人妻大乳在线视频 | 377p欧洲日本亚洲大胆 | 亚洲 激情 小说 另类 欧美 | 伊人久久婷婷五月综合97色 | 久久人人爽人人爽人人片av高清 | 强伦人妻一区二区三区视频18 | 久久人人爽人人爽人人片av高清 | 丰满少妇熟乱xxxxx视频 | 日本乱偷人妻中文字幕 | 无码国产激情在线观看 | 无码吃奶揉捏奶头高潮视频 | 亚洲男人av香蕉爽爽爽爽 | 久久亚洲中文字幕精品一区 | 一二三四社区在线中文视频 | 亚洲精品无码国产 | 欧美亚洲国产一区二区三区 | 牲欲强的熟妇农村老妇女 | 图片区 小说区 区 亚洲五月 | 动漫av网站免费观看 | 性做久久久久久久久 | 俺去俺来也www色官网 | 亚洲a无码综合a国产av中文 | 在线精品国产一区二区三区 | 国産精品久久久久久久 | 日本成熟视频免费视频 | 久久综合给合久久狠狠狠97色 | 精品日本一区二区三区在线观看 | 久精品国产欧美亚洲色aⅴ大片 | av无码久久久久不卡免费网站 | 午夜福利不卡在线视频 | 中文字幕精品av一区二区五区 | 偷窥日本少妇撒尿chinese | 久久精品国产亚洲精品 | 精品国精品国产自在久国产87 | 国产在线一区二区三区四区五区 | 噜噜噜亚洲色成人网站 | 国产精品高潮呻吟av久久4虎 | 亚洲小说图区综合在线 | 亚洲国产午夜精品理论片 | 少妇性l交大片欧洲热妇乱xxx | 国产一区二区不卡老阿姨 | 亚洲另类伦春色综合小说 | 丰满少妇高潮惨叫视频 | 老熟妇乱子伦牲交视频 | 99久久精品国产一区二区蜜芽 | 丰满岳乱妇在线观看中字无码 | 水蜜桃亚洲一二三四在线 | 福利一区二区三区视频在线观看 | www国产精品内射老师 | 日日天干夜夜狠狠爱 | 欧美野外疯狂做受xxxx高潮 | 任你躁在线精品免费 | 国产亲子乱弄免费视频 | 亚洲精品午夜国产va久久成人 | 麻豆md0077饥渴少妇 | 国产精品亚洲一区二区三区喷水 | 国产精品美女久久久 | 狠狠色噜噜狠狠狠7777奇米 | 妺妺窝人体色www在线小说 | av无码电影一区二区三区 | 男女下面进入的视频免费午夜 | 任你躁国产自任一区二区三区 | 天天拍夜夜添久久精品 | 久久天天躁狠狠躁夜夜免费观看 | 久久精品国产99精品亚洲 | 人人妻人人澡人人爽人人精品浪潮 | 免费乱码人妻系列无码专区 | 日日碰狠狠丁香久燥 | 日日干夜夜干 | 中文无码精品a∨在线观看不卡 | 亚洲国产午夜精品理论片 | 国产精品香蕉在线观看 | 无码一区二区三区在线 | 亚洲精品久久久久avwww潮水 | 久久久久久国产精品无码下载 | 一本色道婷婷久久欧美 | 对白脏话肉麻粗话av | 亚洲男人av香蕉爽爽爽爽 | 欧美亚洲国产一区二区三区 | 无码国产激情在线观看 | 成人免费视频一区二区 | 色婷婷av一区二区三区之红樱桃 | 亚洲国产一区二区三区在线观看 | 丰满岳乱妇在线观看中字无码 | 日本一卡二卡不卡视频查询 | 欧美老人巨大xxxx做受 | 亚洲成色www久久网站 | 欧美放荡的少妇 | 在线天堂新版最新版在线8 | 亚洲成a人片在线观看无码 | 国产精品a成v人在线播放 | 亚洲精品美女久久久久久久 | 女高中生第一次破苞av | 无码人中文字幕 | 曰本女人与公拘交酡免费视频 | 一本久久a久久精品亚洲 | 丰满少妇弄高潮了www | 成人一在线视频日韩国产 | 97久久精品无码一区二区 | 中文字幕久久久久人妻 | 窝窝午夜理论片影院 | 亚洲 a v无 码免 费 成 人 a v | 亚洲综合色区中文字幕 | 黑人粗大猛烈进出高潮视频 | 国产av久久久久精东av | 日本爽爽爽爽爽爽在线观看免 | 亚洲va欧美va天堂v国产综合 | 高潮喷水的毛片 | 久久99热只有频精品8 | 国产av无码专区亚洲a∨毛片 | 久久综合激激的五月天 | 国产又爽又黄又刺激的视频 | 狠狠色色综合网站 | 久久久久久国产精品无码下载 | 免费网站看v片在线18禁无码 | 永久免费观看美女裸体的网站 | 成在人线av无码免费 | 精品 日韩 国产 欧美 视频 | 久久久久人妻一区精品色欧美 | 福利一区二区三区视频在线观看 | 国产亚洲人成a在线v网站 | 亚洲一区二区三区含羞草 | 亚洲人成网站在线播放942 | 亚洲码国产精品高潮在线 | 中文字幕日韩精品一区二区三区 | 国产成人精品优优av | aⅴ在线视频男人的天堂 | v一区无码内射国产 | 亚洲成a人片在线观看无码 | 波多野42部无码喷潮在线 | 99精品国产综合久久久久五月天 | 狂野欧美性猛xxxx乱大交 | 黄网在线观看免费网站 | 国产精品久久久 | 人人澡人人透人人爽 | 图片小说视频一区二区 | 国产成人无码av片在线观看不卡 | 国产亚洲人成a在线v网站 | 中文字幕乱码人妻无码久久 | 自拍偷自拍亚洲精品被多人伦好爽 | 亚洲国产精品成人久久蜜臀 | 装睡被陌生人摸出水好爽 | 国语精品一区二区三区 | 熟妇人妻激情偷爽文 | 国产香蕉97碰碰久久人人 | 欧美三级不卡在线观看 | 欧美一区二区三区视频在线观看 | 女人被爽到呻吟gif动态图视看 | 欧美黑人性暴力猛交喷水 | 久久久成人毛片无码 | 欧美性生交xxxxx久久久 | 欧美激情一区二区三区成人 | 影音先锋中文字幕无码 | 日韩av无码中文无码电影 | 久久精品国产99精品亚洲 | 无码精品人妻一区二区三区av | 亚洲精品一区二区三区在线观看 | 婷婷丁香六月激情综合啪 | 国产精品久久久av久久久 | 永久免费观看国产裸体美女 | 日本熟妇大屁股人妻 | 国产区女主播在线观看 | 亚洲国产成人a精品不卡在线 | 欧美激情内射喷水高潮 | 亚洲精品国偷拍自产在线观看蜜桃 | 日本护士xxxxhd少妇 | 乱中年女人伦av三区 | 一本精品99久久精品77 | 久久国产精品_国产精品 | 综合网日日天干夜夜久久 | 国内精品人妻无码久久久影院蜜桃 | 狂野欧美激情性xxxx | 无码av最新清无码专区吞精 | 国内少妇偷人精品视频 | 呦交小u女精品视频 | 日韩少妇内射免费播放 | 99久久精品无码一区二区毛片 | 国产手机在线αⅴ片无码观看 | 成人性做爰aaa片免费看不忠 | 内射后入在线观看一区 | 影音先锋中文字幕无码 | 又大又硬又爽免费视频 | 乱码av麻豆丝袜熟女系列 | 国产高潮视频在线观看 | 中文字幕av日韩精品一区二区 | 欧美 日韩 亚洲 在线 | 久久久久免费看成人影片 | 人人妻人人澡人人爽人人精品 | 久久亚洲a片com人成 | 无码精品人妻一区二区三区av | 久久99精品国产麻豆蜜芽 | 国产精品毛片一区二区 | 日本爽爽爽爽爽爽在线观看免 | 夜精品a片一区二区三区无码白浆 | 免费网站看v片在线18禁无码 | 国产熟妇另类久久久久 | 99久久久国产精品无码免费 | 高中生自慰www网站 | 两性色午夜免费视频 | 精品水蜜桃久久久久久久 | 国产成人av免费观看 | 人妻少妇被猛烈进入中文字幕 | 55夜色66夜色国产精品视频 | 蜜桃av抽搐高潮一区二区 | 99在线 | 亚洲 | 国产成人精品一区二区在线小狼 | 午夜精品久久久内射近拍高清 | 1000部啪啪未满十八勿入下载 | 中文字幕av伊人av无码av | 久久久无码中文字幕久... | 国产成人无码a区在线观看视频app | 成人无码精品一区二区三区 | 亚洲国产精品毛片av不卡在线 | 免费国产黄网站在线观看 | 曰本女人与公拘交酡免费视频 | 久久久久久久女国产乱让韩 | 精品一区二区不卡无码av | 欧美zoozzooz性欧美 | 日本在线高清不卡免费播放 | 亚洲天堂2017无码 | 国产亚洲精品久久久久久久久动漫 | 午夜男女很黄的视频 | 久久久婷婷五月亚洲97号色 | yw尤物av无码国产在线观看 | 午夜福利电影 | 国产熟女一区二区三区四区五区 | 日韩视频 中文字幕 视频一区 | 日本在线高清不卡免费播放 | 男女超爽视频免费播放 | 亚洲乱码日产精品bd | 人人爽人人澡人人高潮 | 欧洲精品码一区二区三区免费看 | 国产69精品久久久久app下载 | 97久久精品无码一区二区 | 丝袜人妻一区二区三区 | 亚洲综合在线一区二区三区 | 丝袜足控一区二区三区 | 欧美日本免费一区二区三区 | 精品欧洲av无码一区二区三区 | √天堂中文官网8在线 | 久久亚洲a片com人成 | 亚洲va中文字幕无码久久不卡 | 亚洲精品欧美二区三区中文字幕 | 妺妺窝人体色www在线小说 | 日韩无码专区 | 夜夜影院未满十八勿进 | 亚洲 激情 小说 另类 欧美 | 国产av久久久久精东av | 久久久精品成人免费观看 | 欧美xxxx黑人又粗又长 | 亚洲国产精华液网站w | 亚洲综合另类小说色区 | 女人高潮内射99精品 | 日韩人妻无码中文字幕视频 | 亚洲熟妇自偷自拍另类 | 国产精品视频免费播放 | 亚洲精品综合一区二区三区在线 | 牲欲强的熟妇农村老妇女视频 | 久久亚洲日韩精品一区二区三区 | 日本高清一区免费中文视频 | 日产精品99久久久久久 | 中文字幕无码免费久久9一区9 | 在线观看国产午夜福利片 | 色综合久久88色综合天天 | 真人与拘做受免费视频 | 欧美精品一区二区精品久久 | 18精品久久久无码午夜福利 | 无码人妻出轨黑人中文字幕 | 国产超碰人人爽人人做人人添 | 国产精品高潮呻吟av久久4虎 | 午夜精品久久久久久久久 | 蜜臀aⅴ国产精品久久久国产老师 | 男人的天堂2018无码 | 少妇邻居内射在线 | 欧美日韩综合一区二区三区 | 午夜成人1000部免费视频 | 给我免费的视频在线观看 | 欧美变态另类xxxx | 无码人妻精品一区二区三区下载 | 亚洲色欲色欲天天天www | 图片区 小说区 区 亚洲五月 | 亚洲综合色区中文字幕 | 久久国产精品_国产精品 | 亚洲精品中文字幕乱码 | 日本乱人伦片中文三区 | 日本va欧美va欧美va精品 | 丰满妇女强制高潮18xxxx | 丝袜 中出 制服 人妻 美腿 | 国产精品无码一区二区三区不卡 | 亚洲成在人网站无码天堂 | 国产超级va在线观看视频 | 4hu四虎永久在线观看 | 亚洲精品成人av在线 | 欧美午夜特黄aaaaaa片 | 国产超级va在线观看视频 | 亚洲中文字幕无码中文字在线 | 久久 国产 尿 小便 嘘嘘 | 极品尤物被啪到呻吟喷水 | 中文字幕精品av一区二区五区 | 无人区乱码一区二区三区 | 久久精品中文字幕大胸 | 人妻aⅴ无码一区二区三区 | 久久精品中文字幕大胸 | 无码国模国产在线观看 | 精品少妇爆乳无码av无码专区 | 少妇无码av无码专区在线观看 | 亚洲精品中文字幕乱码 | 成人欧美一区二区三区黑人 | 欧美国产日产一区二区 | 国产熟妇高潮叫床视频播放 | 麻豆国产人妻欲求不满谁演的 | 色综合久久中文娱乐网 | 欧美喷潮久久久xxxxx | 欧美怡红院免费全部视频 | 午夜福利试看120秒体验区 | aa片在线观看视频在线播放 | 午夜精品久久久久久久久 | 77777熟女视频在线观看 а天堂中文在线官网 | 免费人成网站视频在线观看 | 天天做天天爱天天爽综合网 | 日欧一片内射va在线影院 | 一本色道婷婷久久欧美 | 亚洲熟妇色xxxxx亚洲 | 欧美日韩一区二区综合 | 国产激情无码一区二区app | 一区二区三区乱码在线 | 欧洲 | 欧美日本精品一区二区三区 | 国产成人无码区免费内射一片色欲 | 久久zyz资源站无码中文动漫 | 红桃av一区二区三区在线无码av | 国产农村乱对白刺激视频 | 内射巨臀欧美在线视频 | 永久黄网站色视频免费直播 | 婷婷五月综合激情中文字幕 | 成人精品一区二区三区中文字幕 | 亚洲精品久久久久中文第一幕 | 任你躁在线精品免费 | 国产av一区二区三区最新精品 | 亚洲爆乳精品无码一区二区三区 | 国产精品国产三级国产专播 | 亚洲人成网站免费播放 | 国产精品无码mv在线观看 | 精品熟女少妇av免费观看 | 国产明星裸体无码xxxx视频 | 四虎国产精品免费久久 | 麻豆av传媒蜜桃天美传媒 | 99久久久无码国产aaa精品 | 国产精品久久国产精品99 | 一本久道久久综合婷婷五月 | 国产亚洲美女精品久久久2020 | 国产成人精品三级麻豆 | 99精品国产综合久久久久五月天 | 四虎永久在线精品免费网址 | 亚洲日本va中文字幕 | 丰满人妻翻云覆雨呻吟视频 | 夜先锋av资源网站 | 久久久久免费精品国产 | 中文精品久久久久人妻不卡 | 国产乱人伦偷精品视频 | 大地资源网第二页免费观看 | 国产亚洲人成在线播放 | 精品日本一区二区三区在线观看 | 中文字幕无线码 | 国内少妇偷人精品视频 | 久久久亚洲欧洲日产国码αv | 天堂а√在线地址中文在线 | 偷窥日本少妇撒尿chinese | 久久亚洲精品成人无码 | 国产无遮挡又黄又爽免费视频 | 亚洲第一网站男人都懂 | 国产av一区二区三区最新精品 | 国产精品第一区揄拍无码 | 人人超人人超碰超国产 | 日日躁夜夜躁狠狠躁 | 国产一区二区三区日韩精品 | 蜜桃无码一区二区三区 | 国内精品一区二区三区不卡 | 亚洲中文无码av永久不收费 | 无码吃奶揉捏奶头高潮视频 | 亚洲熟悉妇女xxx妇女av | 性生交片免费无码看人 | 久青草影院在线观看国产 | 国产熟女一区二区三区四区五区 | 久久久久久av无码免费看大片 | 18禁黄网站男男禁片免费观看 | 精品一区二区三区波多野结衣 | 中文字幕人妻无码一区二区三区 | 国产农村乱对白刺激视频 | 日本精品久久久久中文字幕 | 97久久精品无码一区二区 | 亚洲国产精品无码久久久久高潮 | 人人妻人人澡人人爽人人精品 | 日本乱偷人妻中文字幕 | 精品久久久中文字幕人妻 | 亚洲热妇无码av在线播放 | 99久久无码一区人妻 | 丰满人妻精品国产99aⅴ | 精品国偷自产在线 | 伊人久久大香线蕉午夜 | 精品无码一区二区三区爱欲 | 亚洲人亚洲人成电影网站色 | 天天av天天av天天透 | 国产精品丝袜黑色高跟鞋 | 国产卡一卡二卡三 | 一本色道久久综合亚洲精品不卡 | 久久精品人妻少妇一区二区三区 | 国产一区二区三区四区五区加勒比 | 亚洲男人av天堂午夜在 | 鲁大师影院在线观看 | 丰满诱人的人妻3 | 两性色午夜免费视频 | 人妻中文无码久热丝袜 | 欧美 丝袜 自拍 制服 另类 | 少妇久久久久久人妻无码 | 日本大乳高潮视频在线观看 | 欧美日韩久久久精品a片 | 青青草原综合久久大伊人精品 | 99久久久国产精品无码免费 | 少妇人妻偷人精品无码视频 | 久久综合给合久久狠狠狠97色 | 欧美国产亚洲日韩在线二区 | 国产口爆吞精在线视频 | 亚洲国产欧美日韩精品一区二区三区 | 十八禁真人啪啪免费网站 | 一本无码人妻在中文字幕免费 | 少妇人妻偷人精品无码视频 | 在线观看免费人成视频 | 少妇激情av一区二区 | 综合人妻久久一区二区精品 | 欧美精品免费观看二区 | 欧美熟妇另类久久久久久不卡 | 澳门永久av免费网站 | 夜精品a片一区二区三区无码白浆 | 天天av天天av天天透 | 丰满少妇弄高潮了www | 亚洲欧洲无卡二区视頻 | 国产亚洲精品久久久久久国模美 | 亚洲精品综合一区二区三区在线 | 国产精品久久久久久亚洲毛片 | 久久久久se色偷偷亚洲精品av | 国产女主播喷水视频在线观看 | 18无码粉嫩小泬无套在线观看 | 欧美xxxx黑人又粗又长 | 亚洲区小说区激情区图片区 | 色综合久久久无码网中文 | 性欧美videos高清精品 | 正在播放东北夫妻内射 | 狠狠亚洲超碰狼人久久 | 人妻无码久久精品人妻 | 亚洲一区二区三区国产精华液 | 99re在线播放 | 熟妇女人妻丰满少妇中文字幕 | 熟女少妇在线视频播放 | 色欲人妻aaaaaaa无码 | 欧美国产亚洲日韩在线二区 | 成年女人永久免费看片 | 色一情一乱一伦一视频免费看 | 欧美人与禽猛交狂配 | 又色又爽又黄的美女裸体网站 | 国产色精品久久人妻 | 亚洲成色www久久网站 | 97色伦图片97综合影院 | 色婷婷综合中文久久一本 | 又黄又爽又色的视频 | 国产精品久久久午夜夜伦鲁鲁 | 人妻无码久久精品人妻 | 国色天香社区在线视频 | а√天堂www在线天堂小说 | 国产农村妇女aaaaa视频 撕开奶罩揉吮奶头视频 | 国产手机在线αⅴ片无码观看 | 婷婷丁香五月天综合东京热 | 天堂在线观看www | 国产成人精品视频ⅴa片软件竹菊 | 欧美精品一区二区精品久久 | 免费看少妇作爱视频 | 好男人社区资源 | 亚洲人成人无码网www国产 | 大肉大捧一进一出视频出来呀 | 日本精品少妇一区二区三区 | 丰满岳乱妇在线观看中字无码 | 久久久久99精品国产片 | 精品国产一区av天美传媒 | 国产欧美亚洲精品a | 精品偷自拍另类在线观看 | 日本欧美一区二区三区乱码 | 久久久久久av无码免费看大片 | 波多野结衣乳巨码无在线观看 | 国产三级久久久精品麻豆三级 | 在教室伦流澡到高潮hnp视频 | 性欧美videos高清精品 | 成人性做爰aaa片免费看 | 国产无遮挡又黄又爽免费视频 | 日韩在线不卡免费视频一区 | 天干天干啦夜天干天2017 | 亚洲欧美日韩成人高清在线一区 | 纯爱无遮挡h肉动漫在线播放 | 久久久久99精品国产片 | 亚洲一区二区观看播放 | 色五月五月丁香亚洲综合网 | 偷窥村妇洗澡毛毛多 | 国产一区二区三区日韩精品 | 国产精品毛片一区二区 | 成熟女人特级毛片www免费 | 亚洲乱亚洲乱妇50p | 性开放的女人aaa片 | 国产精品国产三级国产专播 | 国产成人无码午夜视频在线观看 | 18黄暴禁片在线观看 | 极品尤物被啪到呻吟喷水 | 97夜夜澡人人双人人人喊 | 天堂久久天堂av色综合 | 天堂а√在线地址中文在线 | 国产精品久久国产三级国 | 精品成人av一区二区三区 | 久久久成人毛片无码 | 亚洲成av人影院在线观看 | 国产超级va在线观看视频 | 亚洲va中文字幕无码久久不卡 | 国产人妖乱国产精品人妖 | 国产口爆吞精在线视频 | 亚洲欧洲无卡二区视頻 | 久久久精品欧美一区二区免费 | 成 人 免费观看网站 | 国产热a欧美热a在线视频 | 无码吃奶揉捏奶头高潮视频 | 国产精品久久国产精品99 | 国精产品一品二品国精品69xx | 乌克兰少妇性做爰 | 色欲久久久天天天综合网精品 | 精品国产一区二区三区四区在线看 | 亚洲精品一区二区三区在线观看 | 在线 国产 欧美 亚洲 天堂 | 亚洲国产欧美日韩精品一区二区三区 | 乱码av麻豆丝袜熟女系列 | 欧美三级a做爰在线观看 | 亚洲色成人中文字幕网站 | 久久久久久久久888 | 人人妻人人澡人人爽欧美一区九九 | 亚洲精品美女久久久久久久 | 欧美黑人性暴力猛交喷水 | 成人免费视频视频在线观看 免费 | 久久精品中文字幕一区 | 亚洲熟妇自偷自拍另类 | 国产农村妇女高潮大叫 | 精品无码国产自产拍在线观看蜜 | 天下第一社区视频www日本 | 国内精品人妻无码久久久影院蜜桃 | 波多野结衣av一区二区全免费观看 | 国产性生大片免费观看性 | 88国产精品欧美一区二区三区 | 天堂一区人妻无码 | 大肉大捧一进一出视频出来呀 | 国产高潮视频在线观看 | 国产精品人妻一区二区三区四 | 天天综合网天天综合色 | 国产国产精品人在线视 | 精品久久8x国产免费观看 | 2019nv天堂香蕉在线观看 | 国产在线精品一区二区高清不卡 | 99国产精品白浆在线观看免费 | 免费国产成人高清在线观看网站 | 999久久久国产精品消防器材 | 无套内谢的新婚少妇国语播放 | 少妇性俱乐部纵欲狂欢电影 | 欧美xxxx黑人又粗又长 | 九九热爱视频精品 | 最新版天堂资源中文官网 | yw尤物av无码国产在线观看 | 国产人妻精品一区二区三区 | 久久久婷婷五月亚洲97号色 | 国产午夜福利亚洲第一 | 亚洲の无码国产の无码步美 | 欧美日韩在线亚洲综合国产人 | 日韩av无码一区二区三区 | 国产精品亚洲lv粉色 | 亚洲精品综合五月久久小说 | 亚洲性无码av中文字幕 | 中文字幕无码av波多野吉衣 | 激情五月综合色婷婷一区二区 | 好爽又高潮了毛片免费下载 | 亚洲一区二区三区在线观看网站 | 在线a亚洲视频播放在线观看 | 无码人妻少妇伦在线电影 | 亚洲国产日韩a在线播放 | 波多野结衣一区二区三区av免费 | 疯狂三人交性欧美 | 在线亚洲高清揄拍自拍一品区 | 亚洲a无码综合a国产av中文 | 天堂无码人妻精品一区二区三区 | 日本精品人妻无码77777 天堂一区人妻无码 | 国产精品内射视频免费 | 亚洲熟妇色xxxxx欧美老妇 | 性欧美videos高清精品 | 疯狂三人交性欧美 | 欧美日韩视频无码一区二区三 | 麻豆av传媒蜜桃天美传媒 | 亚洲欧美日韩综合久久久 | 国产精品理论片在线观看 | 欧美精品一区二区精品久久 | 欧美精品无码一区二区三区 | 午夜精品久久久久久久 | 亚洲中文无码av永久不收费 | 欧美性色19p | 人人爽人人澡人人高潮 | 国产成人无码专区 | 日本一卡2卡3卡4卡无卡免费网站 国产一区二区三区影院 | 国产色xx群视频射精 | 国产婷婷色一区二区三区在线 | 在线视频网站www色 | 丰满诱人的人妻3 | 免费人成在线观看网站 | 人人爽人人澡人人高潮 | 天天拍夜夜添久久精品大 | 国产av人人夜夜澡人人爽麻豆 | 九九久久精品国产免费看小说 | 在线成人www免费观看视频 | 噜噜噜亚洲色成人网站 | 国产精品久久久久久亚洲毛片 | 欧洲极品少妇 | 在线看片无码永久免费视频 | 无码福利日韩神码福利片 | 国产精品99爱免费视频 | 日本高清一区免费中文视频 | 九九综合va免费看 | 蜜桃视频韩日免费播放 | 国内综合精品午夜久久资源 | 性生交大片免费看女人按摩摩 | 亚洲国产精品无码一区二区三区 | 国产真实夫妇视频 | 欧美zoozzooz性欧美 | 一本久久a久久精品亚洲 | 日韩少妇白浆无码系列 | 亚洲中文字幕在线观看 | 中文久久乱码一区二区 | 欧美xxxx黑人又粗又长 | 又紧又大又爽精品一区二区 | 中国女人内谢69xxxxxa片 | 久久国产自偷自偷免费一区调 | 亚洲精品成人av在线 | 精品偷拍一区二区三区在线看 | 少妇的肉体aa片免费 | 成人无码影片精品久久久 | 大肉大捧一进一出视频出来呀 | 欧美黑人性暴力猛交喷水 | 色婷婷欧美在线播放内射 | 桃花色综合影院 | 永久黄网站色视频免费直播 | 亚洲欧洲无卡二区视頻 | 亚洲一区二区三区含羞草 | 国产精品欧美成人 | 97精品人妻一区二区三区香蕉 | 鲁大师影院在线观看 | 男人和女人高潮免费网站 | 亚洲国产欧美在线成人 | 亚洲天堂2017无码中文 | 国产又爽又黄又刺激的视频 | 桃花色综合影院 | 国产在线精品一区二区高清不卡 | 蜜桃臀无码内射一区二区三区 | 国产成人久久精品流白浆 | 日本一本二本三区免费 | 捆绑白丝粉色jk震动捧喷白浆 | 又紧又大又爽精品一区二区 | 久久综合九色综合欧美狠狠 | 国产精品亚洲а∨无码播放麻豆 | 成人影院yy111111在线观看 | 欧美丰满熟妇xxxx | 欧美国产日产一区二区 | 国产精品.xx视频.xxtv | 俺去俺来也www色官网 | 亚洲国产精品成人久久蜜臀 | a国产一区二区免费入口 | 俄罗斯老熟妇色xxxx | 国产一精品一av一免费 | 日日摸天天摸爽爽狠狠97 | 一本久道高清无码视频 | 天堂а√在线中文在线 | 黑人巨大精品欧美黑寡妇 | 人人澡人人透人人爽 | 国产亚洲人成a在线v网站 | 18精品久久久无码午夜福利 | 成人无码视频在线观看网站 | 蜜桃av抽搐高潮一区二区 | 亚洲成色www久久网站 | 国产精品无码久久av | 日韩精品无码免费一区二区三区 | 欧美 丝袜 自拍 制服 另类 | 人人妻人人藻人人爽欧美一区 | 亚洲精品国产a久久久久久 | 在线天堂新版最新版在线8 | 午夜福利一区二区三区在线观看 | 性欧美疯狂xxxxbbbb | 亚洲国产精品久久久久久 | 国产真实夫妇视频 | 国产人妻精品一区二区三区 | 日本丰满熟妇videos | 国产成人综合色在线观看网站 | 国产亚洲人成在线播放 | 久久午夜无码鲁丝片秋霞 | 377p欧洲日本亚洲大胆 | 免费人成在线观看网站 | 亚洲另类伦春色综合小说 | 亚洲啪av永久无码精品放毛片 | 欧美日韩在线亚洲综合国产人 | 天干天干啦夜天干天2017 | 色偷偷人人澡人人爽人人模 | 激情内射亚州一区二区三区爱妻 | 伊人色综合久久天天小片 | 午夜福利电影 | 日本一区二区三区免费播放 | 欧美成人午夜精品久久久 | 超碰97人人做人人爱少妇 | 久久人人爽人人爽人人片av高清 | 在线看片无码永久免费视频 | 午夜熟女插插xx免费视频 | 精品无码国产自产拍在线观看蜜 | 精品国偷自产在线 | 在线看片无码永久免费视频 | 精品一二三区久久aaa片 | 一本无码人妻在中文字幕免费 | 久久久久亚洲精品中文字幕 | 国产综合久久久久鬼色 | 在线看片无码永久免费视频 | 午夜福利试看120秒体验区 | 人妻人人添人妻人人爱 | 久久国语露脸国产精品电影 | 色五月丁香五月综合五月 | 国精品人妻无码一区二区三区蜜柚 | 黑人玩弄人妻中文在线 | 精品国产福利一区二区 | 国产亚洲人成在线播放 | 国产一精品一av一免费 | 蜜桃av蜜臀av色欲av麻 999久久久国产精品消防器材 | 2020久久超碰国产精品最新 | 青草视频在线播放 | 亲嘴扒胸摸屁股激烈网站 | 国产熟妇另类久久久久 | 巨爆乳无码视频在线观看 | 亚洲精品国产精品乱码不卡 | 国产口爆吞精在线视频 | 欧美黑人乱大交 | 搡女人真爽免费视频大全 | 亚欧洲精品在线视频免费观看 | 久久久久av无码免费网 | 大肉大捧一进一出视频出来呀 | 成人亚洲精品久久久久 | 动漫av网站免费观看 | 国产福利视频一区二区 | 亚洲男人av香蕉爽爽爽爽 | 沈阳熟女露脸对白视频 | 77777熟女视频在线观看 а天堂中文在线官网 | 内射后入在线观看一区 | 黑人巨大精品欧美一区二区 | 少妇久久久久久人妻无码 | 久久久久久九九精品久 | 性欧美疯狂xxxxbbbb | 沈阳熟女露脸对白视频 | 婷婷五月综合激情中文字幕 | 国产精品资源一区二区 | 国产精品办公室沙发 | 久久久久成人精品免费播放动漫 | 青青久在线视频免费观看 | 女人被男人躁得好爽免费视频 | 亚洲熟悉妇女xxx妇女av | 亚洲日本va中文字幕 | 永久免费观看美女裸体的网站 | 久9re热视频这里只有精品 | 午夜福利试看120秒体验区 | 精品无码国产一区二区三区av | 国内揄拍国内精品少妇国语 | 国产av久久久久精东av | 亚洲人成人无码网www国产 | 无码av中文字幕免费放 | 无套内谢的新婚少妇国语播放 | 久久久久人妻一区精品色欧美 | 婷婷六月久久综合丁香 | 99精品国产综合久久久久五月天 | 曰韩无码二三区中文字幕 | 免费人成在线观看网站 | 少妇一晚三次一区二区三区 | 国产精品视频免费播放 | 少妇人妻av毛片在线看 | 国产av久久久久精东av | 亚洲aⅴ无码成人网站国产app | 99精品无人区乱码1区2区3区 | 亲嘴扒胸摸屁股激烈网站 | 人妻少妇被猛烈进入中文字幕 | 99久久精品日本一区二区免费 | 国产香蕉尹人视频在线 | 国产人妻大战黑人第1集 | 亚洲色在线无码国产精品不卡 | 沈阳熟女露脸对白视频 | 国产精品久久久久久久影院 | 日本一区二区三区免费播放 | 一本一道久久综合久久 | 天堂а√在线地址中文在线 | 日本大香伊一区二区三区 | 野外少妇愉情中文字幕 | 日本爽爽爽爽爽爽在线观看免 | www成人国产高清内射 | 国产精品欧美成人 | 久久天天躁狠狠躁夜夜免费观看 | 国产超碰人人爽人人做人人添 | 成 人影片 免费观看 | 国产婷婷色一区二区三区在线 | 国产一区二区三区精品视频 | 欧美熟妇另类久久久久久多毛 | 欧美黑人性暴力猛交喷水 | 精品无人区无码乱码毛片国产 | 亚洲一区二区观看播放 | 国产人成高清在线视频99最全资源 | 精品国产aⅴ无码一区二区 | 亚洲精品午夜国产va久久成人 | 亚洲狠狠婷婷综合久久 | 熟妇人妻无码xxx视频 | 亚洲精品一区二区三区四区五区 | 久久精品国产亚洲精品 | 又湿又紧又大又爽a视频国产 | 久久精品国产精品国产精品污 | 午夜福利不卡在线视频 | 纯爱无遮挡h肉动漫在线播放 | 99久久精品日本一区二区免费 | 精品国产麻豆免费人成网站 | 日韩精品无码免费一区二区三区 | 乱码午夜-极国产极内射 | 久久综合九色综合97网 | aⅴ在线视频男人的天堂 | 装睡被陌生人摸出水好爽 | 东京热男人av天堂 | 无套内谢的新婚少妇国语播放 | 2019午夜福利不卡片在线 | 国产偷国产偷精品高清尤物 | 妺妺窝人体色www婷婷 | 日韩精品乱码av一区二区 | 日本又色又爽又黄的a片18禁 | 国产无遮挡又黄又爽又色 | 无码成人精品区在线观看 | 人人澡人人妻人人爽人人蜜桃 | 国产乱人无码伦av在线a | 大屁股大乳丰满人妻 | 国产成人综合美国十次 | 亚洲人成影院在线无码按摩店 | 性史性农村dvd毛片 | 久久久久久久久888 | 高潮毛片无遮挡高清免费视频 | 少妇性l交大片欧洲热妇乱xxx | 男女性色大片免费网站 | 少妇性l交大片 | 纯爱无遮挡h肉动漫在线播放 | 亚洲色偷偷男人的天堂 | 国产激情一区二区三区 | 性生交大片免费看l | 人人澡人人透人人爽 | 久久精品丝袜高跟鞋 | 免费国产成人高清在线观看网站 | 欧美人妻一区二区三区 | 在线精品国产一区二区三区 | 精品无码av一区二区三区 | 精品国精品国产自在久国产87 | 97夜夜澡人人双人人人喊 | 中文字幕人妻无码一夲道 | 麻豆蜜桃av蜜臀av色欲av | 美女极度色诱视频国产 | 亚洲精品鲁一鲁一区二区三区 | 性生交大片免费看女人按摩摩 | 97夜夜澡人人双人人人喊 | yw尤物av无码国产在线观看 | 熟妇人妻无码xxx视频 | 久久国语露脸国产精品电影 | 夜夜夜高潮夜夜爽夜夜爰爰 | 51国偷自产一区二区三区 | 国产在线精品一区二区三区直播 | 性欧美牲交在线视频 | 久久综合九色综合欧美狠狠 | 久久97精品久久久久久久不卡 | 亚洲一区二区三区香蕉 | 无码播放一区二区三区 | 免费男性肉肉影院 | 久久久久成人片免费观看蜜芽 | 一本一道久久综合久久 | 亚洲中文无码av永久不收费 | 熟女体下毛毛黑森林 | 国产一区二区三区四区五区加勒比 | 国产成人av免费观看 | 国产精品亚洲а∨无码播放麻豆 | 精品无码av一区二区三区 | 极品尤物被啪到呻吟喷水 | 高潮毛片无遮挡高清免费 | 亚洲国产欧美国产综合一区 | 国产亚洲精品久久久久久久久动漫 | 国产尤物精品视频 | 97久久精品无码一区二区 | 夫妻免费无码v看片 | 午夜丰满少妇性开放视频 | 偷窥村妇洗澡毛毛多 | 久久99精品久久久久久动态图 | 一本久久伊人热热精品中文字幕 | 久久www免费人成人片 | 老太婆性杂交欧美肥老太 | 成人精品一区二区三区中文字幕 | 高中生自慰www网站 | 久久久久久久女国产乱让韩 | 国产超级va在线观看视频 | 欧美性猛交xxxx富婆 | 欧美激情一区二区三区成人 | 久久99精品久久久久婷婷 | 国产av一区二区精品久久凹凸 | 99久久亚洲精品无码毛片 | 精品久久久久久亚洲精品 | 无码乱肉视频免费大全合集 | 一本加勒比波多野结衣 | 国产乱人伦av在线无码 | 国产免费久久久久久无码 | 天堂无码人妻精品一区二区三区 | 亚洲区小说区激情区图片区 | 俺去俺来也在线www色官网 | 人妻插b视频一区二区三区 | 国内少妇偷人精品视频 | 国色天香社区在线视频 | av无码久久久久不卡免费网站 | 日本大香伊一区二区三区 | 人人爽人人澡人人高潮 | 亚洲欧美日韩综合久久久 | 国产午夜福利100集发布 | 国产亚洲欧美日韩亚洲中文色 | а√天堂www在线天堂小说 | 免费网站看v片在线18禁无码 | 狠狠噜狠狠狠狠丁香五月 | 性欧美疯狂xxxxbbbb | 纯爱无遮挡h肉动漫在线播放 | av无码电影一区二区三区 | 人人爽人人爽人人片av亚洲 | 国产肉丝袜在线观看 | 欧美日韩色另类综合 | 一本色道久久综合亚洲精品不卡 | 99riav国产精品视频 | 日韩av激情在线观看 | 中文字幕无码日韩专区 | 久久精品国产99久久6动漫 | 国产av无码专区亚洲awww | 国产激情一区二区三区 | 国产精品久久久久久亚洲影视内衣 | 六十路熟妇乱子伦 | 国产艳妇av在线观看果冻传媒 | 亚洲熟妇色xxxxx亚洲 | 中文字幕无码热在线视频 | 国产成人精品视频ⅴa片软件竹菊 | 亚洲va中文字幕无码久久不卡 | 亚洲精品成人av在线 | 精品日本一区二区三区在线观看 | 亚洲精品国偷拍自产在线麻豆 | 暴力强奷在线播放无码 | 久久久久av无码免费网 | 99精品久久毛片a片 | 日韩av无码一区二区三区 | 欧洲美熟女乱又伦 | 人妻中文无码久热丝袜 | 自拍偷自拍亚洲精品10p | 国产精品亚洲а∨无码播放麻豆 | 小鲜肉自慰网站xnxx | 久久午夜无码鲁丝片 | 欧美老妇交乱视频在线观看 | 黄网在线观看免费网站 | 一本色道婷婷久久欧美 | 丰满妇女强制高潮18xxxx | 亚洲精品一区二区三区四区五区 | 在线视频网站www色 | 蜜桃av抽搐高潮一区二区 | 九月婷婷人人澡人人添人人爽 | 亚洲国产日韩a在线播放 | 领导边摸边吃奶边做爽在线观看 | 国产莉萝无码av在线播放 | 在线欧美精品一区二区三区 | 又大又硬又爽免费视频 | 日本丰满护士爆乳xxxx | 伊人久久大香线焦av综合影院 | 99久久精品无码一区二区毛片 | 日日天干夜夜狠狠爱 | 白嫩日本少妇做爰 | 伦伦影院午夜理论片 | 成在人线av无码免费 | 精品一二三区久久aaa片 | 永久免费观看国产裸体美女 | 无码av中文字幕免费放 | 亚洲熟妇色xxxxx亚洲 | 一本色道婷婷久久欧美 | 夜夜夜高潮夜夜爽夜夜爰爰 | 99久久精品日本一区二区免费 | 精品乱码久久久久久久 | 高清不卡一区二区三区 | 无码毛片视频一区二区本码 | 国产精品99爱免费视频 | 一区二区三区高清视频一 | 久久久亚洲欧洲日产国码αv | 亚洲大尺度无码无码专区 | 久久久精品成人免费观看 | 国产精品成人av在线观看 | 国产亚洲精品精品国产亚洲综合 | 波多野结衣av在线观看 | 国产精品久久久久无码av色戒 | 伦伦影院午夜理论片 | 国产精品人妻一区二区三区四 | 激情综合激情五月俺也去 | 中文字幕人妻丝袜二区 | 亚洲欧美中文字幕5发布 | 国产乱人伦av在线无码 | 国产亲子乱弄免费视频 | 亚洲自偷自拍另类第1页 | 国产在线一区二区三区四区五区 | 国产美女极度色诱视频www | 巨爆乳无码视频在线观看 | 国产人妖乱国产精品人妖 | 天下第一社区视频www日本 | 亚洲国产精品美女久久久久 | 免费无码午夜福利片69 | 国产精品无码一区二区桃花视频 | 国产精品无码永久免费888 | 老司机亚洲精品影院无码 | 亚洲欧美精品aaaaaa片 | 亚洲欧美国产精品专区久久 | 少妇性俱乐部纵欲狂欢电影 | 人妻与老人中文字幕 | 成 人 网 站国产免费观看 | 四虎4hu永久免费 | 日本肉体xxxx裸交 | 伊在人天堂亚洲香蕉精品区 | 国产人妻精品一区二区三区 | 女高中生第一次破苞av | 亚洲国精产品一二二线 | 全黄性性激高免费视频 | 无码人妻av免费一区二区三区 | 欧美精品在线观看 | 欧美亚洲国产一区二区三区 | 国产精品第一国产精品 | 国产疯狂伦交大片 | 荫蒂添的好舒服视频囗交 | 亚洲国产综合无码一区 | 国产在热线精品视频 | 亚洲综合无码久久精品综合 | 亚洲欧洲无卡二区视頻 | 人妻少妇精品视频专区 | 自拍偷自拍亚洲精品被多人伦好爽 | 夜先锋av资源网站 | 日本爽爽爽爽爽爽在线观看免 | 国产免费观看黄av片 | 国产国产精品人在线视 | 久久久久国色av免费观看性色 | 在线亚洲高清揄拍自拍一品区 | 性生交大片免费看女人按摩摩 | 女人被男人躁得好爽免费视频 | 久久综合久久自在自线精品自 | a国产一区二区免费入口 | 无码国产激情在线观看 | 国产乱子伦视频在线播放 | 老太婆性杂交欧美肥老太 | 在线视频网站www色 | 亚洲日韩乱码中文无码蜜桃臀网站 | 在线观看国产一区二区三区 | 成人精品一区二区三区中文字幕 | 欧美人妻一区二区三区 | 精品国产福利一区二区 | 亚洲无人区一区二区三区 | 55夜色66夜色国产精品视频 | 亚洲成av人综合在线观看 | 国产精品人人爽人人做我的可爱 | 狠狠色丁香久久婷婷综合五月 | 亚洲国产av美女网站 | 国产精品久久国产三级国 | 欧美第一黄网免费网站 | 亚洲a无码综合a国产av中文 | 久久精品中文字幕大胸 | 377p欧洲日本亚洲大胆 | 国产成人精品无码播放 | 日韩少妇白浆无码系列 | 鲁大师影院在线观看 | 国产亚洲精品久久久闺蜜 | 国产成人一区二区三区别 | 桃花色综合影院 | 国产精品-区区久久久狼 | 51国偷自产一区二区三区 | 色一情一乱一伦一视频免费看 | 久久综合激激的五月天 | 国产明星裸体无码xxxx视频 | 国产成人精品视频ⅴa片软件竹菊 | 国产人妖乱国产精品人妖 | 人妻人人添人妻人人爱 | 亚洲 激情 小说 另类 欧美 | 伊在人天堂亚洲香蕉精品区 | 国产超碰人人爽人人做人人添 | 欧美怡红院免费全部视频 | 鲁一鲁av2019在线 | 台湾无码一区二区 | 成人影院yy111111在线观看 | 日韩av无码一区二区三区不卡 | 国产人妻久久精品二区三区老狼 | 图片小说视频一区二区 | 天堂亚洲2017在线观看 | 久久久www成人免费毛片 | 久久99精品久久久久久动态图 | 亚洲自偷自偷在线制服 | 国产在线无码精品电影网 | 欧美黑人性暴力猛交喷水 | 欧美zoozzooz性欧美 | 欧美三级a做爰在线观看 | 欧美亚洲国产一区二区三区 | 一个人看的www免费视频在线观看 | 国产乱人无码伦av在线a | 亚洲精品一区二区三区大桥未久 | 亚洲中文字幕乱码av波多ji | 窝窝午夜理论片影院 | 日日碰狠狠躁久久躁蜜桃 | 免费观看的无遮挡av | 国产偷国产偷精品高清尤物 | 图片小说视频一区二区 | 久久视频在线观看精品 | 国内精品久久久久久中文字幕 | 久久亚洲国产成人精品性色 | 亚洲狠狠色丁香婷婷综合 | 性欧美牲交xxxxx视频 | 九九久久精品国产免费看小说 | 国产舌乚八伦偷品w中 | 国产精品久免费的黄网站 | 夜夜高潮次次欢爽av女 | 国产成人人人97超碰超爽8 | 日本精品人妻无码77777 天堂一区人妻无码 | 日韩欧美中文字幕公布 | 婷婷五月综合缴情在线视频 | 精品偷拍一区二区三区在线看 | 日本护士毛茸茸高潮 | 中文字幕精品av一区二区五区 | 丝袜人妻一区二区三区 | 中文字幕亚洲情99在线 | 波多野结衣aⅴ在线 | 国产又爽又猛又粗的视频a片 | 国产亲子乱弄免费视频 | 红桃av一区二区三区在线无码av | 国产精品第一国产精品 | 妺妺窝人体色www婷婷 | 国产小呦泬泬99精品 | 日本肉体xxxx裸交 | 香港三级日本三级妇三级 | 国产亚洲精品久久久久久 | 国产麻豆精品一区二区三区v视界 | 国产精品毛多多水多 | 国产尤物精品视频 | 欧美激情内射喷水高潮 | 欧美午夜特黄aaaaaa片 | 国产亚洲精品久久久久久久 | 高清不卡一区二区三区 | 久久久久久久久888 | 亚洲娇小与黑人巨大交 | 欧美精品无码一区二区三区 | 午夜精品久久久久久久久 | 亚洲 欧美 激情 小说 另类 | 国产农村乱对白刺激视频 | 波多野结衣高清一区二区三区 | 亚洲区欧美区综合区自拍区 | 国产极品视觉盛宴 | 国产内射爽爽大片视频社区在线 | 一本大道伊人av久久综合 | 亚洲另类伦春色综合小说 | 98国产精品综合一区二区三区 | 麻豆人妻少妇精品无码专区 | 亚洲精品国产精品乱码视色 | 夜夜高潮次次欢爽av女 | 国产精品久久久久久久9999 | 国产超级va在线观看视频 | 国产真人无遮挡作爱免费视频 | 国产人妻大战黑人第1集 | 波多野结衣乳巨码无在线观看 | 国产精品国产三级国产专播 | 国产激情综合五月久久 | 国产色视频一区二区三区 | 天天av天天av天天透 | 国产精品嫩草久久久久 | 女人被男人爽到呻吟的视频 | 俄罗斯老熟妇色xxxx | а√资源新版在线天堂 | 嫩b人妻精品一区二区三区 | 99久久精品无码一区二区毛片 | 亚洲 高清 成人 动漫 | 久久99热只有频精品8 | 亚洲欧美日韩成人高清在线一区 | 国产精品自产拍在线观看 | 色婷婷综合中文久久一本 | 国产真人无遮挡作爱免费视频 | 久激情内射婷内射蜜桃人妖 | 欧美日韩综合一区二区三区 | 国产亚洲精品久久久闺蜜 | 西西人体www44rt大胆高清 | 熟妇激情内射com | 欧美三级不卡在线观看 | 国产精品自产拍在线观看 | 无码人妻出轨黑人中文字幕 | 亚洲日韩中文字幕在线播放 | 日产精品99久久久久久 | 精品无码国产一区二区三区av | 亚洲熟妇自偷自拍另类 | 亚洲色大成网站www | 一个人看的www免费视频在线观看 | 理论片87福利理论电影 | 国产一区二区不卡老阿姨 | 精品午夜福利在线观看 | 无码午夜成人1000部免费视频 | 骚片av蜜桃精品一区 | 色综合久久久无码中文字幕 | 国产亚洲精品久久久久久久久动漫 | 欧美人与善在线com | 人妻互换免费中文字幕 | 乱人伦人妻中文字幕无码 | 99久久精品日本一区二区免费 | 久久亚洲精品中文字幕无男同 | 精品夜夜澡人妻无码av蜜桃 | 东京热男人av天堂 | 全黄性性激高免费视频 | 熟妇人妻中文av无码 | 全黄性性激高免费视频 | 无遮挡国产高潮视频免费观看 | 日韩精品成人一区二区三区 | 天天躁夜夜躁狠狠是什么心态 | 国产午夜手机精彩视频 | 亚洲中文无码av永久不收费 | 国产精品亚洲lv粉色 | 亚洲中文无码av永久不收费 | 一本久久a久久精品亚洲 | 麻豆国产丝袜白领秘书在线观看 | 蜜桃臀无码内射一区二区三区 | 国产亚洲精品精品国产亚洲综合 | 久久精品国产一区二区三区肥胖 | 国产乱人伦app精品久久 国产在线无码精品电影网 国产国产精品人在线视 | 亚洲成熟女人毛毛耸耸多 | 亚洲午夜久久久影院 | 色噜噜亚洲男人的天堂 | 国产午夜手机精彩视频 | 丰满少妇熟乱xxxxx视频 | 日本一本二本三区免费 | 亚洲国产精品一区二区第一页 | 国产成人一区二区三区在线观看 | 国产亚洲精品久久久久久 | 一本无码人妻在中文字幕免费 | 欧美丰满熟妇xxxx性ppx人交 | 欧美精品无码一区二区三区 | 乱中年女人伦av三区 | 日本一区二区三区免费播放 | 久久久久av无码免费网 | 女人被男人躁得好爽免费视频 | 成人女人看片免费视频放人 | 男人和女人高潮免费网站 | 亚洲综合在线一区二区三区 | 久久人人爽人人爽人人片av高清 | 亚洲国产精华液网站w | 中文字幕日产无线码一区 | 国产精品香蕉在线观看 | 免费观看黄网站 | 精品厕所偷拍各类美女tp嘘嘘 | 欧美高清在线精品一区 | 激情综合激情五月俺也去 | 人人澡人摸人人添 | www一区二区www免费 | а√资源新版在线天堂 | 日日碰狠狠躁久久躁蜜桃 | 蜜桃臀无码内射一区二区三区 | 免费乱码人妻系列无码专区 | 久久精品国产大片免费观看 | av无码久久久久不卡免费网站 | 双乳奶水饱满少妇呻吟 | 国产精品亚洲一区二区三区喷水 | 欧美自拍另类欧美综合图片区 | 99riav国产精品视频 | 国产片av国语在线观看 | 理论片87福利理论电影 | av人摸人人人澡人人超碰下载 | 小泽玛莉亚一区二区视频在线 | 亚洲综合无码一区二区三区 | 亚洲人交乣女bbw | 亚洲无人区午夜福利码高清完整版 | 国产特级毛片aaaaaa高潮流水 | 97精品国产97久久久久久免费 | 日本丰满护士爆乳xxxx | 丰满岳乱妇在线观看中字无码 | 精品水蜜桃久久久久久久 | 福利一区二区三区视频在线观看 | 久久五月精品中文字幕 | 国产精品久久久久久亚洲毛片 | 亚洲日本一区二区三区在线 | 宝宝好涨水快流出来免费视频 | 国产av剧情md精品麻豆 | 久久久av男人的天堂 | 国产午夜亚洲精品不卡 | 少妇的肉体aa片免费 | 精品偷自拍另类在线观看 | 国产成人综合在线女婷五月99播放 | 女人高潮内射99精品 | 国产精品鲁鲁鲁 | 国产综合久久久久鬼色 | 国产精品无码永久免费888 | 永久免费观看美女裸体的网站 | 青草青草久热国产精品 | 国产9 9在线 | 中文 | 精品厕所偷拍各类美女tp嘘嘘 | 久久99国产综合精品 | 我要看www免费看插插视频 | 九九久久精品国产免费看小说 | 天堂а√在线中文在线 | a在线亚洲男人的天堂 | 亚洲成a人片在线观看日本 | 自拍偷自拍亚洲精品10p | 欧美日韩在线亚洲综合国产人 | 久久久久免费看成人影片 | 波多野结衣一区二区三区av免费 | 97精品人妻一区二区三区香蕉 | 中文亚洲成a人片在线观看 | 国产无遮挡又黄又爽免费视频 | 1000部夫妻午夜免费 | 日本精品久久久久中文字幕 | 国产成人精品三级麻豆 | 亚洲国产一区二区三区在线观看 | 久久精品女人的天堂av | 狠狠色丁香久久婷婷综合五月 | 国产性生大片免费观看性 | 夜夜夜高潮夜夜爽夜夜爰爰 | 国产人成高清在线视频99最全资源 | 亲嘴扒胸摸屁股激烈网站 | 国产偷国产偷精品高清尤物 | 影音先锋中文字幕无码 | 亚洲欧美日韩国产精品一区二区 | 欧美老妇与禽交 | 国产肉丝袜在线观看 | 特级做a爰片毛片免费69 | 一本大道伊人av久久综合 | 亚洲伊人久久精品影院 | 国产精品国产自线拍免费软件 | 国产成人综合色在线观看网站 | 精品午夜福利在线观看 | 伊人久久大香线焦av综合影院 | 野外少妇愉情中文字幕 | 色一情一乱一伦 | 嫩b人妻精品一区二区三区 | 午夜时刻免费入口 | 亚洲色在线无码国产精品不卡 | 日韩亚洲欧美中文高清在线 | 中文字幕无码人妻少妇免费 | 国内精品人妻无码久久久影院蜜桃 | 男人的天堂av网站 | 成人精品视频一区二区三区尤物 | 粗大的内捧猛烈进出视频 | 国产精品亚洲专区无码不卡 | 国产av一区二区三区最新精品 | 欧美日本精品一区二区三区 | 亚洲精品国产精品乱码视色 | 久久无码人妻影院 | 欧美性黑人极品hd | 午夜理论片yy44880影院 | 国产av一区二区精品久久凹凸 | 狠狠噜狠狠狠狠丁香五月 | 中文字幕亚洲情99在线 | 人人妻人人澡人人爽欧美一区 | av人摸人人人澡人人超碰下载 | 日产精品99久久久久久 | 少妇性l交大片 | 国产电影无码午夜在线播放 | 日本熟妇人妻xxxxx人hd | 亚洲精品国偷拍自产在线观看蜜桃 | 任你躁在线精品免费 | 亚洲s码欧洲m码国产av | 久久亚洲日韩精品一区二区三区 | 三上悠亚人妻中文字幕在线 | www国产亚洲精品久久久日本 | 国产免费久久精品国产传媒 | 亚洲大尺度无码无码专区 | 成人精品视频一区二区 | 国产成人精品三级麻豆 | 给我免费的视频在线观看 | 国产一区二区三区影院 | 乱人伦人妻中文字幕无码久久网 | 伦伦影院午夜理论片 | 强开小婷嫩苞又嫩又紧视频 | 免费观看的无遮挡av | 任你躁国产自任一区二区三区 | 久久精品中文闷骚内射 | 免费播放一区二区三区 | 精品国产福利一区二区 | 麻花豆传媒剧国产免费mv在线 | 久久亚洲中文字幕精品一区 | 2019nv天堂香蕉在线观看 | 国产 浪潮av性色四虎 | 亚洲精品一区二区三区婷婷月 | 亚无码乱人伦一区二区 | 国产香蕉97碰碰久久人人 | 亚洲人成网站在线播放942 | 亚洲大尺度无码无码专区 | 精品人妻人人做人人爽 | 欧美阿v高清资源不卡在线播放 | 成人亚洲精品久久久久 | 久久国产劲爆∧v内射 | 漂亮人妻洗澡被公强 日日躁 | 日本熟妇乱子伦xxxx | 色欲av亚洲一区无码少妇 | 爽爽影院免费观看 | 未满小14洗澡无码视频网站 | 婷婷五月综合缴情在线视频 | 乱码av麻豆丝袜熟女系列 | 中文字幕亚洲情99在线 | 国产精品办公室沙发 | 国内揄拍国内精品人妻 | 精品日本一区二区三区在线观看 | 亚洲中文字幕成人无码 | 亚洲gv猛男gv无码男同 | 亚洲日韩一区二区三区 | 99精品无人区乱码1区2区3区 | 成人精品天堂一区二区三区 | 亚洲国产精品无码久久久久高潮 | 亚洲大尺度无码无码专区 | 国产熟女一区二区三区四区五区 | 真人与拘做受免费视频 | 天天躁夜夜躁狠狠是什么心态 | 噜噜噜亚洲色成人网站 | 欧美日韩人成综合在线播放 | 成人免费视频一区二区 | 国语精品一区二区三区 | 亚洲欧美日韩国产精品一区二区 | 撕开奶罩揉吮奶头视频 | 日日摸天天摸爽爽狠狠97 | 最近的中文字幕在线看视频 | 色综合久久88色综合天天 | 欧美日韩一区二区综合 | 欧美丰满少妇xxxx性 | 在线观看国产午夜福利片 | 久久久精品456亚洲影院 | 无码国模国产在线观看 | 久久精品国产精品国产精品污 | 精品一二三区久久aaa片 | 国产精品第一国产精品 | 鲁鲁鲁爽爽爽在线视频观看 | 四虎永久在线精品免费网址 | 一本精品99久久精品77 | 丁香花在线影院观看在线播放 | 国产精品久久久久无码av色戒 | 国产色xx群视频射精 | 国产精品无套呻吟在线 | 玩弄少妇高潮ⅹxxxyw | 中文字幕av无码一区二区三区电影 | 亚洲欧洲日本综合aⅴ在线 | 俄罗斯老熟妇色xxxx | 一区二区三区高清视频一 | 亚洲欧美日韩成人高清在线一区 | 又紧又大又爽精品一区二区 | 国产精品嫩草久久久久 | 久久久久99精品成人片 | 久久久www成人免费毛片 | 好爽又高潮了毛片免费下载 | 国产乱人伦av在线无码 | 亚洲熟悉妇女xxx妇女av | 久久国产精品萌白酱免费 | 男人的天堂av网站 | а√天堂www在线天堂小说 | 老太婆性杂交欧美肥老太 | 国产高清av在线播放 | 99在线 | 亚洲 | 亚洲欧美国产精品专区久久 | 蜜臀av无码人妻精品 | 日韩成人一区二区三区在线观看 | 特黄特色大片免费播放器图片 | 无码人妻少妇伦在线电影 | 亚洲日韩一区二区三区 | 自拍偷自拍亚洲精品10p | 精品久久久久久亚洲精品 | 性欧美熟妇videofreesex | 狠狠色噜噜狠狠狠狠7777米奇 | 久久久久人妻一区精品色欧美 | 国产内射老熟女aaaa | 久久精品丝袜高跟鞋 | www国产亚洲精品久久网站 | 国产成人综合在线女婷五月99播放 | 特大黑人娇小亚洲女 | 精品一区二区不卡无码av | 亚洲国产一区二区三区在线观看 | 娇妻被黑人粗大高潮白浆 | 国产亚洲人成在线播放 | 十八禁视频网站在线观看 | 亚洲国产成人av在线观看 | 国产电影无码午夜在线播放 | 欧美人与禽猛交狂配 | 国产亚洲人成a在线v网站 | 国语自产偷拍精品视频偷 | 亚洲春色在线视频 | 国产精品欧美成人 | 熟妇人妻无乱码中文字幕 | 伊人久久婷婷五月综合97色 | 亚洲精品久久久久中文第一幕 | 久久伊人色av天堂九九小黄鸭 | 国产无遮挡又黄又爽免费视频 | 精品久久久中文字幕人妻 | 中文精品久久久久人妻不卡 | 国产性猛交╳xxx乱大交 国产精品久久久久久无码 欧洲欧美人成视频在线 | 成人免费无码大片a毛片 | 日日夜夜撸啊撸 | 精品国产麻豆免费人成网站 | 亚洲精品欧美二区三区中文字幕 | 亚洲中文字幕成人无码 | 东京无码熟妇人妻av在线网址 | 国产无av码在线观看 | 少妇性l交大片欧洲热妇乱xxx | 美女扒开屁股让男人桶 | 精品亚洲成av人在线观看 | 两性色午夜免费视频 | 亚洲区小说区激情区图片区 | 亚洲s色大片在线观看 | 老子影院午夜伦不卡 | 大色综合色综合网站 | 精品人妻人人做人人爽 | 夜夜夜高潮夜夜爽夜夜爰爰 | 狠狠噜狠狠狠狠丁香五月 | 双乳奶水饱满少妇呻吟 | 三上悠亚人妻中文字幕在线 | www国产亚洲精品久久久日本 | 荫蒂被男人添的好舒服爽免费视频 | 伊人久久婷婷五月综合97色 | 国产精品99爱免费视频 | www国产亚洲精品久久网站 | 黑人玩弄人妻中文在线 | 麻豆果冻传媒2021精品传媒一区下载 | 欧美人与动性行为视频 | 国内揄拍国内精品人妻 | 精品欧美一区二区三区久久久 | 99久久99久久免费精品蜜桃 | 亚洲综合在线一区二区三区 | 久久久www成人免费毛片 | 99国产精品白浆在线观看免费 | 国产亚洲人成a在线v网站 | 国产精品无套呻吟在线 | 人人爽人人爽人人片av亚洲 | 日韩精品久久久肉伦网站 | 久久午夜无码鲁丝片秋霞 | 丰满人妻一区二区三区免费视频 | 一本色道婷婷久久欧美 | 最近中文2019字幕第二页 | 亚洲精品久久久久久久久久久 | 少妇愉情理伦片bd | 亚欧洲精品在线视频免费观看 | 少妇被粗大的猛进出69影院 | 久久精品人妻少妇一区二区三区 | 99久久亚洲精品无码毛片 | 一个人免费观看的www视频 | 亚洲国产精品一区二区美利坚 | 亚洲s色大片在线观看 | 5858s亚洲色大成网站www | 在线精品亚洲一区二区 | 97无码免费人妻超级碰碰夜夜 | 国产在热线精品视频 | 色综合久久中文娱乐网 | 成人精品视频一区二区 | 免费无码一区二区三区蜜桃大 | 久久亚洲日韩精品一区二区三区 | 色综合久久网 | 图片区 小说区 区 亚洲五月 | 国产农村妇女aaaaa视频 撕开奶罩揉吮奶头视频 | 精品少妇爆乳无码av无码专区 | 国产精品高潮呻吟av久久 | 装睡被陌生人摸出水好爽 | 国产人妻精品一区二区三区 | 天天燥日日燥 | 疯狂三人交性欧美 | 国产精品美女久久久久av爽李琼 | 国产亚洲美女精品久久久2020 | 免费国产成人高清在线观看网站 | 午夜丰满少妇性开放视频 | 国产成人无码a区在线观看视频app | 国产疯狂伦交大片 | 免费看少妇作爱视频 | 任你躁在线精品免费 | 亚洲中文字幕成人无码 | 国产亚洲视频中文字幕97精品 | 5858s亚洲色大成网站www | 一区二区传媒有限公司 | 欧美 亚洲 国产 另类 | 鲁大师影院在线观看 | 久久精品女人天堂av免费观看 | 国内少妇偷人精品视频免费 | 中文字幕无码av波多野吉衣 | 国产精品18久久久久久麻辣 | 国产人成高清在线视频99最全资源 | 国产午夜视频在线观看 | 精品乱子伦一区二区三区 | 国产欧美熟妇另类久久久 | 成人精品视频一区二区 | 午夜福利不卡在线视频 | 啦啦啦www在线观看免费视频 | 午夜无码人妻av大片色欲 | 欧美日本免费一区二区三区 | 亚洲狠狠色丁香婷婷综合 | 99久久精品午夜一区二区 | 亚洲经典千人经典日产 | 天天摸天天碰天天添 | 美女极度色诱视频国产 | 亚洲日韩av一区二区三区四区 | 国产精品沙发午睡系列 | 国产婷婷色一区二区三区在线 | 性生交大片免费看女人按摩摩 | 无遮挡国产高潮视频免费观看 | 小sao货水好多真紧h无码视频 | 国产精品国产三级国产专播 | 亚洲日韩精品欧美一区二区 | 久久人人爽人人爽人人片av高清 | 少妇无码av无码专区在线观看 | 黑人大群体交免费视频 | 欧美丰满少妇xxxx性 | 久久精品无码一区二区三区 | 久激情内射婷内射蜜桃人妖 | 伊人久久大香线蕉亚洲 | 国产成人综合美国十次 | 久久久久久国产精品无码下载 | 亚洲gv猛男gv无码男同 | 日日摸天天摸爽爽狠狠97 | 国产色视频一区二区三区 | 成人一区二区免费视频 | 少女韩国电视剧在线观看完整 | 精品无码av一区二区三区 | 国产成人无码av在线影院 | 亚洲精品无码人妻无码 | 亚洲精品一区二区三区大桥未久 | 亚洲区小说区激情区图片区 | 国产精品久久久午夜夜伦鲁鲁 | 中文无码精品a∨在线观看不卡 | 久久国语露脸国产精品电影 | 波多野结衣一区二区三区av免费 | 亚洲日本在线电影 | 国产精品无码成人午夜电影 | 国产手机在线αⅴ片无码观看 | 亚洲日韩av一区二区三区四区 | 国精产品一区二区三区 | 全黄性性激高免费视频 | 无码人妻出轨黑人中文字幕 |