3atv精品不卡视频,97人人超碰国产精品最新,中文字幕av一区二区三区人妻少妇,久久久精品波多野结衣,日韩一区二区三区精品

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 综合教程 >内容正文

综合教程

Flume知识点全面总结教程

發布時間:2023/12/1 综合教程 28 生活家
生活随笔 收集整理的這篇文章主要介紹了 Flume知识点全面总结教程 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

?目錄

1.前言

1.1什么是flume?

1.2Flume特性

2.Flume核心概念

2.1agent

2.2Event:flume內部數據傳輸的封裝形式

2.3Transaction:事務控制機制

2.4攔截器

3.Flume安裝部署

3.1參數配置

3.2啟動命令

4.Flume入門案例

4.1數據流

?4.2 組件選擇

?4.3 部署配置實現

5.Flume常用內置組件詳解

6.Flume常用組件詳解:Source

6.1netcat?source

6.1.1工作機制:

6.1.2配置文件:

6.2 exec?source

6.2.1工作機制:

?6.2.2參數詳解:

6.2.3配置文件:

6.3 spooldir?source

6.3.1工作機制:

6.3.2參數詳解:

6.3.3配置文件:

6.3.4啟動測試:

6.4 avro?source

6.4.1工作機制

6.4.2參數詳解

6.4.3配置文件

6.4.4啟動測試

6.4.5啟動測試利用avro?source和avro sink實現agent級聯

6.5 kafka source

6.5.1工作機制

6.5.2參數詳解

6.5.3配置文件

6.5.4啟動測試

6.6?taildir?source

6.6.1?工作機制

???????6.6.2?參數詳解

6.6.3配置文件

??????????????6.6.4啟動測試

7Flume常用組件詳解:Interceptor攔截器

7.1timestamp?攔截器

7.1.1作用

7.1.2參數

7.1.3配置示例

7.1.4??????????????測試

7.2static攔截器

7.2.1作用

???????7.2.2參數

???????7.2.3配置示例

???????7.2.4測試

7.3Host 攔截器

7.3.1作用

???????7.3.2參數

7.3.3配置示例

7.3.4測試

7.4?UUID 攔截器

7.4.1作用

7.4.2參數

7.4.3配置

??????????????7.4.4測試

8?Flume常用組件詳解:channel

8.1 memory channel

8.1.1特性

??????????????8.1.2參數

??????????????8.1.3配置示例

??????????????8.1.4測試

??????????????8.1.5擴展了解

???????????8.2 file channel

8.2.1特性

???????8.2.3參數

???????8.2.4配置示例

???????8.2.5??????????????測試

???????8.3kafka channel

?????????????????????8.3.1特性

???????????????????????????????????8.3.2參數

?????????????????????????????????????????????????8.3.3配置測試

9 Flume常用組件詳解:sink

9.1.1特性

??????????????9.1.2參數

??????????????9.1.3配置示例

9.1.4測試

9.2?kafka sink

9.2.1特性

9.2.2參數

9.3 avro?sink

9.3.1 特性

10 Flume常用組件詳解:Selector

10.1實踐一:replicating selector(復制選擇器)

????????????10.1.1目標場景

????????????????????????????10.1???????.2Flume agent配置

10.1???????.3?Collector1 配置

10.1.4?Collector2 配置

10.1.5?測試驗證

10.2?實踐二:multiplexing selector(多路選擇器)

10.2.1目標場景

?10.2.2?第一級1 / 2配置

????????10.2.3?第二級配置

??????????????????????10.2.4?測試驗證

11 Flume常用組件詳解:grouping processor

12Flume自定義擴展組件

12.1自定義Source

12.1.1需求場景

??????????????12.1.2實現思路

12.1.3代碼架構

?????12.1.4?具體實現

12.2?自定義攔截器

12.2.2實現思路

12.2.3?自定義攔截器的開發

13?綜合案例

13.1?案例場景

13.2?實現思路

13.4?配置文件

13.5?啟動測試

14?面試加強

14.1?flume事務機制

14.2flume agent內部機制

14.3?ganglia及flume監控

14.4?Flume調優


1.前言

flume是由cloudera軟件公司產出的可分布式日志收集系統,后與2009年被捐贈了apache軟件基金會,為hadoop相關組件之一。尤其近幾年隨著flume的不斷被完善以及升級版本的逐一推出,特別是flume-ng;同時flume內部的各種組件不斷豐富,用戶在開發的過程中使用的便利性得到很大的改善,現已成為apache top項目之一.

補充:cloudera公司的主打產品是CDH(hadoop的一個企業級商業發行版)

1.1什么是flume?

???Apache Flume 是一個從可以收集例如日志,事件等數據資源,并將這些數量龐大的數據從各項數據資源中集中起來存儲的工具/服務。flume具有高可用,分布式和豐富的配置工具,其結構如下圖所示:

Flume: 是一個數據采集工具;可以從各種各樣的數據源(服務器)上采集數據傳輸(匯聚)到大數據生態的各種存儲系統中(Hdfs、hbase、hive、kafka);

開箱即用!(安裝部署、修改配置文件)

1.2Flume特性

Flume是一個分布式、可靠、和高可用的海量日志采集、匯聚和傳輸的系統。

Flume可以采集文件,socket數據包(網絡端口)、文件夾、kafka、mysql數據庫等各種形式源數據,又可以將采集到的數據(下沉sink)輸出到HDFS、hbase、hive、kafka等眾多外部存儲系統中

一般的采集、傳輸需求,通過對flume的簡單配置即可實現;不用開發一行代碼!

Flume針對特殊場景也具備良好的自定義擴展能力,因此,flume可以適用于大部分的日常數據采集場景

2.Flume核心概念

2.1agent

Flume中最核心的角色是agent,flume采集系統就是由一個個agent連接起來所形成的一個或簡單或復雜的數據傳輸通道。

對于每一個Agent來說,它就是一個獨立的守護進程(JVM),它負責從數據源接收數據,并發往下一個目的地,如下圖所示:

每一個agent相當于一個數據(被封裝成Event對象)傳遞員,內部有三個組件:

Source:采集組件,用于跟數據源對接,以獲取數據;它有各種各樣的內置實現;

Sink:下沉組件,用于往下一級agent傳遞數據或者向最終存儲系統傳遞數據

Channel:傳輸通道組件,用于從source將數據傳遞到sink

單個agent采集數據

?多級agent之間串聯

2.2Eventflume內部數據傳輸的封裝形式

數據在Flum內部中數據以Event的封裝形式存在。

因此,Source組件在獲取到原始數據后,需要封裝成Event放入channel;

Sink組件從channel中取出Event后,需要根據配置要求,轉成其他形式的數據輸出。

Event封裝對象主要有兩部分組成: Headers和 ?Body

Header是一個集合 ?Map[String,String],用于攜帶一些KV形式的元數據(標志、描述等)

Boby: 就是一個字節數組;裝載具體的數據內容

2018-11-03 18:44:44,913 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:94)] Event: { headers:{} body: 61 20 61 20 61 61 61 20 61 20 0D ???????????????a a aaa a . }

2.3Transaction事務控制機制

Flume的事務機制(類似數據庫的事務機制):

Flume使用兩個獨立的事務分別負責從Soucrce到Channel,以及從Channel到Sink的event傳遞。比如spooling directory source 為文件的每一個event?batch創建一個事務,一旦事務中所有的事件全部傳遞到Channel且提交成功,那么Soucrce就將event batch標記為完成。

同理,事務以類似的方式處理從Channel到Sink的傳遞過程,如果因為某種原因使得事件無法記錄,那么事務將會回滾,且所有的事件都會保持到Channel中,等待重新傳遞。

事務機制涉及到如下重要參數:

a1.sources.s1.batchSize?=100

a1.sinks.k1.batchSize = 200

a1.channels.c1.transactionCapacity?= 300 (應該大于source或者sink的批次大小)

<?transactionCapacity 是說,channel中保存的事務的個數>

跟channel的數據緩存空間容量區別開來:

a1.channels.c1.capacity = 10000

那么事務是如何保證數據的端到端完整性的呢?看下面有兩個agent的情況:

數據流程:

  1. source 1產生Event,通過“put”、“commit”操作將Event放到Channel 1中
  2. sink 1通過“take”操作從Channel 1中取出Event,并把它發送到Source 2中
  3. source 2通過“put”、“commit”操作將Event放到Channel 2中
  4. source 2向sink 1發送成功信號,sink 1“commit”步驟2中的“take”操作(其實就是刪除Channel 1中的Event)

說明:在任何時刻,Event至少在一個Channel中是完整有效的

2.4攔截器

攔截器工作在source組件之后,source產生的event會被傳入攔截器根據需要進行攔截處理

而且,攔截器可以組成攔截器鏈!

攔截器在flume中有一些內置的功能比較常用的攔截器

用戶也可以根據自己的數據處理需求,自己開發自定義攔截器!

這也是flume的一個可以用來自定義擴展的接口!

3.Flume安裝部署

3.1參數配置

Flume的安裝非常簡單,只需要解壓即可,當然,前提是已有hadoop環境

1.上傳安裝包到數據源所在節點上

然后解壓 ?tar -zxvf apache-flume-1.8.0-bin.tar.gz

2、根據數據采集的需求配置采集方案,描述在配置文件中(文件名可任意自定義)

3、指定采集方案配置文件,在相應的節點上啟動flume agent

3.2啟動命令

bin/flume-ng agent?-c?./conf ………….

commands:

??help ?????????????????????顯示本幫助信息

??agent ????????????????????啟動一個agent進程

??avro-client ????????????????啟動一個用于測試avro?source的客戶端(能夠發送avro序列化流)

??version ???????????????????顯示當前flume的版本信息

global options: ??全局通用選項

??--conf,-c <conf> ?????????指定flume的系統配置文件所在目錄

??--classpath,-C <cp> ???????添加額外的jar路徑

??--dryrun,-d ??????????????不去真實啟動flume?agent,而是打印當前命令

??--plugins-path <dirs> ??????指定插件(jar)所在路徑

??-Dproperty=value ?????????傳入java環境參數

??-Xproperty=value ?????????傳入所需的JVM配置參數

agent options:

??--name,-n <name> ?????????agent的別名(在用戶采集方案配置文件中)

??--conf-file,-f <file> ?????????指定用戶采集方案配置文件的路徑

??--zkConnString,-z <str> ?????指定zookeeper的連接地址

??--zkBasePath,-p <path> ?????指定用戶配置文件所在的zookeeper?path,比如:/flume/config

??--no-reload-conf ???????????關閉配置文件動態加載

??--help,-h ??????????????????display help text

avro-client options:

??--rpcProps,-P <file> ??RPC client properties file with server connection params

??--host,-H <host> ????avro序列化數據所要發往的目標主機(avro?source所在機器)

??--port,-p <port> ?????avro序列化數據所要發往的目標主機的端口號

??--dirname <dir> ?????需要被序列化發走的數據所在目錄(提前準備好測試數據放在一個文件中)

??--filename,-F <file> ??需要被序列化發走的數據所在文件(default: std input)

??--headerFile,-R <file> ?存儲header?key-value的文件

??--help,-h ????????????幫助信息

??Either --rpcProps or both --host and --port must be specified.

Note that if <conf> directory is specified, then it is always included first

in the classpath.

開啟內置監控功能

-Dflume.monitoring.type=http -Dflume.monitoring.port=34545

4.Flume入門案例

先用一個最簡單的例子來測試一下程序環境是否正常

4.1數據流

?4.2
組件選擇

  • Source組件 NetCat:

  • Channel組件:

Memory?Channel

capacity: 緩存的容量 ,可緩存的event的數量

transactionCapacity: 事務容量。支持出錯情況下的event回滾事件數量。

  • Sink組件: logger Sink

?4.3
部署配置實現

  • 創建部署配置文件

在flume的安裝目錄下,新建一個文件夾,myconf

#?cd?myconf

#?vi??netcat-logger.conf

# 定義這個agent中各組件的名字

a1.sources = r1

a1.sinks = k1

a1.channels = c1

# 描述和配置source組件:r1

a1.sources.r1.type =?netcat

a1.sources.r1.bind = 0.0.0.0

a1.sources.r1.port = 9999

# source 和 channel關聯

a1.sources.r1.channels = c1??

# 描述和配置sink組件:k1

a1.sinks.k1.type = logger

#?sink也要關聯channel

a1.sinks.k1.channel = c1

# 描述和配置channel組件,此處使用是內存緩存的方式

a1.channels.c1.type = memory

a1.channels.c1.capacity = 1000

a1.channels.c1.transactionCapacity = 100

flume-ng?命令的模式:

  • ?啟動一個采集器:

[root@hdp-01 apache-flume-1.6.0-bin]# bin/flume-ng agent -n a1 -c conf -f myconf/netcat-logger.conf ???-Dflume.root.logger=INFO,console

agent ???運行一個采集器

-n a1 ?指定我們這個agent的名字

-c conf ??指定flume自身的配置文件所在目錄

-f conf/netcat-logger.conf ?指定自定義的采集方案

在工作環境中的命令為:

nohup bin/flume-ng agent -n a1 -c conf -f myconf/netcat-logger.conf 1>/dev/null 2>&1 &

  • 測試

往agent的source所監聽的端口上發送數據,讓agent有數據可采。

通過telnet命令向端口發送消息:

[root@hdp-01 ~]#?telnet?hdp-01 9999

如果沒有telnet命令,用yum安裝一個即可: ?

yum?-y install?telnet

就可以通過日志查看:

注意: 注釋不能寫在配置的后面,只能單獨一行寫。

5.Flume常用內置組件詳解

Flume支持眾多的source和sink類型,詳細手冊可參考官方文檔

Flume 1.9.0 User Guide — Apache Flume

6.Flume常用組件詳解Source

6.1netcat?source

6.1.1工作機制:

啟動一個socket服務,監聽一個端口;

將端口上收到的數據,轉成event寫入channel;

6.1.2配置文件:

a1.sources = s1

a1.sources.s1.type = netcat

a1.sources.s1.bind = 0.0.0.0

a1.sources.s1.port = 44444

a1.sources.s1.channels = c1

6.2
exec?source

6.2.1工作機制:

啟動一個用戶所指定的linux?shell命令;

采集這個linux shell命令的標準輸出,作為收集到的數據,轉為event寫入channel;

?6.2.2參數詳解:

channels

本source要發往的channel

type

本source的類別名稱:exec

command

本source所要運行的linux命令,比如: tail?-F /path/file

shell

指定運行上述命令所用shell

restartThrottle

10000

命令die了以后,重啟的時間間隔

restart

false

命令die了以后,是否要重啟

logStdErr

false

是否收集命令的錯誤輸出stderr

batchSize

20

提交的event批次大小

batchTimeout

3000

發往下游沒完成前,等待的時間

selector.type

replicating

指定channel選擇器:replicating or multiplexing

selector.*

選擇器的具體參數

interceptors

指定攔截器

interceptors.*

?指定的攔截器的具體參數

6.2.3配置文件:

a1.sources = s1

a1.sources.s1.channels = c1

a1.sources.s1.type = exec

a1.sources.s1.command = tail -F /root/weblog/access.log

a1.sources.s1.batchSize = 100

a1.channels = c1

a1.channels.c1.type = memory

a1.channels.c1.capacity = 200

a1.channels.c1.transactionCapacity = 100

a1.sinks = k1

a1.sinks.k1.type = logger

a1.sinks.k1.channel = c1

啟動測試:

1.準備一個日志文件

2.寫一個腳本模擬往日志文件中持續寫入數據

for i in {1..10000}; do echo ${i}--------------------------- >> access.log ; sleep 0.5; done

3.創建一個flume自定義配置文件

4.啟動flume采集

注意:通過人為破壞測試,發現這個exec?source,不會記錄宕機前所采集數據的偏移量位置,重啟后可能會造成數據丟失!

6.3
spooldir?source

6.3.1工作機制:

監視一個指定的文件夾,如果文件夾下有沒采集過的新文件,則將這些新文件中的數據采集,并轉成event寫入channel;

注意:spooling目錄中的文件必須是不可變的,而且是不能重名的!否則,source會loudly?fail!

6.3.2參數詳解:

Property Name

Default

Description

channels

type

The component type name, needs to be?spooldir.

spoolDir

The directory from which to read files from.

fileSuffix

.COMPLETED

采集完成的文件,添加什么后綴名

deletePolicy

never

是否刪除采完的文件:?never?or?immediate

fileHeader

false

是否將所采集文件的絕對路徑添加到header中

fileHeaderKey

file

上述header的key名稱

basenameHeader

false

是否將文件名添加到header

basenameHeaderKey

basename

上述header的key名稱

includePattern

^.*$

指定需要采集的文件名的正則表達式

ignorePattern

^$

指定要排除的文件名的正則表達式

如果一個文件名即符合includePattern又匹配ignorePattern,則該文件不采

trackerDir

.flumespool

記錄元數據的目錄所在路徑,可以用絕對路徑也可以用相對路徑(相對于采集目錄)

trackingPolicy

rename

采集進度跟蹤策略,有兩種:?“rename”和?“tracker_dir”. 本參數只在deletePolicy=never時才生效

?“rename”- 采完的文件根據filesuffix重命名

?“tracker_dir” - 采完的文件會在trackerDir目錄中生成一個同名的空文件

consumeOrder

oldest

采集順序:?oldest,?youngest?and?random.

oldest和youngest情況下,可能會帶來一定效率的損失;(需要對文件夾中所有文件進行一次掃描以尋找最old或最young的)

pollDelay

500

Delay (in milliseconds) used when polling for new files.

recursiveDirectorySearch

false

Whether to monitor sub directories for new files to read.

maxBackoff

4000

The maximum time (in millis) to wait between consecutive attempts to write to the channel(s) if the channel is full. The source will start at a low backoff and increase it exponentially each time the channel throws a ChannelException, upto the value specified by this parameter.

batchSize

100

一次傳輸到channel的event條數(一批)

inputCharset

UTF-8

Character set used by deserializers that treat the input file as text.

decodeErrorPolicy

FAIL

What to do when we see a non-decodable character in the input file.?FAIL: Throw an exception and fail to parse the file.?REPLACE: Replace the unparseable character with the “replacement character” char, typically Unicode U+FFFD.?IGNORE: Drop the unparseable character sequence.

deserializer

LINE

Specify the deserializer used to parse the file into events. Defaults to parsing each line as an event. The class specified must implementEventDeserializer.Builder.

deserializer.*

Varies per event deserializer.

bufferMaxLines

(Obselete) This option is now ignored.

bufferMaxLineLength

5000

(Deprecated) Maximum length of a line in the commit buffer. Use deserializer.maxLineLength instead.

selector.type

replicating

replicating or multiplexing

selector.*

Depends on the selector.type value

interceptors

Space-separated list of interceptors

interceptors.*

6.3.3配置文件

a1.sources = s1

a1.sources.s1.channels = c1

a1.sources.s1.type = spooldir

a1.sources.s1.spoolDir = /root/weblog

a1.sources.s1.batchSize = 200

a1.channels = c1

a1.channels.c1.type = memory

a1.channels.c1.capacity = 200

a1.channels.c1.transactionCapacity = 100

a1.sinks = k1

a1.sinks.k1.type = logger

a1.sinks.k1.channel = c1

6.3.4啟動測試

bin/flume-ng agent -n a1 -c conf -f myconf/spooldir-mem-logger.conf -Dflume.root.logger=DEBUG,console

注意:spooldir?source?與exec?source不同,spooldir?source本身是可靠的!會記錄崩潰之前的采集位置!

6.4 avro?source

Avro?source 是通過監聽一個網絡端口來接受數據,而且接受的數據必須是使用avro序列化框架序列化后的數據;

Avro是一種序列化框架,跨語言的;

擴展:什么是序列化,什么是序列化框架?

序列化: 是將一個有復雜結構的數據塊(對象)變成扁平的(線性的)二進制序列

序列化框架: 一套現成的軟件,可以按照既定策略,將對象轉成二進制序列

比如: jdk就有: ObjectOutputStream

???????hadoop就有: Writable

???????跨平臺的序列化框架: avro

6.4.1工作機制

啟動一個網絡服務,監聽一個端口,收集端口上收到的avro序列化數據流!

該source中擁有avro的反序列化器,能夠將收到的二進制流進行正確反序列化,并裝入一個event寫入channel!??????

6.4.2參數詳解

Property Name

Default

Description

channels

type

本source的別名:?avro

bind

要綁定的地址

port

要綁定的端口號

threads

服務的最大線程數

selector.type

selector.*

interceptors

Space-separated list of interceptors

interceptors.*

compression-type

none

壓縮類型:跟發過來的數據是否壓縮要匹配:none | deflate

ssl

false

Set this to true to enable SSL encryption. If SSL is enabled, you must also specify a “keystore” and a “keystore-password”, either through component level parameters (see below) or as global SSL parameters (see?SSL/TLS support?section).

keystore

This is the path to a Java keystore file. If not specified here, then the global keystore will be used (if defined, otherwise configuration error).

keystore-password

The password for the Java keystore. If not specified here, then the global keystore password will be used (if defined, otherwise configuration error).

keystore-type

JKS

The type of the Java keystore. This can be “JKS” or “PKCS12”. If not specified here, then the global keystore type will be used (if defined, otherwise the default is JKS).

exclude-protocols

SSLv3

Space-separated list of SSL/TLS protocols to exclude. SSLv3 will always be excluded in addition to the protocols specified.

include-protocols

Space-separated list of SSL/TLS protocols to include. The enabled protocols will be the included protocols without the excluded protocols. If included-protocols is empty, it includes every supported protocols.

exclude-cipher-suites

Space-separated list of cipher suites to exclude.

include-cipher-suites

Space-separated list of cipher suites to include. The enabled cipher suites will be the included cipher suites without the excluded cipher suites. If included-cipher-suites is empty, it includes every supported cipher suites.

ipFilter

false

Set this to true to enable ipFiltering for netty

ipFilterRules

Define N netty ipFilter pattern rules with this config.

6.4.3配置文件

a1.sources = r1

a1.sources.r1.type = avro

a1.sources.r1.channels = c1

a1.sources.r1.bind = 0.0.0.0

a1.sources.r1.port = 4141

a1.channels = c1

a1.channels.c1.type = memory

a1.channels.c1.capacity = 200

a1.channels.c1.transactionCapacity = 100

a1.sinks = k1

a1.sinks.k1.type = logger

a1.sinks.k1.channel = c1

6.4.4啟動測試

啟動agent:

bin/flume-ng agent -c ./conf -f ./myconf/avro-mem-logger.conf -n a1 -Dflume.root.logger=DEBUG,consol

用一個客戶端去給啟動好的source發送avro序列化數據:

bin/flume-ng avro-client --host c703 --port 4141

6.4.5啟動測試利用avro?source和avro sink實現agent級聯

6.4.5.1需求說明

6.4.5.2配置文件

  • 上游配置文件

vi ?exec-m-avro.conf

a1.sources = r1

a1.channels = c1

a1.sinks = k1

a1.sources.r1.channels = c1

a1.sources.r1.type = exec

a1.sources.r1.command = tail -F /tmp/logs/access.log

a1.sources.r1.batchSize = 100

a1.channels.c1.type = memory

a1.channels.c1.capacity = 10000

a1.channels.c1.trasactionCapacity = 100

a1.sinks.k1.channel = c1

a1.sinks.k1.type = avro

a1.sinks.k1.hostname = h3

a1.sinks.k1.port = 4455

  • 下游配置文件

vi ?avro-m-log.conf

a1.sources = r1

a1.channels = c1

a1.sinks = k1

a1.sources.r1.channels = c1

a1.sources.r1.type = avro

a1.sources.r1.bind = 0.0.0.0

a1.sources.r1.port = 4455

a1.channels.c1.type = memory

a1.channels.c1.capacity = 10000

a1.channels.c1.trasactionCapacity = 100

a1.sinks.k1.channel = c1

a1.sinks.k1.type = logger

6.4.5.3啟動測試

  • 先啟動下游:

bin/flume-ng agent -n a1 -c conf/ -f avro-m-log.conf -Dflume.root.logger=INFO,console

  • 再啟動上游:

bin/flume-ng agent -n a1 -c conf/ -f exec-m-avro.conf

  • 然后寫一個腳本在h1上模擬生成數據

while true

do

echo "hello " ?>> /tmp/logs/access.log

sleep 0.1

done

6.5 kafka source

6.5.1???????工作機制

Kafka source的工作機制:就是用kafka?consumer連接kafka,讀取數據,然后轉換成event,寫入channel

6.5.2??????????????參數詳解

Property Name

Default

Description

channels

?數據發往的channel

type

本source的名稱:

org.apache.flume.source.kafka.KafkaSource

kafka.bootstrap.servers

Kafka?broker服務器列表,逗號分隔

kafka.consumer.group.id

flume

Kafka消費者組id

kafka.topics

Kafka消息主題列表,逗號隔開

kafka.topics.regex

用正則表達式來指定一批topic;本參數的優先級高于kafka.topics

batchSize

1000

寫入channel的event?批,最大消息條數

batchDurationMillis

1000

批次寫入channel的最大時長

backoffSleepIncrement

1000

Kafka?Topic?顯示為空時觸發的初始和增量等待時間。

maxBackoffSleep

5000

Kafka?Topic?顯示為空時觸發的最長等待時間

useFlumeEventFormat

false

默認情況下,event 將從Kafka Topic 直接作為字節直接進入event 主體。設置為true以讀取event 作為Flume Avro二進制格式。與Kafka Sink上的相同屬性或Kafka Channel上的parseAsFlumeEvent屬性一起使用時,這將保留在生成端發送的任何Flume標頭。

setTopicHeader

true

是否要往header中加入一個kv:topic信息

topicHeader

topic

應上面開關的需求,加入kv:topic?=>topic名稱

kafka.consumer.security.protocol

PLAINTEXT

Set to SASL_PLAINTEXT, SASL_SSL or SSL if writing to Kafka using some level of security. See below for additional info on secure setup.

more consumer security props

If using SASL_PLAINTEXT, SASL_SSL or SSL refer to?Kafka security?for additional properties that need to be set on consumer.

Other Kafka Consumer Properties

本source,允許直接配置任意的kafka消費者參數,格式如下:

For example:?kafka.consumer.auto.offset.reset

(就是在消費者參數前加統一前綴:?kafka.consumer.)

??????????????6.5.3配置文件

a1.sources = s1

a1.sources.s1.type = org.apache.flume.source.kafka.KafkaSource

a1.sources.s1.channels = c1

a1.sources.s1.batchSize = 100

a1.sources.s1.batchDurationMillis = 2000

a1.sources.s1.kafka.bootstrap.servers = c701:9092,c702:9092,c703:9092

a1.sources.s1.kafka.topics = TAOGE

a1.sources.s1.kafka.consumer.group.id = g1

a1.channels = c1

a1.channels.c1.type = memory

a1.channels.c1.capacity = 200

a1.channels.c1.transactionCapacity = 100

a1.sinks = k1

a1.sinks.k1.type = logger

a1.sinks.k1.channel = c1

??????????????6.5.4啟動測試

1. 首先,操作kafka,準備好topic

#?查看當前kafka集群中的topic:

bin/kafka-topics.sh ?--list --zookeeper c701:2181

#?創建一個新的topic

bin/kafka-topics.sh ?--create --topic TAOGE --partitions 3 --replication-factor 2 --zookeeper c701:2181

#?查看topic的詳細信息

bin/kafka-topics.sh --describe --topic TAOGE --zookeeper c701:2181

#?控制臺生產者,向topic中寫入數據

bin/kafka-console-producer.sh --broker-list c701:9092,c702:9092,c703:9092 --topic TAOGE

2. 啟動flume?agent來采集kafka中的數據

bin/flume-ng agent -n a1 -c conf/ -f myconf/kfk-mem-logger.conf ?-Dflume.root.logger=INFO,console

注意:

Source往channel中寫入數據的批次大小 ?<= ?channel的事務控制容量大小

6.6?taildir?source

???????6.6.1?工作機制

監視指定目錄下的一批文件,只要某個文件中有新寫入的行,則會被tail到

它會記錄每一個文件所tail到的位置,記錄到一個指定的positionfile保存目錄中,格式為json(如果需要的時候,可以人為修改,就可以讓source從任意指定的位置開始讀取數據)

所以,這個source真的像官網所吹的,是可靠的reliable!

它對采集完成的文件,不會做任何修改(比如重命名,刪除…..)

taildir source會把讀到的數據成功寫入channel后,再更新記錄偏移量

這種機制,能保證數據不會漏采(丟失),但是有可能會產生數據重復!

?????????????????????6.6.2?參數詳解

Property Name

Default

Description

channels

所要寫往的channel

type

本source的別名:?TAILDIR.

filegroups

空格分割的組名,每一組代表著一批文件

g1 g2

filegroups.<filegroupName>

每個文件組的絕路路徑,文件名可用正則表達式

positionFile

~/.flume/taildir_position.json

記錄偏移量位置的文件所在路徑

headers.<filegroupName>.<headerKey>

Header value which is the set with header key. Multiple headers can be specified for one file group.

byteOffsetHeader

false

Whether to add the byte offset of a tailed line to a header called ‘byteoffset’.

skipToEnd

false

Whether to skip the position to EOF in the case of files not written on the position file.

idleTimeout

120000

關閉非活動文件的時延。如果被關閉的這個文件又在某個時間有了新增行,會被此source檢測到,并重新打開

writePosInterval

3000

3s 記錄一次偏移量到positionfile

batchSize

100

提交event到channel的批次最大條數

maxBatchCount

Long.MAX_VALUE

控制在一個文件上連續讀取的最大批次個數(如果某個文件正在被高速寫入,那就應該讓這個參數調為最大值,以讓source可以集中精力專采這個文件)

backoffSleepIncrement

1000

The increment for time delay before reattempting to poll for new data, when the last attempt did not find any new data.

maxBackoffSleep

5000

The max time delay between each reattempt to poll for new data, when the last attempt did not find any new data.

cachePatternMatching

true

Listing directories and applying the filename regex pattern may be time consuming for directories containing thousands of files. Caching the list of matching files can improve performance. The order in which files are consumed will also be cached. Requires that the file system keeps track of modification times with at least a 1-second granularity.

fileHeader

false

Whether to add a header storing the absolute path filename.

fileHeaderKey

file

Header key to use when appending absolute path filename to event header.

??????????????6.6.3配置文件

a1.sources = r1

a1.sources.r1.type = TAILDIR

a1.sources.r1.channels = c1

a1.sources.r1.positionFile = /root/flumedata/taildir_position.json

a1.sources.r1.filegroups = f1

a1.sources.r1.filegroups.f1 = /root/weblog/access.log

a1.sources.r1.fileHeader = true

a1.sources.ri.maxBatchCount = 1000

a1.channels = c1

a1.channels.c1.type = memory

a1.channels.c1.capacity = 200

a1.channels.c1.transactionCapacity = 100

a1.sinks = k1

a1.sinks.k1.type = logger

a1.sinks.k1.channel = c1

????????????????????????????6.6.4啟動測試

bin/flume-ng agent -n a1 -c conf/ -f myconf/taildir-mem-logger.conf -Dflume.root.logger=DEBUG,console

經過人為破壞測試,發現, this source還是真正挺reliable的!

不會丟失數據,但在極端情況下可能會產生重復數據!

7Flume常用組件詳解Interceptor攔截器

攔截器是什么?

就是工作在source之后,它可以從source獲得event,做一個邏輯處理,然后再返回處理之后的event

這樣一來,就可以讓用戶不需要改動source代碼的情況下,就可以插入一些數據處理邏輯;

Flume supports chaining of interceptors.

閱讀源碼,獲取的知識:

攔截器的調用順序:

SourceRunner

ExecSource

ChannelProcessor

SourceRunner -》 source 的start( )方法 --》讀到一批數據,調channelProcessor.processEventBatch(events) --> 調攔截器進行攔截處理 ?--> 調選擇器selector獲取要發送的channle --> 提交數據

7.1timestamp?攔截器

???????7.1.1作用

向event中,寫入一個kv到header里

k名稱可配置;v就是當前的時間戳(毫秒)

???????7.1.2參數

Property Name

Default

Description

type

本攔截器的名稱:timestamp

headerName

timestamp

要插入header的key名

preserveExisting

false

如果header中已存在同名key,是否要覆蓋

??????????????7.1.3配置示例

a1.sources = s1

a1.sources.s1.channels = c1

a1.sources.s1.type = exec

a1.sources.s1.command = tail -F /root/weblog/access.log

a1.sources.s1.batchSize = 100

a1.sources.s1.interceptors = i1

a1.sources.s1.interceptors.i1.type = timestamp

a1.sources.s1.interceptors.i1.preserveExisting = false

a1.channels = c1

a1.channels.c1.type = memory

a1.channels.c1.capacity = 200

a1.channels.c1.transactionCapacity = 100

a1.sinks = k1

a1.sinks.k1.type = logger

a1.sinks.k1.channel = c1

??????????????7.1.4??????????????測試

2019-06-09 10:24:21,884 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{timestamp=1560047061012} body: 31 30 32 34 34 2E 2E 2E 2E 2E 2E 2E 2E 2E 2E 2E 10244........... }

7.2static攔截器

???????7.2.1作用

讓用戶往event中添加一個自定義的header??key-value,當然,這個key-value是在配置文件中配死的;

?????????????????????7.2.2參數

Property Name

Default

Description

type

別名:?static

preserveExisting

true

是否覆蓋同名kv

key

key

你要插入的key名

value

value

你要插入的value

?????????????????????7.2.3配置示例

a1.sources = s1

a1.sources.s1.channels = c1

a1.sources.s1.type = exec

a1.sources.s1.command = tail -F /root/weblog/access.log

a1.sources.s1.batchSize = 100

a1.sources.s1.interceptors = i1 i2 i3

a1.sources.s1.interceptors.i1.type = timestamp

a1.sources.s1.interceptors.i1.preserveExisting = false

a1.sources.s1.interceptors.i2.type = host

a1.sources.s1.interceptors.i2.preserveExisting = false

a1.sources.s1.interceptors.i2.useIP = true

a1.sources.r1.interceptors.i3.type = static

a1.sources.r1.interceptors.i3.key = hero

a1.sources.r1.interceptors.i3.value = TAOGE

a1.channels = c1

a1.channels.c1.type = memory

a1.channels.c1.capacity = 200

a1.channels.c1.transactionCapacity = 100

a1.sinks = k1

a1.sinks.k1.type = logger

a1.sinks.k1.channel = c1

?????????????????????7.2.4測試

7.3Host 攔截器

???????7.3.1作用

往event的header中插入主機名(ip)信息

?????????????????????7.3.2參數

Property Name

Default

Description

type

本攔截器的別名:?host

preserveExisting

false

是否覆蓋已存在的hader key-value

useIP

true

插入ip還是主機名

hostHeader

host

要插入header的key名

7.3.3配置示例

a1.sources = s1

a1.sources.s1.channels = c1

a1.sources.s1.type = exec

a1.sources.s1.command = tail -F /root/weblog/access.log

a1.sources.s1.batchSize = 100

a1.sources.s1.interceptors = i1?i2

a1.sources.s1.interceptors.i1.type = timestamp

a1.sources.s1.interceptors.i1.preserveExisting = false

a1.sources.s1.interceptors.i2.type = host

a1.sources.s1.interceptors.i2.preserveExisting = false

a1.sources.s1.interceptors.i2.useIP = true

a1.channels = c1

a1.channels.c1.type = memory

a1.channels.c1.capacity = 200

a1.channels.c1.transactionCapacity = 100

a1.sinks = k1

a1.sinks.k1.type = logger

a1.sinks.k1.channel = c1

7.3.4測試

??????????????7.4?UUID 攔截器

7.4.1作用

生成uuid放入event的header中

??????????????7.4.2參數

Property Name

Default

Description

type

全名:org.apache.flume.sink.solr.morphline.UUIDInterceptor$Builder

headerName

id

Key名稱

preserveExisting

true

是否覆蓋同名key

prefix

""

Uuid前的前綴

??????????????7.4.3配置

a1.sources = s1

a1.sources.s1.channels = c1

a1.sources.s1.type = exec

a1.sources.s1.command = tail -F /root/weblog/access.log

a1.sources.s1.batchSize = 100

a1.sources.s1.interceptors = i1 i2 i3 i4

a1.sources.s1.interceptors.i1.type = timestamp

a1.sources.s1.interceptors.i1.preserveExisting = false

a1.sources.s1.interceptors.i2.type = host

a1.sources.s1.interceptors.i2.preserveExisting = false

a1.sources.s1.interceptors.i2.useIP = true

a1.sources.s1.interceptors.i3.type = static

a1.sources.s1.interceptors.i3.key = hero

a1.sources.s1.interceptors.i3.value = TAOGE

a1.sources.s1.interceptors.i4.type = org.apache.flume.sink.solr.morphline.UUIDInterceptor$Builder

a1.sources.s1.interceptors.i4.headName = duanzong

a1.sources.s1.interceptors.i4.prefix = ?666_

a1.channels = c1

a1.channels.c1.type = memory

a1.channels.c1.capacity = 200

a1.channels.c1.transactionCapacity = 100

a1.sinks = k1

a1.sinks.k1.type = logger

a1.sinks.k1.channel = c1

????????????????????????????7.4.4測試

8?Flume常用組件詳解channel

channel是agent中用來緩存event的repository(池,倉庫)

source往channel中添加event

sink從channel中取并移除event

channel跟事務控制有極大關系;

channel?有容量大小、可靠性級別、事務容量等特性;

8.1 memory channel

??????????????8.1.1特性

事件被存儲在實現配置好容量的內存(隊列)中。

速度快,但可靠性較低,有可能會丟失數據

????????????????????????????8.1.2參數

Property Name

Default

Description

type

別名:?memory

capacity

100

能存儲的最大事件event數

transactionCapacity

100

最大事務控制容量

keep-alive

3

添加或移除event的超時時間

byteCapacityBufferPercentage

20

除了body以外的字節所能占用的容量百分比

byteCapacity

see description

channel中最大的總byte數(只計算body)

????????????????????????????8.1.3配置示例

a1.channels?=?c1

a1.channels.c1.type?=?memory

a1.channels.c1.capacity?=?10000

a1.channels.c1.transactionCapacity?=?10000

a1.channels.c1.byteCapacityBufferPercentage?=?20

a1.channels.c1.byteCapacity?=?800000

????????????????????????????8.1.4測試

????????????????????????????8.1.5擴展了解

Memory channel源碼閱讀

// lock to guard queue, mainly needed to keep it locked down during resizes// it should never be held through a blocking operation

private?Object queueLock = new?Object();

//queue為Memory Channel中存放Event的地方,這里用了LinkedBlockingDeque來實現

@GuardedBy(value = "queueLock")p

rivate?LinkedBlockingDeque<Event> queue;

//下面的兩個信號量用來做同步操作,queueRemaining表示queue中的剩余空間,queueStored表示queue中的使用空間

// invariant that tracks the amount of space remaining in the queue(with all uncommitted takeLists deducted)

// we maintain the remaining permits = queue.remaining - takeList.size()// this allows local threads waiting for space in the queue to commit without denying access to the

// shared lock to threads that would make more space on the queue

private?Semaphore queueRemaining;

// used to make "reservations" to grab data from the queue.

// by using this we can block for a while to get data without locking all other threads out

// like we would if we tried to use a blocking call on queue

private?Semaphore queueStored;

//下面幾個變量為配置文件中Memory Channel的配置項

// 一個事務中Event的最大數目

private?volatile?Integer transCapacity;

// 向queue中添加、移除Event的等待時間

private?volatile?int?keepAlive;

// queue中,所有Event所能占用的最大空間

private?volatile?int?byteCapacity;

private?volatile?int?lastByteCapacity;

// queue中,所有Event的header所能占用的最大空間占byteCapacity的比例

private?volatile?int?byteCapacityBufferPercentage;

// 用于標示byteCapacity中剩余空間的信號量

private?Semaphore bytesRemaining;

// 用于記錄Memory Channel的一些指標,后面可以通過配置監控來觀察Flume的運行情況

private?ChannelCounter channelCounter;

然后重點說下MemoryChannel里面的MemoryTransaction,它是Transaction類的子類,從其文檔來看,一個Transaction的使用模式都是類似的:

?Channel ch = ...Transaction tx = ch.getTransaction();try {tx.begin();...// ch.put(event) or ch.take()...tx.commit();} catch (ChannelException ex) {tx.rollback();...} finally {tx.close();}

可以看到一個Transaction主要有、put、take、commit、rollback這四個方法,我們在實現其子類時,主要也是實現著四個方法。

Flume官方為了方便開發者實現自己的Transaction,定義了BasicTransactionSemantics,這時開發者只需要繼承這個輔助類,并且實現其相應的、doPut、doTake、doCommit、doRollback方法即可,MemoryChannel就是繼承了這個輔助類。

private class MemoryTransaction extends BasicTransactionSemantics {//和MemoryChannel一樣,內部使用LinkedBlockingDeque來保存沒有commit的Eventprivate LinkedBlockingDeque<Event> takeList;private LinkedBlockingDeque<Event> putList;private final ChannelCounter channelCounter;//下面兩個變量用來表示put的Event的大小、take的Event的大小private int putByteCounter = 0;private int takeByteCounter = 0;public MemoryTransaction(int transCapacity, ChannelCounter counter) {//用transCapacity來初始化put、take的隊列putList = new LinkedBlockingDeque<Event>(transCapacity);takeList = new LinkedBlockingDeque<Event>(transCapacity);channelCounter = counter;}@Overrideprotected void doPut(Event event) throws InterruptedException {//doPut操作,先判斷putList中是否還有剩余空間,有則把Event插入到該隊列中,同時更新putByteCounter//沒有剩余空間的話,直接報ChannelExceptionchannelCounter.incrementEventPutAttemptCount();int eventByteSize = (int)Math.ceil(estimateEventSize(event)/byteCapacitySlotSize);if (!putList.offer(event)) {throw new ChannelException("Put queue for MemoryTransaction of capacity " +putList.size() + " full, consider committing more frequently, " +"increasing capacity or increasing thread count");}putByteCounter += eventByteSize;}@Overrideprotected Event doTake() throws InterruptedException {//doTake操作,首先判斷takeList中是否還有剩余空間channelCounter.incrementEventTakeAttemptCount();if(takeList.remainingCapacity() == 0) {throw new ChannelException("Take list for MemoryTransaction, capacity " +takeList.size() + " full, consider committing more frequently, " +"increasing capacity, or increasing thread count");}//然后判斷,該MemoryChannel中的queue中是否還有空間,這里通過信號量來判斷if(!queueStored.tryAcquire(keepAlive, TimeUnit.SECONDS)) {return null;}Event event;//從MemoryChannel中的queue中取出一個eventsynchronized(queueLock) {event = queue.poll();}Preconditions.checkNotNull(event, "Queue.poll returned NULL despite semaphore " +"signalling existence of entry");//放到takeList中,然后更新takeByteCounter變量takeList.put(event);int eventByteSize = (int)Math.ceil(estimateEventSize(event)/byteCapacitySlotSize);takeByteCounter += eventByteSize;return event;}@Overrideprotected void doCommit() throws InterruptedException {//該對應一個事務的提交//首先判斷putList與takeList的相對大小int remainingChange = takeList.size() - putList.size();//如果takeList小,說明向該MemoryChannel放的數據比取的數據要多,所以需要判斷該MemoryChannel是否有空間來放if(remainingChange < 0) {// 1. 首先通過信號量來判斷是否還有剩余空間if(!bytesRemaining.tryAcquire(putByteCounter, keepAlive,TimeUnit.SECONDS)) {throw new ChannelException("Cannot commit transaction. Byte capacity " +"allocated to store event body " + byteCapacity * byteCapacitySlotSize +"reached. Please increase heap space/byte capacity allocated to " +"the channel as the sinks may not be keeping up with the sources");}// 2. 然后判斷,在給定的keepAlive時間內,能否獲取到充足的queue空間if(!queueRemaining.tryAcquire(-remainingChange, keepAlive, TimeUnit.SECONDS)) {bytesRemaining.release(putByteCounter);throw new ChannelFullException("Space for commit to queue couldn't be acquired." +" Sinks are likely not keeping up with sources, or the buffer size is too tight");}}int puts = putList.size();int takes = takeList.size();//如果上面的兩個判斷都過了,那么把putList中的Event放到該MemoryChannel中的queue中。synchronized(queueLock) {if(puts > 0 ) {while(!putList.isEmpty()) {if(!queue.offer(putList.removeFirst())) {throw new RuntimeException("Queue add failed, this shouldn't be able to happen");}}}//清空本次事務中用到的putList與takeList,釋放資源putList.clear();takeList.clear();}//更新控制queue大小的信號量bytesRemaining,因為把takeList清空了,所以直接把takeByteCounter加到bytesRemaining中。bytesRemaining.release(takeByteCounter);takeByteCounter = 0;putByteCounter = 0;//因為把putList中的Event放到了MemoryChannel中的queue,所以把puts加到queueStored中去。queueStored.release(puts);//如果takeList比putList大,說明該MemoryChannel中queue的數量應該是減少了,所以把(takeList-putList)的差值加到信號量queueRemainingif(remainingChange > 0) {queueRemaining.release(remainingChange);}if (puts > 0) {channelCounter.addToEventPutSuccessCount(puts);}if (takes > 0) {channelCounter.addToEventTakeSuccessCount(takes);}channelCounter.setChannelSize(queue.size());}@Overrideprotected void doRollback() {//當一個事務失敗時,會進行回滾,即調用本方法//首先把takeList中的Event放回到MemoryChannel中的queue中。int takes = takeList.size();synchronized(queueLock) {Preconditions.checkState(queue.remainingCapacity() >= takeList.size(), "Not enough space in memory channel " +"queue to rollback takes. This should never happen, please report");while(!takeList.isEmpty()) {queue.addFirst(takeList.removeLast());}//然后清空putListputList.clear();}//因為清空了putList,所以需要把putList所占用的空間大小添加到bytesRemaining中bytesRemaining.release(putByteCounter);putByteCounter = 0;takeByteCounter = 0;//因為把takeList中的Event回退到queue中去了,所以需要把takeList的大小添加到queueStored中queueStored.release(takes);channelCounter.setChannelSize(queue.size());}}

MemoryChannel的邏輯相對簡單,主要是通過MemoryTransaction中的putList、takeList與MemoryChannel中的queue打交道,這里的queue相當于持久化層,只不過放到了內存中,如果是FileChannel的話,會把這個queue放到本地文件中。下面表示了Event在一個使用了MemoryChannel的agent中數據流向:

source ---> putList ---> queue ---> takeList ---> sink

還需要注意的一點是,這里的事務可以嵌套使用,如下圖:

?當有兩個agent級連時,sink的事務中包含了一個source的事務,這也應證了前面所說的:

在任何時刻,Event至少在一個Channel中是完整有效的

?????????????????????????8.2 file channel

???????8.2.1特性

event被緩存在本地磁盤文件中

可靠性高,不會丟失

但在極端情況下可能會重復數據

?????????????????????8.2.3參數

Property Name Default

Description

?

type

別名:?file.

checkpointDir

~/.flume/file-channel/checkpoint

Checkpoint信息保存目錄

useDualCheckpoints

false

Checkpoint是否雙重checkpoint機制

backupCheckpointDir

備份checkpoint的保存目錄

dataDirs

~/.flume/file-channel/data

Event數據緩存目錄

transactionCapacity

10000

事務管理容量

checkpointInterval

30000

記錄checkpoint信息的時間間隔

maxFileSize

2146435071

控制一個數據文件的大小規格

minimumRequiredSpace

524288000

所需的最低磁盤空間,低于則停止接收新數據

capacity

1000000

最大event緩存數

keep-alive

3

等待添加數據的最大時間

?????????????????????8.2.4配置示例

a1.sources = r1

a1.sources.r1.type = TAILDIR

a1.sources.r1.channels = c1

a1.sources.r1.positionFile = /root/taildir_chkp/taildir_position.json

a1.sources.r1.filegroups = f1

a1.sources.r1.filegroups.f1 = /root/weblog/access.log

a1.sources.r1.fileHeader = true

a1.sources.ri.maxBatchCount = 1000

a1.channels = c1

a1.channels.c1.type = file

a1.channels.c1.capacity = 1000000

a1.channels.c1.transactionCapacity = 100

a1.channels.c1.checkpointDir = /root/flume_chkp

a1.channels.c1.dataDirs = /root/flume_data

a1.sinks = k1

a1.sinks.k1.type = logger

a1.sinks.k1.channel = c1

?????????????????????8.2.5??????????????測試

在使用taildir?source??和 ?file?channel的情況下,經過反復各種人為破壞,發現,沒有數據丟失的現象發生;

但是,如果時間點掐的比較好(sink 取了一批數據寫出,但還沒來得及向channel提交事務),會產生數據重復的現象!

?????????????????????8.3kafka channel

???????????????????????????????????8.3.1特性

agent利用kafka作為channel數據緩存

kafka channel要跟 kafka?source、 kafka?sink區別開來

kafka?channel在應用時,可以沒有source?|??或者可以沒有sink

如果是需要把kafka作為最終采集存儲,那么就只要 ?source?+?kafka?channel

如果是把kafka作為數據源,要將kafka中的數據寫往hdfs,那么就只要 kafka?channel?+?hdfs?sink

?????????????????????????????????????????????????8.3.2參數

Property Name

Default

Description

type

名字:?org.apache.flume.channel.kafka.KafkaChannel

kafka.bootstrap.servers

Kafka服務器地址

kafka.topic

flume-channel

所使用的topic

kafka.consumer.group.id

flume

消費者組id

parseAsFlumeEvent

true

跟上、下游匹配,是否需要將數據解析為Flume的Event格式

pollTimeout

500

從kafka取數據的超時時間

defaultPartitionId

默認指派的partitionid

partitionIdHeader

將一個event指派到某個分區時所使用的header 的key

kafka.consumer.auto.offset.reset

latest

初始化讀取偏移量的策略

???????????????????????????????????????????????????????????????8.3.3配置測試

配置示例1:用exec?soure讀文件,往kafka?channel中寫

不帶sink!自己用消費者去kafka集群中讀取采集到的數據!

a1.sources = r1

# 配置兩個channel,為了便于觀察

# c1 是kafkachannel ,c2是一個內存channel

a1.channels = c1 c2

a1.sinks = k1

a1.sources.r1.type = exec

a1.sources.r1.command = tail -F /root/logs/a.log

a1.sources.r1.channels = c1 c2

# kafka-channel具體配置,該channel沒有sink

a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel

a1.channels.c1.kafka.bootstrap.servers = h1:9092,h2:9092,h3:9092

a1.channels.c1.parseAsFlumeEvent = false

# 內存channel 配置,并對接一個logger sink來觀察

a1.channels.c2.type = memory

a1.sinks.k1.type = logger

a1.sinks.k1.channel = c2

運行測試:

1.準備好a.log文件

2.啟動好kafka

3.啟動agent

4.往a.log寫入數據

5.用kafka的控制臺消費者消費主題,看是否拿到數據

配置示例2:用logger?sink,從kafka?channel中取數據

a1.sources = r1

a1.sinks = k1

a1.channels = c1

a1.sources.r1.type = exec

a1.sources.r1.command = tail -F /root/logs/a.log

a1.sources.r1.channels = c1

a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel

a1.channels.c1.kafka.bootstrap.servers = h1:9092,h2:9092,h3:9092

a1.channels.c1.parseAsFlumeEvent = false

a1.sinks.k1.type = logger

a1.sinks.k1.channel = c1

運行測試:

2.啟動好kafka

3.啟動flume agent

4.用kafka的控制臺生產者向topic中寫入數據

5.在flume?agent的控制臺上觀察是否取到數據

9 Flume常用組件詳解sink

sink是從channel中獲取、移除數據,并輸出到下游(可能是下一級agent,也可能是最終目標存儲系統)

9.1hdfs sink

???????9.1.1特性

數據被最終發往hdfs

可以生成text文件或 sequence?文件,而且支持壓縮;

支持生成文件的周期性roll機制:基于文件size,或者時間間隔,或者event數量;

目標路徑,可以使用動態通配符替換,比如用%D代表當前日期;

當然,它也能從event的header中,取到一些標記來作為通配符替換;

header:{type=acb}

/weblog/%{type}/%D/ ?就會被替換成: /weblog/abc/19-06-09/

????????????????????????????9.1.2參數

Name

Default

Description

channel

從哪個channel取數據

type

別名:?hdfs

hdfs.path

目標hdfs存儲路徑(URI)

hdfs.filePrefix

FlumeData

指定生成的文件名前綴

hdfs.fileSuffix

后綴

hdfs.inUsePrefix

正在寫入的文件的前綴標識

hdfs.inUseSuffix

.tmp

正在寫入的文件的后綴標識

hdfs.rollInterval

30

切換文件的條件:間隔時間;為0則不生效

hdfs.rollSize

134217728

切換文件的條件:文件大小;為0則不生效

hdfs.rollCount

10

切換文件的條件:event條數;為0則不生效

hdfs.idleTimeout

0

不活躍文件的關閉超時時長;0則不自動關閉

hdfs.batchSize

100

從channel中取一批數據的最大大小;

hdfs.codeC

壓縮編碼: gzip, bzip2, lzo, lzop, snappy

hdfs.fileType

SequenceFile

目標文件格式: SequenceFile,?DataStream?or?CompressedStream?

注意:DataStream?不能支持壓縮

CompressedStream?必須設置壓縮編碼

SequenceFile 可壓縮可不壓縮

hdfs.maxOpenFiles

5000

允許同時最多打開的文件數;如果超出,則會關閉最早打開的

hdfs.minBlockReplicas

目標文件的block副本數

hdfs.writeFormat

Writable

指定sequence?file中的對象類型;支持Text和Writable

同時請使用Text,否則后續數據處理平臺可能無法解析

hdfs.threadsPoolSize

10

操作HDFS時的線程池大小

hdfs.rollTimerPoolSize

1

檢查文件是否需要被roll的線程數

hdfs.kerberosPrincipal

Kerberos user principal for accessing secure HDFS

hdfs.kerberosKeytab

Kerberos keytab for accessing secure HDFS

hdfs.proxyUser

hdfs.round

false

目錄通配符切換是是否需要切掉尾數

hdfs.roundValue

10

時間尾數切掉多少

hdfs.roundUnit

minute

時間尾數切掉大小的單位-?second,?minute?or?hour.

hdfs.timeZone

Local Time

時間通配符所使用的時區

hdfs.useLocalTimeStamp

false

所用的時間是否要從agent?sink本地獲取

hdfs.closeTries

0

重命名已完成文件的重試次數;0則一直嘗試重命名

hdfs.retryInterval

180

關閉一個文件的重試時間間隔

serializer

TEXT

將channel中的event?body解析成什么格式:Text|?avro_event ; 也可以使用自定義的序列化器

serializer.*

小提示:什么叫做URI

????????????????????????????9.1.3配置示例

## 定義

a1.sources = r1

a1.sinks = k1

a1.channels = c1

## source

a1.sources.r1.type = exec

a1.sources.r1.command = tail -F /root/logs/a.log

a1.sources.r1.channels = c1

a1.sources.r1.interceptors = i1

a1.sources.r1.interceptors.i1.type = timestamp

## channel

a1.channels.c1.type = memory

a1.channels.c1.capacity = 100000000

## sink

a1.sinks.k1.channel = c1

a1.sinks.k1.type = hdfs

a1.sinks.k1.hdfs.path = hdfs://h1:8020/doitedu/%Y-%m-%d/%H-%M

a1.sinks.k1.hdfs.round = true

a1.sinks.k1.hdfs.roundValue = 10

a1.sinks.k1.hdfs.roundUnit = minute

a1.sinks.k1.hdfs.filePrefix = doit_

a1.sinks.k1.hdfs.fileSuffix = .log.gz

a1.sinks.k1.hdfs.rollInterval = 0

a1.sinks.k1.hdfs.rollSize = 102400

a1.sinks.k1.hdfs.rollCount = 0

a1.sinks.k1.hdfs.fileType = CompressedStream

a1.sinks.k1.hdfs.codeC = gzip

a1.sinks.k1.hdfs.writeFormat = Text

??????????????9.1.4測試

1. 啟動hdfs

2. 清除以前的taildirsource產生的偏移量記錄文件、filechannel緩存的數據目錄和checkpoint目錄

3. 啟動agent

4. 用for循環腳本往日志文件中不斷寫入新的數據

5. 到hdfs中觀察結果

???????9.2?kafka sink

有了kafka?channel后, ?kafka?sink的必要性就降低了。因為我們可以用kafka作為channel來接收source產生的數據!

9.2.1特性

9.2.2參數

Property Name

Default

Description

type

名稱:?org.apache.flume.sink.kafka.KafkaSink

kafka.bootstrap.servers

Kafka服務器列表

kafka.topic

default-flume-topic

Kafka的topic

flumeBatchSize

100

從channel中取event的批次大小

kafka.producer.acks

1

Kafka生產者消息推送應答級別:

1?: Leader接收到即回應

0 :不等回應?

-1:副本同步完成,再回應

useFlumeEventFormat

false

是否使用avro序列化

defaultPartitionId

指定所有event默認發往的分區id;不指定則按kafka生產者的分區器,均勻分發

partitionIdHeader

通過在header中指定分區id,來約束這個event發往的分區

allowTopicOverride

true

允許在event的header中指定要寫入的topic

topicHeader

topic

如果上一條開關開啟,則在header中放入的key的名稱

Other Kafka Producer Properties

可以用kafka.producer.xxx來配置任何kafka?producer的參數

??????????????9.2.3配置示例

a1.sources = r1

a1.sources.r1.type = TAILDIR

a1.sources.r1.channels = c1

a1.sources.r1.positionFile = /root/taildir_chkp/taildir_position.json

a1.sources.r1.filegroups = f1

a1.sources.r1.filegroups.f1 = /root/weblog/access.log

a1.sources.r1.fileHeader = true

a1.sources.r1.maxBatchCount = 1000

a1.sources.r1.interceptors = i1

a1.sources.r1.interceptors.i1.type = timestamp

a1.channels = c1

a1.channels.c1.type = file

a1.channels.c1.capacity = 1000000

a1.channels.c1.transactionCapacity = 100

a1.channels.c1.checkpointDir = /root/flume_chkp

a1.channels.c1.dataDirs = /root/flume_data

a1.sinks = k1

a1.sinks.k1.channel = c1

a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink

a1.sinks.k1.kafka.topic = mytopic

a1.sinks.k1.kafka.bootstrap.servers = c701:9092,c702:9092,c703:9092

a1.sinks.k1.kafka.flumeBatchSize = 20

a1.sinks.k1.kafka.producer.acks = 1

a1.sinks.k1.kafka.producer.linger.ms = 1

a1.sinks.k1.kafka.producer.compression.type = snappy

??????????????9.2.4測試

1. 清掉 taildir的偏移量記錄文件;清掉filechannel的數據緩存和checkpoint記錄;

2. 準備一個日志文件,并不斷寫入數據

3. 啟動flume的agent

4. 用kafka的客戶端去觀察數據結果

9.3 avro?sink

9.3.1 特性

avro?sink用來向avro?source發送avro序列化數據,這樣就可以實現agent之間的級聯

??????????????9.3.2參數

Property Name

Default

Description

channel

type

The component type name, needs to be?avro.

hostname

目標avro source的主機

port

目標avro source的綁定端口

batch-size

100

number of event to batch together for send.

connect-timeout

20000

連接超時時間

request-timeout

20000

請求超時時間

reset-connection-interval

none

Amount of time (s) before the connection to the next hop is reset. This will force the Avro Sink to reconnect to the next hop. This will allow the sink to connect to hosts behind a hardware load-balancer when news hosts are added without having to restart the agent.

compression-type

none

This can be “none” or “deflate”. The compression-type must match the compression-type of matching AvroSource

compression-level

6

The level of compression to compress event. 0 = no compression and 1-9 is compression. The higher the number the more compression

ssl

false

Set to true to enable SSL for this AvroSink. When configuring SSL, you can optionally set a “truststore”, “truststore-password”, “truststore-type”, and specify whether to “trust-all-certs”.

trust-all-certs

false

If this is set to true, SSL server certificates for remote servers (Avro Sources) will not be checked. This should NOT be used in production because it makes it easier for an attacker to execute a man-in-the-middle attack and “listen in” on the encrypted connection.

truststore

The path to a custom Java truststore file. Flume uses the certificate authority information in this file to determine whether the remote Avro Source’s SSL authentication credentials should be trusted. If not specified, then the global keystore will be used. If the global keystore not specified either, then the default Java JSSE certificate authority files (typically “jssecacerts” or “cacerts” in the Oracle JRE) will be used.

truststore-password

The password for the truststore. If not specified, then the global keystore password will be used (if defined).

truststore-type

JKS

The type of the Java truststore. This can be “JKS” or other supported Java truststore type. If not specified, then the global keystore type will be used (if defined, otherwise the defautl is JKS).

exclude-protocols

SSLv3

Space-separated list of SSL/TLS protocols to exclude. SSLv3 will always be excluded in addition to the protocols specified.

maxIoWorkers

2 * the number of available processors in the machine

The maximum number of I/O worker threads. This is configured on the NettyAvroRpcClient NioClientSocketChannelFactory.

????????????????????????????9.3.3配置示例

級聯配置,需要至少兩個flume?agent來演示

在C703上,配置avro?sink?發送者

## c703 ##

a1.sources = s1

a1.sources.s1.type = exec

a1.sources.s1.command = tail -F /root/weblog/access.log

a1.sources.s1.channels = c1

a1.channels = c1

a1.channels.c1.type = memory

a1.sinks = k1

a1.sinks.k1.type = avro

a1.sinks.k1.channel = c1

a1.sinks.k1.hostname = c701

a1.sinks.k1.port = 4545

在C701上,配置avro?source?接收者

## c701 ##

a1.sources = s1

a1.sources.s1.type = avro

a1.sources.s1.hostname = 0.0.0.0

a1.sources.s1.port = 4545

a1.sources.s1.channel = c1

a1.channels = c1

a1.channels.c1.type = memory

a1.sinks = k1

a1.sinks.k1.type = logger

a1.sinks.k1.channel = c1

????????????????????????????9.3.4啟動測試

先在C701上啟動接受者avro?source(服務)

bin/flume-ng agent -n a1 -c conf/ -f myconf/avro-mem-logger.conf -Dflume.root.logger=INFO,console

再在C703上啟動發送者avro?sink(客戶端)

bin/flume-ng agent -n a1 -c conf/ -f myconf/tail-mem-avro.conf -Dflume.root.logger=INFO,console

10 Flume常用組件詳解:Selector

一個source可以對接多個channel

那么,source的數據如何在多個channel之間傳遞,就由selector來控制

配置應該掛載到source組件上

???????10.1實踐一:replicating selector復制選擇器

replicating selector就是默認的選擇器

官網配置參考

??????????????????????????10.1.1目標場景

selector將event復制,分發給所有下游節點

??????????????????????????????????????????10.1???????.2Flume agent配置

# Name the components on this agent ?a1.sources = r1 ?a1.sinks = k1 k2 ?a1.channels = c1 c2 ?# http source, with replicating selectora1.sources.r1.type = httpa1.sources.r1.port = 6666a1.sources.r1.bind = mastera1.sources.r1.selector.type = replicating ?# Describe the sink ?a1.sinks.k1.type = avro ?a1.sinks.k1.hostname = slave1 ?# bind to remote host,RPCa1.sinks.k1.port = 6666a1.sinks.k2.type = avro# bind to remote host,PRCa1.sinks.k2.hostname = slave2a1.sinks.k2.port = 6666# 2 channels in selector testa1.channels.c1.type = memory ?a1.channels.c1.capacity = 1000 ?a1.channels.c1.transactionCapacity = 100 ?a1.channels.c2.type = memory ?a1.channels.c2.capacity = 1000 ?a1.channels.c2.transactionCapacity = 100 ?# bind source ,sink to channelsa1.sources.r1.channels = c1 c2a1.sinks.k1.channel = c1 ?a1.sinks.k2.channel = c2

??????????????10.1???????.3?Collector1 配置

# 01 specify agent,source,sink,channela1.sources = r1a1.sinks = k1a1.channels = c1# 02 avro source,connect to local port 6666a1.sources.r1.type = avroa1.sources.r1.bind = slave1a1.sources.r1.port = 6666# 03 logger sinka1.sinks.k1.type = logger# 04 channel,memorya1.channels.c1.type = memorya1.channels.c1.capacity = 1000a1.channels.c1.transactionCapacity = 100# 05 bind source,sink to channela1.sources.r1.channels = c1a1.sinks.k1.channel = c1

???????10.1.4?Collector2 配置

# 01 specify agent,source,sink,channela1.sources = r1a1.sinks = k1a1.channels = c1# 02 avro source,connect to local port 6666a1.sources.r1.type = avroa1.sources.r1.bind = slave2a1.sources.r1.port = 6666# 03 logger sinka1.sinks.k1.type = logger# 04 channel,memorya1.channels.c1.type = memorya1.channels.c1.capacity = 1000a1.channels.c1.transactionCapacity = 100# 05 bind source,sink to channela1.sources.r1.channels = c1a1.sinks.k1.channel = c1

???????10.1.5?測試驗證

???????10.2?實踐二:multiplexing selector多路選擇器

multiplexing selector可以根據event中的一個指定key的value來決定這條消息會寫入哪個channel,具體在選擇時,需要配置一個映射關系,比如

a1.sources.r1.selector.mapping.CZ=c1 ?; 就意味著header中的value為CZ的話,這條消息就會被寫入c1這個channel

multiplexing selector官方配置參考

???????10.2.1目標場景

????????10.2.2?第一級1 / 2配置

a1.sources = r1a1.channels = c1a1.sinks = k1a1.sources.r1.type = execa1.sources.r1.command = tail -F /root/logs/a.loga1.sources.r1.channels = c1a1.sources.r1.interceptors = i1a1.sources.r1.interceptors.i1.type = statica1.sources.r1.interceptors.i1.key = flag# 第一臺value=1,另一臺value=2a1.sources.r1.interceptors.i1.value = 1a1.channels.c1.type = memorya1.sinks.k1.type = avroa1.sinks.k1.hostname = h3a1.sinks.k1.port = 44444a1.sinks.k1.channel = c1

??????????????????????10.2.3?第二級配置

a1.sources = r1a1.channels = c1 c2a1.sinks = k1 k2# source配置a1.sources.r1.channels = c1 c2a1.sources.r1.type = avroa1.sources.r1.bind = 0.0.0.0a1.sources.r1.port = 44444# source的選擇器配置a1.sources.r1.selector.type = multiplexinga1.sources.r1.selector.header = flaga1.sources.r1.selector.default = c2a1.sources.r1.selector.mapping.1 = c1a1.sources.r1.selector.mapping.2 = c2# channle 配置a1.channels.c1 = memorya1.channels.c2 = memory# 兩個sink分別對接兩個channel的配置a1.sinks.k1.type = loggera1.sinks.k1.channel = c1a1.sinks.k2.type = loggera1.sinks.k2.channel = c2

????????????????????????????????????10.2.4?測試驗證

11 Flume常用組件詳解:grouping processor

一個agent中,多個sink可以被組裝到一個組,而數據在組內多個sink之間發送,有兩種模式:

模式1: Failover Sink Processor??失敗切換

一組中只有優先級高的那個sink在工作,另一個是等待中

如果高優先級的sink發送數據失敗,則專用低優先級的sink去工作!并且,在配置時間penalty之后,還會嘗試用高優先級的去發送數據!

a1.sinkgroups = g1

a1.sinkgroups.g1.sinks = k1 k2

a1.sinkgroups.g1.processor.type = failover

## 對兩個sink分配不同的優先級

a1.sinkgroups.g1.processor.priority.k1 = 200

a1.sinkgroups.g1.processor.priority.k2 = 100

## 主sink失敗后,停用懲罰時間

a1.sinkgroups.g1.processor.maxpenalty = 5000

模式2: Load balancing Sink Processor ?負載均衡

允許channel中的數據在一組sink中的多個sink之間進行輪轉,策略有:

round-robin(輪著發)

random(隨機挑)

a1.sinkgroups = g1

a1.sinkgroups.g1.sinks = k1 k2

a1.sinkgroups.g1.processor.type = load_balance

a1.sinkgroups.g1.processor.backoff = true

a1.sinkgroups.g1.processor.selector = random

12Flume自定義擴展組件

12.1自定義Source

??????????????12.1.1需求場景

什么情況下需要自定義source:

一般是某種數據源,用flume內置的source組件無法解析,比如XML文檔

而本教程中的例子:實現文本日志的采集,并能記住偏移量!

????????????????????????????12.1.2實現思路

首先,找到自定義source所要實現或繼承的父類/接口

然后,重寫方法(插入自己的需求邏輯)

然后,將代碼打成jar包,傳入flume的lib目錄

最后,寫配置文件調用自定義的source

??????????????12.1.3代碼架構

source是由 SourceRunner—》EventDrivenSourceRunner來調用

sourceRunner 拿到?source實例對象

然后調?source的start方法

source讀數據,然后將數據轉成event

然后將event傳給channelprocessor.processEvent(event)

processEvent中第一個動作就是調用攔截器攔截這個event,然后再往channel中寫入

?????12.1.4?具體實現

  • 線程池實現版:

package cn.doitedu.flume.custom;

import org.apache.commons.io.FileUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.EventDrivenSource;
import org.apache.flume.SystemClock;
import org.apache.flume.channel.ChannelProcessor;
import org.apache.flume.conf.Configurable;
import org.apache.flume.event.EventBuilder;
import org.apache.flume.source.AbstractSource;
import org.apache.flume.source.ExecSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.*;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
?* 能夠記錄讀取位置偏移量的自定義source
?*/
public class HoldOffesetSource extends AbstractSource implements EventDrivenSource, Configurable {
????private static final Logger logger = LoggerFactory.getLogger(HoldOffesetSource.class);

????private String positionfilepath;
????private String logfile;
????private int batchsize;

????private ExecutorService exec;

????/**
?????* 框架調用本方法,開始采集數據
?????* 自定義代碼去讀取數據,轉為event
?????* 用getChannelProcessor()方法(定義在父類中)去獲取框架的channelprocessor(channel處理器)
?????* 調用這個channelprocessor將event提交給channel
?????*/
????@Override
????public synchronized void start() {

????????super.start();
????????// 用于向channel提交數據的一個處理器
????????ChannelProcessor channelProcessor = getChannelProcessor();

????????// 獲取歷史偏移量
????????long offset = 0;
????????try {
????????????File positionfile = new File(this.positionfilepath);
????????????String s = FileUtils.readFileToString(positionfile);
????????????offset = Long.parseLong(s);
????????} catch (IOException e) {
????????????e.printStackTrace();
????????}

????????// 構造一個線程池
????????exec = Executors.newSingleThreadExecutor();
????????// 向線程池提交數據采集任務
????????exec.execute(new HoldOffsetRunnable(offset, logfile, channelProcessor, batchsize, positionfilepath));

????}

????/**
?????* 停止前要調用的方法
?????* 可以在這里做一些資源關閉清理工作
?????*/
????@Override
????public synchronized void stop() {
????????super.stop();

???????try{
???????????exec.shutdown();
???????}catch (Exception e){
???????????exec.shutdown();
???????}
????}

????/**
?????* 獲取配置文件中的參數,來配置本source實例
?????* <p>
?????* 要哪些參數:
?????* 偏移量記錄文件所在路徑
?????* 要采集的文件所在路徑
?????*
?????* @param context
?????*/
????public void configure(Context context) {

????????// 這是我們source用來記錄偏移量的文件路徑
????????this.positionfilepath = context.getString("positionfile", "./");

????????// 這是我們source要采集的日志文件的路徑
????????this.logfile = context.getString("logfile");

????????// 這是用戶配置的采集事務批次最大值
????????this.batchsize = context.getInteger("batchsize", 100);

????????// 如果日志文件路徑沒有指定,則拋異常
????????if (StringUtils.isBlank(logfile)) throw new RuntimeException("請配置需要采集的文件路徑");

????}

????/**
?????* 采集文件的具體工作線程任務類
?????*/
????private static class HoldOffsetRunnable implements Runnable {

????????long offset;
????????String logfilepath;
????????String positionfilepath;
????????ChannelProcessor channelProcessor; ?// channel提交器 (里面會調攔截器,會開啟寫入channel的事務)
????????int batchsize; // 批次大小
????????List<Event> events = new ArrayList<Event>(); ?// 用來保存一批事件
????????SystemClock systemClock = new SystemClock();

????????public HoldOffsetRunnable(long offset, String logfilepath, ChannelProcessor channelProcessor, int batchsize, String positionfilepath) {
????????????this.offset = offset;
????????????this.logfilepath = logfilepath;
????????????this.channelProcessor = channelProcessor;
????????????this.batchsize = batchsize;
????????????this.positionfilepath = positionfilepath;
????????}

????????public void run() {

????????????try {
????????????????// 先定位到指定的offset
????????????????RandomAccessFile raf = new RandomAccessFile(logfilepath, "r");
????????????????raf.seek(offset);

????????????????// 循環讀數據
????????????????String line = null;

????????????????// 記錄上一批提交的時間
????????????????long lastBatchTime = System.currentTimeMillis();
????????????????while (true) {
????????????????????line = raf.readLine();
????????????????????if(line == null ){
????????????????????????Thread.sleep(2000);
????????????????????????continue;
????????????????????}

????????????????????// 將數據轉成event
????????????????????Event event = EventBuilder.withBody(line.getBytes());
????????????????????// 裝入list batch
????????????????????synchronized (HoldOffesetSource.class) {
????????????????????????events.add(event);
????????????????????}

????????????????????// 判斷批次大小是否滿 或者 時間到了沒有
????????????????????if (events.size() >= batchsize || timeout(lastBatchTime)) {
????????????????????????// 滿足,則提交
????????????????????????channelProcessor.processEventBatch(events);

????????????????????????// 記錄提交時間
????????????????????????lastBatchTime = systemClock.currentTimeMillis();

????????????????????????// 記錄偏移量
????????????????????????long offset = raf.getFilePointer();
????????????????????????FileUtils.writeStringToFile(new File(positionfilepath), offset + "");

????????????????????????// 清空本批event
????????????????????????events.clear();

????????????????????}

????????????????????// 不滿足,繼續讀
????????????????}
????????????} catch (FileNotFoundException e) {
????????????????logger.error("要采集的文件不存在");
????????????} catch (IOException e) {
????????????????logger.error("我也不知道怎么搞的,不好意思,我罷工了");
????????????} catch (InterruptedException e) {
????????????????logger.error("線程休眠出問題了");
????????????}
????????}

????????// 判斷是否批次間隔超時
????????private boolean timeout(long lastBatchTime) {
????????????return systemClock.currentTimeMillis() - lastBatchTime > 2000;
????????}
????}
}

  • 單線程實現版

12.1.5 啟動測試

代碼打成jar包,上傳flume的lib

然后寫agent配置文件

a1.sources = s1

a1.channels = c1

a1.sinks = k1

# source 配置

a1.sources.s1.type = cn.doitedu.flume.custom.HoldOffesetSource

a1.sources.s1.position_file_path= /root/myposition

a1.sources.s1.data_file_path = /root/weblog/access.log

a1.sources.s1.batchSize = 100

a1.sources.s1.batchTime = 5000

a1.sources.s1.channels = c1

a1.channels.c1.type = memory

a1.sinks.k1.type = logger

a1.sinks.k1.channel = c1

然后啟動flume-agent即可

12.2?自定義攔截器

12.2.1需求場景

公司的點擊流日志數據在一個目錄中不斷生成: /var/log/click_streaming.log

日志文件會隨著文件的大小達到一定閾值(128M)而被重命名,比如:click_streaming.log.1,并且會生成一個新的click_streaming.log文件繼續寫入日志流;

日志文件中數據格式如下:

13888776677,click,/baoming,張飛,湖北省

13889976655,click,/xuexi,關羽,山西省

……..

現在需要用flume去日志服務器上采集數據寫入HDFS,并且要求,對數據中的手機號、姓名字段進行加密(MD5加密,并將加密結果變成BASE64編碼);

??????????????12.2.2實現思路

采集的目標目錄中,會不斷生成新文件

那么,我們的source組件可以選擇 ?taildir:

1.可以監控到新文件 ?

2.可以記錄采集的偏移量

channel,為了保證可靠性,可以選擇 ?filechannel?:

1.會在磁盤上緩存event??

2.會在磁盤上記錄事務狀態

目標存儲是HDFS,sink自然是選擇hdfs?sink;

加密需求的解決:

如果從source上解決,那只能修改 taildir組件的源碼;

如果從sink上解決,那只能修改hdfs?sink組件的源碼;

上述兩種,都需要修改源碼,不是最佳選擇!

最佳選擇:通過攔截器來實現對數據的加工!而flume中沒有現成的內置攔截器可以實現字段加密,我們可以自定義自己的攔截器;

??????????????12.2.3?自定義攔截器的開發

????????????????????????????12.2.3.1基本套路

框架中,自定義擴展接口的套路:

1. 要實現或者繼承框架中提供的接口或父類,實現、重寫其中的方法

2. 寫好的代碼要打成jar包,并放入flume的lib目錄

3. 要將自定義的類,寫入相關agent配置文件

??????????????????????????????????????????12.2.3.2攔截器設計

應對本場景使用自定義攔截器,還要考慮幾個參數的問題:

用戶要加密的字段,可能會變化,代碼的可配置性需要匹配

1. 可以在配置文件中設計一個參數,來指定要加密的字段:??

indices (要加密的字段索引)

以及索引的切割符idxSplitBy

--》 比如: ?a1.sources.interceptors.i1.indices = 0:3

a1.sources.interceptors.i1.idxSplitBy = :

2.為了能夠正確切分數據中的字段,還需要一個參數:字段的分隔符dataSplitBy ?

--》 比如: a1.sources.interceptors.i1.dataSplitBy= ,

??????????????????????????????????????????12.2.3.3flume中的攔截器接口規范

首先,引入flume的開發依賴

<dependency>
????<groupId>org.apache.flume</groupId>
????<artifactId>flume-ng-core</artifactId>
????<version>1.9.0</version>
????<scope>provided</scope>
</dependency>

自定義攔截器的工作機制,其實很簡單:

flume先調自定義攔截器中的一個內部Builder類的config()方法進行參數配置

flume再調Builder類的build()方法獲取自定義攔截器的實例對象(可以在構造過程中傳遞參數)

flume再反復調用攔截器對象的intercept(List<Event> events)方法來修改event

????????????????????????????????????????????????????????12.2.3.4?代碼實現

package cn.doitedu.flume.custom;

import org.apache.commons.codec.binary.Base64;
import org.apache.commons.codec.digest.DigestUtils;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.event.EventBuilder;
import org.apache.flume.interceptor.Interceptor;

import java.util.ArrayList;
import java.util.List;

public class EncryptInterceptor ?implements Interceptor {
????// 要加密的字段索引s
????String indices;
????// 索引之間的分隔符
????String idxSplitBy;
????// 數據體字段之間的分隔符
????String dataSplitBy;

????/**
?????* 構造方法
?????* @param indices
?????* @param idxSplitBy
?????* @param dataSplitBy
?????*/
????public EncryptInterceptor(String indices, String idxSplitBy, String dataSplitBy) {
????????// 0,3
????????this.indices = indices;
????????this.idxSplitBy = idxSplitBy;
????????this.dataSplitBy = dataSplitBy;
????}

????// 這個方法會被框架調用一次,用來做一些初始化工作
????public void initialize() {

????}

????// 攔截方法--對一個event進行處理
????public Event intercept(Event event) {

????????byte[] body = event.getBody();
????????String dataStr = new String(body);

????????// 數據的字段數組
????????String[] dataFieldsArr = dataStr.split(dataSplitBy);

????????// 需要加密的索引的數組
????????String[] idxArr = indices.split(idxSplitBy);

????????for (String s : idxArr) {

????????????int index = Integer.parseInt(s);
????????????// 取出要加密的字段的內容
????????????String field = dataFieldsArr[index];
????????????// MD5加密這個字段
????????????String encryptedField = DigestUtils.md5Hex(field);
????????????// BASE64編碼
????????????byte[] bytes = Base64.decodeBase64(encryptedField);
????????????// 替換掉原來的未加密內容
????????????dataFieldsArr[index] = new String(bytes);
????????}

????????// 將加密過的字段重新拼接成一條數據,并使用原來的分隔符
????????StringBuilder sb = new StringBuilder();
????????for (String field : dataFieldsArr) {
????????????sb.append(field).append(dataSplitBy);
????????}

????????sb.deleteCharAt(sb.lastIndexOf(dataSplitBy));

????????// 返回加密后的字段所封裝的event對象
????????return EventBuilder.withBody(sb.toString().getBytes());
????}

????// 攔截方法--對一批event進行處理
????public List<Event> intercept(List<Event> events) {

????????ArrayList<Event> lst = new ArrayList<Event>();

????????for (Event event : events) {
????????????Event eventEncrpt = intercept(event);
????????????lst.add(eventEncrpt);
????????}

????????return lst;
????}

????// agent退出前,會調一次該方法,進行需要的清理、關閉操作
????public void close() {

????}

????/**
?????* 攔截器的構造器
?????*/
????public staticc lass EncryptInterceptorBuilder implements Interceptor.Builder{
????????// 要加密的字段索引s
????????String indices;
????????// 索引之間的分隔符
????????String idxSplitBy;
????????// 數據體字段之間的分隔符
????????String dataSplitBy;

????????// 構造一個攔截器實例
????????public Interceptor build() {

????????????return new EncryptInterceptor(indices,idxSplitBy,dataSplitBy);
????????}

????????// 獲取配置文件中的攔截器參數
????????public void configure(Context context) {
????????????// 要加密的字段索引s
????????????this.indices = context.getString(Constants.INDICES);
????????????// 索引之間的分隔符
????????????this.idxSplitBy = context.getString(Constants.IDX_SPLIT_BY);
????????????// 數據體字段之間的分隔符
????????????this.dataSplitBy = context.getString(Constants.DATA_SPLIT_BY);

????????}
????}

????public static class Constants {
????????public static final String INDICES = "indices";
????????public static final String IDX_SPLIT_BY = "idxSplitBy";
????????public static final String DATA_SPLIT_BY = "dataSplitBy";
????}
}

??????????????????????????????????????????????????????????????????????12.2.3.5?運行測試

1. 先將代碼打成jar包

2. 上傳到flume安裝節點上,并放入flume的lib目錄

3. 寫采集方案配置文件

a1.sources = s1

a1.sources.s1.channels = c1

a1.sources.s1.type = exec

a1.sources.s1.command = tail -F /root/weblog/access.log

a1.sources.s1.batchSize = 100

a1.sources.s1.interceptors = i1

a1.sources.s1.interceptors.i1.type = cn.doitedu.flume.custom.EncryptInterceptor$EncryptInterceptorBuilder

a1.sources.s1.interceptors.i1.indices = 0:4

a1.sources.s1.interceptors.i1.idxSplitBy = :

a1.sources.s1.interceptors.i1.dataSplitBy = ,

a1.channels = c1

a1.channels.c1.type = memory

a1.channels.c1.capacity = 200

a1.channels.c1.transactionCapacity = 100

a1.sinks = k1

a1.sinks.k1.type = logger

a1.sinks.k1.channel = c1

# 寫到kafka就用這一段sink配置

# a1.sinks = k1

# a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink

# a1.sinks.k1.kafka.bootstrap.servers = h1:9092,h2:9092,h3:9092

# a1.sinks.k1.channel = c1

4. 準備數據日志文件,與你的處理邏輯相符,如下所示

13888776677,click,/baoming,張飛,湖北省

13889976655,click,/xuexi,關羽,山西省

…….

5.運行

bin/flume-ng agent -n a1 -c conf/ -f agentconf/spooldir-myi-m-log.conf -Dflume.root.logger=INFO,console

13?綜合案例

???????13.1?案例場景

A、B等日志服務機器實時生產日志,日志分為多種類型:

log1/access.log

log2/nginx.log

log3/web.log

現在要求:

?把日志服務器中的各類日志采集匯總到一個中轉agent上,然后分類寫入hdfs中。

但是在hdfs中要求的目錄為:

/source/logs/access/20160101/**

/source/logs/nginx/20160101/**

/source/logs/web/20160101/**

并要求可以按指定的索引,將對應字段內容加密!

???????13.2?實現思路

1. 每臺日志服務器上部署一個flume?agent?-?-?-> level1,每個agent配置3個source對應3類數據

2. leve1_1級的agent在采集數據時,添加一個header,指定數據的類別

3. level_1級的agent要配置兩個avro sink,各自對接一個下級的agent

4. level_1還要配置sink?processoràfail?over??sink?processor,控制兩個sink中只有一個avro sink在工作,如果失敗再切換到另一個avro sink

5.level_1還要配置字段加密攔截器

6. level_2 級配置兩個flume?agent,使用avro?source接收數據

7. level_2 級的hdfs?sink,目錄配置使用動態通配符,取到event中的類別header,以便于將不同類別數據寫入不同hdfs?目錄!

??????????????13.4?配置文件

level_1級配置文件

a1.sources = r1 r2 r3

## source 配置

a1.sources.r1.type = TAILDIR

a1.sources.r1.channels = c1

a1.sources.r1.positionFile = /root/chekp1/taildir_position.json

a1.sources.r1.filegroups = f1

a1.sources.r1.filegroups.f1 = /root/weblog/log1/access.log

a1.sources.r1.maxBatchCount = 1000

a1.sources.r1.interceptors = i1 i2 i3

a1.sources.r1.interceptors.i1.type = timestamp

a1.sources.r1.interceptors.i2.type = cn.doitedu.flume.custom.EncryptInterceptor$EncryptInterceptorBuilder

a1.sources.r1.interceptors.i2.indices = 0:4

a1.sources.r1.interceptors.i2.idxSplitBy = :

a1.sources.r1.interceptors.i2.dataSplitBy = ,

a1.sources.r1.interceptors.i3.type = static

a1.sources.r1.interceptors.i3.key = logtype

a1.sources.r1.interceptors.i3.value = access

a1.sources.r2.type = TAILDIR

a1.sources.r2.channels = c1

a1.sources.r2.positionFile = /root/chekp2/taildir_position.json

a1.sources.r2.filegroups = f1

a1.sources.r2.filegroups.f1 = /root/weblog/log2/nginx.log

a1.sources.r2.maxBatchCount = 1000

a1.sources.r2.interceptors = i1 i2 i3

a1.sources.r2.interceptors.i1.type = timestamp

a1.sources.r2.interceptors.i2.type = cn.doitedu.flume.custom.EncryptInterceptor$EncryptInterceptorBuilder

a1.sources.r2.interceptors.i2.indices = 0:4

a1.sources.r2.interceptors.i2.idxSplitBy = :

a1.sources.r2.interceptors.i2.dataSplitBy = ,

a1.sources.r2.interceptors.i3.type = static

a1.sources.r2.interceptors.i3.key = logtype

a1.sources.r2.interceptors.i3.value = nginx

a1.sources.r3.type = TAILDIR

a1.sources.r3.channels = c1

a1.sources.r3.positionFile = /root/chekp3/taildir_position.json

a1.sources.r3.filegroups = f1

a1.sources.r3.filegroups.f1 = /root/weblog/log3/weblog.log

a1.sources.r3.maxBatchCount = 1000

a1.sources.r3.interceptors = i1 i2 i3

a1.sources.r3.interceptors.i1.type = timestamp

a1.sources.r3.interceptors.i2.type = cn.doitedu.flume.custom.EncryptInterceptor$EncryptInterceptorBuilder

a1.sources.r3.interceptors.i2.indices = 0:4

a1.sources.r3.interceptors.i2.idxSplitBy = :

a1.sources.r3.interceptors.i2.dataSplitBy = ,

a1.sources.r3.interceptors.i3.type = static

a1.sources.r3.interceptors.i3.key = logtype

a1.sources.r3.interceptors.i3.value = weblog

## channel 配置

a1.channels = c1

a1.channels.c1.type = file

a1.channels.c1.capacity = 1000000

a1.channels.c1.transactionCapacity = 100

a1.channels.c1.checkpointDir = /root/channel_chkp

a1.channels.c1.dataDirs = /root/channel_data

## sink 配置

a1.sinks = k1 k2

a1.sinks.k1.type = avro

a1.sinks.k1.channel = c1

a1.sinks.k1.hostname = c704

a1.sinks.k1.port = 4545

a1.sinks.k2.type = avro

a1.sinks.k2.channel = c1

a1.sinks.k2.hostname = c705

a1.sinks.k2.port = 4545

## sink processor - fail over 失敗配置

a1.sinkgroups = g1

a1.sinkgroups.g1.sinks = k1 k2

a1.sinkgroups.g1.processor.type = failover

## 對兩個sink分配不同的優先級

a1.sinkgroups.g1.processor.priority.k1 = 200

a1.sinkgroups.g1.processor.priority.k2 = 100

## 主sink失敗后,停用懲罰時間

a1.sinkgroups.g1.processor.maxpenalty = 5000

level_2配置

a1.sources = s1

a1.channels = c1

a1.sinks = k1

## source 配置

a1.sources.s1.type = avro

a1.sources.s1.bind= 0.0.0.0

a1.sources.s1.port = 4545

a1.sources.s1.channels = c1

a1.sources.s1.interceptors = i1

a1.sources.s1.interceptors.i1.type = timestamp

## channel 配置

a1.channels.c1.type = file

a1.channels.c1.capacity = 1000000

a1.channels.c1.transactionCapacity = 100

a1.channels.c1.checkpointDir = /root/lev2_channel_chkp

a1.channels.c1.dataDirs = /root/lev2_channel_data

## sink配置

a1.sinks.k1.type = hdfs

a1.sinks.k1.channel = c1

a1.sinks.k1.hdfs.path = hdfs://c701:8020/doitedu/%{logtype}/%Y-%m-%d/%H/

a1.sinks.k1.hdfs.filePrefix = doitedu-

a1.sinks.k1.hdfs.fileSuffix = .log.gz

a1.sinks.k1.hdfs.rollInterval = 0

a1.sinks.k1.hdfs.rollSize = 134217728

a1.sinks.k1.hdfs.rollCount = 0

## 配置壓縮

a1.sinks.k1.hdfs.fileType = CompressedStream

a1.sinks.k1.hdfs.codeC = gzip

## 數據格式

a1.sinks.k1.hdfs.serializer = TEXT

???????13.5?啟動測試

1. 先把自定義攔截器代碼jar包放入level_1級(C701/C702/C703)的所有flume的lib目錄中;

2. 將各臺機器上之前的一些checkpoint、緩存等目錄清除;

2. 啟動level_2級的兩個agent(C704、C705上);

3. 在level_1的所有機器上,創建日志數據目錄,并寫腳本模擬往3類日志中寫入日志:

4.在level_1的所有機器上啟動level_1級的flume?agent

5.到hdfs上觀察結果

6.嘗試kill掉2級的c704 的 agent,看是否能夠故障切換

14?面試加強

???????14.1?flume事務機制

二、Delivery 保證

認識 Flume 對事件投遞的可靠性保證是非常重要的,它往往是我們是否使用 Flume 來解決問題的決定因素之一。

消息投遞的可靠保證有三種:

  1. At-least-once
  2. At-most-once
  3. Exactly-once

基本上所有工具的使用用戶都希望工具框架能保證消息 Exactly-once ,這樣就不必在設計實現上考慮消息的丟失或者重復的處理場景。但是事實上很少有工具和框架能做到這一點,真正能做到這一點所付出的成本往往很大,或者帶來的額外影響反而讓你覺得不值得。假設 Flume 真的做到了 Exactly-once ,那勢必降低了穩定性和吞吐量,所以?Flume 選擇的策略是 At-least-once 。

當然這里的 At-least-once 需要加上引號,并不是說用上 Flume 的隨便哪個組件組成一個實例,運行過程中就能保存消息不會丟失。事實上 At-least-once 原則只是說的是 Source 、 Channel 和 Sink 三者之間上下投遞消息的保證。而當你選擇 MemoryChannel 時,實例如果異常掛了再重啟,在 channel 中的未被 sink 所消費的殘留數據也就丟失了,從而沒辦法保證整條鏈路的 At-least-once。

Flume 的 At-least-once 保證的實現基礎是建立了自身的 Transaction 機制。Flume 的 Transaction 有4個生命周期函數,分別是 start、 commit、rollback 和 close。

當 Source 往 Channel 批量投遞事件時首先調用 start 開啟事務,批量

put 完事件后通過 commit 來提交事務,如果 commit 異常則 rollback ,然后 close 事務,最后 Source 將剛才提交的一批消息事件向源服務 ack(比如 kafka 提交新的 offset )。Sink 消費 Channel 也是相同的模式,唯一的區別就是 Sink 需要在向目標源完成寫入之后才對事務進行 commit。兩個組件的相同做法都是只有向下游成功投遞了消息才會向上游 ack,從而保證了數據能 At-least-once 向下投遞。

???????14.2flume agent內部機制

組件:

1、ChannelSelector

ChannelSelector 的作用就是選出 Event 將要被發往哪個 Channel。其共有兩種類型,分別是 Replicating(復制)和 Multiplexing(多路復用)。 ReplicatingSelector 會將同一個 Event 發往所有的 Channel,Multiplexing 會根據相應的原則,將不同的 Event 發往不同的 Channel。

2、SinkProcessor

(1) SinkProcessor 共 有 三 種 類 型 , 分 別 是 DefaultSinkProcessor 、

LoadBalancingSinkProcessor 和 FailoverSinkProcessor。

(2) DefaultSinkProcessor 對應的是單個的 Sink,

LoadBalancingSinkProcessor 和 FailoverSinkProcessor 對應的是 Sink Group。

(3) LoadBalancingSinkProcessor 可以實現負載均衡的功能,FailoverSinkProcessor 可以實現故障轉移的功能。

???????14.3?ganglia及flume監控

開啟內置監控功能

-Dflume.monitoring.type=http -Dflume.monitoring.port=34545

將監控數據發往ganglia進行展現

-Dflume.monitoring.type=ganglia -Dflume.monitoring.port=34890

???????14.4?Flume調優

flume-ng agent包括source、channel、sink三個部分,這三部分都運行在JVM上,而JVM運行在linux操作系統之上。因此,對于flume的性能調優,就是對這三部分及影響因素調優。

1、source的配置

該項目中采用的是 taildir source,他的讀取速度能夠跟上命令行寫入日志的速度,故并未做特殊的處理。

2、channel的配置

可選的channel配置一般有兩種,一是memory channel,二是file channel。

建議在內存足夠的情況下,優先選擇memory channel。

嘗試過相同配置下使用file channel和memory channel,file channel明顯速度較慢,并且會生成log的文件,應該是用作緩存,當source已經接收但是還未寫入sink時的event都會存在這個文件中。這樣的好處是保證數據不會丟失,所以當對數據的丟失情況非常敏感且對實時性沒有太大要求的時候,還是使用file memory吧。。

一開始的memory channel配置用的是默認的,然后控制臺報出了如下警告:

The channel is full or unexpected failure. The source will try again after 1000 ms

這個是因為當前被采集的文件過大,可以通過增大keep-alive的值解決。深層的原因是文件采集的速度和sink的速度沒有匹配好。

所以memory channel有三個比較重要的參數需要配置:

#channel中最多緩存多少

a1.channels.c1.capacity = 5000

#channel一次最多吐給sink多少

a1.channels.c1.transactionCapacity = 2000

#event的活躍時間

a1.channels.c1.keep-alive = 10

3、sink的配置

可以通過壓縮來節省空間和網絡流量,但是會增加cpu的消耗。

batch:size越大性能越好,但是太大會影響時效性,一般batch size和源數據端的大小相同。

4、java內存的配置

export JAVA_OPTS="-Xms512m -Xmx2048m -Dcom.sun.management.jmxremote"

主要涉及Xms和Xmx兩個參數,可以根據實際的服務器的內存大小進行設計。

5、OS內核參數的配置

如果單臺服務器啟動的flume agent過多的話,默認的內核參數設置偏小,需要調整。(待補充,暫時還未涉及)。

總結

以上是生活随笔為你收集整理的Flume知识点全面总结教程的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。

欧美野外疯狂做受xxxx高潮 | 久久zyz资源站无码中文动漫 | 青青青爽视频在线观看 | 国产女主播喷水视频在线观看 | 一本加勒比波多野结衣 | 午夜无码人妻av大片色欲 | 久在线观看福利视频 | 中文字幕无码免费久久99 | 亚洲精品一区二区三区在线 | 亚洲一区二区三区无码久久 | 国产精品无套呻吟在线 | 国内丰满熟女出轨videos | а√天堂www在线天堂小说 | 色综合天天综合狠狠爱 | 精品无码成人片一区二区98 | 一本无码人妻在中文字幕免费 | 久久99精品久久久久久 | 熟女少妇人妻中文字幕 | 国产后入清纯学生妹 | 亚洲精品久久久久久久久久久 | 狂野欧美激情性xxxx | 亚洲人成无码网www | 亚洲精品一区二区三区婷婷月 | 日本肉体xxxx裸交 | 无码精品人妻一区二区三区av | 国产免费久久精品国产传媒 | 无码精品人妻一区二区三区av | 无码av免费一区二区三区试看 | 日韩欧美群交p片內射中文 | 无码国产乱人伦偷精品视频 | 久久国产劲爆∧v内射 | 亚洲国产欧美在线成人 | 婷婷五月综合缴情在线视频 | 一本加勒比波多野结衣 | 天天躁日日躁狠狠躁免费麻豆 | 国产在线aaa片一区二区99 | 国产成人精品三级麻豆 | 中文无码成人免费视频在线观看 | 麻豆国产人妻欲求不满 | 人人澡人人透人人爽 | 欧美性猛交xxxx富婆 | 成人一在线视频日韩国产 | 久久视频在线观看精品 | 亚洲一区二区三区在线观看网站 | 中文字幕乱妇无码av在线 | 国产成人无码专区 | 成人精品视频一区二区三区尤物 | 乱人伦中文视频在线观看 | 成年女人永久免费看片 | 国产成人人人97超碰超爽8 | 女人被男人爽到呻吟的视频 | 精品国产乱码久久久久乱码 | 疯狂三人交性欧美 | 女人被男人爽到呻吟的视频 | 午夜丰满少妇性开放视频 | 在线精品国产一区二区三区 | 久久久中文字幕日本无吗 | 扒开双腿吃奶呻吟做受视频 | 国产精品无码永久免费888 | 中文无码精品a∨在线观看不卡 | 波多野结衣乳巨码无在线观看 | 奇米综合四色77777久久 东京无码熟妇人妻av在线网址 | 无码免费一区二区三区 | 久久久久免费精品国产 | 日韩亚洲欧美精品综合 | 无码毛片视频一区二区本码 | 欧美丰满少妇xxxx性 | 98国产精品综合一区二区三区 | 久久久久99精品成人片 | 国产av人人夜夜澡人人爽麻豆 | 国产精品va在线播放 | 国产精品资源一区二区 | 色一情一乱一伦一区二区三欧美 | 国产乱子伦视频在线播放 | 亚洲欧美日韩国产精品一区二区 | 久久久国产一区二区三区 | 青青久在线视频免费观看 | 精品无码成人片一区二区98 | 精品成人av一区二区三区 | 白嫩日本少妇做爰 | 日本又色又爽又黄的a片18禁 | 国产电影无码午夜在线播放 | 亚洲s码欧洲m码国产av | 亚洲精品一区二区三区在线 | 亚洲日韩一区二区 | 老熟妇乱子伦牲交视频 | 国内丰满熟女出轨videos | 亚洲中文字幕无码中字 | 人人爽人人爽人人片av亚洲 | 国产免费无码一区二区视频 | 久久精品国产一区二区三区肥胖 | 精品人人妻人人澡人人爽人人 | 在线视频网站www色 | 丝袜足控一区二区三区 | 久久久精品成人免费观看 | 全黄性性激高免费视频 | 18黄暴禁片在线观看 | 最近中文2019字幕第二页 | 精品少妇爆乳无码av无码专区 | 国产成人无码区免费内射一片色欲 | а√天堂www在线天堂小说 | 大地资源网第二页免费观看 | 国产 精品 自在自线 | 在线精品亚洲一区二区 | 人人妻人人澡人人爽欧美一区九九 | 亚洲中文字幕无码一久久区 | 欧美野外疯狂做受xxxx高潮 | 国产九九九九九九九a片 | 性色欲网站人妻丰满中文久久不卡 | 亚洲精品国偷拍自产在线观看蜜桃 | 麻豆精品国产精华精华液好用吗 | 国产超碰人人爽人人做人人添 | 无码av岛国片在线播放 | 国产又爽又黄又刺激的视频 | 免费中文字幕日韩欧美 | 午夜福利试看120秒体验区 | 99精品无人区乱码1区2区3区 | 无码帝国www无码专区色综合 | 亚洲一区二区三区国产精华液 | 领导边摸边吃奶边做爽在线观看 | 在教室伦流澡到高潮hnp视频 | a在线观看免费网站大全 | 国产在线aaa片一区二区99 | 国产性生交xxxxx无码 | а√天堂www在线天堂小说 | 亚洲码国产精品高潮在线 | 亚洲人成网站色7799 | 大胆欧美熟妇xx | 国产人妻精品一区二区三区不卡 | 熟妇人妻中文av无码 | 夫妻免费无码v看片 | 亚洲熟妇色xxxxx欧美老妇 | 欧洲美熟女乱又伦 | 美女极度色诱视频国产 | 特黄特色大片免费播放器图片 | 日韩视频 中文字幕 视频一区 | 99久久久无码国产精品免费 | 欧美freesex黑人又粗又大 | 国产成人精品久久亚洲高清不卡 | 99国产精品白浆在线观看免费 | 99久久人妻精品免费二区 | 久久综合狠狠综合久久综合88 | 中文字幕无码视频专区 | 亚洲精品久久久久avwww潮水 | 精品乱子伦一区二区三区 | 亚洲精品午夜无码电影网 | 日韩精品无码免费一区二区三区 | 思思久久99热只有频精品66 | 久久精品国产日本波多野结衣 | 成人免费视频在线观看 | 国产人妖乱国产精品人妖 | 久久这里只有精品视频9 | 中文毛片无遮挡高清免费 | 国产性生大片免费观看性 | 久久午夜无码鲁丝片午夜精品 | 亚洲欧美日韩综合久久久 | 荫蒂添的好舒服视频囗交 | 精品乱子伦一区二区三区 | 欧美黑人性暴力猛交喷水 | 中文字幕亚洲情99在线 | 亚洲色偷偷偷综合网 | 中文字幕无码热在线视频 | 色一情一乱一伦一区二区三欧美 | 国产激情无码一区二区app | 美女扒开屁股让男人桶 | 免费观看又污又黄的网站 | www一区二区www免费 | 亚洲国产精品久久人人爱 | 成人精品天堂一区二区三区 | 久久久久久av无码免费看大片 | 日本免费一区二区三区最新 | 蜜桃av蜜臀av色欲av麻 999久久久国产精品消防器材 | 欧洲欧美人成视频在线 | 日欧一片内射va在线影院 | 日欧一片内射va在线影院 | 久久99热只有频精品8 | 女人被男人爽到呻吟的视频 | 中文字幕日韩精品一区二区三区 | 老太婆性杂交欧美肥老太 | 亚洲午夜久久久影院 | 亚洲经典千人经典日产 | 99久久亚洲精品无码毛片 | 女人被爽到呻吟gif动态图视看 | 一本久道久久综合婷婷五月 | 色综合久久网 | 国内揄拍国内精品少妇国语 | 一本一道久久综合久久 | 成人女人看片免费视频放人 | 国产无遮挡又黄又爽免费视频 | 久久国产精品精品国产色婷婷 | 亚洲 欧美 激情 小说 另类 | 澳门永久av免费网站 | 小sao货水好多真紧h无码视频 | 国产精品久久久久9999小说 | 日日摸日日碰夜夜爽av | 国产情侣作爱视频免费观看 | 国产内射老熟女aaaa | 成人免费视频视频在线观看 免费 | 亚洲精品国偷拍自产在线麻豆 | 2020久久超碰国产精品最新 | 国产成人精品一区二区在线小狼 | 色一情一乱一伦 | 欧美成人家庭影院 | 亚洲毛片av日韩av无码 | 久久精品人人做人人综合试看 | av无码久久久久不卡免费网站 | 大色综合色综合网站 | 极品尤物被啪到呻吟喷水 | 国产精品理论片在线观看 | 男人和女人高潮免费网站 | а天堂中文在线官网 | 亚洲国产精品一区二区美利坚 | 国产极品美女高潮无套在线观看 | 成人欧美一区二区三区黑人 | 久久熟妇人妻午夜寂寞影院 | 久久久久久久人妻无码中文字幕爆 | 亚洲一区二区三区国产精华液 | 国产口爆吞精在线视频 | 国产97色在线 | 免 | 亚洲精品中文字幕 | 日韩欧美中文字幕在线三区 | 骚片av蜜桃精品一区 | 一个人看的www免费视频在线观看 | 亚洲日韩中文字幕在线播放 | 精品欧洲av无码一区二区三区 | 熟女少妇人妻中文字幕 | 国产97人人超碰caoprom | 性色av无码免费一区二区三区 | 国产真人无遮挡作爱免费视频 | 无码人妻久久一区二区三区不卡 | 国产特级毛片aaaaaa高潮流水 | 精品国产一区二区三区四区在线看 | 日日天干夜夜狠狠爱 | 精品乱码久久久久久久 | 秋霞特色aa大片 | 国内少妇偷人精品视频 | 中文久久乱码一区二区 | 国内精品一区二区三区不卡 | 波多野结衣 黑人 | 欧美兽交xxxx×视频 | 日本熟妇人妻xxxxx人hd | 最近免费中文字幕中文高清百度 | 成人一在线视频日韩国产 | 精品国产乱码久久久久乱码 | 樱花草在线社区www | 精品偷自拍另类在线观看 | 亚洲中文字幕无码中文字在线 | 亚洲精品一区三区三区在线观看 | 鲁鲁鲁爽爽爽在线视频观看 | 377p欧洲日本亚洲大胆 | 香港三级日本三级妇三级 | av人摸人人人澡人人超碰下载 | 日本丰满熟妇videos | 国内老熟妇对白xxxxhd | 99久久人妻精品免费二区 | 呦交小u女精品视频 | 成人无码精品一区二区三区 | 我要看www免费看插插视频 | 精品一区二区三区波多野结衣 | 欧美亚洲日韩国产人成在线播放 | 六月丁香婷婷色狠狠久久 | 久久久久亚洲精品中文字幕 | 日本欧美一区二区三区乱码 | 国产激情精品一区二区三区 | 中文字幕无码热在线视频 | 四虎国产精品免费久久 | 全黄性性激高免费视频 | 国产一区二区三区四区五区加勒比 | 午夜免费福利小电影 | 又大又硬又黄的免费视频 | 黄网在线观看免费网站 | 老熟女乱子伦 | 曰韩无码二三区中文字幕 | 在线观看欧美一区二区三区 | 日本丰满熟妇videos | 久久精品人妻少妇一区二区三区 | 午夜精品久久久久久久久 | 97久久超碰中文字幕 | 无码人妻精品一区二区三区不卡 | 激情内射日本一区二区三区 | 亚洲精品一区二区三区四区五区 | 精品无码一区二区三区的天堂 | 精品一二三区久久aaa片 | 国产精品嫩草久久久久 | 精品一区二区不卡无码av | 亚欧洲精品在线视频免费观看 | 免费网站看v片在线18禁无码 | 日本丰满护士爆乳xxxx | 国产在线精品一区二区高清不卡 | 亚洲色大成网站www国产 | 欧美精品国产综合久久 | 欧美三级不卡在线观看 | 日本xxxx色视频在线观看免费 | 乱人伦人妻中文字幕无码 | 狠狠亚洲超碰狼人久久 | 国产99久久精品一区二区 | 夜精品a片一区二区三区无码白浆 | 亚洲中文字幕乱码av波多ji | 99精品国产综合久久久久五月天 | 伦伦影院午夜理论片 | 亚洲中文字幕无码中文字在线 | 国产婷婷色一区二区三区在线 | 无码一区二区三区在线观看 | 色爱情人网站 | 成熟女人特级毛片www免费 | 成人免费无码大片a毛片 | 在线观看免费人成视频 | 窝窝午夜理论片影院 | 在线看片无码永久免费视频 | 欧美精品免费观看二区 | 真人与拘做受免费视频一 | 国产成人一区二区三区别 | 亚洲成色在线综合网站 | 久久精品国产一区二区三区肥胖 | 日本在线高清不卡免费播放 | 国产精品.xx视频.xxtv | 成人免费视频一区二区 | 九月婷婷人人澡人人添人人爽 | 精品 日韩 国产 欧美 视频 | 野狼第一精品社区 | 人人爽人人澡人人高潮 | 亚洲国产成人av在线观看 | 成人毛片一区二区 | 亚洲热妇无码av在线播放 | 精品偷自拍另类在线观看 | 精品国产青草久久久久福利 | 黑森林福利视频导航 | 欧洲欧美人成视频在线 | 蜜桃视频插满18在线观看 | 成人av无码一区二区三区 | 午夜性刺激在线视频免费 | 国产香蕉尹人综合在线观看 | 国产午夜福利100集发布 | 少妇无码av无码专区在线观看 | 欧美日韩一区二区免费视频 | 亚洲国产精品一区二区第一页 | 国产又粗又硬又大爽黄老大爷视 | 丰满少妇人妻久久久久久 | 成人一区二区免费视频 | 午夜福利试看120秒体验区 | 奇米影视7777久久精品人人爽 | 欧美放荡的少妇 | 妺妺窝人体色www在线小说 | 国产激情综合五月久久 | 福利一区二区三区视频在线观看 | 色情久久久av熟女人妻网站 | 国产内射老熟女aaaa | 久久成人a毛片免费观看网站 | 欧美freesex黑人又粗又大 | 成人无码影片精品久久久 | 国产高潮视频在线观看 | √8天堂资源地址中文在线 | 日本一卡2卡3卡4卡无卡免费网站 国产一区二区三区影院 | 日韩人妻系列无码专区 | 强辱丰满人妻hd中文字幕 | 亚洲成av人在线观看网址 | 国产成人精品久久亚洲高清不卡 | 性啪啪chinese东北女人 | 精品一二三区久久aaa片 | 国产亚洲视频中文字幕97精品 | 全黄性性激高免费视频 | 亚洲欧美中文字幕5发布 | 精品一区二区不卡无码av | 无码帝国www无码专区色综合 | 99国产欧美久久久精品 | 中文字幕av伊人av无码av | 特黄特色大片免费播放器图片 | 人人妻人人澡人人爽欧美一区九九 | 波多野结衣一区二区三区av免费 | 国产特级毛片aaaaaa高潮流水 | 人妻少妇精品视频专区 | 久久久久久a亚洲欧洲av冫 | 欧美老熟妇乱xxxxx | 国产精品高潮呻吟av久久 | 国产成人亚洲综合无码 | 亚洲自偷自拍另类第1页 | av在线亚洲欧洲日产一区二区 | 网友自拍区视频精品 | 无码纯肉视频在线观看 | 日本又色又爽又黄的a片18禁 | 久久久久久国产精品无码下载 | 97夜夜澡人人爽人人喊中国片 | 又紧又大又爽精品一区二区 | 2019午夜福利不卡片在线 | av小次郎收藏 | 久久久久久久人妻无码中文字幕爆 | 国产99久久精品一区二区 | 日本熟妇大屁股人妻 | 国产精品久免费的黄网站 | 久久精品国产99精品亚洲 | 99久久精品午夜一区二区 | 2020久久超碰国产精品最新 | 久久精品中文字幕一区 | 亚洲一区二区三区在线观看网站 | 久久久国产一区二区三区 | 国产无遮挡又黄又爽又色 | 亚洲の无码国产の无码步美 | 国产人妻人伦精品 | 成人动漫在线观看 | 久久99精品久久久久久 | 人妻互换免费中文字幕 | 波多野结衣av在线观看 | av香港经典三级级 在线 | 久久综合狠狠综合久久综合88 | аⅴ资源天堂资源库在线 | 老熟女乱子伦 | 精品一二三区久久aaa片 | 国产区女主播在线观看 | 国产艳妇av在线观看果冻传媒 | 精品久久久无码人妻字幂 | 亚洲熟妇色xxxxx欧美老妇y | 亚洲第一无码av无码专区 | 欧美人与善在线com | 18黄暴禁片在线观看 | 牲交欧美兽交欧美 | 亚洲区欧美区综合区自拍区 | 女高中生第一次破苞av | 亚洲熟女一区二区三区 | 亚洲欧美日韩国产精品一区二区 | 国产又粗又硬又大爽黄老大爷视 | 天堂а√在线地址中文在线 | 久久99精品久久久久久动态图 | 国产乱子伦视频在线播放 | 色 综合 欧美 亚洲 国产 | 精品偷拍一区二区三区在线看 | 3d动漫精品啪啪一区二区中 | 麻豆成人精品国产免费 | 丰满岳乱妇在线观看中字无码 | 日韩精品乱码av一区二区 | 中文毛片无遮挡高清免费 | 亚洲国产精品毛片av不卡在线 | 网友自拍区视频精品 | 国产精品国产三级国产专播 | www国产亚洲精品久久网站 | 国产精品丝袜黑色高跟鞋 | 人妻少妇精品无码专区二区 | 伦伦影院午夜理论片 | 欧美日韩一区二区免费视频 | 性做久久久久久久久 | 又紧又大又爽精品一区二区 | 麻豆国产人妻欲求不满 | 欧美黑人乱大交 | 日欧一片内射va在线影院 | 欧美肥老太牲交大战 | 亚洲色无码一区二区三区 | 午夜精品久久久久久久久 | 乱码av麻豆丝袜熟女系列 | 欧美日韩一区二区三区自拍 | 永久免费精品精品永久-夜色 | 国产人妻精品一区二区三区 | 欧美精品国产综合久久 | 一个人免费观看的www视频 | 国产精品亚洲а∨无码播放麻豆 | 亚洲a无码综合a国产av中文 | 欧美日韩一区二区三区自拍 | 国产亚洲精品久久久久久 | 国产福利视频一区二区 | 装睡被陌生人摸出水好爽 | 亚洲色欲色欲欲www在线 | 中文字幕无码日韩欧毛 | 久久久精品456亚洲影院 | 少妇被粗大的猛进出69影院 | 中文字幕无码热在线视频 | 欧美自拍另类欧美综合图片区 | 国产午夜视频在线观看 | 国产黄在线观看免费观看不卡 | 强伦人妻一区二区三区视频18 | 亚洲热妇无码av在线播放 | 久久久久久a亚洲欧洲av冫 | 老熟女乱子伦 | 国产黑色丝袜在线播放 | 性欧美牲交在线视频 | 国产超碰人人爽人人做人人添 | 最近免费中文字幕中文高清百度 | 国产9 9在线 | 中文 | 亚洲无人区一区二区三区 | 中文字幕乱妇无码av在线 | 国产av剧情md精品麻豆 | 欧美激情内射喷水高潮 | 国精品人妻无码一区二区三区蜜柚 | 搡女人真爽免费视频大全 | 夜夜夜高潮夜夜爽夜夜爰爰 | 纯爱无遮挡h肉动漫在线播放 | 国产成人无码午夜视频在线观看 | 午夜肉伦伦影院 | 国产激情精品一区二区三区 | 久久久久久久久888 | 全黄性性激高免费视频 | 国产精品va在线观看无码 | 色诱久久久久综合网ywww | 久久精品国产日本波多野结衣 | 九一九色国产 | 亚洲成a人片在线观看无码3d | 人人澡人摸人人添 | 久久国产劲爆∧v内射 | 国产av一区二区三区最新精品 | 中文字幕无码热在线视频 | 日本丰满熟妇videos | 麻豆国产97在线 | 欧洲 | 亚洲一区二区三区香蕉 | 天天摸天天透天天添 | 精品一区二区不卡无码av | 国产乱人伦偷精品视频 | 日日鲁鲁鲁夜夜爽爽狠狠 | 99久久亚洲精品无码毛片 | 国产香蕉尹人综合在线观看 | 亚洲啪av永久无码精品放毛片 | 精品水蜜桃久久久久久久 | √天堂资源地址中文在线 | 男人和女人高潮免费网站 | 日韩av无码中文无码电影 | 俄罗斯老熟妇色xxxx | 久久精品国产一区二区三区肥胖 | 婷婷色婷婷开心五月四房播播 | 久久久久se色偷偷亚洲精品av | 麻豆av传媒蜜桃天美传媒 | 97色伦图片97综合影院 | 狠狠噜狠狠狠狠丁香五月 | 日本精品高清一区二区 | 亚欧洲精品在线视频免费观看 | 精品偷自拍另类在线观看 | 成人av无码一区二区三区 | 在线a亚洲视频播放在线观看 | 在线播放亚洲第一字幕 | 国产色在线 | 国产 | 99国产精品白浆在线观看免费 | 97人妻精品一区二区三区 | 麻豆国产人妻欲求不满谁演的 | 亚洲精品www久久久 | 波多野结衣一区二区三区av免费 | 男女超爽视频免费播放 | 欧美精品一区二区精品久久 | 亚洲精品一区二区三区在线观看 | 亚洲熟妇色xxxxx欧美老妇y | 中文字幕乱码亚洲无线三区 | 人妻尝试又大又粗久久 | 国产成人人人97超碰超爽8 | 青春草在线视频免费观看 | 国内少妇偷人精品视频 | 人人妻在人人 | 熟妇女人妻丰满少妇中文字幕 | 99久久精品无码一区二区毛片 | 久久熟妇人妻午夜寂寞影院 | 粉嫩少妇内射浓精videos | 玩弄人妻少妇500系列视频 | 中文字幕 亚洲精品 第1页 | 无遮无挡爽爽免费视频 | 亚洲精品鲁一鲁一区二区三区 | 亚洲中文字幕va福利 | 天下第一社区视频www日本 | 偷窥日本少妇撒尿chinese | 亚洲精品综合五月久久小说 | 亚洲一区二区三区香蕉 | 午夜精品一区二区三区在线观看 | 娇妻被黑人粗大高潮白浆 | 给我免费的视频在线观看 | 国产内射爽爽大片视频社区在线 | 中文精品久久久久人妻不卡 | 一区二区三区乱码在线 | 欧洲 | 午夜福利一区二区三区在线观看 | 欧美性生交xxxxx久久久 | 精品熟女少妇av免费观看 | 88国产精品欧美一区二区三区 | 丰满岳乱妇在线观看中字无码 | 强伦人妻一区二区三区视频18 | www国产亚洲精品久久久日本 | 无码国产乱人伦偷精品视频 | 欧洲熟妇精品视频 | 国产精品久久久久久久影院 | 欧美激情一区二区三区成人 | 99久久婷婷国产综合精品青草免费 | 久久亚洲中文字幕精品一区 | 亚洲大尺度无码无码专区 | 亚洲熟妇自偷自拍另类 | 精品亚洲成av人在线观看 | 亚洲精品久久久久avwww潮水 | 搡女人真爽免费视频大全 | 久久综合九色综合欧美狠狠 | 色综合久久中文娱乐网 | 国产精品沙发午睡系列 | 国产精品久久久久久亚洲影视内衣 | 久久综合九色综合欧美狠狠 | 极品尤物被啪到呻吟喷水 | 野狼第一精品社区 | 成年女人永久免费看片 | 亚洲 另类 在线 欧美 制服 | 99国产欧美久久久精品 | 国产激情一区二区三区 | 久久久精品成人免费观看 | 在线精品国产一区二区三区 | 人人妻人人澡人人爽人人精品 | 免费看男女做好爽好硬视频 | 狂野欧美性猛xxxx乱大交 | 国产欧美精品一区二区三区 | 欧美怡红院免费全部视频 | 蜜桃臀无码内射一区二区三区 | 蜜桃av蜜臀av色欲av麻 999久久久国产精品消防器材 | 精品无码成人片一区二区98 | 欧美人与物videos另类 | 少妇人妻av毛片在线看 | 久久99精品久久久久久动态图 | 九一九色国产 | 日韩精品无码一本二本三本色 | 天天拍夜夜添久久精品大 | 国产麻豆精品一区二区三区v视界 | 国内精品一区二区三区不卡 | 乱码午夜-极国产极内射 | 中文字幕无码视频专区 | 精品人妻人人做人人爽夜夜爽 | aⅴ在线视频男人的天堂 | 国产成人无码av片在线观看不卡 | 国内精品人妻无码久久久影院蜜桃 | 国产一精品一av一免费 | 日本一本二本三区免费 | 特大黑人娇小亚洲女 | 亚洲人成无码网www | 欧美乱妇无乱码大黄a片 | 国产黄在线观看免费观看不卡 | 人妻互换免费中文字幕 | 欧美人与牲动交xxxx | 午夜无码区在线观看 | 精品欧美一区二区三区久久久 | 亚洲中文无码av永久不收费 | 精品无码一区二区三区爱欲 | 精品亚洲成av人在线观看 | 无码乱肉视频免费大全合集 | 精品久久久久久人妻无码中文字幕 | 18精品久久久无码午夜福利 | 又大又硬又黄的免费视频 | 欧美黑人乱大交 | 久久综合九色综合97网 | 国产乱子伦视频在线播放 | 中文精品无码中文字幕无码专区 | 色欲av亚洲一区无码少妇 | 亚洲一区二区三区无码久久 | 久久99精品久久久久婷婷 | 国产三级精品三级男人的天堂 | 亚洲 欧美 激情 小说 另类 | 无码av岛国片在线播放 | 成熟妇人a片免费看网站 | 在线视频网站www色 | 国产精品国产自线拍免费软件 | 国产成人精品一区二区在线小狼 | 性色欲网站人妻丰满中文久久不卡 | 99国产欧美久久久精品 | 亚洲综合伊人久久大杳蕉 | 免费国产黄网站在线观看 | 久久久久久a亚洲欧洲av冫 | 久久99精品国产.久久久久 | 成人免费视频在线观看 | 久久国产精品二国产精品 | 国产乱人伦av在线无码 | 亚洲精品国偷拍自产在线麻豆 | 国产精品二区一区二区aⅴ污介绍 | 欧美国产日产一区二区 | 欧美日韩在线亚洲综合国产人 | 国产av无码专区亚洲awww | 99久久亚洲精品无码毛片 | 久久人人爽人人人人片 | 亚洲精品久久久久中文第一幕 | 天海翼激烈高潮到腰振不止 | 又紧又大又爽精品一区二区 | 中文无码精品a∨在线观看不卡 | 精品欧美一区二区三区久久久 | 亚洲一区二区三区在线观看网站 | 无套内谢老熟女 | 永久免费观看国产裸体美女 | 欧美性猛交内射兽交老熟妇 | 色综合视频一区二区三区 | 亚洲精品久久久久久久久久久 | 伊人久久大香线焦av综合影院 | 97se亚洲精品一区 | 18精品久久久无码午夜福利 | 欧美激情内射喷水高潮 | 98国产精品综合一区二区三区 | 日日摸夜夜摸狠狠摸婷婷 | 国产成人精品优优av | 国产精品久久久久久久9999 | 精品一区二区不卡无码av | 欧美三级不卡在线观看 | 丰满人妻精品国产99aⅴ | 在线精品亚洲一区二区 | 久久99精品久久久久久 | 成人aaa片一区国产精品 | 中文字幕无码免费久久99 | 国产一区二区不卡老阿姨 | 亚洲综合在线一区二区三区 | 玩弄中年熟妇正在播放 | 伊人久久大香线焦av综合影院 | 亚洲成av人综合在线观看 | 亚洲爆乳无码专区 | 老太婆性杂交欧美肥老太 | 狠狠色噜噜狠狠狠7777奇米 | 久久午夜无码鲁丝片秋霞 | ass日本丰满熟妇pics | 精品国偷自产在线视频 | 免费人成网站视频在线观看 | 国产亚洲视频中文字幕97精品 | 老熟女重囗味hdxx69 | 日日天日日夜日日摸 | 青青青爽视频在线观看 | 亚洲精品综合五月久久小说 | 牲交欧美兽交欧美 | 中文字幕人妻无码一区二区三区 | 欧美怡红院免费全部视频 | 色婷婷av一区二区三区之红樱桃 | 天堂亚洲免费视频 | 特大黑人娇小亚洲女 | 久久亚洲精品中文字幕无男同 | 久久亚洲精品成人无码 | 亚洲日韩av一区二区三区四区 | 亚洲成av人在线观看网址 | 欧美熟妇另类久久久久久不卡 | 亚洲国产精品久久久久久 | 久久久精品国产sm最大网站 | 亚洲国产综合无码一区 | 领导边摸边吃奶边做爽在线观看 | 性色av无码免费一区二区三区 | 久久综合网欧美色妞网 | 人人妻在人人 | 波多野结衣高清一区二区三区 | 天天做天天爱天天爽综合网 | 亚洲一区二区三区国产精华液 | 99久久婷婷国产综合精品青草免费 | 成人欧美一区二区三区黑人 | 日产国产精品亚洲系列 | 99久久人妻精品免费一区 | 人人妻人人澡人人爽欧美一区九九 | 国内精品久久毛片一区二区 | 未满小14洗澡无码视频网站 | 亚洲精品久久久久久一区二区 | 图片小说视频一区二区 | 欧美国产亚洲日韩在线二区 | 嫩b人妻精品一区二区三区 | 自拍偷自拍亚洲精品被多人伦好爽 | 亚洲中文字幕在线无码一区二区 | 无遮挡啪啪摇乳动态图 | 国产亚洲欧美在线专区 | 日欧一片内射va在线影院 | 在教室伦流澡到高潮hnp视频 | 日韩欧美群交p片內射中文 | 四虎国产精品一区二区 | 粗大的内捧猛烈进出视频 | 亚洲精品中文字幕久久久久 | 亚拍精品一区二区三区探花 | 日韩精品无码一区二区中文字幕 | 成人亚洲精品久久久久软件 | 欧美国产亚洲日韩在线二区 | 少妇性俱乐部纵欲狂欢电影 | 亚洲国产精品无码一区二区三区 | 夜先锋av资源网站 | www成人国产高清内射 | 国产无遮挡又黄又爽又色 | 在线成人www免费观看视频 | 黑人大群体交免费视频 | 色一情一乱一伦一视频免费看 | 日本欧美一区二区三区乱码 | 日韩人妻少妇一区二区三区 | 无码人妻黑人中文字幕 | 天堂а√在线地址中文在线 | 大色综合色综合网站 | 少妇邻居内射在线 | 鲁鲁鲁爽爽爽在线视频观看 | 午夜男女很黄的视频 | 思思久久99热只有频精品66 | 国产成人无码av在线影院 | 在线播放免费人成毛片乱码 | 色五月五月丁香亚洲综合网 | 国产精品久久久午夜夜伦鲁鲁 | 男女下面进入的视频免费午夜 | 亚洲精品中文字幕 | 乱人伦人妻中文字幕无码 | 国产国语老龄妇女a片 | 日日碰狠狠躁久久躁蜜桃 | 永久免费观看国产裸体美女 | 大乳丰满人妻中文字幕日本 | 日本一卡2卡3卡4卡无卡免费网站 国产一区二区三区影院 | 色 综合 欧美 亚洲 国产 | 国产精品高潮呻吟av久久 | 自拍偷自拍亚洲精品被多人伦好爽 | 亚洲成a人片在线观看无码3d | av无码电影一区二区三区 | 亚洲 高清 成人 动漫 | 亚洲区小说区激情区图片区 | 久久久精品欧美一区二区免费 | 熟妇人妻无乱码中文字幕 | 人妻无码αv中文字幕久久琪琪布 | 国产又爽又猛又粗的视频a片 | 欧美zoozzooz性欧美 | 一个人免费观看的www视频 | 亚洲国产精品一区二区美利坚 | 国产尤物精品视频 | 激情人妻另类人妻伦 | 亚洲理论电影在线观看 | 国产精品第一区揄拍无码 | 天天躁夜夜躁狠狠是什么心态 | 精品成在人线av无码免费看 | 色一情一乱一伦一区二区三欧美 | 免费网站看v片在线18禁无码 | av人摸人人人澡人人超碰下载 | 国内揄拍国内精品少妇国语 | 成人无码精品一区二区三区 | 青青青手机频在线观看 | 欧美日韩人成综合在线播放 | 久久精品无码一区二区三区 | 国产精品久久久久无码av色戒 | 欧洲欧美人成视频在线 | 亚洲乱码中文字幕在线 | 色一情一乱一伦一视频免费看 | 日本乱偷人妻中文字幕 | 正在播放老肥熟妇露脸 | 3d动漫精品啪啪一区二区中 | 亚洲精品久久久久久一区二区 | 国产午夜福利亚洲第一 | av无码久久久久不卡免费网站 | 国产一区二区三区精品视频 | 国产精品久久久午夜夜伦鲁鲁 | 亚拍精品一区二区三区探花 | 全黄性性激高免费视频 | 精品无码一区二区三区爱欲 | 亚洲一区二区三区国产精华液 | 亚洲精品久久久久久久久久久 | 在线a亚洲视频播放在线观看 | 中文字幕乱妇无码av在线 | 亚洲国产欧美在线成人 | 日日橹狠狠爱欧美视频 | 人人妻人人澡人人爽欧美精品 | 精品无码av一区二区三区 | 亚洲精品成人福利网站 | 理论片87福利理论电影 | 欧美高清在线精品一区 | 欧美真人作爱免费视频 | 亚洲 高清 成人 动漫 | 无码任你躁久久久久久久 | 强开小婷嫩苞又嫩又紧视频 | 久久五月精品中文字幕 | 国产精品自产拍在线观看 | 国产成人一区二区三区在线观看 | 亚洲国产av精品一区二区蜜芽 | 无遮挡啪啪摇乳动态图 | 人人妻人人澡人人爽人人精品浪潮 | 永久免费观看美女裸体的网站 | 亚洲色欲久久久综合网东京热 | 国产精品第一国产精品 | 亚洲精品美女久久久久久久 | 亚洲精品成人福利网站 | yw尤物av无码国产在线观看 | 久久精品国产一区二区三区肥胖 | 老司机亚洲精品影院无码 | 日韩人妻少妇一区二区三区 | 久久久婷婷五月亚洲97号色 | 国内少妇偷人精品视频 | 日产精品99久久久久久 | 亚洲va欧美va天堂v国产综合 | 中文毛片无遮挡高清免费 | 日本又色又爽又黄的a片18禁 | 欧美老熟妇乱xxxxx | 国产成人精品无码播放 | 亚洲综合无码久久精品综合 | 欧美变态另类xxxx | 中国女人内谢69xxxx | 国产黄在线观看免费观看不卡 | 久久五月精品中文字幕 | 国产艳妇av在线观看果冻传媒 | 欧美日韩在线亚洲综合国产人 | 欧美放荡的少妇 | 亚洲码国产精品高潮在线 | 强辱丰满人妻hd中文字幕 | 亚洲日韩av一区二区三区四区 | 日韩亚洲欧美精品综合 | 奇米影视7777久久精品 | 欧美黑人性暴力猛交喷水 | 国产午夜亚洲精品不卡下载 | 午夜嘿嘿嘿影院 | 亚洲精品一区二区三区在线观看 | 中文无码伦av中文字幕 | 精品偷自拍另类在线观看 | 欧美zoozzooz性欧美 | 亚洲成av人影院在线观看 | 亚洲国精产品一二二线 | 国产成人亚洲综合无码 | 国产xxx69麻豆国语对白 | 无码纯肉视频在线观看 | 国产国语老龄妇女a片 | 亚洲成a人一区二区三区 | 国产精品久久久久久久影院 | 亚洲精品一区二区三区四区五区 | 老熟妇乱子伦牲交视频 | 好男人www社区 | 亚洲色www成人永久网址 | 在线观看国产午夜福利片 | 亚洲日韩一区二区三区 | 国产成人亚洲综合无码 | 天天拍夜夜添久久精品大 | 亚洲综合在线一区二区三区 | 欧美成人免费全部网站 | 狂野欧美性猛交免费视频 | 亚洲人亚洲人成电影网站色 | 久久久久亚洲精品男人的天堂 | 成人免费无码大片a毛片 | 中文字幕人妻无码一区二区三区 | 少妇无码一区二区二三区 | 性欧美牲交在线视频 | 两性色午夜视频免费播放 | 乱中年女人伦av三区 | 99久久人妻精品免费二区 | 97精品人妻一区二区三区香蕉 | 亚洲爆乳精品无码一区二区三区 | 色婷婷欧美在线播放内射 | 亚洲无人区午夜福利码高清完整版 | 内射欧美老妇wbb | 中文精品久久久久人妻不卡 | 国产亚洲欧美日韩亚洲中文色 | 男女猛烈xx00免费视频试看 | 久久五月精品中文字幕 | 欧美 丝袜 自拍 制服 另类 | 欧美性生交xxxxx久久久 | 中文字幕乱码中文乱码51精品 | 国产免费观看黄av片 | 中文字幕乱码亚洲无线三区 | 麻豆国产人妻欲求不满谁演的 | 亚洲一区二区三区偷拍女厕 | 鲁鲁鲁爽爽爽在线视频观看 | 欧美zoozzooz性欧美 | 人人妻人人藻人人爽欧美一区 | 成人欧美一区二区三区黑人免费 | 精品夜夜澡人妻无码av蜜桃 | 国内综合精品午夜久久资源 | 国产午夜视频在线观看 | 亚洲欧美日韩国产精品一区二区 | 亚洲国产精品毛片av不卡在线 | 国产精品无套呻吟在线 | 日本肉体xxxx裸交 | 国产免费无码一区二区视频 | 国产疯狂伦交大片 | 精品一区二区三区波多野结衣 | 麻豆md0077饥渴少妇 | 日本熟妇人妻xxxxx人hd | 久久精品99久久香蕉国产色戒 | 中文字幕+乱码+中文字幕一区 | 在线看片无码永久免费视频 | 日本欧美一区二区三区乱码 | 国产黄在线观看免费观看不卡 | 又大又黄又粗又爽的免费视频 | 久久 国产 尿 小便 嘘嘘 | 国产一区二区三区影院 | 久久久久亚洲精品男人的天堂 | 国产精品爱久久久久久久 | 国产女主播喷水视频在线观看 | 在线视频网站www色 | 久久综合狠狠综合久久综合88 | 1000部夫妻午夜免费 | 成 人 免费观看网站 | 国产人妻精品一区二区三区 | 少妇邻居内射在线 | 国产乱人伦app精品久久 国产在线无码精品电影网 国产国产精品人在线视 | 4hu四虎永久在线观看 | 无遮挡啪啪摇乳动态图 | 日本大乳高潮视频在线观看 | 性生交大片免费看女人按摩摩 | 成人试看120秒体验区 | 人妻天天爽夜夜爽一区二区 | 大胆欧美熟妇xx | 亚洲精品中文字幕乱码 | 久久国产精品_国产精品 | 亚洲男人av香蕉爽爽爽爽 | 日本精品高清一区二区 | 沈阳熟女露脸对白视频 | 日日躁夜夜躁狠狠躁 | 亚洲国产精品美女久久久久 | 免费中文字幕日韩欧美 | 亚洲综合色区中文字幕 | 国产在线精品一区二区高清不卡 | 中文字幕乱妇无码av在线 | 俺去俺来也www色官网 | 久久综合九色综合欧美狠狠 | 在线精品国产一区二区三区 | 乱人伦中文视频在线观看 | 久久精品国产大片免费观看 | 正在播放老肥熟妇露脸 | 三级4级全黄60分钟 | 中文无码精品a∨在线观看不卡 | 免费观看激色视频网站 | 无码国模国产在线观看 | 精品乱码久久久久久久 | 国产黑色丝袜在线播放 | 国产性生交xxxxx无码 | 欧美成人午夜精品久久久 | 久久精品中文字幕一区 | 久久综合网欧美色妞网 | 色五月五月丁香亚洲综合网 | 国产精品无码成人午夜电影 | 又大又紧又粉嫩18p少妇 | 欧美乱妇无乱码大黄a片 | 国产在线一区二区三区四区五区 | www国产亚洲精品久久久日本 | 国产精品久久久久久亚洲影视内衣 | 国产精品成人av在线观看 | 亚洲人成无码网www | 55夜色66夜色国产精品视频 | 少妇性l交大片欧洲热妇乱xxx | 精品aⅴ一区二区三区 | 久久精品国产99久久6动漫 | 亚洲另类伦春色综合小说 | 欧美激情综合亚洲一二区 | aⅴ亚洲 日韩 色 图网站 播放 | 一个人看的www免费视频在线观看 | 国产三级精品三级男人的天堂 | 无码精品国产va在线观看dvd | 亚洲中文字幕无码中文字在线 | 中文字幕人妻丝袜二区 | 国产精品久久久久久亚洲毛片 | 亚洲精品一区三区三区在线观看 | 午夜福利一区二区三区在线观看 | 亚洲区小说区激情区图片区 | 玩弄中年熟妇正在播放 | 夜夜高潮次次欢爽av女 | 综合网日日天干夜夜久久 | а√天堂www在线天堂小说 | 在线观看国产午夜福利片 | 在线播放免费人成毛片乱码 | 水蜜桃亚洲一二三四在线 | 久久久www成人免费毛片 | 国产精品对白交换视频 | 久久无码中文字幕免费影院蜜桃 | 荫蒂被男人添的好舒服爽免费视频 | 欧美人与善在线com | 中文字幕乱码亚洲无线三区 | 亚洲色欲久久久综合网东京热 | 无遮挡国产高潮视频免费观看 | 欧美人与物videos另类 | 国产亚洲欧美在线专区 | 国产激情无码一区二区 | 国产精品毛多多水多 | 亚洲精品欧美二区三区中文字幕 | 国产亚洲精品久久久久久大师 | 中文字幕av伊人av无码av | 亚洲 a v无 码免 费 成 人 a v | 久久久久久久久888 | 色欲久久久天天天综合网精品 | 色一情一乱一伦 | 成人免费视频在线观看 | 成人欧美一区二区三区黑人 | 国产乱人偷精品人妻a片 | 日韩人妻无码中文字幕视频 | 无码人妻av免费一区二区三区 | 亚洲理论电影在线观看 | 综合激情五月综合激情五月激情1 | 综合激情五月综合激情五月激情1 | 人妻夜夜爽天天爽三区 | 日本一本二本三区免费 | 天海翼激烈高潮到腰振不止 | 漂亮人妻洗澡被公强 日日躁 | 精品aⅴ一区二区三区 | 天堂а√在线地址中文在线 | 99国产精品白浆在线观看免费 | 亚洲中文字幕av在天堂 | 国产两女互慰高潮视频在线观看 | 成人影院yy111111在线观看 | 又色又爽又黄的美女裸体网站 | 最新国产乱人伦偷精品免费网站 | 2019nv天堂香蕉在线观看 | 国产人妻精品一区二区三区不卡 | 国产精品久久久av久久久 | 国产精品久久久一区二区三区 | 亚洲狠狠色丁香婷婷综合 | 人妻人人添人妻人人爱 | 日日麻批免费40分钟无码 | 黑人大群体交免费视频 | 国产黑色丝袜在线播放 | 漂亮人妻洗澡被公强 日日躁 | 国产av无码专区亚洲awww | 在线视频网站www色 | 欧美国产日韩亚洲中文 | 日韩成人一区二区三区在线观看 | 亚洲中文字幕乱码av波多ji | 精品无码av一区二区三区 | 熟女少妇在线视频播放 | 丰满肥臀大屁股熟妇激情视频 | 377p欧洲日本亚洲大胆 | 国产综合久久久久鬼色 | 亚洲一区二区三区在线观看网站 | 日本成熟视频免费视频 | 成人无码影片精品久久久 | 九九在线中文字幕无码 | 波多野结衣乳巨码无在线观看 | 国产莉萝无码av在线播放 | 日韩av激情在线观看 | 亚洲国产精品久久久天堂 | 日本肉体xxxx裸交 | 男女作爱免费网站 | 亚洲日韩av片在线观看 | 国产手机在线αⅴ片无码观看 | 国产精品对白交换视频 | 俄罗斯老熟妇色xxxx | 国产亚洲精品久久久闺蜜 | 青草视频在线播放 | 欧美老妇与禽交 | 欧美日韩在线亚洲综合国产人 | 色窝窝无码一区二区三区色欲 | 中文字幕无码av波多野吉衣 | 欧美猛少妇色xxxxx | 亚洲精品国产精品乱码视色 | 性欧美熟妇videofreesex | 色欲av亚洲一区无码少妇 | 国产午夜精品一区二区三区嫩草 | 亚洲午夜久久久影院 | 美女黄网站人色视频免费国产 | 久久午夜无码鲁丝片午夜精品 | 国产精品久久久久7777 | 麻豆果冻传媒2021精品传媒一区下载 | 国产亚洲精品久久久闺蜜 | 波多野结衣乳巨码无在线观看 | 领导边摸边吃奶边做爽在线观看 | 牲欲强的熟妇农村老妇女 | а天堂中文在线官网 | 国内精品人妻无码久久久影院蜜桃 | 国产精品亚洲а∨无码播放麻豆 | 奇米影视7777久久精品 | 大色综合色综合网站 | 亚洲男人av香蕉爽爽爽爽 | 国产乱子伦视频在线播放 | 激情内射亚州一区二区三区爱妻 | 亚洲国产精华液网站w | 福利一区二区三区视频在线观看 | 亚洲色大成网站www | 日本熟妇人妻xxxxx人hd | 欧美阿v高清资源不卡在线播放 | 国产精品高潮呻吟av久久 | 免费播放一区二区三区 | 老子影院午夜精品无码 | 啦啦啦www在线观看免费视频 | 亚洲日韩一区二区 | www国产亚洲精品久久网站 | 成人无码精品1区2区3区免费看 | 国产精品igao视频网 | 成人精品视频一区二区 | 日韩人妻无码中文字幕视频 | 亚洲精品无码人妻无码 | 夜夜躁日日躁狠狠久久av | av无码久久久久不卡免费网站 | 亚洲一区二区三区国产精华液 | 领导边摸边吃奶边做爽在线观看 | 国产亚洲精品精品国产亚洲综合 | 丰满少妇弄高潮了www | 网友自拍区视频精品 | 久久国产自偷自偷免费一区调 | 亚洲中文字幕乱码av波多ji | 少妇性荡欲午夜性开放视频剧场 | 亚洲精品一区二区三区在线 | 性欧美videos高清精品 | 久久国产精品萌白酱免费 | 黑人玩弄人妻中文在线 | 日本成熟视频免费视频 | 国产热a欧美热a在线视频 | 永久免费观看国产裸体美女 | 日韩精品乱码av一区二区 | 中文无码伦av中文字幕 | 少妇厨房愉情理9仑片视频 | 国产精品视频免费播放 | 国产精品久久国产三级国 | 国产成人综合美国十次 | 欧美老熟妇乱xxxxx | 亚洲人成网站免费播放 | а√天堂www在线天堂小说 | 免费观看的无遮挡av | 亚无码乱人伦一区二区 | аⅴ资源天堂资源库在线 | 日韩人妻无码中文字幕视频 | 天天拍夜夜添久久精品 | 国产在线一区二区三区四区五区 | 亚洲国产精品无码久久久久高潮 | 四虎永久在线精品免费网址 | 欧美国产亚洲日韩在线二区 | 熟妇人妻激情偷爽文 | 亚洲日韩中文字幕在线播放 | 天天躁夜夜躁狠狠是什么心态 | 国产激情无码一区二区app | 无码av中文字幕免费放 | 国产成人精品一区二区在线小狼 | 亚洲色偷偷男人的天堂 | 国产免费观看黄av片 | 亚洲欧美中文字幕5发布 | 亚洲午夜无码久久 | 麻豆国产人妻欲求不满 | 娇妻被黑人粗大高潮白浆 | 男女性色大片免费网站 | 亚洲精品一区二区三区四区五区 | 人人爽人人爽人人片av亚洲 | 最新国产乱人伦偷精品免费网站 | a片免费视频在线观看 | 亚洲欧美精品aaaaaa片 | 亚洲の无码国产の无码影院 | 久久久精品人妻久久影视 | 玩弄人妻少妇500系列视频 | 亚洲人成无码网www | 成人性做爰aaa片免费看 | 成熟妇人a片免费看网站 | 少女韩国电视剧在线观看完整 | 扒开双腿疯狂进出爽爽爽视频 | 国产成人人人97超碰超爽8 | 久久久久亚洲精品中文字幕 | 牲交欧美兽交欧美 | 欧美高清在线精品一区 | 中文字幕无线码免费人妻 | 日韩av激情在线观看 | 亚洲色在线无码国产精品不卡 | 狂野欧美性猛xxxx乱大交 | 国产精品亚洲专区无码不卡 | 午夜福利一区二区三区在线观看 | 性做久久久久久久免费看 | 一本久久伊人热热精品中文字幕 | 国产精品久久久久久亚洲毛片 | 色老头在线一区二区三区 | 少妇性l交大片欧洲热妇乱xxx | 亚洲精品无码人妻无码 | 成人毛片一区二区 | 国产人妻精品一区二区三区 | 国产无av码在线观看 | 精品欧洲av无码一区二区三区 | 青青青手机频在线观看 | 综合人妻久久一区二区精品 | 丝袜足控一区二区三区 | 亚洲熟妇色xxxxx亚洲 | 纯爱无遮挡h肉动漫在线播放 | 亚洲色大成网站www | 永久免费观看美女裸体的网站 | 又紧又大又爽精品一区二区 | 在线看片无码永久免费视频 | 午夜福利不卡在线视频 | 乌克兰少妇性做爰 | 欧美日本精品一区二区三区 | 免费播放一区二区三区 | 人妻少妇精品无码专区动漫 | 欧美三级不卡在线观看 | 天下第一社区视频www日本 | 蜜桃视频插满18在线观看 | 亚洲精品一区三区三区在线观看 | 天堂久久天堂av色综合 | 亚洲中文字幕乱码av波多ji | 国产另类ts人妖一区二区 | 强辱丰满人妻hd中文字幕 | 丰满人妻被黑人猛烈进入 | 久久午夜无码鲁丝片 | aa片在线观看视频在线播放 | 亚洲一区二区三区在线观看网站 | 99视频精品全部免费免费观看 | 美女黄网站人色视频免费国产 | 中文字幕亚洲情99在线 | 内射爽无广熟女亚洲 | 在线a亚洲视频播放在线观看 | 国产后入清纯学生妹 | 欧美变态另类xxxx | 亚洲色欲色欲天天天www | 中文字幕av日韩精品一区二区 | 国产办公室秘书无码精品99 | 人人澡人摸人人添 | 日本va欧美va欧美va精品 | 精品无人区无码乱码毛片国产 | 无码吃奶揉捏奶头高潮视频 | 美女张开腿让人桶 | 亚洲一区二区三区在线观看网站 | 欧美亚洲国产一区二区三区 | 国内精品久久久久久中文字幕 | 国产日产欧产精品精品app | 99riav国产精品视频 | 动漫av网站免费观看 | a片免费视频在线观看 | 亚洲色www成人永久网址 | 天天躁夜夜躁狠狠是什么心态 | 无码av岛国片在线播放 | 婷婷丁香六月激情综合啪 | 亚洲精品久久久久avwww潮水 | 一二三四在线观看免费视频 | 国产激情无码一区二区 | 亚洲综合伊人久久大杳蕉 | 暴力强奷在线播放无码 | 国产精品永久免费视频 | 精品人人妻人人澡人人爽人人 | 亚洲中文字幕在线观看 | 狠狠躁日日躁夜夜躁2020 | 极品尤物被啪到呻吟喷水 | 亚洲人成无码网www | 亚洲成色在线综合网站 | 东京无码熟妇人妻av在线网址 | 国产色在线 | 国产 | 婷婷丁香六月激情综合啪 | 久久精品人人做人人综合试看 | 国产片av国语在线观看 | 免费国产成人高清在线观看网站 | 久久久久成人精品免费播放动漫 | 成人av无码一区二区三区 | 一本加勒比波多野结衣 | 国产美女极度色诱视频www | 日日摸天天摸爽爽狠狠97 | 国产成人无码av片在线观看不卡 | 国产精品高潮呻吟av久久4虎 | 亚洲精品一区二区三区在线观看 | 国产片av国语在线观看 | 蜜桃视频插满18在线观看 | 日韩视频 中文字幕 视频一区 | 人妻少妇精品视频专区 | 国产在线一区二区三区四区五区 | 伊人久久大香线蕉亚洲 | 午夜福利试看120秒体验区 | 国产麻豆精品精东影业av网站 | 午夜丰满少妇性开放视频 | 久久久久久久久蜜桃 | 国产日产欧产精品精品app | 亚洲精品综合一区二区三区在线 | 欧美 日韩 人妻 高清 中文 | 又大又硬又爽免费视频 | 大肉大捧一进一出视频出来呀 | 国产精品理论片在线观看 | 亚洲 高清 成人 动漫 | 国内精品一区二区三区不卡 | 亚无码乱人伦一区二区 | 精品无码国产自产拍在线观看蜜 | 麻豆果冻传媒2021精品传媒一区下载 | av人摸人人人澡人人超碰下载 | 成人精品一区二区三区中文字幕 | 国产 浪潮av性色四虎 | 国产精品va在线播放 | 亚洲无人区午夜福利码高清完整版 | 欧美国产日韩久久mv | 97无码免费人妻超级碰碰夜夜 | a在线亚洲男人的天堂 | 久久 国产 尿 小便 嘘嘘 | 成人精品天堂一区二区三区 | 青青青爽视频在线观看 | 中文字幕色婷婷在线视频 | 在线播放免费人成毛片乱码 | 丰满少妇高潮惨叫视频 | 国产av一区二区精品久久凹凸 | 免费看少妇作爱视频 | 亚洲另类伦春色综合小说 | 夜夜高潮次次欢爽av女 | 偷窥日本少妇撒尿chinese | 欧洲精品码一区二区三区免费看 | 国产 浪潮av性色四虎 | 丝袜人妻一区二区三区 | 国产suv精品一区二区五 | 久久久久久国产精品无码下载 | 夜先锋av资源网站 | 精品少妇爆乳无码av无码专区 | 四虎永久在线精品免费网址 | 亚洲中文字幕在线无码一区二区 | 丝袜 中出 制服 人妻 美腿 | 无码av岛国片在线播放 | 免费观看又污又黄的网站 | 亚洲午夜福利在线观看 | 国产成人精品视频ⅴa片软件竹菊 | 国精产品一品二品国精品69xx | 97无码免费人妻超级碰碰夜夜 | 亚洲中文无码av永久不收费 | 欧美成人高清在线播放 | 天堂在线观看www | 色综合天天综合狠狠爱 | 小sao货水好多真紧h无码视频 | 蜜桃视频韩日免费播放 | 无码国模国产在线观看 | 成人试看120秒体验区 | 午夜精品久久久久久久久 | 人人妻人人藻人人爽欧美一区 | 18无码粉嫩小泬无套在线观看 | 人人爽人人爽人人片av亚洲 | 网友自拍区视频精品 | 免费国产黄网站在线观看 | 国产精品久久久 | 98国产精品综合一区二区三区 | 一本久久a久久精品vr综合 | 国产女主播喷水视频在线观看 | 国产精品无码一区二区桃花视频 | 国产精品a成v人在线播放 | 亚洲啪av永久无码精品放毛片 | 久久久精品成人免费观看 | 最新版天堂资源中文官网 | 天下第一社区视频www日本 | 2020久久超碰国产精品最新 | 九九综合va免费看 | 国产精品欧美成人 | 久久综合狠狠综合久久综合88 | 东京无码熟妇人妻av在线网址 | 色婷婷综合激情综在线播放 | 粉嫩少妇内射浓精videos | 亚洲精品综合一区二区三区在线 | 欧美肥老太牲交大战 | 中文字幕无码视频专区 | 玩弄少妇高潮ⅹxxxyw | 两性色午夜免费视频 | 国产乱人偷精品人妻a片 | av无码久久久久不卡免费网站 | 东京无码熟妇人妻av在线网址 | 精品午夜福利在线观看 | 少妇性荡欲午夜性开放视频剧场 | 久久精品视频在线看15 | 国产高清av在线播放 | 亚洲日韩av片在线观看 | 日韩精品无码一本二本三本色 | 免费人成在线视频无码 | 亚洲精品成a人在线观看 | 国产凸凹视频一区二区 | 午夜福利电影 | 一本加勒比波多野结衣 | 国产内射老熟女aaaa | 亚洲一区二区三区无码久久 | 乱人伦人妻中文字幕无码久久网 | 日韩精品a片一区二区三区妖精 | 福利一区二区三区视频在线观看 | 欧美xxxxx精品 | 久久久精品欧美一区二区免费 | 午夜精品一区二区三区在线观看 | 99riav国产精品视频 | 成人免费视频在线观看 | 日韩欧美群交p片內射中文 | 亚洲中文字幕久久无码 | 午夜理论片yy44880影院 | 国产艳妇av在线观看果冻传媒 | 精品偷拍一区二区三区在线看 | 成人无码视频在线观看网站 | 男女下面进入的视频免费午夜 | 国内精品九九久久久精品 | 欧美35页视频在线观看 | 欧美变态另类xxxx | 亚无码乱人伦一区二区 | 67194成是人免费无码 | 国产精品久久久久久亚洲毛片 | 人妻无码久久精品人妻 | 露脸叫床粗话东北少妇 | 国产激情精品一区二区三区 | 亚洲精品综合五月久久小说 | 欧美freesex黑人又粗又大 | 麻豆国产人妻欲求不满 | 少妇无套内谢久久久久 | 亚洲 a v无 码免 费 成 人 a v | 欧美亚洲日韩国产人成在线播放 | 国产人妻人伦精品1国产丝袜 | 人人爽人人澡人人高潮 | 亚洲最大成人网站 | 亚洲精品国产品国语在线观看 | 捆绑白丝粉色jk震动捧喷白浆 | 国产一区二区三区影院 | 精品无码一区二区三区爱欲 | 沈阳熟女露脸对白视频 | 无码国模国产在线观看 | 又紧又大又爽精品一区二区 | 国产精品国产自线拍免费软件 | 六十路熟妇乱子伦 | 无码福利日韩神码福利片 | 精品无人国产偷自产在线 | 国产绳艺sm调教室论坛 | 熟女俱乐部五十路六十路av | 性生交大片免费看女人按摩摩 | aa片在线观看视频在线播放 | 亚洲精品综合一区二区三区在线 | 国语精品一区二区三区 | 午夜男女很黄的视频 | 国产亚洲精品久久久久久久 | 久久无码专区国产精品s | 久久精品中文字幕大胸 | 国产精品高潮呻吟av久久 | 在线播放亚洲第一字幕 | 国产97在线 | 亚洲 | 国产精品永久免费视频 | 国产精品无套呻吟在线 | 亚洲一区二区三区 | 精品国产福利一区二区 | 精品国产aⅴ无码一区二区 | 麻豆国产人妻欲求不满谁演的 | 国产在线无码精品电影网 | 精品国产一区二区三区四区 | 精品久久久久香蕉网 | 无遮挡国产高潮视频免费观看 | 国产亚洲精品久久久闺蜜 | 男女猛烈xx00免费视频试看 | 欧美丰满老熟妇xxxxx性 | 国产精品对白交换视频 | 老子影院午夜伦不卡 | 一本大道久久东京热无码av | 国产精品亚洲а∨无码播放麻豆 | аⅴ资源天堂资源库在线 | 亚拍精品一区二区三区探花 | 高潮毛片无遮挡高清免费 | 曰本女人与公拘交酡免费视频 | 少妇激情av一区二区 | 中文无码精品a∨在线观看不卡 | 丰满人妻被黑人猛烈进入 | 一本大道伊人av久久综合 | 国产亚洲日韩欧美另类第八页 | 领导边摸边吃奶边做爽在线观看 | 秋霞成人午夜鲁丝一区二区三区 | 亚洲成av人影院在线观看 | 天堂无码人妻精品一区二区三区 | 国产免费观看黄av片 | 国产精品二区一区二区aⅴ污介绍 | 亚洲区欧美区综合区自拍区 | 亚洲色成人中文字幕网站 | 久久精品人人做人人综合试看 | 综合激情五月综合激情五月激情1 | 亚洲国产精品成人久久蜜臀 | 最近中文2019字幕第二页 | 国产特级毛片aaaaaaa高清 | 亚洲欧美日韩成人高清在线一区 | 国产人妻人伦精品1国产丝袜 | 内射后入在线观看一区 | 国产农村妇女高潮大叫 | 一区二区三区高清视频一 | 三级4级全黄60分钟 | 色综合久久久无码中文字幕 | 久久亚洲日韩精品一区二区三区 | 正在播放老肥熟妇露脸 | 色婷婷欧美在线播放内射 | 内射爽无广熟女亚洲 | 国产一精品一av一免费 | 色综合久久中文娱乐网 | 国产亚洲tv在线观看 | 中文字幕人成乱码熟女app | 国产特级毛片aaaaaa高潮流水 | 天堂一区人妻无码 | 又大又黄又粗又爽的免费视频 | 亚洲综合在线一区二区三区 | 丰满诱人的人妻3 | 无码人妻精品一区二区三区不卡 | 亚洲国产精品无码久久久久高潮 | 亚洲成a人片在线观看日本 | 免费人成网站视频在线观看 | 疯狂三人交性欧美 | 国产成人久久精品流白浆 | 18无码粉嫩小泬无套在线观看 | 成人亚洲精品久久久久软件 | 色综合久久久无码网中文 | 欧美日本免费一区二区三区 | 狠狠色色综合网站 | 99精品视频在线观看免费 | 一本久久伊人热热精品中文字幕 | 成人试看120秒体验区 | 7777奇米四色成人眼影 | 中文字幕无线码 | 欧美老熟妇乱xxxxx | 国产性生大片免费观看性 | 国产成人无码av在线影院 | 性欧美videos高清精品 | 欧美高清在线精品一区 | 国产欧美熟妇另类久久久 | 老熟妇乱子伦牲交视频 | 麻豆成人精品国产免费 | 日韩精品无码一区二区中文字幕 | 一本久久a久久精品亚洲 | 久久国产劲爆∧v内射 | 帮老师解开蕾丝奶罩吸乳网站 | 乱人伦人妻中文字幕无码久久网 | 中文字幕乱妇无码av在线 | 国产午夜无码精品免费看 | 久久亚洲中文字幕无码 | 牛和人交xxxx欧美 | 精品人妻人人做人人爽夜夜爽 | 亚洲欧美日韩国产精品一区二区 | 少妇人妻大乳在线视频 | 99国产精品白浆在线观看免费 | 欧美日韩久久久精品a片 | 99精品视频在线观看免费 | 99视频精品全部免费免费观看 | 日本丰满护士爆乳xxxx | 亚洲精品欧美二区三区中文字幕 | 骚片av蜜桃精品一区 | 一本无码人妻在中文字幕免费 | 日韩人妻无码一区二区三区久久99 | 天堂亚洲2017在线观看 | 国产精品久久福利网站 | 国语精品一区二区三区 | 麻豆av传媒蜜桃天美传媒 | 老子影院午夜伦不卡 | 欧美喷潮久久久xxxxx | 偷窥日本少妇撒尿chinese | 97人妻精品一区二区三区 | 国产成人无码一二三区视频 | 久久综合网欧美色妞网 | 丰满护士巨好爽好大乳 | 国产免费久久精品国产传媒 | 中文字幕无码人妻少妇免费 | 日本欧美一区二区三区乱码 | 天天摸天天透天天添 | 精品乱子伦一区二区三区 | 在线观看国产一区二区三区 | 黑森林福利视频导航 | 亚洲男女内射在线播放 | 亚洲日本在线电影 | 日本又色又爽又黄的a片18禁 | 国语精品一区二区三区 | 图片小说视频一区二区 | 国产成人无码午夜视频在线观看 | 亚洲中文字幕在线无码一区二区 | 久久久久国色av免费观看性色 | 国产suv精品一区二区五 | 亚洲国产高清在线观看视频 | 日本xxxx色视频在线观看免费 | 国精品人妻无码一区二区三区蜜柚 | 久久99精品国产麻豆 | 理论片87福利理论电影 | 婷婷色婷婷开心五月四房播播 | 久久久精品人妻久久影视 | 图片小说视频一区二区 | 亚洲中文字幕av在天堂 | 国产在线精品一区二区三区直播 | 国产精品亚洲lv粉色 | 熟女俱乐部五十路六十路av | 亚洲一区二区三区含羞草 | 人人澡人人透人人爽 | 日本熟妇乱子伦xxxx | 亚洲熟悉妇女xxx妇女av | 牛和人交xxxx欧美 | 无码av岛国片在线播放 | 国产真实夫妇视频 | 国产乱人伦app精品久久 国产在线无码精品电影网 国产国产精品人在线视 | 亚洲乱码日产精品bd | 亚洲综合无码一区二区三区 | 日本大香伊一区二区三区 | 中文字幕乱码人妻二区三区 | 中文字幕乱码人妻二区三区 | 国产精品视频免费播放 | 无码任你躁久久久久久久 | 亚洲综合无码一区二区三区 | 亚洲综合无码一区二区三区 | 精品夜夜澡人妻无码av蜜桃 | 亚洲狠狠色丁香婷婷综合 | 国产成人亚洲综合无码 | 国产亲子乱弄免费视频 | 少妇被粗大的猛进出69影院 | 少女韩国电视剧在线观看完整 | 亚洲一区二区三区无码久久 | 成人女人看片免费视频放人 | 久久婷婷五月综合色国产香蕉 | 亚洲欧美综合区丁香五月小说 | 亚洲精品中文字幕 | 大肉大捧一进一出视频出来呀 | 亚洲国产精品一区二区第一页 | 日本大香伊一区二区三区 | 亚洲色欲色欲天天天www | 狂野欧美性猛交免费视频 | 97人妻精品一区二区三区 | 荫蒂被男人添的好舒服爽免费视频 | 国产成人综合在线女婷五月99播放 | 国产绳艺sm调教室论坛 | 精品国产一区av天美传媒 | 日本熟妇大屁股人妻 | 色婷婷av一区二区三区之红樱桃 | 久久精品99久久香蕉国产色戒 | 九一九色国产 | 国内精品人妻无码久久久影院 | 久久久久成人片免费观看蜜芽 | 丰满少妇熟乱xxxxx视频 | 动漫av一区二区在线观看 | 激情内射日本一区二区三区 | 亚洲精品鲁一鲁一区二区三区 | 亚洲熟妇色xxxxx欧美老妇 | 婷婷综合久久中文字幕蜜桃三电影 | 日本一区二区更新不卡 | 欧美丰满熟妇xxxx | 欧美性色19p | 人妻互换免费中文字幕 | 精品无码成人片一区二区98 | 色综合久久中文娱乐网 | 无套内谢的新婚少妇国语播放 | 国产猛烈高潮尖叫视频免费 | 久久伊人色av天堂九九小黄鸭 | 欧美精品无码一区二区三区 | 国产精品无码一区二区三区不卡 | 国产在线aaa片一区二区99 | 人妻天天爽夜夜爽一区二区 | 国产精品久久久av久久久 | 丁香啪啪综合成人亚洲 | 人妻少妇精品无码专区二区 | 亚洲第一无码av无码专区 | 亚洲色大成网站www | 国产精品久久久久久久9999 | 亚洲色偷偷偷综合网 |