3atv精品不卡视频,97人人超碰国产精品最新,中文字幕av一区二区三区人妻少妇,久久久精品波多野结衣,日韩一区二区三区精品

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

Storm概念学习系列之storm-starter项目(完整版)(博主推荐)

發布時間:2025/5/22 编程问答 24 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Storm概念学习系列之storm-starter项目(完整版)(博主推荐) 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

這是書籍《從零開始學Storm》趙必廈 2014年出版的配套代碼!

  storm-starter項目包含使用storm的各種各樣的例子。項目托管在GitHub上面,其網址為:?http://github.com/nathanmarz/storm-starter

?

?

?

?

?

?

?

  或者

  ?

?

?

?

 storm-starter項目的包結構:

?

?

?

?  storm-starter項目的拓撲結構:

?

?

?

  新建maven項目的方式

  以“新建Maven項目的方式”導入storm-starter項目的步驟如下:

1、新建一個Maven項目,項目名稱可以隨意,如storm-starter。

?

?

?

?

?

?

?

?

?

?

2、把storm-starter項目根目錄的src\jvm目錄中的全部文件復制到Maven項目的src/main/java目錄下。

?

?

?

?

?

?

?

?

?

  storm-starter-master\src\jvm\storm\starter下的BasicDRPCTopology.java

package storm.starter;import backtype.storm.Config; import backtype.storm.LocalCluster; import backtype.storm.LocalDRPC; import backtype.storm.StormSubmitter; import backtype.storm.drpc.LinearDRPCTopologyBuilder; import backtype.storm.topology.BasicOutputCollector; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.base.BaseBasicBolt; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Tuple; import backtype.storm.tuple.Values;/*** This topology is a basic example of doing distributed RPC on top of Storm. It implements a function that appends a* "!" to any string you send the DRPC function.* <p/>* See https://github.com/nathanmarz/storm/wiki/Distributed-RPC for more information on doing distributed RPC on top of* Storm.*/ public class BasicDRPCTopology {public static class ExclaimBolt extends BaseBasicBolt {@Overridepublic void execute(Tuple tuple, BasicOutputCollector collector) {String input = tuple.getString(1);collector.emit(new Values(tuple.getValue(0), input + "!"));}@Overridepublic void declareOutputFields(OutputFieldsDeclarer declarer) {declarer.declare(new Fields("id", "result"));}}public static void main(String[] args) throws Exception {LinearDRPCTopologyBuilder builder = new LinearDRPCTopologyBuilder("exclamation");builder.addBolt(new ExclaimBolt(), 3);Config conf = new Config();if (args == null || args.length == 0) {LocalDRPC drpc = new LocalDRPC();LocalCluster cluster = new LocalCluster();cluster.submitTopology("drpc-demo", conf, builder.createLocalTopology(drpc));for (String word : new String[]{ "hello", "goodbye" }) {System.out.println("Result for \"" + word + "\": " + drpc.execute("exclamation", word));}cluster.shutdown();drpc.shutdown();}else {conf.setNumWorkers(3);StormSubmitter.submitTopology(args[0], conf, builder.createRemoteTopology());}} }

?

?

?

?

?

?

  storm-starter-master\src\jvm\storm\starter下的的ExclamationTopology.java

package storm.starter;import backtype.storm.Config; import backtype.storm.LocalCluster; import backtype.storm.StormSubmitter; import backtype.storm.task.OutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.testing.TestWordSpout; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.TopologyBuilder; import backtype.storm.topology.base.BaseRichBolt; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Tuple; import backtype.storm.tuple.Values; import backtype.storm.utils.Utils;import java.util.Map;/*** This is a basic example of a Storm topology.*/ public class ExclamationTopology {public static class ExclamationBolt extends BaseRichBolt {OutputCollector _collector;@Overridepublic void prepare(Map conf, TopologyContext context, OutputCollector collector) {_collector = collector;}@Overridepublic void execute(Tuple tuple) {_collector.emit(tuple, new Values(tuple.getString(0) + "!!!"));_collector.ack(tuple);}@Overridepublic void declareOutputFields(OutputFieldsDeclarer declarer) {declarer.declare(new Fields("word"));}}public static void main(String[] args) throws Exception {TopologyBuilder builder = new TopologyBuilder();builder.setSpout("word", new TestWordSpout(), 10);builder.setBolt("exclaim1", new ExclamationBolt(), 3).shuffleGrouping("word");builder.setBolt("exclaim2", new ExclamationBolt(), 2).shuffleGrouping("exclaim1");Config conf = new Config();conf.setDebug(true);if (args != null && args.length > 0) {conf.setNumWorkers(3);StormSubmitter.submitTopology(args[0], conf, builder.createTopology());}else {LocalCluster cluster = new LocalCluster();cluster.submitTopology("test", conf, builder.createTopology());Utils.sleep(10000);cluster.killTopology("test");cluster.shutdown();}} }

?

?

?

?

?

storm-starter-master\src\jvm\storm\starter下的ManualDRPC.java

package storm.starter;import backtype.storm.Config; import backtype.storm.LocalCluster; import backtype.storm.LocalDRPC; import backtype.storm.drpc.DRPCSpout; import backtype.storm.drpc.ReturnResults; import backtype.storm.topology.BasicOutputCollector; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.TopologyBuilder; import backtype.storm.topology.base.BaseBasicBolt; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Tuple; import backtype.storm.tuple.Values;public class ManualDRPC {public static class ExclamationBolt extends BaseBasicBolt {@Overridepublic void declareOutputFields(OutputFieldsDeclarer declarer) {declarer.declare(new Fields("result", "return-info"));}@Overridepublic void execute(Tuple tuple, BasicOutputCollector collector) {String arg = tuple.getString(0);Object retInfo = tuple.getValue(1);collector.emit(new Values(arg + "!!!", retInfo));}}public static void main(String[] args) {TopologyBuilder builder = new TopologyBuilder();LocalDRPC drpc = new LocalDRPC();DRPCSpout spout = new DRPCSpout("exclamation", drpc);builder.setSpout("drpc", spout);builder.setBolt("exclaim", new ExclamationBolt(), 3).shuffleGrouping("drpc");builder.setBolt("return", new ReturnResults(), 3).shuffleGrouping("exclaim");LocalCluster cluster = new LocalCluster();Config conf = new Config();cluster.submitTopology("exclaim", conf, builder.createTopology());System.out.println(drpc.execute("exclamation", "aaa"));System.out.println(drpc.execute("exclamation", "bbb"));} }

?

?

?

?

?

?

?

?

storm-starter-master\src\jvm\storm\starter下的PrintSampleStream.java

/* // to use this example, uncomment the twitter4j dependency information in the project.clj, // uncomment storm.starter.spout.TwitterSampleSpout, and uncomment this classpackage storm.starter;import storm.starter.spout.TwitterSampleSpout; import backtype.storm.Config; import backtype.storm.LocalCluster; import backtype.storm.topology.TopologyBuilder; import backtype.storm.utils.Utils; import storm.starter.bolt.PrinterBolt;public class PrintSampleStream { public static void main(String[] args) {String username = args[0];String pwd = args[1];TopologyBuilder builder = new TopologyBuilder();builder.setSpout("spout", new TwitterSampleSpout(username, pwd));builder.setBolt("print", new PrinterBolt()).shuffleGrouping("spout");Config conf = new Config();LocalCluster cluster = new LocalCluster();cluster.submitTopology("test", conf, builder.createTopology());Utils.sleep(10000);cluster.shutdown();} } */

?

?

?

?

?

?

?

storm-starter-master\src\jvm\storm\starter下的ReachTopology.java

package storm.starter;import backtype.storm.Config; import backtype.storm.LocalCluster; import backtype.storm.LocalDRPC; import backtype.storm.StormSubmitter; import backtype.storm.coordination.BatchOutputCollector; import backtype.storm.drpc.LinearDRPCTopologyBuilder; import backtype.storm.task.TopologyContext; import backtype.storm.topology.BasicOutputCollector; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.base.BaseBasicBolt; import backtype.storm.topology.base.BaseBatchBolt; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Tuple; import backtype.storm.tuple.Values;import java.util.*;/*** This is a good example of doing complex Distributed RPC on top of Storm. This program creates a topology that can* compute the reach for any URL on Twitter in realtime by parallelizing the whole computation.* <p/>* Reach is the number of unique people exposed to a URL on Twitter. To compute reach, you have to get all the people* who tweeted the URL, get all the followers of all those people, unique that set of followers, and then count the* unique set. It's an intense computation that can involve thousands of database calls and tens of millions of follower* records.* <p/>* This Storm topology does every piece of that computation in parallel, turning what would be a computation that takes* minutes on a single machine into one that takes just a couple seconds.* <p/>* For the purposes of demonstration, this topology replaces the use of actual DBs with in-memory hashmaps.* <p/>* See https://github.com/nathanmarz/storm/wiki/Distributed-RPC for more information on Distributed RPC.*/ public class ReachTopology {public static Map<String, List<String>> TWEETERS_DB = new HashMap<String, List<String>>() {{put("foo.com/blog/1", Arrays.asList("sally", "bob", "tim", "george", "nathan"));put("engineering.twitter.com/blog/5", Arrays.asList("adam", "david", "sally", "nathan"));put("tech.backtype.com/blog/123", Arrays.asList("tim", "mike", "john"));}};public static Map<String, List<String>> FOLLOWERS_DB = new HashMap<String, List<String>>() {{put("sally", Arrays.asList("bob", "tim", "alice", "adam", "jim", "chris", "jai"));put("bob", Arrays.asList("sally", "nathan", "jim", "mary", "david", "vivian"));put("tim", Arrays.asList("alex"));put("nathan", Arrays.asList("sally", "bob", "adam", "harry", "chris", "vivian", "emily", "jordan"));put("adam", Arrays.asList("david", "carissa"));put("mike", Arrays.asList("john", "bob"));put("john", Arrays.asList("alice", "nathan", "jim", "mike", "bob"));}};public static class GetTweeters extends BaseBasicBolt {@Overridepublic void execute(Tuple tuple, BasicOutputCollector collector) {Object id = tuple.getValue(0);String url = tuple.getString(1);List<String> tweeters = TWEETERS_DB.get(url);if (tweeters != null) {for (String tweeter : tweeters) {collector.emit(new Values(id, tweeter));}}}@Overridepublic void declareOutputFields(OutputFieldsDeclarer declarer) {declarer.declare(new Fields("id", "tweeter"));}}public static class GetFollowers extends BaseBasicBolt {@Overridepublic void execute(Tuple tuple, BasicOutputCollector collector) {Object id = tuple.getValue(0);String tweeter = tuple.getString(1);List<String> followers = FOLLOWERS_DB.get(tweeter);if (followers != null) {for (String follower : followers) {collector.emit(new Values(id, follower));}}}@Overridepublic void declareOutputFields(OutputFieldsDeclarer declarer) {declarer.declare(new Fields("id", "follower"));}}public static class PartialUniquer extends BaseBatchBolt {BatchOutputCollector _collector;Object _id;Set<String> _followers = new HashSet<String>();@Overridepublic void prepare(Map conf, TopologyContext context, BatchOutputCollector collector, Object id) {_collector = collector;_id = id;}@Overridepublic void execute(Tuple tuple) {_followers.add(tuple.getString(1));}@Overridepublic void finishBatch() {_collector.emit(new Values(_id, _followers.size()));}@Overridepublic void declareOutputFields(OutputFieldsDeclarer declarer) {declarer.declare(new Fields("id", "partial-count"));}}public static class CountAggregator extends BaseBatchBolt {BatchOutputCollector _collector;Object _id;int _count = 0;@Overridepublic void prepare(Map conf, TopologyContext context, BatchOutputCollector collector, Object id) {_collector = collector;_id = id;}@Overridepublic void execute(Tuple tuple) {_count += tuple.getInteger(1);}@Overridepublic void finishBatch() {_collector.emit(new Values(_id, _count));}@Overridepublic void declareOutputFields(OutputFieldsDeclarer declarer) {declarer.declare(new Fields("id", "reach"));}}public static LinearDRPCTopologyBuilder construct() {LinearDRPCTopologyBuilder builder = new LinearDRPCTopologyBuilder("reach");builder.addBolt(new GetTweeters(), 4);builder.addBolt(new GetFollowers(), 12).shuffleGrouping();builder.addBolt(new PartialUniquer(), 6).fieldsGrouping(new Fields("id", "follower"));builder.addBolt(new CountAggregator(), 3).fieldsGrouping(new Fields("id"));return builder;}public static void main(String[] args) throws Exception {LinearDRPCTopologyBuilder builder = construct();Config conf = new Config();if (args == null || args.length == 0) {conf.setMaxTaskParallelism(3);LocalDRPC drpc = new LocalDRPC();LocalCluster cluster = new LocalCluster();cluster.submitTopology("reach-drpc", conf, builder.createLocalTopology(drpc));String[] urlsToTry = new String[]{ "foo.com/blog/1", "engineering.twitter.com/blog/5", "notaurl.com" };for (String url : urlsToTry) {System.out.println("Reach of " + url + ": " + drpc.execute("reach", url));}cluster.shutdown();drpc.shutdown();}else {conf.setNumWorkers(6);StormSubmitter.submitTopology(args[0], conf, builder.createRemoteTopology());}} }

?

?

?

?

?

?

?

?

?

storm-starter-master\src\jvm\storm\starter下的RollingTopWords.java

package storm.starter;import backtype.storm.Config; import backtype.storm.testing.TestWordSpout; import backtype.storm.topology.TopologyBuilder; import backtype.storm.tuple.Fields; import storm.starter.bolt.IntermediateRankingsBolt; import storm.starter.bolt.RollingCountBolt; import storm.starter.bolt.TotalRankingsBolt; import storm.starter.util.StormRunner;/*** This topology does a continuous computation of the top N words that the topology has seen in terms of cardinality.* The top N computation is done in a completely scalable way, and a similar approach could be used to compute things* like trending topics or trending images on Twitter.*/ public class RollingTopWords {private static final int DEFAULT_RUNTIME_IN_SECONDS = 60;private static final int TOP_N = 5;private final TopologyBuilder builder;private final String topologyName;private final Config topologyConfig;private final int runtimeInSeconds;public RollingTopWords() throws InterruptedException {builder = new TopologyBuilder();topologyName = "slidingWindowCounts";topologyConfig = createTopologyConfiguration();runtimeInSeconds = DEFAULT_RUNTIME_IN_SECONDS;wireTopology();}private static Config createTopologyConfiguration() {Config conf = new Config();conf.setDebug(true);return conf;}private void wireTopology() throws InterruptedException {String spoutId = "wordGenerator";String counterId = "counter";String intermediateRankerId = "intermediateRanker";String totalRankerId = "finalRanker";builder.setSpout(spoutId, new TestWordSpout(), 5);builder.setBolt(counterId, new RollingCountBolt(9, 3), 4).fieldsGrouping(spoutId, new Fields("word"));builder.setBolt(intermediateRankerId, new IntermediateRankingsBolt(TOP_N), 4).fieldsGrouping(counterId, new Fields("obj"));builder.setBolt(totalRankerId, new TotalRankingsBolt(TOP_N)).globalGrouping(intermediateRankerId);}public void run() throws InterruptedException {StormRunner.runTopologyLocally(builder.createTopology(), topologyName, topologyConfig, runtimeInSeconds);}public static void main(String[] args) throws Exception {new RollingTopWords().run();} }

?

?

?

?

?

storm-starter-master\src\jvm\storm\starter下的SingleJoinExample.java

package storm.starter;import backtype.storm.Config; import backtype.storm.LocalCluster; import backtype.storm.testing.FeederSpout; import backtype.storm.topology.TopologyBuilder; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Values; import backtype.storm.utils.Utils; import storm.starter.bolt.SingleJoinBolt;public class SingleJoinExample {public static void main(String[] args) {FeederSpout genderSpout = new FeederSpout(new Fields("id", "gender"));FeederSpout ageSpout = new FeederSpout(new Fields("id", "age"));TopologyBuilder builder = new TopologyBuilder();builder.setSpout("gender", genderSpout);builder.setSpout("age", ageSpout);builder.setBolt("join", new SingleJoinBolt(new Fields("gender", "age"))).fieldsGrouping("gender", new Fields("id")).fieldsGrouping("age", new Fields("id"));Config conf = new Config();conf.setDebug(true);LocalCluster cluster = new LocalCluster();cluster.submitTopology("join-example", conf, builder.createTopology());for (int i = 0; i < 10; i++) {String gender;if (i % 2 == 0) {gender = "male";}else {gender = "female";}genderSpout.feed(new Values(i, gender));}for (int i = 9; i >= 0; i--) {ageSpout.feed(new Values(i, i + 20));}Utils.sleep(2000);cluster.shutdown();} }

?

?

?

?

?

?

?

?

?

storm-starter-master\src\jvm\storm\starter下的TransactionalGlobalCount.java

package storm.starter;import backtype.storm.Config; import backtype.storm.LocalCluster; import backtype.storm.coordination.BatchOutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.testing.MemoryTransactionalSpout; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.base.BaseBatchBolt; import backtype.storm.topology.base.BaseTransactionalBolt; import backtype.storm.transactional.ICommitter; import backtype.storm.transactional.TransactionAttempt; import backtype.storm.transactional.TransactionalTopologyBuilder; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Tuple; import backtype.storm.tuple.Values;import java.math.BigInteger; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map;/*** This is a basic example of a transactional topology. It keeps a count of the number of tuples seen so far in a* database. The source of data and the databases are mocked out as in memory maps for demonstration purposes. This* class is defined in depth on the wiki at https://github.com/nathanmarz/storm/wiki/Transactional-topologies*/ public class TransactionalGlobalCount {public static final int PARTITION_TAKE_PER_BATCH = 3;public static final Map<Integer, List<List<Object>>> DATA = new HashMap<Integer, List<List<Object>>>() {{put(0, new ArrayList<List<Object>>() {{add(new Values("cat"));add(new Values("dog"));add(new Values("chicken"));add(new Values("cat"));add(new Values("dog"));add(new Values("apple"));}});put(1, new ArrayList<List<Object>>() {{add(new Values("cat"));add(new Values("dog"));add(new Values("apple"));add(new Values("banana"));}});put(2, new ArrayList<List<Object>>() {{add(new Values("cat"));add(new Values("cat"));add(new Values("cat"));add(new Values("cat"));add(new Values("cat"));add(new Values("dog"));add(new Values("dog"));add(new Values("dog"));add(new Values("dog"));}});}};public static class Value {int count = 0;BigInteger txid;}public static Map<String, Value> DATABASE = new HashMap<String, Value>();public static final String GLOBAL_COUNT_KEY = "GLOBAL-COUNT";public static class BatchCount extends BaseBatchBolt {Object _id;BatchOutputCollector _collector;int _count = 0;@Overridepublic void prepare(Map conf, TopologyContext context, BatchOutputCollector collector, Object id) {_collector = collector;_id = id;}@Overridepublic void execute(Tuple tuple) {_count++;}@Overridepublic void finishBatch() {_collector.emit(new Values(_id, _count));}@Overridepublic void declareOutputFields(OutputFieldsDeclarer declarer) {declarer.declare(new Fields("id", "count"));}}public static class UpdateGlobalCount extends BaseTransactionalBolt implements ICommitter {TransactionAttempt _attempt;BatchOutputCollector _collector;int _sum = 0;@Overridepublic void prepare(Map conf, TopologyContext context, BatchOutputCollector collector, TransactionAttempt attempt) {_collector = collector;_attempt = attempt;}@Overridepublic void execute(Tuple tuple) {_sum += tuple.getInteger(1);}@Overridepublic void finishBatch() {Value val = DATABASE.get(GLOBAL_COUNT_KEY);Value newval;if (val == null || !val.txid.equals(_attempt.getTransactionId())) {newval = new Value();newval.txid = _attempt.getTransactionId();if (val == null) {newval.count = _sum;}else {newval.count = _sum + val.count;}DATABASE.put(GLOBAL_COUNT_KEY, newval);}else {newval = val;}_collector.emit(new Values(_attempt, newval.count));}@Overridepublic void declareOutputFields(OutputFieldsDeclarer declarer) {declarer.declare(new Fields("id", "sum"));}}public static void main(String[] args) throws Exception {MemoryTransactionalSpout spout = new MemoryTransactionalSpout(DATA, new Fields("word"), PARTITION_TAKE_PER_BATCH);TransactionalTopologyBuilder builder = new TransactionalTopologyBuilder("global-count", "spout", spout, 3);builder.setBolt("partial-count", new BatchCount(), 5).noneGrouping("spout");builder.setBolt("sum", new UpdateGlobalCount()).globalGrouping("partial-count");LocalCluster cluster = new LocalCluster();Config config = new Config();config.setDebug(true);config.setMaxSpoutPending(3);cluster.submitTopology("global-count-topology", config, builder.buildTopology());Thread.sleep(3000);cluster.shutdown();} }

?

?

?

?

?

?

?

?

?

?

?

storm-starter-master\src\jvm\storm\starter下的TransactionalWords.java

package storm.starter;import backtype.storm.Config; import backtype.storm.LocalCluster; import backtype.storm.coordination.BatchOutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.testing.MemoryTransactionalSpout; import backtype.storm.topology.BasicOutputCollector; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.base.BaseBasicBolt; import backtype.storm.topology.base.BaseTransactionalBolt; import backtype.storm.transactional.ICommitter; import backtype.storm.transactional.TransactionAttempt; import backtype.storm.transactional.TransactionalTopologyBuilder; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Tuple; import backtype.storm.tuple.Values;import java.math.BigInteger; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map;/*** This class defines a more involved transactional topology then TransactionalGlobalCount. This topology processes a* stream of words and produces two outputs:* <p/>* 1. A count for each word (stored in a database) 2. The number of words for every bucket of 10 counts. So it stores in* the database how many words have appeared 0-9 times, how many have appeared 10-19 times, and so on.* <p/>* A batch of words can cause the bucket counts to decrement for some buckets and increment for others as words move* between buckets as their counts accumulate.*/ public class TransactionalWords {public static class CountValue {Integer prev_count = null;int count = 0;BigInteger txid = null;}public static class BucketValue {int count = 0;BigInteger txid;}public static final int BUCKET_SIZE = 10;public static Map<String, CountValue> COUNT_DATABASE = new HashMap<String, CountValue>();public static Map<Integer, BucketValue> BUCKET_DATABASE = new HashMap<Integer, BucketValue>();public static final int PARTITION_TAKE_PER_BATCH = 3;public static final Map<Integer, List<List<Object>>> DATA = new HashMap<Integer, List<List<Object>>>() {{put(0, new ArrayList<List<Object>>() {{add(new Values("cat"));add(new Values("dog"));add(new Values("chicken"));add(new Values("cat"));add(new Values("dog"));add(new Values("apple"));}});put(1, new ArrayList<List<Object>>() {{add(new Values("cat"));add(new Values("dog"));add(new Values("apple"));add(new Values("banana"));}});put(2, new ArrayList<List<Object>>() {{add(new Values("cat"));add(new Values("cat"));add(new Values("cat"));add(new Values("cat"));add(new Values("cat"));add(new Values("dog"));add(new Values("dog"));add(new Values("dog"));add(new Values("dog"));}});}};public static class KeyedCountUpdater extends BaseTransactionalBolt implements ICommitter {Map<String, Integer> _counts = new HashMap<String, Integer>();BatchOutputCollector _collector;TransactionAttempt _id;int _count = 0;@Overridepublic void prepare(Map conf, TopologyContext context, BatchOutputCollector collector, TransactionAttempt id) {_collector = collector;_id = id;}@Overridepublic void execute(Tuple tuple) {String key = tuple.getString(1);Integer curr = _counts.get(key);if (curr == null)curr = 0;_counts.put(key, curr + 1);}@Overridepublic void finishBatch() {for (String key : _counts.keySet()) {CountValue val = COUNT_DATABASE.get(key);CountValue newVal;if (val == null || !val.txid.equals(_id)) {newVal = new CountValue();newVal.txid = _id.getTransactionId();if (val != null) {newVal.prev_count = val.count;newVal.count = val.count;}newVal.count = newVal.count + _counts.get(key);COUNT_DATABASE.put(key, newVal);}else {newVal = val;}_collector.emit(new Values(_id, key, newVal.count, newVal.prev_count));}}@Overridepublic void declareOutputFields(OutputFieldsDeclarer declarer) {declarer.declare(new Fields("id", "key", "count", "prev-count"));}}public static class Bucketize extends BaseBasicBolt {@Overridepublic void execute(Tuple tuple, BasicOutputCollector collector) {TransactionAttempt attempt = (TransactionAttempt) tuple.getValue(0);int curr = tuple.getInteger(2);Integer prev = tuple.getInteger(3);int currBucket = curr / BUCKET_SIZE;Integer prevBucket = null;if (prev != null) {prevBucket = prev / BUCKET_SIZE;}if (prevBucket == null) {collector.emit(new Values(attempt, currBucket, 1));}else if (currBucket != prevBucket) {collector.emit(new Values(attempt, currBucket, 1));collector.emit(new Values(attempt, prevBucket, -1));}}@Overridepublic void declareOutputFields(OutputFieldsDeclarer declarer) {declarer.declare(new Fields("attempt", "bucket", "delta"));}}public static class BucketCountUpdater extends BaseTransactionalBolt {Map<Integer, Integer> _accum = new HashMap<Integer, Integer>();BatchOutputCollector _collector;TransactionAttempt _attempt;int _count = 0;@Overridepublic void prepare(Map conf, TopologyContext context, BatchOutputCollector collector, TransactionAttempt attempt) {_collector = collector;_attempt = attempt;}@Overridepublic void execute(Tuple tuple) {Integer bucket = tuple.getInteger(1);Integer delta = tuple.getInteger(2);Integer curr = _accum.get(bucket);if (curr == null)curr = 0;_accum.put(bucket, curr + delta);}@Overridepublic void finishBatch() {for (Integer bucket : _accum.keySet()) {BucketValue currVal = BUCKET_DATABASE.get(bucket);BucketValue newVal;if (currVal == null || !currVal.txid.equals(_attempt.getTransactionId())) {newVal = new BucketValue();newVal.txid = _attempt.getTransactionId();newVal.count = _accum.get(bucket);if (currVal != null)newVal.count += currVal.count;BUCKET_DATABASE.put(bucket, newVal);}else {newVal = currVal;}_collector.emit(new Values(_attempt, bucket, newVal.count));}}@Overridepublic void declareOutputFields(OutputFieldsDeclarer declarer) {declarer.declare(new Fields("id", "bucket", "count"));}}public static void main(String[] args) throws Exception {MemoryTransactionalSpout spout = new MemoryTransactionalSpout(DATA, new Fields("word"), PARTITION_TAKE_PER_BATCH);TransactionalTopologyBuilder builder = new TransactionalTopologyBuilder("top-n-words", "spout", spout, 2);builder.setBolt("count", new KeyedCountUpdater(), 5).fieldsGrouping("spout", new Fields("word"));builder.setBolt("bucketize", new Bucketize()).noneGrouping("count");builder.setBolt("buckets", new BucketCountUpdater(), 5).fieldsGrouping("bucketize", new Fields("bucket"));LocalCluster cluster = new LocalCluster();Config config = new Config();config.setDebug(true);config.setMaxSpoutPending(3);cluster.submitTopology("top-n-topology", config, builder.buildTopology());Thread.sleep(3000);cluster.shutdown();} }

?

?

?

?

?

?

?

?

?

?

storm-starter-master\src\jvm\storm\starter下的WordCountTopology.java

package storm.starter;import backtype.storm.Config; import backtype.storm.LocalCluster; import backtype.storm.StormSubmitter; import backtype.storm.task.ShellBolt; import backtype.storm.topology.BasicOutputCollector; import backtype.storm.topology.IRichBolt; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.TopologyBuilder; import backtype.storm.topology.base.BaseBasicBolt; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Tuple; import backtype.storm.tuple.Values; import storm.starter.spout.RandomSentenceSpout;import java.util.HashMap; import java.util.Map;/*** This topology demonstrates Storm's stream groupings and multilang capabilities.*/ public class WordCountTopology {public static class SplitSentence extends ShellBolt implements IRichBolt {public SplitSentence() {super("python", "splitsentence.py");}@Overridepublic void declareOutputFields(OutputFieldsDeclarer declarer) {declarer.declare(new Fields("word"));}@Overridepublic Map<String, Object> getComponentConfiguration() {return null;}}public static class WordCount extends BaseBasicBolt {Map<String, Integer> counts = new HashMap<String, Integer>();@Overridepublic void execute(Tuple tuple, BasicOutputCollector collector) {String word = tuple.getString(0);Integer count = counts.get(word);if (count == null)count = 0;count++;counts.put(word, count);collector.emit(new Values(word, count));}@Overridepublic void declareOutputFields(OutputFieldsDeclarer declarer) {declarer.declare(new Fields("word", "count"));}}public static void main(String[] args) throws Exception {TopologyBuilder builder = new TopologyBuilder();builder.setSpout("spout", new RandomSentenceSpout(), 5);builder.setBolt("split", new SplitSentence(), 8).shuffleGrouping("spout");builder.setBolt("count", new WordCount(), 12).fieldsGrouping("split", new Fields("word"));Config conf = new Config();conf.setDebug(true);if (args != null && args.length > 0) {conf.setNumWorkers(3);StormSubmitter.submitTopology(args[0], conf, builder.createTopology());}else {conf.setMaxTaskParallelism(3);LocalCluster cluster = new LocalCluster();cluster.submitTopology("word-count", conf, builder.createTopology());Thread.sleep(10000);cluster.shutdown();}} }

?

?

?

?

?

?

?

?

?

?

?

storm-starter-master\src\jvm\storm\starter\spout的RandomSentenceSpout.java

package storm.starter.spout;import backtype.storm.spout.SpoutOutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.base.BaseRichSpout; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Values; import backtype.storm.utils.Utils;import java.util.Map; import java.util.Random;public class RandomSentenceSpout extends BaseRichSpout {SpoutOutputCollector _collector;Random _rand;@Overridepublic void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {_collector = collector;_rand = new Random();}@Overridepublic void nextTuple() {Utils.sleep(100);String[] sentences = new String[]{ "the cow jumped over the moon", "an apple a day keeps the doctor away","four score and seven years ago", "snow white and the seven dwarfs", "i am at two with nature" };String sentence = sentences[_rand.nextInt(sentences.length)];_collector.emit(new Values(sentence));}@Overridepublic void ack(Object id) {}@Overridepublic void fail(Object id) {}@Overridepublic void declareOutputFields(OutputFieldsDeclarer declarer) {declarer.declare(new Fields("word"));}}

?

?

?

?

?

?

?

?

storm-starter-master\src\jvm\storm\starter\spout的TwitterSampleSpout.java

/*package storm.starter.spout;import backtype.storm.Config; import twitter4j.conf.ConfigurationBuilder; import twitter4j.TwitterStream; import twitter4j.TwitterStreamFactory; import backtype.storm.spout.SpoutOutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.base.BaseRichSpout; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Values; import backtype.storm.utils.Utils; import java.util.Map; import java.util.concurrent.LinkedBlockingQueue; import twitter4j.Status; import twitter4j.StatusDeletionNotice; import twitter4j.StatusListener;public class TwitterSampleSpout extends BaseRichSpout {SpoutOutputCollector _collector;LinkedBlockingQueue<Status> queue = null;TwitterStream _twitterStream;String _username;String _pwd;public TwitterSampleSpout(String username, String pwd) {_username = username;_pwd = pwd;}@Overridepublic void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {queue = new LinkedBlockingQueue<Status>(1000);_collector = collector;StatusListener listener = new StatusListener() {@Overridepublic void onStatus(Status status) {queue.offer(status);}@Overridepublic void onDeletionNotice(StatusDeletionNotice sdn) {}@Overridepublic void onTrackLimitationNotice(int i) {}@Overridepublic void onScrubGeo(long l, long l1) {}@Overridepublic void onException(Exception e) {}};TwitterStreamFactory fact = new TwitterStreamFactory(new ConfigurationBuilder().setUser(_username).setPassword(_pwd).build());_twitterStream = fact.getInstance();_twitterStream.addListener(listener);_twitterStream.sample();}@Overridepublic void nextTuple() {Status ret = queue.poll();if(ret==null) {Utils.sleep(50);} else {_collector.emit(new Values(ret));}}@Overridepublic void close() {_twitterStream.shutdown();}@Overridepublic Map<String, Object> getComponentConfiguration() {Config ret = new Config();ret.setMaxTaskParallelism(1);return ret;} @Overridepublic void ack(Object id) {}@Overridepublic void fail(Object id) {}@Overridepublic void declareOutputFields(OutputFieldsDeclarer declarer) {declarer.declare(new Fields("tweet"));}} */

?

?

?

?

?

?

?

?

?

?

?

?storm-starter-master\src\jvm\storm\starter\bolt的AbstractRankerBolt.java

package storm.starter.bolt;import backtype.storm.Config; import backtype.storm.topology.BasicOutputCollector; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.base.BaseBasicBolt; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Tuple; import backtype.storm.tuple.Values; import org.apache.log4j.Logger; import storm.starter.tools.Rankings; import storm.starter.util.TupleHelpers;import java.util.HashMap; import java.util.Map;/*** This abstract bolt provides the basic behavior of bolts that rank objects according to their count.* <p/>* It uses a template method design pattern for {@link AbstractRankerBolt#execute(Tuple, BasicOutputCollector)} to allow* actual bolt implementations to specify how incoming tuples are processed, i.e. how the objects embedded within those* tuples are retrieved and counted.*/ public abstract class AbstractRankerBolt extends BaseBasicBolt {private static final long serialVersionUID = 4931640198501530202L;private static final int DEFAULT_EMIT_FREQUENCY_IN_SECONDS = 2;private static final int DEFAULT_COUNT = 10;private final int emitFrequencyInSeconds;private final int count;private final Rankings rankings;public AbstractRankerBolt() {this(DEFAULT_COUNT, DEFAULT_EMIT_FREQUENCY_IN_SECONDS);}public AbstractRankerBolt(int topN) {this(topN, DEFAULT_EMIT_FREQUENCY_IN_SECONDS);}public AbstractRankerBolt(int topN, int emitFrequencyInSeconds) {if (topN < 1) {throw new IllegalArgumentException("topN must be >= 1 (you requested " + topN + ")");}if (emitFrequencyInSeconds < 1) {throw new IllegalArgumentException("The emit frequency must be >= 1 seconds (you requested " + emitFrequencyInSeconds + " seconds)");}count = topN;this.emitFrequencyInSeconds = emitFrequencyInSeconds;rankings = new Rankings(count);}protected Rankings getRankings() {return rankings;}/*** This method functions as a template method (design pattern).*/@Overridepublic final void execute(Tuple tuple, BasicOutputCollector collector) {if (TupleHelpers.isTickTuple(tuple)) {getLogger().debug("Received tick tuple, triggering emit of current rankings");emitRankings(collector);}else {updateRankingsWithTuple(tuple);}}abstract void updateRankingsWithTuple(Tuple tuple);private void emitRankings(BasicOutputCollector collector) {collector.emit(new Values(rankings.copy()));getLogger().debug("Rankings: " + rankings);}@Overridepublic void declareOutputFields(OutputFieldsDeclarer declarer) {declarer.declare(new Fields("rankings"));}@Overridepublic Map<String, Object> getComponentConfiguration() {Map<String, Object> conf = new HashMap<String, Object>();conf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, emitFrequencyInSeconds);return conf;}abstract Logger getLogger(); }

?

?

?

?

?

?

?

?

?

?storm-starter-master\src\jvm\storm\starter\bolt的IntermediateRankingsBolt.java

package storm.starter.bolt;import backtype.storm.tuple.Tuple; import org.apache.log4j.Logger; import storm.starter.tools.Rankable; import storm.starter.tools.RankableObjectWithFields;/*** This bolt ranks incoming objects by their count.* <p/>* It assumes the input tuples to adhere to the following format: (object, object_count, additionalField1,* additionalField2, ..., additionalFieldN).*/ public final class IntermediateRankingsBolt extends AbstractRankerBolt {private static final long serialVersionUID = -1369800530256637409L;private static final Logger LOG = Logger.getLogger(IntermediateRankingsBolt.class);public IntermediateRankingsBolt() {super();}public IntermediateRankingsBolt(int topN) {super(topN);}public IntermediateRankingsBolt(int topN, int emitFrequencyInSeconds) {super(topN, emitFrequencyInSeconds);}@Overridevoid updateRankingsWithTuple(Tuple tuple) {Rankable rankable = RankableObjectWithFields.from(tuple);super.getRankings().updateWith(rankable);}@OverrideLogger getLogger() {return LOG;} }

?

?

?

?

?

?

?

?storm-starter-master\src\jvm\storm\starter\bolt的PrinterBolt.java

package storm.starter.bolt;import backtype.storm.topology.BasicOutputCollector; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.base.BaseBasicBolt; import backtype.storm.tuple.Tuple;public class PrinterBolt extends BaseBasicBolt {@Overridepublic void execute(Tuple tuple, BasicOutputCollector collector) {System.out.println(tuple);}@Overridepublic void declareOutputFields(OutputFieldsDeclarer ofd) {}}

?

?

?

?

?

?

?storm-starter-master\src\jvm\storm\starter\bolt的RollingCountBolt.java

package storm.starter.bolt;import backtype.storm.Config; import backtype.storm.task.OutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.base.BaseRichBolt; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Tuple; import backtype.storm.tuple.Values; import org.apache.log4j.Logger; import storm.starter.tools.NthLastModifiedTimeTracker; import storm.starter.tools.SlidingWindowCounter; import storm.starter.util.TupleHelpers;import java.util.HashMap; import java.util.Map; import java.util.Map.Entry;/*** This bolt performs rolling counts of incoming objects, i.e. sliding window based counting.* <p/>* The bolt is configured by two parameters, the length of the sliding window in seconds (which influences the output* data of the bolt, i.e. how it will count objects) and the emit frequency in seconds (which influences how often the* bolt will output the latest window counts). For instance, if the window length is set to an equivalent of five* minutes and the emit frequency to one minute, then the bolt will output the latest five-minute sliding window every* minute.* <p/>* The bolt emits a rolling count tuple per object, consisting of the object itself, its latest rolling count, and the* actual duration of the sliding window. The latter is included in case the expected sliding window length (as* configured by the user) is different from the actual length, e.g. due to high system load. Note that the actual* window length is tracked and calculated for the window, and not individually for each object within a window.* <p/>* Note: During the startup phase you will usually observe that the bolt warns you about the actual sliding window* length being smaller than the expected length. This behavior is expected and is caused by the way the sliding window* counts are initially "loaded up". You can safely ignore this warning during startup (e.g. you will see this warning* during the first ~ five minutes of startup time if the window length is set to five minutes).*/ public class RollingCountBolt extends BaseRichBolt {private static final long serialVersionUID = 5537727428628598519L;private static final Logger LOG = Logger.getLogger(RollingCountBolt.class);private static final int NUM_WINDOW_CHUNKS = 5;private static final int DEFAULT_SLIDING_WINDOW_IN_SECONDS = NUM_WINDOW_CHUNKS * 60;private static final int DEFAULT_EMIT_FREQUENCY_IN_SECONDS = DEFAULT_SLIDING_WINDOW_IN_SECONDS / NUM_WINDOW_CHUNKS;private static final String WINDOW_LENGTH_WARNING_TEMPLATE ="Actual window length is %d seconds when it should be %d seconds"+ " (you can safely ignore this warning during the startup phase)";private final SlidingWindowCounter<Object> counter;private final int windowLengthInSeconds;private final int emitFrequencyInSeconds;private OutputCollector collector;private NthLastModifiedTimeTracker lastModifiedTracker;public RollingCountBolt() {this(DEFAULT_SLIDING_WINDOW_IN_SECONDS, DEFAULT_EMIT_FREQUENCY_IN_SECONDS);}public RollingCountBolt(int windowLengthInSeconds, int emitFrequencyInSeconds) {this.windowLengthInSeconds = windowLengthInSeconds;this.emitFrequencyInSeconds = emitFrequencyInSeconds;counter = new SlidingWindowCounter<Object>(deriveNumWindowChunksFrom(this.windowLengthInSeconds,this.emitFrequencyInSeconds));}private int deriveNumWindowChunksFrom(int windowLengthInSeconds, int windowUpdateFrequencyInSeconds) {return windowLengthInSeconds / windowUpdateFrequencyInSeconds;}@SuppressWarnings("rawtypes")@Overridepublic void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {this.collector = collector;lastModifiedTracker = new NthLastModifiedTimeTracker(deriveNumWindowChunksFrom(this.windowLengthInSeconds,this.emitFrequencyInSeconds));}@Overridepublic void execute(Tuple tuple) {if (TupleHelpers.isTickTuple(tuple)) {LOG.debug("Received tick tuple, triggering emit of current window counts");emitCurrentWindowCounts();}else {countObjAndAck(tuple);}}private void emitCurrentWindowCounts() {Map<Object, Long> counts = counter.getCountsThenAdvanceWindow();int actualWindowLengthInSeconds = lastModifiedTracker.secondsSinceOldestModification();lastModifiedTracker.markAsModified();if (actualWindowLengthInSeconds != windowLengthInSeconds) {LOG.warn(String.format(WINDOW_LENGTH_WARNING_TEMPLATE, actualWindowLengthInSeconds, windowLengthInSeconds));}emit(counts, actualWindowLengthInSeconds);}private void emit(Map<Object, Long> counts, int actualWindowLengthInSeconds) {for (Entry<Object, Long> entry : counts.entrySet()) {Object obj = entry.getKey();Long count = entry.getValue();collector.emit(new Values(obj, count, actualWindowLengthInSeconds));}}private void countObjAndAck(Tuple tuple) {Object obj = tuple.getValue(0);counter.incrementCount(obj);collector.ack(tuple);}@Overridepublic void declareOutputFields(OutputFieldsDeclarer declarer) {declarer.declare(new Fields("obj", "count", "actualWindowLengthInSeconds"));}@Overridepublic Map<String, Object> getComponentConfiguration() {Map<String, Object> conf = new HashMap<String, Object>();conf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, emitFrequencyInSeconds);return conf;} }

?

?

?

?

?

?

?

?

?

?

?

?

?

?

?

?storm-starter-master\src\jvm\storm\starter\bolt的SingleJoinBolt.java

package storm.starter.bolt;import backtype.storm.Config; import backtype.storm.generated.GlobalStreamId; import backtype.storm.task.OutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.base.BaseRichBolt; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Tuple; import backtype.storm.utils.TimeCacheMap;import java.util.*;public class SingleJoinBolt extends BaseRichBolt {OutputCollector _collector;Fields _idFields;Fields _outFields;int _numSources;TimeCacheMap<List<Object>, Map<GlobalStreamId, Tuple>> _pending;Map<String, GlobalStreamId> _fieldLocations;public SingleJoinBolt(Fields outFields) {_outFields = outFields;}@Overridepublic void prepare(Map conf, TopologyContext context, OutputCollector collector) {_fieldLocations = new HashMap<String, GlobalStreamId>();_collector = collector;int timeout = ((Number) conf.get(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS)).intValue();_pending = new TimeCacheMap<List<Object>, Map<GlobalStreamId, Tuple>>(timeout, new ExpireCallback());_numSources = context.getThisSources().size();Set<String> idFields = null;for (GlobalStreamId source : context.getThisSources().keySet()) {Fields fields = context.getComponentOutputFields(source.get_componentId(), source.get_streamId());Set<String> setFields = new HashSet<String>(fields.toList());if (idFields == null)idFields = setFields;elseidFields.retainAll(setFields);for (String outfield : _outFields) {for (String sourcefield : fields) {if (outfield.equals(sourcefield)) {_fieldLocations.put(outfield, source);}}}}_idFields = new Fields(new ArrayList<String>(idFields));if (_fieldLocations.size() != _outFields.size()) {throw new RuntimeException("Cannot find all outfields among sources");}}@Overridepublic void execute(Tuple tuple) {List<Object> id = tuple.select(_idFields);GlobalStreamId streamId = new GlobalStreamId(tuple.getSourceComponent(), tuple.getSourceStreamId());if (!_pending.containsKey(id)) {_pending.put(id, new HashMap<GlobalStreamId, Tuple>());}Map<GlobalStreamId, Tuple> parts = _pending.get(id);if (parts.containsKey(streamId))throw new RuntimeException("Received same side of single join twice");parts.put(streamId, tuple);if (parts.size() == _numSources) {_pending.remove(id);List<Object> joinResult = new ArrayList<Object>();for (String outField : _outFields) {GlobalStreamId loc = _fieldLocations.get(outField);joinResult.add(parts.get(loc).getValueByField(outField));}_collector.emit(new ArrayList<Tuple>(parts.values()), joinResult);for (Tuple part : parts.values()) {_collector.ack(part);}}}@Overridepublic void declareOutputFields(OutputFieldsDeclarer declarer) {declarer.declare(_outFields);}private class ExpireCallback implements TimeCacheMap.ExpiredCallback<List<Object>, Map<GlobalStreamId, Tuple>> {@Overridepublic void expire(List<Object> id, Map<GlobalStreamId, Tuple> tuples) {for (Tuple tuple : tuples.values()) {_collector.fail(tuple);}}} }

?

?

?

?

?

?

?

?

?

?storm-starter-master\src\jvm\storm\starter\bolt的TotalRankingsBolt.java

?

package storm.starter.bolt;import backtype.storm.tuple.Tuple; import org.apache.log4j.Logger; import storm.starter.tools.Rankings;/*** This bolt merges incoming {@link Rankings}.* <p/>* It can be used to merge intermediate rankings generated by {@link IntermediateRankingsBolt} into a final,* consolidated ranking. To do so, configure this bolt with a globalGrouping on {@link IntermediateRankingsBolt}.*/ public final class TotalRankingsBolt extends AbstractRankerBolt {private static final long serialVersionUID = -8447525895532302198L;private static final Logger LOG = Logger.getLogger(TotalRankingsBolt.class);public TotalRankingsBolt() {super();}public TotalRankingsBolt(int topN) {super(topN);}public TotalRankingsBolt(int topN, int emitFrequencyInSeconds) {super(topN, emitFrequencyInSeconds);}@Overridevoid updateRankingsWithTuple(Tuple tuple) {Rankings rankingsToBeMerged = (Rankings) tuple.getValue(0);super.getRankings().updateWith(rankingsToBeMerged);super.getRankings().pruneZeroCounts();}@OverrideLogger getLogger() {return LOG;}}

?

?

?

?

?

?

?

?

?

?

?

?

?

?

?

storm-starter-master\src\jvm\storm\starter\tools的NthLastModifiedTimeTracker.java

package storm.starter.tools;import backtype.storm.utils.Time; import org.apache.commons.collections.buffer.CircularFifoBuffer;/*** This class tracks the time-since-last-modify of a "thing" in a rolling fashion.* <p/>* For example, create a 5-slot tracker to track the five most recent time-since-last-modify.* <p/>* You must manually "mark" that the "something" that you want to track -- in terms of modification times -- has just* been modified.*/ public class NthLastModifiedTimeTracker {private static final int MILLIS_IN_SEC = 1000;private final CircularFifoBuffer lastModifiedTimesMillis;public NthLastModifiedTimeTracker(int numTimesToTrack) {if (numTimesToTrack < 1) {throw new IllegalArgumentException("numTimesToTrack must be greater than zero (you requested " + numTimesToTrack + ")");}lastModifiedTimesMillis = new CircularFifoBuffer(numTimesToTrack);initLastModifiedTimesMillis();}private void initLastModifiedTimesMillis() {long nowCached = now();for (int i = 0; i < lastModifiedTimesMillis.maxSize(); i++) {lastModifiedTimesMillis.add(Long.valueOf(nowCached));}}private long now() {return Time.currentTimeMillis();}public int secondsSinceOldestModification() {long modifiedTimeMillis = ((Long) lastModifiedTimesMillis.get()).longValue();return (int) ((now() - modifiedTimeMillis) / MILLIS_IN_SEC);}public void markAsModified() {updateLastModifiedTime();}private void updateLastModifiedTime() {lastModifiedTimesMillis.add(now());}}

?

?

?

?

?

?

?

storm-starter-master\src\jvm\storm\starter\tools的Rankable.java

package storm.starter.tools;public interface Rankable extends Comparable<Rankable> {Object getObject();long getCount();/*** Note: We do not defensively copy the object wrapped by the Rankable. It is passed as is.** @return a defensive copy*/Rankable copy(); }

?

?

?

?

?

?

storm-starter-master\src\jvm\storm\starter\tools的RankableObjectWithFields.java

package storm.starter.tools;import backtype.storm.tuple.Tuple; import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists;import java.io.Serializable; import java.util.List;/*** This class wraps an objects and its associated count, including any additional data fields.* <p/>* This class can be used, for instance, to track the number of occurrences of an object in a Storm topology.*/ public class RankableObjectWithFields implements Rankable, Serializable {private static final long serialVersionUID = -9102878650001058090L;private static final String toStringSeparator = "|";private final Object obj;private final long count;private final ImmutableList<Object> fields;public RankableObjectWithFields(Object obj, long count, Object... otherFields) {if (obj == null) {throw new IllegalArgumentException("The object must not be null");}if (count < 0) {throw new IllegalArgumentException("The count must be >= 0");}this.obj = obj;this.count = count;fields = ImmutableList.copyOf(otherFields);}/*** Construct a new instance based on the provided {@link Tuple}.* <p/>* This method expects the object to be ranked in the first field (index 0) of the provided tuple, and the number of* occurrences of the object (its count) in the second field (index 1). Any further fields in the tuple will be* extracted and tracked, too. These fields can be accessed via {@link RankableObjectWithFields#getFields()}.** @param tuple** @return new instance based on the provided tuple*/public static RankableObjectWithFields from(Tuple tuple) {List<Object> otherFields = Lists.newArrayList(tuple.getValues());Object obj = otherFields.remove(0);Long count = (Long) otherFields.remove(0);return new RankableObjectWithFields(obj, count, otherFields.toArray());}public Object getObject() {return obj;}public long getCount() {return count;}/*** @return an immutable list of any additional data fields of the object (may be empty but will never be null)*/public List<Object> getFields() {return fields;}@Overridepublic int compareTo(Rankable other) {long delta = this.getCount() - other.getCount();if (delta > 0) {return 1;}else if (delta < 0) {return -1;}else {return 0;}}@Overridepublic boolean equals(Object o) {if (this == o) {return true;}if (!(o instanceof RankableObjectWithFields)) {return false;}RankableObjectWithFields other = (RankableObjectWithFields) o;return obj.equals(other.obj) && count == other.count;}@Overridepublic int hashCode() {int result = 17;int countHash = (int) (count ^ (count >>> 32));result = 31 * result + countHash;result = 31 * result + obj.hashCode();return result;}public String toString() {StringBuffer buf = new StringBuffer();buf.append("[");buf.append(obj);buf.append(toStringSeparator);buf.append(count);for (Object field : fields) {buf.append(toStringSeparator);buf.append(field);}buf.append("]");return buf.toString();}/*** Note: We do not defensively copy the wrapped object and any accompanying fields. We do guarantee, however,* do return a defensive (shallow) copy of the List object that is wrapping any accompanying fields.** @return*/@Overridepublic Rankable copy() {List<Object> shallowCopyOfFields = ImmutableList.copyOf(getFields());return new RankableObjectWithFields(getObject(), getCount(), shallowCopyOfFields);}}

?

?

?

?

?

?

?

storm-starter-master\src\jvm\storm\starter\tools的Rankings.java

package storm.starter.tools;import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists;import java.io.Serializable; import java.util.Collections; import java.util.List;public class Rankings implements Serializable {private static final long serialVersionUID = -1549827195410578903L;private static final int DEFAULT_COUNT = 10;private final int maxSize;private final List<Rankable> rankedItems = Lists.newArrayList();public Rankings() {this(DEFAULT_COUNT);}public Rankings(int topN) {if (topN < 1) {throw new IllegalArgumentException("topN must be >= 1");}maxSize = topN;}/*** Copy constructor.* @param other*/public Rankings(Rankings other) {this(other.maxSize());updateWith(other);}/*** @return the maximum possible number (size) of ranked objects this instance can hold*/public int maxSize() {return maxSize;}/*** @return the number (size) of ranked objects this instance is currently holding*/public int size() {return rankedItems.size();}/*** The returned defensive copy is only "somewhat" defensive. We do, for instance, return a defensive copy of the* enclosing List instance, and we do try to defensively copy any contained Rankable objects, too. However, the* contract of {@link storm.starter.tools.Rankable#copy()} does not guarantee that any Object's embedded within* a Rankable will be defensively copied, too.** @return a somewhat defensive copy of ranked items*/public List<Rankable> getRankings() {List<Rankable> copy = Lists.newLinkedList();for (Rankable r: rankedItems) {copy.add(r.copy());}return ImmutableList.copyOf(copy);}public void updateWith(Rankings other) {for (Rankable r : other.getRankings()) {updateWith(r);}}public void updateWith(Rankable r) {synchronized(rankedItems) {addOrReplace(r);rerank();shrinkRankingsIfNeeded();}}private void addOrReplace(Rankable r) {Integer rank = findRankOf(r);if (rank != null) {rankedItems.set(rank, r);}else {rankedItems.add(r);}}private Integer findRankOf(Rankable r) {Object tag = r.getObject();for (int rank = 0; rank < rankedItems.size(); rank++) {Object cur = rankedItems.get(rank).getObject();if (cur.equals(tag)) {return rank;}}return null;}private void rerank() {Collections.sort(rankedItems);Collections.reverse(rankedItems);}private void shrinkRankingsIfNeeded() {if (rankedItems.size() > maxSize) {rankedItems.remove(maxSize);}}/*** Removes ranking entries that have a count of zero.*/public void pruneZeroCounts() {int i = 0;while (i < rankedItems.size()) {if (rankedItems.get(i).getCount() == 0) {rankedItems.remove(i);}else {i++;}}}public String toString() {return rankedItems.toString();}/*** Creates a (defensive) copy of itself.*/public Rankings copy() {return new Rankings(this);} }

?

?

?

?

?

?

?

storm-starter-master\src\jvm\storm\starter\tools的SlidingWindowCounter.java

package storm.starter.tools;import java.io.Serializable; import java.util.Map;/*** This class counts objects in a sliding window fashion.* <p/>* It is designed 1) to give multiple "producer" threads write access to the counter, i.e. being able to increment* counts of objects, and 2) to give a single "consumer" thread (e.g. {@link PeriodicSlidingWindowCounter}) read access* to the counter. Whenever the consumer thread performs a read operation, this class will advance the head slot of the* sliding window counter. This means that the consumer thread indirectly controls where writes of the producer threads* will go to. Also, by itself this class will not advance the head slot.* <p/>* A note for analyzing data based on a sliding window count: During the initial <code>windowLengthInSlots</code>* iterations, this sliding window counter will always return object counts that are equal or greater than in the* previous iteration. This is the effect of the counter "loading up" at the very start of its existence. Conceptually,* this is the desired behavior.* <p/>* To give an example, using a counter with 5 slots which for the sake of this example represent 1 minute of time each:* <p/>* <pre>* {@code* Sliding window counts of an object X over time** Minute (timeline):* 1 2 3 4 5 6 7 8** Observed counts per minute:* 1 1 1 1 0 0 0 0** Counts returned by counter:* 1 2 3 4 4 3 2 1* }* </pre>* <p/>* As you can see in this example, for the first <code>windowLengthInSlots</code> (here: the first five minutes) the* counter will always return counts equal or greater than in the previous iteration (1, 2, 3, 4, 4). This initial load* effect needs to be accounted for whenever you want to perform analyses such as trending topics; otherwise your* analysis algorithm might falsely identify the object to be trending as the counter seems to observe continuously* increasing counts. Also, note that during the initial load phase <em>every object</em> will exhibit increasing* counts.* <p/>* On a high-level, the counter exhibits the following behavior: If you asked the example counter after two minutes,* "how often did you count the object during the past five minutes?", then it should reply "I have counted it 2 times* in the past five minutes", implying that it can only account for the last two of those five minutes because the* counter was not running before that time.** @param <T> The type of those objects we want to count.*/ public final class SlidingWindowCounter<T> implements Serializable {private static final long serialVersionUID = -2645063988768785810L;private SlotBasedCounter<T> objCounter;private int headSlot;private int tailSlot;private int windowLengthInSlots;public SlidingWindowCounter(int windowLengthInSlots) {if (windowLengthInSlots < 2) {throw new IllegalArgumentException("Window length in slots must be at least two (you requested " + windowLengthInSlots + ")");}this.windowLengthInSlots = windowLengthInSlots;this.objCounter = new SlotBasedCounter<T>(this.windowLengthInSlots);this.headSlot = 0;this.tailSlot = slotAfter(headSlot);}public void incrementCount(T obj) {objCounter.incrementCount(obj, headSlot);}/*** Return the current (total) counts of all tracked objects, then advance the window.* <p/>* Whenever this method is called, we consider the counts of the current sliding window to be available to and* successfully processed "upstream" (i.e. by the caller). Knowing this we will start counting any subsequent* objects within the next "chunk" of the sliding window.** @return The current (total) counts of all tracked objects.*/public Map<T, Long> getCountsThenAdvanceWindow() {Map<T, Long> counts = objCounter.getCounts();objCounter.wipeZeros();objCounter.wipeSlot(tailSlot);advanceHead();return counts;}private void advanceHead() {headSlot = tailSlot;tailSlot = slotAfter(tailSlot);}private int slotAfter(int slot) {return (slot + 1) % windowLengthInSlots;}}

?

?

?

?

?

?

?

?

storm-starter-master\src\jvm\storm\starter\tools的SlotBasedCounter.java

package storm.starter.tools;import java.io.Serializable; import java.util.HashMap; import java.util.HashSet; import java.util.Map; import java.util.Set;/*** This class provides per-slot counts of the occurrences of objects.* <p/>* It can be used, for instance, as a building block for implementing sliding window counting of objects.** @param <T> The type of those objects we want to count.*/ public final class SlotBasedCounter<T> implements Serializable {private static final long serialVersionUID = 4858185737378394432L;private final Map<T, long[]> objToCounts = new HashMap<T, long[]>();private final int numSlots;public SlotBasedCounter(int numSlots) {if (numSlots <= 0) {throw new IllegalArgumentException("Number of slots must be greater than zero (you requested " + numSlots + ")");}this.numSlots = numSlots;}public void incrementCount(T obj, int slot) {long[] counts = objToCounts.get(obj);if (counts == null) {counts = new long[this.numSlots];objToCounts.put(obj, counts);}counts[slot]++;}public long getCount(T obj, int slot) {long[] counts = objToCounts.get(obj);if (counts == null) {return 0;}else {return counts[slot];}}public Map<T, Long> getCounts() {Map<T, Long> result = new HashMap<T, Long>();for (T obj : objToCounts.keySet()) {result.put(obj, computeTotalCount(obj));}return result;}private long computeTotalCount(T obj) {long[] curr = objToCounts.get(obj);long total = 0;for (long l : curr) {total += l;}return total;}/*** Reset the slot count of any tracked objects to zero for the given slot.** @param slot*/public void wipeSlot(int slot) {for (T obj : objToCounts.keySet()) {resetSlotCountToZero(obj, slot);}}private void resetSlotCountToZero(T obj, int slot) {long[] counts = objToCounts.get(obj);counts[slot] = 0;}private boolean shouldBeRemovedFromCounter(T obj) {return computeTotalCount(obj) == 0;}/*** Remove any object from the counter whose total count is zero (to free up memory).*/public void wipeZeros() {Set<T> objToBeRemoved = new HashSet<T>();for (T obj : objToCounts.keySet()) {if (shouldBeRemovedFromCounter(obj)) {objToBeRemoved.add(obj);}}for (T obj : objToBeRemoved) {objToCounts.remove(obj);}}}

?

?

?

?

?

?

?

?

?

?

?

?

storm-starter-master\src\jvm\storm\starter\trident的TridentReach.java

package storm.starter.trident;import backtype.storm.Config; import backtype.storm.LocalCluster; import backtype.storm.LocalDRPC; import backtype.storm.generated.StormTopology; import backtype.storm.task.IMetricsContext; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Values; import storm.trident.TridentState; import storm.trident.TridentTopology; import storm.trident.operation.BaseFunction; import storm.trident.operation.CombinerAggregator; import storm.trident.operation.TridentCollector; import storm.trident.operation.builtin.MapGet; import storm.trident.operation.builtin.Sum; import storm.trident.state.ReadOnlyState; import storm.trident.state.State; import storm.trident.state.StateFactory; import storm.trident.state.map.ReadOnlyMapState; import storm.trident.tuple.TridentTuple;import java.util.*;public class TridentReach {public static Map<String, List<String>> TWEETERS_DB = new HashMap<String, List<String>>() {{put("foo.com/blog/1", Arrays.asList("sally", "bob", "tim", "george", "nathan"));put("engineering.twitter.com/blog/5", Arrays.asList("adam", "david", "sally", "nathan"));put("tech.backtype.com/blog/123", Arrays.asList("tim", "mike", "john"));}};public static Map<String, List<String>> FOLLOWERS_DB = new HashMap<String, List<String>>() {{put("sally", Arrays.asList("bob", "tim", "alice", "adam", "jim", "chris", "jai"));put("bob", Arrays.asList("sally", "nathan", "jim", "mary", "david", "vivian"));put("tim", Arrays.asList("alex"));put("nathan", Arrays.asList("sally", "bob", "adam", "harry", "chris", "vivian", "emily", "jordan"));put("adam", Arrays.asList("david", "carissa"));put("mike", Arrays.asList("john", "bob"));put("john", Arrays.asList("alice", "nathan", "jim", "mike", "bob"));}};public static class StaticSingleKeyMapState extends ReadOnlyState implements ReadOnlyMapState<Object> {public static class Factory implements StateFactory {Map _map;public Factory(Map map) {_map = map;}@Overridepublic State makeState(Map conf, IMetricsContext metrics, int partitionIndex, int numPartitions) {return new StaticSingleKeyMapState(_map);}}Map _map;public StaticSingleKeyMapState(Map map) {_map = map;}@Overridepublic List<Object> multiGet(List<List<Object>> keys) {List<Object> ret = new ArrayList();for (List<Object> key : keys) {Object singleKey = key.get(0);ret.add(_map.get(singleKey));}return ret;}}public static class One implements CombinerAggregator<Integer> {@Overridepublic Integer init(TridentTuple tuple) {return 1;}@Overridepublic Integer combine(Integer val1, Integer val2) {return 1;}@Overridepublic Integer zero() {return 1;}}public static class ExpandList extends BaseFunction {@Overridepublic void execute(TridentTuple tuple, TridentCollector collector) {List l = (List) tuple.getValue(0);if (l != null) {for (Object o : l) {collector.emit(new Values(o));}}}}public static StormTopology buildTopology(LocalDRPC drpc) {TridentTopology topology = new TridentTopology();TridentState urlToTweeters = topology.newStaticState(new StaticSingleKeyMapState.Factory(TWEETERS_DB));TridentState tweetersToFollowers = topology.newStaticState(new StaticSingleKeyMapState.Factory(FOLLOWERS_DB));topology.newDRPCStream("reach", drpc).stateQuery(urlToTweeters, new Fields("args"), new MapGet(), new Fields("tweeters")).each(new Fields("tweeters"), new ExpandList(), new Fields("tweeter")).shuffle().stateQuery(tweetersToFollowers, new Fields("tweeter"), new MapGet(), new Fields("followers")).each(new Fields("followers"),new ExpandList(), new Fields("follower")).groupBy(new Fields("follower")).aggregate(new One(), new Fields("one")).aggregate(new Fields("one"), new Sum(), new Fields("reach"));return topology.build();}public static void main(String[] args) throws Exception {LocalDRPC drpc = new LocalDRPC();Config conf = new Config();LocalCluster cluster = new LocalCluster();cluster.submitTopology("reach", conf, buildTopology(drpc));Thread.sleep(2000);System.out.println("REACH: " + drpc.execute("reach", "aaa"));System.out.println("REACH: " + drpc.execute("reach", "foo.com/blog/1"));System.out.println("REACH: " + drpc.execute("reach", "engineering.twitter.com/blog/5"));cluster.shutdown();drpc.shutdown();} }

?

?

?

?

?

?

?

?storm-starter-master\src\jvm\storm\starter\trident的TridentWordCount.java

package storm.starter.trident;import backtype.storm.Config; import backtype.storm.LocalCluster; import backtype.storm.LocalDRPC; import backtype.storm.StormSubmitter; import backtype.storm.generated.StormTopology; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Values; import storm.trident.TridentState; import storm.trident.TridentTopology; import storm.trident.operation.BaseFunction; import storm.trident.operation.TridentCollector; import storm.trident.operation.builtin.Count; import storm.trident.operation.builtin.FilterNull; import storm.trident.operation.builtin.MapGet; import storm.trident.operation.builtin.Sum; import storm.trident.testing.FixedBatchSpout; import storm.trident.testing.MemoryMapState; import storm.trident.tuple.TridentTuple;public class TridentWordCount {public static class Split extends BaseFunction {@Overridepublic void execute(TridentTuple tuple, TridentCollector collector) {String sentence = tuple.getString(0);for (String word : sentence.split(" ")) {collector.emit(new Values(word));}}}public static StormTopology buildTopology(LocalDRPC drpc) {FixedBatchSpout spout = new FixedBatchSpout(new Fields("sentence"), 3, new Values("the cow jumped over the moon"),new Values("the man went to the store and bought some candy"), new Values("four score and seven years ago"),new Values("how many apples can you eat"), new Values("to be or not to be the person"));spout.setCycle(true);TridentTopology topology = new TridentTopology();TridentState wordCounts = topology.newStream("spout1", spout).parallelismHint(16).each(new Fields("sentence"),new Split(), new Fields("word")).groupBy(new Fields("word")).persistentAggregate(new MemoryMapState.Factory(),new Count(), new Fields("count")).parallelismHint(16);topology.newDRPCStream("words", drpc).each(new Fields("args"), new Split(), new Fields("word")).groupBy(new Fields("word")).stateQuery(wordCounts, new Fields("word"), new MapGet(), new Fields("count")).each(new Fields("count"),new FilterNull()).aggregate(new Fields("count"), new Sum(), new Fields("sum"));return topology.build();}public static void main(String[] args) throws Exception {Config conf = new Config();conf.setMaxSpoutPending(20);if (args.length == 0) {LocalDRPC drpc = new LocalDRPC();LocalCluster cluster = new LocalCluster();cluster.submitTopology("wordCounter", conf, buildTopology(drpc));for (int i = 0; i < 100; i++) {System.out.println("DRPC RESULT: " + drpc.execute("words", "cat the dog jumped"));Thread.sleep(1000);}}else {conf.setNumWorkers(3);StormSubmitter.submitTopology(args[0], conf, buildTopology(null));}} }

?

?

?

?

?

?

?

?

?

?

?

?

?

storm-starter-master\src\jvm\storm\starter\util的StormRunner.java

package storm.starter.util;import backtype.storm.Config; import backtype.storm.LocalCluster; import backtype.storm.generated.StormTopology;public final class StormRunner {private static final int MILLIS_IN_SEC = 1000;private StormRunner() {}public static void runTopologyLocally(StormTopology topology, String topologyName, Config conf, int runtimeInSeconds)throws InterruptedException {LocalCluster cluster = new LocalCluster();cluster.submitTopology(topologyName, conf, topology);Thread.sleep((long) runtimeInSeconds * MILLIS_IN_SEC);cluster.killTopology(topologyName);cluster.shutdown();} }

?

?

?

?

?

?

?storm-starter-master\src\jvm\storm\starter\util的TupleHelpers.java

package storm.starter.util;import backtype.storm.Constants; import backtype.storm.tuple.Tuple;public final class TupleHelpers {private TupleHelpers() {}public static boolean isTickTuple(Tuple tuple) {return tuple.getSourceComponent().equals(Constants.SYSTEM_COMPONENT_ID) && tuple.getSourceStreamId().equals(Constants.SYSTEM_TICK_STREAM_ID);}}

藏經閣技術資料分享群二維碼

轉載于:https://www.cnblogs.com/wangsongbai/p/9122725.html

總結

以上是生活随笔為你收集整理的Storm概念学习系列之storm-starter项目(完整版)(博主推荐)的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。

日本一区二区三区免费高清 | 99麻豆久久久国产精品免费 | 性欧美大战久久久久久久 | 牛和人交xxxx欧美 | 精品国偷自产在线 | 超碰97人人射妻 | 青青青手机频在线观看 | 国产精品无码一区二区桃花视频 | 精品无码国产一区二区三区av | 18禁黄网站男男禁片免费观看 | 无人区乱码一区二区三区 | 免费人成在线观看网站 | 丁香花在线影院观看在线播放 | 成人无码视频在线观看网站 | 久久亚洲国产成人精品性色 | 久激情内射婷内射蜜桃人妖 | 国产明星裸体无码xxxx视频 | 99re在线播放 | 国产欧美精品一区二区三区 | 久久精品中文闷骚内射 | 中文字幕av日韩精品一区二区 | 久精品国产欧美亚洲色aⅴ大片 | 极品嫩模高潮叫床 | 亚洲 另类 在线 欧美 制服 | 国产精品人人爽人人做我的可爱 | 中文字幕色婷婷在线视频 | 正在播放老肥熟妇露脸 | 免费看少妇作爱视频 | 久青草影院在线观看国产 | 在线亚洲高清揄拍自拍一品区 | 国产精品va在线观看无码 | 亚洲欧美综合区丁香五月小说 | 内射老妇bbwx0c0ck | 色婷婷欧美在线播放内射 | 丝袜足控一区二区三区 | 学生妹亚洲一区二区 | 无码精品国产va在线观看dvd | 野狼第一精品社区 | 久久国产36精品色熟妇 | 日本爽爽爽爽爽爽在线观看免 | 国产成人无码a区在线观看视频app | 影音先锋中文字幕无码 | 中文字幕无码日韩欧毛 | 国产精品久久久av久久久 | 日韩精品无码一区二区中文字幕 | 精品无码一区二区三区的天堂 | 俄罗斯老熟妇色xxxx | 亚洲成a人片在线观看无码 | 亚洲狠狠色丁香婷婷综合 | 亚洲成a人片在线观看日本 | 97久久国产亚洲精品超碰热 | 亚洲精品中文字幕 | 亚洲精品国偷拍自产在线麻豆 | 国产精品毛片一区二区 | 97精品国产97久久久久久免费 | 久久精品国产精品国产精品污 | 熟妇人妻激情偷爽文 | 人妻尝试又大又粗久久 | 国产精品多人p群无码 | 久久久中文字幕日本无吗 | 国产无遮挡又黄又爽又色 | 亚洲精品国产品国语在线观看 | 亚洲综合另类小说色区 | 中文字幕乱码人妻无码久久 | 久精品国产欧美亚洲色aⅴ大片 | 无码国产乱人伦偷精品视频 | 久久久久亚洲精品中文字幕 | 精品无码国产一区二区三区av | 国内揄拍国内精品少妇国语 | 内射后入在线观看一区 | 国内精品九九久久久精品 | 中文无码成人免费视频在线观看 | 暴力强奷在线播放无码 | 国产麻豆精品精东影业av网站 | 亚洲综合在线一区二区三区 | 在线播放亚洲第一字幕 | 精品人人妻人人澡人人爽人人 | 亚洲 a v无 码免 费 成 人 a v | 永久免费观看美女裸体的网站 | 国产成人亚洲综合无码 | 国产无套粉嫩白浆在线 | 丰满少妇人妻久久久久久 | 国产在线无码精品电影网 | 国产精品对白交换视频 | 国产精品高潮呻吟av久久 | 欧美激情内射喷水高潮 | 午夜福利电影 | 亚洲精品久久久久中文第一幕 | 九一九色国产 | 强伦人妻一区二区三区视频18 | 99久久人妻精品免费一区 | 国产精品久久久久影院嫩草 | 在线观看国产一区二区三区 | 久久久国产一区二区三区 | 欧美日本精品一区二区三区 | 激情国产av做激情国产爱 | 美女极度色诱视频国产 | 性欧美疯狂xxxxbbbb | 一本大道久久东京热无码av | 国产又粗又硬又大爽黄老大爷视 | 六月丁香婷婷色狠狠久久 | 亚洲成色www久久网站 | 日韩精品无码免费一区二区三区 | 亚洲一区二区三区无码久久 | 日韩精品久久久肉伦网站 | 精品乱码久久久久久久 | 妺妺窝人体色www在线小说 | 少妇性l交大片 | 久久久中文字幕日本无吗 | 国产一区二区三区影院 | аⅴ资源天堂资源库在线 | 亚洲国产av美女网站 | 少妇无套内谢久久久久 | 亚洲aⅴ无码成人网站国产app | 午夜精品一区二区三区在线观看 | 内射白嫩少妇超碰 | 我要看www免费看插插视频 | 欧美精品在线观看 | 性色欲网站人妻丰满中文久久不卡 | 国产成人一区二区三区别 | 中文字幕+乱码+中文字幕一区 | 鲁鲁鲁爽爽爽在线视频观看 | 国产真人无遮挡作爱免费视频 | 曰韩无码二三区中文字幕 | 亚洲中文字幕在线无码一区二区 | 国产成人精品久久亚洲高清不卡 | 日日摸天天摸爽爽狠狠97 | 亚洲精品国产第一综合99久久 | 激情爆乳一区二区三区 | 少妇无套内谢久久久久 | 中文字幕av日韩精品一区二区 | 2020久久超碰国产精品最新 | 欧美 丝袜 自拍 制服 另类 | 色婷婷欧美在线播放内射 | 国产熟女一区二区三区四区五区 | 欧美丰满少妇xxxx性 | 亚洲欧洲中文日韩av乱码 | 好屌草这里只有精品 | 亚洲 激情 小说 另类 欧美 | 7777奇米四色成人眼影 | 欧美激情综合亚洲一二区 | 狂野欧美性猛交免费视频 | 亚洲人成网站免费播放 | 欧洲vodafone精品性 | 亚洲人成网站在线播放942 | 久久久婷婷五月亚洲97号色 | 全黄性性激高免费视频 | аⅴ资源天堂资源库在线 | 久久国语露脸国产精品电影 | 欧美日韩综合一区二区三区 | 亚洲国产高清在线观看视频 | 日日天干夜夜狠狠爱 | 日本精品人妻无码77777 天堂一区人妻无码 | 俄罗斯老熟妇色xxxx | 东京无码熟妇人妻av在线网址 | 国产成人精品优优av | 亚洲国产精品久久久久久 | 国产精品久久国产三级国 | 国产精品久久久久久久9999 | 免费播放一区二区三区 | 国产精品亚洲а∨无码播放麻豆 | 日韩视频 中文字幕 视频一区 | 久久久精品欧美一区二区免费 | 大屁股大乳丰满人妻 | 国内丰满熟女出轨videos | 国产香蕉尹人综合在线观看 | 国产精品久久久久影院嫩草 | 亲嘴扒胸摸屁股激烈网站 | 久久久中文久久久无码 | 国产精品人妻一区二区三区四 | 免费看男女做好爽好硬视频 | 日产精品99久久久久久 | 伊人久久大香线蕉av一区二区 | 永久免费观看美女裸体的网站 | 野外少妇愉情中文字幕 | 中文字幕av日韩精品一区二区 | 日本又色又爽又黄的a片18禁 | 奇米影视888欧美在线观看 | 国产内射爽爽大片视频社区在线 | 丰满人妻翻云覆雨呻吟视频 | 日本xxxx色视频在线观看免费 | 女人被爽到呻吟gif动态图视看 | 日本一区二区三区免费高清 | 亚洲中文字幕成人无码 | 国产内射老熟女aaaa | 无码人妻丰满熟妇区五十路百度 | 久久精品国产99久久6动漫 | 国产成人无码av片在线观看不卡 | 日日麻批免费40分钟无码 | 最近免费中文字幕中文高清百度 | 高潮毛片无遮挡高清免费视频 | 国内精品九九久久久精品 | 初尝人妻少妇中文字幕 | 无码精品人妻一区二区三区av | 强伦人妻一区二区三区视频18 | 一本久道久久综合婷婷五月 | 麻豆果冻传媒2021精品传媒一区下载 | 国产9 9在线 | 中文 | 中文字幕中文有码在线 | 亚洲日韩一区二区三区 | 国产另类ts人妖一区二区 | 无码毛片视频一区二区本码 | 十八禁真人啪啪免费网站 | 人妻体内射精一区二区三四 | 狂野欧美性猛xxxx乱大交 | 国产成人精品久久亚洲高清不卡 | 国产熟妇另类久久久久 | 国产成人无码av一区二区 | 偷窥村妇洗澡毛毛多 | 无码人妻av免费一区二区三区 | www一区二区www免费 | 国产精品.xx视频.xxtv | 正在播放老肥熟妇露脸 | 欧美刺激性大交 | 国产成人人人97超碰超爽8 | 国产成人无码av一区二区 | 久久久久久av无码免费看大片 | 国产无av码在线观看 | 国产办公室秘书无码精品99 | 狠狠色欧美亚洲狠狠色www | 亚洲国产精品美女久久久久 | 国产精品怡红院永久免费 | 在线欧美精品一区二区三区 | 成人免费视频视频在线观看 免费 | 无套内谢的新婚少妇国语播放 | 欧美 日韩 人妻 高清 中文 | 熟妇人妻无码xxx视频 | 国产麻豆精品一区二区三区v视界 | 日日碰狠狠丁香久燥 | 99久久亚洲精品无码毛片 | 亚洲热妇无码av在线播放 | 国产精品久久久久久久影院 | 野狼第一精品社区 | 无码成人精品区在线观看 | 亚洲色成人中文字幕网站 | 撕开奶罩揉吮奶头视频 | 无码任你躁久久久久久久 | 东京热一精品无码av | 免费乱码人妻系列无码专区 | 精品无人区无码乱码毛片国产 | 97久久超碰中文字幕 | 蜜桃av蜜臀av色欲av麻 999久久久国产精品消防器材 | 一本大道伊人av久久综合 | 奇米影视888欧美在线观看 | 夜夜躁日日躁狠狠久久av | aⅴ在线视频男人的天堂 | 亚洲综合在线一区二区三区 | 久久久久久国产精品无码下载 | 四虎4hu永久免费 | 日韩成人一区二区三区在线观看 | 亚洲第一网站男人都懂 | 久久伊人色av天堂九九小黄鸭 | 国内丰满熟女出轨videos | 国产疯狂伦交大片 | 亚洲爆乳大丰满无码专区 | 高潮毛片无遮挡高清免费视频 | аⅴ资源天堂资源库在线 | 人妻插b视频一区二区三区 | 青青久在线视频免费观看 | 国产乱码精品一品二品 | 特级做a爰片毛片免费69 | 无人区乱码一区二区三区 | 亚洲国产成人a精品不卡在线 | 日本精品人妻无码免费大全 | 在线 国产 欧美 亚洲 天堂 | 精品久久8x国产免费观看 | 亚洲欧美国产精品久久 | 日韩av无码一区二区三区不卡 | 伊人久久大香线蕉av一区二区 | 国产女主播喷水视频在线观看 | 国产97色在线 | 免 | 亚洲综合在线一区二区三区 | 国产av久久久久精东av | 小鲜肉自慰网站xnxx | 99久久久无码国产精品免费 | 99久久99久久免费精品蜜桃 | 扒开双腿吃奶呻吟做受视频 | 国产午夜无码视频在线观看 | 18禁止看的免费污网站 | 日本护士毛茸茸高潮 | 国产无遮挡又黄又爽又色 | 亚洲欧美精品伊人久久 | 国产黄在线观看免费观看不卡 | 亚拍精品一区二区三区探花 | 国产av一区二区精品久久凹凸 | 中文精品无码中文字幕无码专区 | 丰满少妇人妻久久久久久 | 欧洲熟妇色 欧美 | 国产av剧情md精品麻豆 | 国产精品第一区揄拍无码 | 国产亚洲精品久久久久久 | 日本xxxx色视频在线观看免费 | 兔费看少妇性l交大片免费 | 国产精品亚洲综合色区韩国 | 国产欧美精品一区二区三区 | 亚洲精品午夜国产va久久成人 | 亚洲色欲久久久综合网东京热 | 欧洲精品码一区二区三区免费看 | 国产又粗又硬又大爽黄老大爷视 | 日韩亚洲欧美中文高清在线 | 强开小婷嫩苞又嫩又紧视频 | 亚洲精品一区二区三区婷婷月 | 天堂亚洲2017在线观看 | 日日天日日夜日日摸 | 性生交片免费无码看人 | 国产精品无码久久av | 国产成人无码专区 | 玩弄中年熟妇正在播放 | 亚洲自偷精品视频自拍 | 日本一区二区三区免费高清 | 久久久久久九九精品久 | 天堂а√在线地址中文在线 | 国产香蕉尹人视频在线 | 人人澡人人透人人爽 | 青青青爽视频在线观看 | 无码一区二区三区在线 | 好男人www社区 | 97人妻精品一区二区三区 | 蜜桃视频韩日免费播放 | 日本丰满护士爆乳xxxx | 国产人妻精品午夜福利免费 | 无码毛片视频一区二区本码 | 天天躁日日躁狠狠躁免费麻豆 | 精品人妻中文字幕有码在线 | 玩弄少妇高潮ⅹxxxyw | 久久久久成人片免费观看蜜芽 | 波多野结衣av一区二区全免费观看 | 麻豆蜜桃av蜜臀av色欲av | 在线 国产 欧美 亚洲 天堂 | 日本熟妇人妻xxxxx人hd | 亚洲啪av永久无码精品放毛片 | 精品国产福利一区二区 | 色一情一乱一伦一区二区三欧美 | 天天摸天天碰天天添 | 又湿又紧又大又爽a视频国产 | 国产精品理论片在线观看 | 亚洲国产综合无码一区 | 熟妇激情内射com | 7777奇米四色成人眼影 | 国产在线一区二区三区四区五区 | 漂亮人妻洗澡被公强 日日躁 | 亚洲国产一区二区三区在线观看 | 亚洲男人av天堂午夜在 | 精品一二三区久久aaa片 | 国产色视频一区二区三区 | 久久久久亚洲精品中文字幕 | 日日摸夜夜摸狠狠摸婷婷 | 国内精品人妻无码久久久影院蜜桃 | 综合人妻久久一区二区精品 | 熟妇人妻中文av无码 | 无码精品人妻一区二区三区av | 国产三级精品三级男人的天堂 | 大地资源中文第3页 | 99精品视频在线观看免费 | 国产午夜手机精彩视频 | 欧美精品在线观看 | 日本一区二区三区免费播放 | 国产人妻精品一区二区三区不卡 | 欧洲vodafone精品性 | 老司机亚洲精品影院 | 无套内谢的新婚少妇国语播放 | 婷婷五月综合缴情在线视频 | 特黄特色大片免费播放器图片 | 国产明星裸体无码xxxx视频 | 国产农村妇女aaaaa视频 撕开奶罩揉吮奶头视频 | 性欧美大战久久久久久久 | 九九热爱视频精品 | 欧美性生交活xxxxxdddd | 国产乱人偷精品人妻a片 | 少妇激情av一区二区 | 日韩人妻系列无码专区 | 男女猛烈xx00免费视频试看 | 欧美丰满熟妇xxxx性ppx人交 | 婷婷丁香五月天综合东京热 | 久久久久久国产精品无码下载 | 波多野结衣一区二区三区av免费 | 天堂无码人妻精品一区二区三区 | 亚洲成色在线综合网站 | 性色av无码免费一区二区三区 | 国产成人精品优优av | 国产电影无码午夜在线播放 | 在线欧美精品一区二区三区 | 丝袜 中出 制服 人妻 美腿 | 国产尤物精品视频 | 精品一区二区三区波多野结衣 | 国内揄拍国内精品少妇国语 | 欧美丰满老熟妇xxxxx性 | a片免费视频在线观看 | 亚洲春色在线视频 | 99er热精品视频 | 丝袜美腿亚洲一区二区 | 久久久久免费精品国产 | 精品久久久无码中文字幕 | 无码人妻丰满熟妇区五十路百度 | aⅴ在线视频男人的天堂 | 国产av一区二区精品久久凹凸 | 国产午夜亚洲精品不卡下载 | 在线播放免费人成毛片乱码 | 嫩b人妻精品一区二区三区 | 亚洲日韩av一区二区三区中文 | 亚欧洲精品在线视频免费观看 | 无码人妻丰满熟妇区五十路百度 | 国产精品自产拍在线观看 | 少妇人妻大乳在线视频 | 国内老熟妇对白xxxxhd | 亚洲一区二区三区国产精华液 | 国产精品久久久久久亚洲影视内衣 | 久久99精品国产麻豆蜜芽 | 色综合久久88色综合天天 | 99久久人妻精品免费二区 | 国内少妇偷人精品视频 | 欧美日韩综合一区二区三区 | 国产成人无码a区在线观看视频app | 成人试看120秒体验区 | 国产一区二区三区四区五区加勒比 | 亚洲一区二区观看播放 | 色婷婷综合中文久久一本 | 国产色视频一区二区三区 | 在线天堂新版最新版在线8 | 国内精品九九久久久精品 | 日韩少妇内射免费播放 | 亚洲中文字幕成人无码 | 少妇厨房愉情理9仑片视频 | 曰韩少妇内射免费播放 | 精品国精品国产自在久国产87 | 在线看片无码永久免费视频 | 国产精品久久久久无码av色戒 | 亚洲成av人在线观看网址 | 中文字幕av无码一区二区三区电影 | 国产xxx69麻豆国语对白 | 国产又爽又猛又粗的视频a片 | 免费观看的无遮挡av | av在线亚洲欧洲日产一区二区 | 自拍偷自拍亚洲精品10p | 永久免费精品精品永久-夜色 | 天天躁日日躁狠狠躁免费麻豆 | 国产高清不卡无码视频 | 久久www免费人成人片 | 午夜熟女插插xx免费视频 | 国产成人亚洲综合无码 | 曰本女人与公拘交酡免费视频 | 欧美日韩视频无码一区二区三 | 日韩欧美群交p片內射中文 | 在线观看欧美一区二区三区 | 未满小14洗澡无码视频网站 | 成人精品视频一区二区三区尤物 | 亚洲娇小与黑人巨大交 | 日韩欧美群交p片內射中文 | 18禁黄网站男男禁片免费观看 | 久久久久se色偷偷亚洲精品av | 欧美肥老太牲交大战 | 亚洲精品一区二区三区四区五区 | 亚洲gv猛男gv无码男同 | 中文毛片无遮挡高清免费 | 老头边吃奶边弄进去呻吟 | 99riav国产精品视频 | 精品国产av色一区二区深夜久久 | 日韩欧美成人免费观看 | 午夜福利一区二区三区在线观看 | 无码人妻黑人中文字幕 | 男女作爱免费网站 | 国产人妖乱国产精品人妖 | 伊人久久婷婷五月综合97色 | 亚洲精品国产精品乱码不卡 | 精品成在人线av无码免费看 | 少妇太爽了在线观看 | 高清无码午夜福利视频 | 大屁股大乳丰满人妻 | 亚洲va中文字幕无码久久不卡 | 久久综合香蕉国产蜜臀av | 人妻少妇精品无码专区动漫 | 亚洲欧美国产精品专区久久 | 小sao货水好多真紧h无码视频 | 亚洲精品中文字幕乱码 | 成人免费视频视频在线观看 免费 | 天天做天天爱天天爽综合网 | 婷婷色婷婷开心五月四房播播 | 无码中文字幕色专区 | 国内综合精品午夜久久资源 | 激情五月综合色婷婷一区二区 | 亚洲国产精品久久人人爱 | 99re在线播放 | 精品久久久中文字幕人妻 | 国产成人综合色在线观看网站 | 男女性色大片免费网站 | 国产极品视觉盛宴 | 波多野42部无码喷潮在线 | 久久久精品456亚洲影院 | 无套内谢老熟女 | 色婷婷欧美在线播放内射 | 亚洲国产午夜精品理论片 | 成在人线av无码免费 | 亚洲精品成人福利网站 | 精品国产乱码久久久久乱码 | 色婷婷久久一区二区三区麻豆 | 性做久久久久久久免费看 | 18禁止看的免费污网站 | 亚洲精品国产a久久久久久 | 国内综合精品午夜久久资源 | 曰韩无码二三区中文字幕 | 最新版天堂资源中文官网 | 亚洲综合另类小说色区 | 在线亚洲高清揄拍自拍一品区 | 日韩人妻无码一区二区三区久久99 | 国产网红无码精品视频 | 午夜精品久久久内射近拍高清 | 精品乱码久久久久久久 | 欧洲精品码一区二区三区免费看 | 三级4级全黄60分钟 | 日韩人妻系列无码专区 | 久久久中文字幕日本无吗 | 最新国产乱人伦偷精品免费网站 | 色欲人妻aaaaaaa无码 | 国内精品久久久久久中文字幕 | 国内精品久久毛片一区二区 | 久久无码中文字幕免费影院蜜桃 | 国产乡下妇女做爰 | 人人爽人人澡人人人妻 | 日日橹狠狠爱欧美视频 | 人妻aⅴ无码一区二区三区 | 一本精品99久久精品77 | 婷婷色婷婷开心五月四房播播 | 欧美日本精品一区二区三区 | 妺妺窝人体色www婷婷 | 久久99精品久久久久久 | 国产午夜手机精彩视频 | 一区二区传媒有限公司 | 国产深夜福利视频在线 | 欧美国产日韩亚洲中文 | 亚洲日本va中文字幕 | 欧美国产日韩亚洲中文 | 成人影院yy111111在线观看 | 一个人看的视频www在线 | 亚洲欧美日韩国产精品一区二区 | 精品乱码久久久久久久 | 国产偷国产偷精品高清尤物 | 亚洲成av人片天堂网无码】 | 动漫av一区二区在线观看 | 久久精品人人做人人综合试看 | 国产成人一区二区三区在线观看 | 国产肉丝袜在线观看 | 国内精品人妻无码久久久影院蜜桃 | 大色综合色综合网站 | 国产一区二区三区四区五区加勒比 | 婷婷五月综合激情中文字幕 | 97久久精品无码一区二区 | 人妻中文无码久热丝袜 | 亚洲 欧美 激情 小说 另类 | 亚洲人成网站色7799 | www国产亚洲精品久久网站 | 国产高清av在线播放 | 日本成熟视频免费视频 | 2020最新国产自产精品 | 日产精品99久久久久久 | 荫蒂被男人添的好舒服爽免费视频 | 天堂а√在线中文在线 | 波多野结衣一区二区三区av免费 | 亚洲 a v无 码免 费 成 人 a v | 妺妺窝人体色www在线小说 | 2019nv天堂香蕉在线观看 | 欧美性猛交内射兽交老熟妇 | 久久亚洲精品成人无码 | 国产精品手机免费 | 亚洲男人av天堂午夜在 | 伊人久久大香线蕉午夜 | 午夜理论片yy44880影院 | 久久亚洲日韩精品一区二区三区 | 亚洲性无码av中文字幕 | 亚洲色偷偷男人的天堂 | 国产舌乚八伦偷品w中 | 中文字幕乱码人妻二区三区 | 亚无码乱人伦一区二区 | 国产午夜手机精彩视频 | 撕开奶罩揉吮奶头视频 | 国产日产欧产精品精品app | 国产亚洲精品久久久久久 | 丰满人妻一区二区三区免费视频 | 欧美激情内射喷水高潮 | 日韩视频 中文字幕 视频一区 | 国产精品毛片一区二区 | 精品一二三区久久aaa片 | 在线看片无码永久免费视频 | 97精品国产97久久久久久免费 | 人人爽人人爽人人片av亚洲 | 亚洲日韩av一区二区三区四区 | 欧美黑人性暴力猛交喷水 | 亚洲国产成人a精品不卡在线 | 九月婷婷人人澡人人添人人爽 | 欧美刺激性大交 | 熟妇女人妻丰满少妇中文字幕 | 精品国产一区二区三区四区在线看 | 无码毛片视频一区二区本码 | 波多野结衣av一区二区全免费观看 | 老熟妇仑乱视频一区二区 | 亚洲国产精品久久人人爱 | 国产麻豆精品精东影业av网站 | 日韩精品无码一本二本三本色 | 强伦人妻一区二区三区视频18 | 久久天天躁夜夜躁狠狠 | 漂亮人妻洗澡被公强 日日躁 | 极品尤物被啪到呻吟喷水 | 国产精品久久福利网站 | 免费看男女做好爽好硬视频 | 大屁股大乳丰满人妻 | 久久国产精品偷任你爽任你 | 亚洲综合色区中文字幕 | 国产一区二区三区日韩精品 | 蜜桃臀无码内射一区二区三区 | 牲欲强的熟妇农村老妇女 | 九九久久精品国产免费看小说 | 欧美人与物videos另类 | 中文字幕乱码人妻二区三区 | 欧美一区二区三区 | 亚洲欧美国产精品久久 | 国产精品久久久久影院嫩草 | 少妇被粗大的猛进出69影院 | 四虎4hu永久免费 | 色老头在线一区二区三区 | 老子影院午夜伦不卡 | 亚洲va中文字幕无码久久不卡 | 性色av无码免费一区二区三区 | 亚洲中文字幕无码中字 | 亚洲综合久久一区二区 | 久久精品女人天堂av免费观看 | 亚洲色www成人永久网址 | 精品久久久久香蕉网 | 丰满妇女强制高潮18xxxx | 一个人看的www免费视频在线观看 | 亚洲色在线无码国产精品不卡 | 亚洲の无码国产の无码步美 | 国产sm调教视频在线观看 | 国产亲子乱弄免费视频 | 高清国产亚洲精品自在久久 | 暴力强奷在线播放无码 | 欧美色就是色 | 国产综合色产在线精品 | 免费观看又污又黄的网站 | 给我免费的视频在线观看 | 又黄又爽又色的视频 | 精品一区二区不卡无码av | 天天拍夜夜添久久精品大 | 国产美女精品一区二区三区 | 青青青手机频在线观看 | 黄网在线观看免费网站 | 日韩在线不卡免费视频一区 | 天堂а√在线地址中文在线 | 无码国内精品人妻少妇 | 亚洲 欧美 激情 小说 另类 | 亚洲の无码国产の无码影院 | 国产又粗又硬又大爽黄老大爷视 | 风流少妇按摩来高潮 | 欧美兽交xxxx×视频 | 又湿又紧又大又爽a视频国产 | 草草网站影院白丝内射 | 亚洲一区av无码专区在线观看 | 亚洲日韩av片在线观看 | 狠狠色丁香久久婷婷综合五月 | 国产精品久久国产精品99 | 极品嫩模高潮叫床 | 国产免费久久久久久无码 | 免费无码一区二区三区蜜桃大 | 欧美日韩一区二区综合 | 成人免费视频视频在线观看 免费 | 九九在线中文字幕无码 | 欧美刺激性大交 | 成熟人妻av无码专区 | 精品国产aⅴ无码一区二区 | 亚洲精品成人av在线 | 亚洲色欲色欲欲www在线 | 亚洲精品久久久久久一区二区 | 久久zyz资源站无码中文动漫 | 久久午夜无码鲁丝片午夜精品 | 人人妻人人澡人人爽人人精品浪潮 | 秋霞特色aa大片 | 中文字幕无码热在线视频 | 国产农村妇女高潮大叫 | 18禁黄网站男男禁片免费观看 | 亚洲精品无码国产 | 少妇人妻大乳在线视频 | 免费视频欧美无人区码 | 国产人成高清在线视频99最全资源 | 夜夜夜高潮夜夜爽夜夜爰爰 | 国产综合在线观看 | 99久久精品午夜一区二区 | 成熟女人特级毛片www免费 | 亚洲精品一区二区三区婷婷月 | 鲁一鲁av2019在线 | 日本熟妇人妻xxxxx人hd | 丰满少妇人妻久久久久久 | 久久无码人妻影院 | 人妻人人添人妻人人爱 | 成人免费视频在线观看 | 午夜无码人妻av大片色欲 | 色婷婷综合激情综在线播放 | 亚洲中文无码av永久不收费 | 亚洲成熟女人毛毛耸耸多 | a片免费视频在线观看 | 成人无码精品1区2区3区免费看 | 日韩欧美中文字幕公布 | 国产精品99爱免费视频 | 国产熟妇高潮叫床视频播放 | 秋霞成人午夜鲁丝一区二区三区 | 东京热无码av男人的天堂 | 国产在热线精品视频 | 日韩人妻无码中文字幕视频 | 亚洲人成影院在线无码按摩店 | 久久国内精品自在自线 | 久久久亚洲欧洲日产国码αv | 欧美 亚洲 国产 另类 | 永久免费精品精品永久-夜色 | 久久久www成人免费毛片 | 天天av天天av天天透 | 欧美刺激性大交 | 麻豆成人精品国产免费 | 高潮毛片无遮挡高清免费 | 九九久久精品国产免费看小说 | 欧美真人作爱免费视频 | 熟女俱乐部五十路六十路av | 久久国产精品偷任你爽任你 | 日日碰狠狠丁香久燥 | 日韩成人一区二区三区在线观看 | 欧美人与物videos另类 | 青草青草久热国产精品 | 又紧又大又爽精品一区二区 | 正在播放老肥熟妇露脸 | 国产成人精品无码播放 | 97无码免费人妻超级碰碰夜夜 | 国产一精品一av一免费 | 2019午夜福利不卡片在线 | 欧美日韩一区二区综合 | 亚洲熟妇色xxxxx欧美老妇y | 小泽玛莉亚一区二区视频在线 | 97色伦图片97综合影院 | 在线播放免费人成毛片乱码 | 久久精品国产日本波多野结衣 | 久久久久亚洲精品男人的天堂 | 免费视频欧美无人区码 | 中文字幕+乱码+中文字幕一区 | 国产精品怡红院永久免费 | 国产成人午夜福利在线播放 | 亚洲成av人在线观看网址 | 国语自产偷拍精品视频偷 | 国产超碰人人爽人人做人人添 | 帮老师解开蕾丝奶罩吸乳网站 | 亚洲欧美国产精品专区久久 | а天堂中文在线官网 | 国产成人精品视频ⅴa片软件竹菊 | 丰满人妻翻云覆雨呻吟视频 | 国产精品无码mv在线观看 | 樱花草在线社区www | 亚洲精品一区二区三区大桥未久 | 亚洲国产精品久久久久久 | 俺去俺来也www色官网 | 精品无人区无码乱码毛片国产 | 女人高潮内射99精品 | 自拍偷自拍亚洲精品被多人伦好爽 | 亚洲 激情 小说 另类 欧美 | 青草青草久热国产精品 | 夜夜高潮次次欢爽av女 | 久久精品国产大片免费观看 | 国产精品欧美成人 | 午夜福利不卡在线视频 | 国产肉丝袜在线观看 | 少妇性l交大片欧洲热妇乱xxx | 一本色道婷婷久久欧美 | 日韩人妻无码一区二区三区久久99 | 午夜熟女插插xx免费视频 | 成年美女黄网站色大免费全看 | 大屁股大乳丰满人妻 | 夜精品a片一区二区三区无码白浆 | 性做久久久久久久久 | 国产高潮视频在线观看 | 综合激情五月综合激情五月激情1 | 亚洲精品一区二区三区四区五区 | 久久精品中文字幕一区 | 国内精品久久毛片一区二区 | 中文字幕乱码人妻二区三区 | 国内精品人妻无码久久久影院 | 一本久久a久久精品亚洲 | 好屌草这里只有精品 | 国产亚洲精品久久久ai换 | 国产偷抇久久精品a片69 | 免费无码的av片在线观看 | 美女扒开屁股让男人桶 | 亚洲人成网站色7799 | 成人无码视频免费播放 | 日韩精品乱码av一区二区 | 女人和拘做爰正片视频 | 日韩精品无码一区二区中文字幕 | 51国偷自产一区二区三区 | 国产精品美女久久久久av爽李琼 | 少女韩国电视剧在线观看完整 | 国产精品无码一区二区三区不卡 | 国产网红无码精品视频 | 伊人久久大香线焦av综合影院 | 亚洲精品国产精品乱码不卡 | 蜜臀av无码人妻精品 | 三上悠亚人妻中文字幕在线 | 一个人看的www免费视频在线观看 | 国产乱人伦偷精品视频 | 老头边吃奶边弄进去呻吟 | 狂野欧美性猛交免费视频 | 女人被爽到呻吟gif动态图视看 | 学生妹亚洲一区二区 | 无码人妻丰满熟妇区五十路百度 | 亚洲成av人综合在线观看 | 日日噜噜噜噜夜夜爽亚洲精品 | 麻花豆传媒剧国产免费mv在线 | 精品国产av色一区二区深夜久久 | 激情内射日本一区二区三区 | 1000部啪啪未满十八勿入下载 | 131美女爱做视频 | 人人澡人人透人人爽 | 亚洲无人区午夜福利码高清完整版 | 永久免费精品精品永久-夜色 | 国产一区二区三区精品视频 | 老熟妇乱子伦牲交视频 | 亚洲精品成a人在线观看 | 久久天天躁夜夜躁狠狠 | 精品无码国产一区二区三区av | 成人免费视频视频在线观看 免费 | 国产精品无码久久av | 无码毛片视频一区二区本码 | 人人妻人人澡人人爽人人精品浪潮 | 国产精品国产三级国产专播 | 俄罗斯老熟妇色xxxx | 丁香花在线影院观看在线播放 | 精品人妻人人做人人爽夜夜爽 | 一本久久伊人热热精品中文字幕 | 免费看少妇作爱视频 | 学生妹亚洲一区二区 | 麻豆精产国品 | 日本xxxx色视频在线观看免费 | 激情国产av做激情国产爱 | 亚洲熟悉妇女xxx妇女av | 丰满肥臀大屁股熟妇激情视频 | 无码任你躁久久久久久久 | 18禁黄网站男男禁片免费观看 | 小鲜肉自慰网站xnxx | 妺妺窝人体色www在线小说 | 水蜜桃色314在线观看 | 久久97精品久久久久久久不卡 | 扒开双腿疯狂进出爽爽爽视频 | 欧美日韩在线亚洲综合国产人 | 国产偷国产偷精品高清尤物 | 中文字幕av日韩精品一区二区 | 国产成人综合在线女婷五月99播放 | 99视频精品全部免费免费观看 | 亚洲欧美精品aaaaaa片 | 久久综合九色综合欧美狠狠 | 日日鲁鲁鲁夜夜爽爽狠狠 | 久久国产劲爆∧v内射 | 无套内射视频囯产 | 亚欧洲精品在线视频免费观看 | 国产精品美女久久久 | 国产小呦泬泬99精品 | 麻豆精产国品 | 精品国精品国产自在久国产87 | 日本熟妇乱子伦xxxx | 动漫av一区二区在线观看 | 18黄暴禁片在线观看 | 熟妇女人妻丰满少妇中文字幕 | 人妻插b视频一区二区三区 | 国产香蕉尹人视频在线 | 小泽玛莉亚一区二区视频在线 | 国产无遮挡吃胸膜奶免费看 | 亚洲精品久久久久久一区二区 | 无码福利日韩神码福利片 | 亚洲日韩av一区二区三区四区 | 亚洲男女内射在线播放 | 少妇被粗大的猛进出69影院 | 亚洲日韩一区二区 | 精品久久久久久人妻无码中文字幕 | 国产成人综合在线女婷五月99播放 | 精品偷拍一区二区三区在线看 | 麻豆av传媒蜜桃天美传媒 | 天天综合网天天综合色 | 久久久久国色av免费观看性色 | 日本高清一区免费中文视频 | 大肉大捧一进一出好爽视频 | 亚洲成av人片天堂网无码】 | 国产精品国产三级国产专播 | 狂野欧美性猛xxxx乱大交 | 精品无码一区二区三区爱欲 | 狠狠躁日日躁夜夜躁2020 | 黑人巨大精品欧美一区二区 | 亚洲精品久久久久久久久久久 | 国产午夜精品一区二区三区嫩草 | 国产成人无码av一区二区 | 国内精品一区二区三区不卡 | 国产手机在线αⅴ片无码观看 | 天天躁夜夜躁狠狠是什么心态 | 国产香蕉尹人综合在线观看 | 国模大胆一区二区三区 | 国产手机在线αⅴ片无码观看 | 国产又粗又硬又大爽黄老大爷视 | 三级4级全黄60分钟 | 久久国产精品_国产精品 | 成人无码精品1区2区3区免费看 | 国产av一区二区精品久久凹凸 | 日本护士毛茸茸高潮 | 欧美日本免费一区二区三区 | 国产av人人夜夜澡人人爽麻豆 | 亚洲国产一区二区三区在线观看 | 麻豆国产人妻欲求不满 | 精品国产一区二区三区四区在线看 | 精品国精品国产自在久国产87 | 荫蒂被男人添的好舒服爽免费视频 | 老司机亚洲精品影院无码 | 欧美人与物videos另类 | 国产免费久久久久久无码 | 色综合久久网 | 日本免费一区二区三区最新 | av人摸人人人澡人人超碰下载 | 麻豆国产人妻欲求不满谁演的 | 成 人 免费观看网站 | 亚洲色欲色欲天天天www | 免费看男女做好爽好硬视频 | 少女韩国电视剧在线观看完整 | 国产精品-区区久久久狼 | aa片在线观看视频在线播放 | 久久久精品欧美一区二区免费 | 欧美兽交xxxx×视频 | 亚洲码国产精品高潮在线 | 六月丁香婷婷色狠狠久久 | 狠狠综合久久久久综合网 | 玩弄中年熟妇正在播放 | 女人高潮内射99精品 | 亚洲一区二区三区偷拍女厕 | 啦啦啦www在线观看免费视频 | 九九综合va免费看 | 日本乱人伦片中文三区 | 97夜夜澡人人爽人人喊中国片 | 麻花豆传媒剧国产免费mv在线 | 日韩人妻系列无码专区 | 一本久道久久综合狠狠爱 | 荫蒂添的好舒服视频囗交 | 国产人妻久久精品二区三区老狼 | 欧美性色19p | 97久久精品无码一区二区 | 99久久无码一区人妻 | 色婷婷久久一区二区三区麻豆 | 欧美freesex黑人又粗又大 | 亚洲 高清 成人 动漫 | 性欧美大战久久久久久久 | 国产激情艳情在线看视频 | 扒开双腿吃奶呻吟做受视频 | 久久精品中文字幕一区 | 国产精品久久久一区二区三区 | 1000部夫妻午夜免费 | 又色又爽又黄的美女裸体网站 | 黑人粗大猛烈进出高潮视频 | 高清不卡一区二区三区 | 精品无人国产偷自产在线 | 国产麻豆精品一区二区三区v视界 | 久久久久国色av免费观看性色 | 久久久久人妻一区精品色欧美 | 国产精品办公室沙发 | 亚洲成a人片在线观看无码3d | 激情爆乳一区二区三区 | 国产亚洲欧美日韩亚洲中文色 | 啦啦啦www在线观看免费视频 | 无码中文字幕色专区 | 亚洲综合无码久久精品综合 | 欧美熟妇另类久久久久久多毛 | 未满成年国产在线观看 | 国产精品久久久久久久9999 | 人妻少妇精品无码专区二区 | 亚洲性无码av中文字幕 | 精品国产精品久久一区免费式 | 蜜桃臀无码内射一区二区三区 | 中文字幕av无码一区二区三区电影 | 久久亚洲中文字幕无码 | 亚洲а∨天堂久久精品2021 | 久久婷婷五月综合色国产香蕉 | 免费人成在线视频无码 | 欧美激情内射喷水高潮 | 大色综合色综合网站 | 图片小说视频一区二区 | 撕开奶罩揉吮奶头视频 | 久久综合网欧美色妞网 | 亚洲毛片av日韩av无码 | 国产综合色产在线精品 | 亚洲aⅴ无码成人网站国产app | 激情人妻另类人妻伦 | 色综合久久网 | 六月丁香婷婷色狠狠久久 | 国产无套内射久久久国产 | аⅴ资源天堂资源库在线 | 国产精品国产三级国产专播 | 丰满人妻被黑人猛烈进入 | 国产亚洲视频中文字幕97精品 | 蜜桃视频插满18在线观看 | 久久久久免费看成人影片 | 黑人巨大精品欧美一区二区 | 自拍偷自拍亚洲精品10p | 亚洲中文字幕成人无码 | 久久伊人色av天堂九九小黄鸭 | 98国产精品综合一区二区三区 | 疯狂三人交性欧美 | 国产人妻精品午夜福利免费 | 国产热a欧美热a在线视频 | 中文字幕日产无线码一区 | 国产人妻大战黑人第1集 | 国产成人av免费观看 | 亚洲热妇无码av在线播放 | 中文字幕乱码人妻无码久久 | 中文字幕人妻无码一夲道 | 色综合久久久久综合一本到桃花网 | 麻豆国产人妻欲求不满谁演的 | 超碰97人人射妻 | 国产精品无码一区二区三区不卡 | 亚洲精品国产精品乱码不卡 | 成人精品视频一区二区三区尤物 | 久久精品丝袜高跟鞋 | 黑人玩弄人妻中文在线 | 人人妻人人澡人人爽人人精品 | 久久精品人人做人人综合 | 亚洲精品一区国产 | 性欧美大战久久久久久久 | 亚洲欧美精品伊人久久 | 中文字幕乱妇无码av在线 | 久久久久国色av免费观看性色 | 国产亚洲欧美日韩亚洲中文色 | 国产suv精品一区二区五 | 中文字幕日产无线码一区 | 精品久久久久久人妻无码中文字幕 | 欧美 日韩 亚洲 在线 | 在线 国产 欧美 亚洲 天堂 | 思思久久99热只有频精品66 | 国产精品第一区揄拍无码 | 人妻人人添人妻人人爱 | 欧美丰满老熟妇xxxxx性 | 狠狠噜狠狠狠狠丁香五月 | 综合网日日天干夜夜久久 | 日日躁夜夜躁狠狠躁 | 国产精品爱久久久久久久 | 领导边摸边吃奶边做爽在线观看 | 老熟妇仑乱视频一区二区 | 波多野结衣高清一区二区三区 | 欧美人与动性行为视频 | 99久久久国产精品无码免费 | 国产人妻人伦精品 | 成人精品视频一区二区三区尤物 | 国产精品沙发午睡系列 | 国产电影无码午夜在线播放 | 国产午夜无码视频在线观看 | 国产无遮挡又黄又爽又色 | 国产人妖乱国产精品人妖 | 无码人中文字幕 | 帮老师解开蕾丝奶罩吸乳网站 | 精品乱码久久久久久久 | 国产又爽又猛又粗的视频a片 | 午夜熟女插插xx免费视频 | 国产亚洲精品久久久久久大师 | 久久久久免费精品国产 | 300部国产真实乱 | 丝袜 中出 制服 人妻 美腿 | 久久亚洲精品中文字幕无男同 | 日日摸天天摸爽爽狠狠97 | 5858s亚洲色大成网站www | 午夜不卡av免费 一本久久a久久精品vr综合 | 丰满少妇弄高潮了www | 亚洲中文字幕在线无码一区二区 | 乌克兰少妇xxxx做受 | 三上悠亚人妻中文字幕在线 | 欧美日韩一区二区三区自拍 | 国产乡下妇女做爰 | 精品久久久无码人妻字幂 | 久久久精品欧美一区二区免费 | 无码毛片视频一区二区本码 | 国产一精品一av一免费 | 亚洲日本一区二区三区在线 | 中文字幕精品av一区二区五区 | 天天摸天天碰天天添 | 少妇激情av一区二区 | 99在线 | 亚洲 | 性色欲情网站iwww九文堂 | 久久久精品欧美一区二区免费 | 久久国产精品_国产精品 | 欧美成人家庭影院 | 成人一区二区免费视频 | 国内少妇偷人精品视频免费 | 亚洲精品一区二区三区四区五区 | 三上悠亚人妻中文字幕在线 | 国产xxx69麻豆国语对白 | 国精产品一区二区三区 | 中文字幕无码av激情不卡 | 成熟人妻av无码专区 | 激情亚洲一区国产精品 | 女人高潮内射99精品 | 欧美激情综合亚洲一二区 | 国产一区二区三区四区五区加勒比 | 夜夜躁日日躁狠狠久久av | av香港经典三级级 在线 | 国产精品a成v人在线播放 | 久久综合香蕉国产蜜臀av | 全球成人中文在线 | 无码国产乱人伦偷精品视频 | 性欧美熟妇videofreesex | 色婷婷av一区二区三区之红樱桃 | 在线观看免费人成视频 | 内射白嫩少妇超碰 | 少女韩国电视剧在线观看完整 | 国产农村乱对白刺激视频 | 日韩人妻系列无码专区 | 国产乱人伦偷精品视频 | 国产亚洲美女精品久久久2020 | 亚洲色欲久久久综合网东京热 | 国产精品久久久久久亚洲毛片 | 久久99热只有频精品8 | 免费无码午夜福利片69 | 人妻少妇精品无码专区动漫 | 国产精品久久久午夜夜伦鲁鲁 | 亚洲の无码国产の无码影院 | 一本加勒比波多野结衣 | 日本爽爽爽爽爽爽在线观看免 | 亚洲精品中文字幕乱码 | 高清国产亚洲精品自在久久 | 国产又爽又黄又刺激的视频 | 欧洲vodafone精品性 | 亚洲va欧美va天堂v国产综合 | 粗大的内捧猛烈进出视频 | 亚洲精品久久久久久久久久久 | 久久久久成人片免费观看蜜芽 | 亚洲综合伊人久久大杳蕉 | 久久精品成人欧美大片 | 国产婷婷色一区二区三区在线 | 欧美兽交xxxx×视频 | 无码人妻出轨黑人中文字幕 | 精品无人区无码乱码毛片国产 | 少妇高潮喷潮久久久影院 | 97夜夜澡人人爽人人喊中国片 | 亚洲中文字幕在线观看 | 欧美性猛交xxxx富婆 | 色综合视频一区二区三区 | 全黄性性激高免费视频 | 人妻体内射精一区二区三四 | 天天爽夜夜爽夜夜爽 | 四虎永久在线精品免费网址 | 久久久久免费看成人影片 | 欧美成人高清在线播放 | 久久精品一区二区三区四区 | 风流少妇按摩来高潮 | 国产精品18久久久久久麻辣 | 国产亚洲精品久久久久久大师 | 丰满护士巨好爽好大乳 | 亚洲大尺度无码无码专区 | 亚洲精品综合五月久久小说 | 熟妇人妻无码xxx视频 | 丰满人妻一区二区三区免费视频 | 国产精品亚洲五月天高清 | 成人无码视频在线观看网站 | 熟妇人妻无乱码中文字幕 | aa片在线观看视频在线播放 | 男人扒开女人内裤强吻桶进去 | 正在播放老肥熟妇露脸 | 中文字幕人成乱码熟女app | 国产午夜视频在线观看 | 色综合久久中文娱乐网 | 欧美人与禽猛交狂配 | 国产精品.xx视频.xxtv | 成人欧美一区二区三区 | 天天燥日日燥 | 久久99精品久久久久婷婷 | 欧美日本免费一区二区三区 | 性做久久久久久久免费看 | 久久zyz资源站无码中文动漫 | 欧美自拍另类欧美综合图片区 | 天天躁日日躁狠狠躁免费麻豆 | 亚洲日韩av一区二区三区中文 | √8天堂资源地址中文在线 | 日韩视频 中文字幕 视频一区 | 成人精品视频一区二区 | 一本精品99久久精品77 | 亚洲一区二区三区偷拍女厕 | 婷婷六月久久综合丁香 | 18黄暴禁片在线观看 | 亚洲爆乳精品无码一区二区三区 | 国产av无码专区亚洲a∨毛片 | 国产成人无码专区 | 日韩无套无码精品 | 午夜免费福利小电影 | 东京热男人av天堂 | 撕开奶罩揉吮奶头视频 | 成人免费视频视频在线观看 免费 | 国产午夜视频在线观看 | 东京一本一道一二三区 | 国产精品爱久久久久久久 | 国产在线无码精品电影网 | 欧美日本精品一区二区三区 | 国产精品永久免费视频 | 日韩亚洲欧美精品综合 | 丰满人妻翻云覆雨呻吟视频 | 扒开双腿疯狂进出爽爽爽视频 | 亚洲天堂2017无码中文 | 六十路熟妇乱子伦 | 又大又紧又粉嫩18p少妇 | 欧美日韩一区二区三区自拍 | 日韩 欧美 动漫 国产 制服 | 永久黄网站色视频免费直播 | 精品一区二区不卡无码av | a片在线免费观看 | 丰满人妻精品国产99aⅴ | 无码毛片视频一区二区本码 | 麻豆蜜桃av蜜臀av色欲av | 18无码粉嫩小泬无套在线观看 | 人妻熟女一区 | 亚洲日韩av一区二区三区四区 | 欧美激情内射喷水高潮 | 久久午夜夜伦鲁鲁片无码免费 | 亚洲色无码一区二区三区 | av无码不卡在线观看免费 | 久久精品国产一区二区三区 | 欧美日本免费一区二区三区 | 国产精品嫩草久久久久 | 任你躁国产自任一区二区三区 | 国产极品美女高潮无套在线观看 | 国产精品99久久精品爆乳 | 任你躁国产自任一区二区三区 | 成 人影片 免费观看 | 国产一区二区三区日韩精品 | 熟女少妇在线视频播放 | 亚洲国产精品美女久久久久 | 国产精品成人av在线观看 | 欧美成人午夜精品久久久 | 午夜成人1000部免费视频 | 成人无码精品一区二区三区 | 久久精品中文闷骚内射 | 狠狠躁日日躁夜夜躁2020 | 少妇无码av无码专区在线观看 | 久久久久久a亚洲欧洲av冫 | 国产熟妇另类久久久久 | 久久久久久国产精品无码下载 | 亚洲人成无码网www | 久久 国产 尿 小便 嘘嘘 | 熟妇激情内射com | 国产人妻人伦精品1国产丝袜 | 国内精品一区二区三区不卡 | 国产办公室秘书无码精品99 | 国产精品内射视频免费 | 婷婷色婷婷开心五月四房播播 | 欧美老人巨大xxxx做受 | www国产亚洲精品久久网站 | 成熟人妻av无码专区 | 国产成人一区二区三区在线观看 | 国产麻豆精品一区二区三区v视界 | 国产特级毛片aaaaaaa高清 | 国产精品久久久久无码av色戒 | 波多野结衣高清一区二区三区 | 男女猛烈xx00免费视频试看 | 动漫av一区二区在线观看 | 又大又黄又粗又爽的免费视频 | 国产人妖乱国产精品人妖 | 久久亚洲精品中文字幕无男同 | 中文字幕人妻无码一区二区三区 | 真人与拘做受免费视频 | 亚洲国产高清在线观看视频 | 丰满少妇高潮惨叫视频 | 中文字幕乱码人妻无码久久 | 久久久久免费看成人影片 | 性色欲网站人妻丰满中文久久不卡 | 中文字幕+乱码+中文字幕一区 | 国产精品美女久久久久av爽李琼 | 一个人免费观看的www视频 | 内射后入在线观看一区 | 亚洲日韩一区二区 | 亚洲狠狠色丁香婷婷综合 | 无码av岛国片在线播放 | 少女韩国电视剧在线观看完整 | 黑森林福利视频导航 | 小泽玛莉亚一区二区视频在线 | 蜜臀av无码人妻精品 | 人妻无码αv中文字幕久久琪琪布 | 国产精品久久久久7777 | 国产热a欧美热a在线视频 | 国产亚洲精品久久久久久国模美 | 久久久久成人精品免费播放动漫 | 精品国产一区二区三区av 性色 | 在线精品国产一区二区三区 | 亚洲一区二区三区偷拍女厕 | 伊在人天堂亚洲香蕉精品区 | 日韩精品无码一区二区中文字幕 | 日日摸日日碰夜夜爽av | 亚洲日本一区二区三区在线 | 亚洲а∨天堂久久精品2021 | 人妻无码αv中文字幕久久琪琪布 | 任你躁国产自任一区二区三区 | 亚洲日韩精品欧美一区二区 | 波多野结衣高清一区二区三区 | 久久久久久av无码免费看大片 | 久久久精品国产sm最大网站 | 日日躁夜夜躁狠狠躁 | 久久久久成人精品免费播放动漫 | 老子影院午夜伦不卡 | 狠狠噜狠狠狠狠丁香五月 | 99国产精品白浆在线观看免费 | 亚洲日韩av一区二区三区中文 | 亚洲精品国偷拍自产在线麻豆 | 国内精品人妻无码久久久影院蜜桃 | 国产精品办公室沙发 | 奇米影视888欧美在线观看 | 乌克兰少妇性做爰 | 日本乱偷人妻中文字幕 | 国产成人综合色在线观看网站 | 久久综合九色综合欧美狠狠 | 国产成人精品必看 | ass日本丰满熟妇pics | 国产人成高清在线视频99最全资源 | 久久亚洲中文字幕无码 | 亚洲精品无码人妻无码 | 国产真实伦对白全集 | 久久99精品国产麻豆蜜芽 | 3d动漫精品啪啪一区二区中 | 98国产精品综合一区二区三区 | 2020最新国产自产精品 | 亚洲gv猛男gv无码男同 | 国产av一区二区精品久久凹凸 | 国产97色在线 | 免 | 2020久久超碰国产精品最新 | 十八禁真人啪啪免费网站 | 中文字幕 人妻熟女 | 亚洲一区二区三区含羞草 | 无码av最新清无码专区吞精 | 欧美丰满少妇xxxx性 | 亚洲精品一区二区三区婷婷月 | 精品久久久久香蕉网 | 亚洲日韩av一区二区三区中文 | 蜜桃视频插满18在线观看 | 国产九九九九九九九a片 | 丝袜美腿亚洲一区二区 | 久久精品国产99精品亚洲 | 日本护士毛茸茸高潮 | 图片区 小说区 区 亚洲五月 | 天天燥日日燥 | 国产精品美女久久久网av | 亚洲一区二区三区偷拍女厕 | 亚洲精品一区二区三区四区五区 | 国产麻豆精品一区二区三区v视界 | 在线欧美精品一区二区三区 | 久久精品视频在线看15 | 国产人妻久久精品二区三区老狼 | 久久人妻内射无码一区三区 | 永久免费观看国产裸体美女 | 国色天香社区在线视频 | 久久精品99久久香蕉国产色戒 | 中文字幕av伊人av无码av | 精品欧美一区二区三区久久久 | 99视频精品全部免费免费观看 | 日韩精品久久久肉伦网站 | 青春草在线视频免费观看 | 亚洲自偷精品视频自拍 | 亚洲理论电影在线观看 | 图片小说视频一区二区 | 亚洲中文字幕无码一久久区 | 成年美女黄网站色大免费视频 | 俺去俺来也在线www色官网 | 国产精品无码一区二区三区不卡 | 欧美日韩在线亚洲综合国产人 | 青青久在线视频免费观看 | 日本成熟视频免费视频 | 一个人看的www免费视频在线观看 | 免费无码午夜福利片69 | 国产情侣作爱视频免费观看 | 精品厕所偷拍各类美女tp嘘嘘 | 色婷婷综合激情综在线播放 | 亚洲国产成人a精品不卡在线 | 国产熟女一区二区三区四区五区 | 国产精品人人妻人人爽 | 久久99精品国产麻豆 | 丰满人妻被黑人猛烈进入 | 人妻少妇被猛烈进入中文字幕 | 少妇人妻大乳在线视频 | 亚洲区欧美区综合区自拍区 | 成熟妇人a片免费看网站 | 男女爱爱好爽视频免费看 | 日本熟妇人妻xxxxx人hd | 伊人久久大香线蕉午夜 | 暴力强奷在线播放无码 | 日韩欧美中文字幕在线三区 | 日本精品人妻无码免费大全 | 国产精品va在线观看无码 | 欧美日本精品一区二区三区 | 国产精品久久久一区二区三区 | 日本大乳高潮视频在线观看 | 中文字幕日韩精品一区二区三区 | 无套内谢的新婚少妇国语播放 | 99久久亚洲精品无码毛片 | 色偷偷av老熟女 久久精品人妻少妇一区二区三区 | 黑人巨大精品欧美一区二区 | 亚洲精品成a人在线观看 | 在线а√天堂中文官网 | 又色又爽又黄的美女裸体网站 | 欧美亚洲国产一区二区三区 | 久久久久久av无码免费看大片 | 久青草影院在线观看国产 | 国产成人综合在线女婷五月99播放 | 日本大乳高潮视频在线观看 | 在线精品国产一区二区三区 | 久久精品国产日本波多野结衣 | 在线精品国产一区二区三区 | 高清不卡一区二区三区 | 国产另类ts人妖一区二区 | 国内少妇偷人精品视频免费 | 日韩人妻无码中文字幕视频 | 亚洲自偷自偷在线制服 | 日韩精品a片一区二区三区妖精 | 久久国产精品萌白酱免费 | 人人妻人人澡人人爽欧美一区 | 一本一道久久综合久久 | 亚洲 日韩 欧美 成人 在线观看 | 巨爆乳无码视频在线观看 | 国产无av码在线观看 | 欧美国产日产一区二区 | 色 综合 欧美 亚洲 国产 | 国产成人综合美国十次 | 天堂亚洲免费视频 | 思思久久99热只有频精品66 | 特黄特色大片免费播放器图片 | 亚洲一区二区三区四区 | 丰腴饱满的极品熟妇 | 国产麻豆精品精东影业av网站 | 俺去俺来也在线www色官网 | 精品国产aⅴ无码一区二区 | 国产一区二区三区四区五区加勒比 | 天堂久久天堂av色综合 | 国产免费久久精品国产传媒 | 成年美女黄网站色大免费全看 | 久久综合激激的五月天 | 免费播放一区二区三区 | 亚洲热妇无码av在线播放 | 嫩b人妻精品一区二区三区 | 国产精品无码久久av | 日韩av无码中文无码电影 | 成 人 网 站国产免费观看 | 特大黑人娇小亚洲女 | 日本熟妇大屁股人妻 | 狂野欧美激情性xxxx | 午夜理论片yy44880影院 | 日本欧美一区二区三区乱码 | 3d动漫精品啪啪一区二区中 | 色情久久久av熟女人妻网站 | 日日躁夜夜躁狠狠躁 | 亚洲小说图区综合在线 | 亚洲啪av永久无码精品放毛片 | 老子影院午夜精品无码 | 亚洲成在人网站无码天堂 | 久久综合狠狠综合久久综合88 | 国产精品久久久久久久9999 | 国产两女互慰高潮视频在线观看 | 中文字幕无码免费久久9一区9 | 中文亚洲成a人片在线观看 | 中国大陆精品视频xxxx | 98国产精品综合一区二区三区 | 亚洲国产精品久久久久久 | 亚洲国产精品无码一区二区三区 | 欧美大屁股xxxxhd黑色 | 欧美日韩在线亚洲综合国产人 | 男女作爱免费网站 | 亚洲熟妇色xxxxx欧美老妇 | 日韩人妻无码中文字幕视频 | 高潮毛片无遮挡高清免费视频 | 久久综合香蕉国产蜜臀av | 亚洲人成影院在线无码按摩店 | 人人澡人摸人人添 | 少妇无套内谢久久久久 | 娇妻被黑人粗大高潮白浆 | 亚洲成熟女人毛毛耸耸多 | 无码国产乱人伦偷精品视频 | 伊人久久大香线蕉av一区二区 | 中文字幕人妻无码一区二区三区 | 欧美人与禽猛交狂配 | 成人精品视频一区二区 | 久久精品女人的天堂av | 在线观看国产一区二区三区 | 亚洲人成人无码网www国产 | 99精品久久毛片a片 | 狠狠噜狠狠狠狠丁香五月 | 在线播放免费人成毛片乱码 | 国产乱子伦视频在线播放 | 久久久国产精品无码免费专区 | 99久久精品无码一区二区毛片 | 国产综合色产在线精品 | 女人被男人爽到呻吟的视频 | 激情内射亚州一区二区三区爱妻 | 一本久久伊人热热精品中文字幕 | 亚洲爆乳精品无码一区二区三区 | 久久久久久久久888 | 亚洲人成影院在线观看 | 青青久在线视频免费观看 | 无遮无挡爽爽免费视频 | 99久久人妻精品免费一区 | 免费人成在线观看网站 | 88国产精品欧美一区二区三区 | 国产熟女一区二区三区四区五区 | 亚洲国产欧美日韩精品一区二区三区 | 亚洲熟妇自偷自拍另类 | 2019午夜福利不卡片在线 | 男女性色大片免费网站 | 在线播放亚洲第一字幕 | 日本爽爽爽爽爽爽在线观看免 | 波多野42部无码喷潮在线 | 欧美人与善在线com | 久久亚洲a片com人成 | 一本久久伊人热热精品中文字幕 | 国产内射老熟女aaaa | 成人一在线视频日韩国产 | 男人和女人高潮免费网站 | 亚洲区欧美区综合区自拍区 | 欧美肥老太牲交大战 | 亚洲人成网站在线播放942 | 在线成人www免费观看视频 | 久久综合色之久久综合 | 夜先锋av资源网站 | 国内老熟妇对白xxxxhd | 国产成人无码区免费内射一片色欲 | 人人妻人人澡人人爽人人精品 | 漂亮人妻洗澡被公强 日日躁 | 国产熟女一区二区三区四区五区 | 亚洲狠狠婷婷综合久久 | 天堂在线观看www | 国产又粗又硬又大爽黄老大爷视 | 一本色道久久综合狠狠躁 | 97se亚洲精品一区 | 国内揄拍国内精品少妇国语 | 亚洲国产成人av在线观看 | 男人和女人高潮免费网站 | 国产绳艺sm调教室论坛 | 色一情一乱一伦一区二区三欧美 | 亚洲精品久久久久中文第一幕 | 亚洲国产一区二区三区在线观看 | 国产无遮挡又黄又爽免费视频 | 强开小婷嫩苞又嫩又紧视频 | 97色伦图片97综合影院 | 国内精品久久毛片一区二区 | 天堂а√在线地址中文在线 | 国内精品久久久久久中文字幕 | 国产精品高潮呻吟av久久4虎 | 青青久在线视频免费观看 | 欧美zoozzooz性欧美 | 特级做a爰片毛片免费69 | 中文久久乱码一区二区 | 欧美三级a做爰在线观看 | 少妇人妻av毛片在线看 | 亚洲中文字幕成人无码 | 亚洲色偷偷偷综合网 | 亚洲色www成人永久网址 | 波多野结衣aⅴ在线 | 九月婷婷人人澡人人添人人爽 | 又紧又大又爽精品一区二区 | 久久国产自偷自偷免费一区调 | 欧洲精品码一区二区三区免费看 | 天天躁日日躁狠狠躁免费麻豆 | 日本大乳高潮视频在线观看 | 呦交小u女精品视频 | 免费人成在线观看网站 | 成人精品视频一区二区三区尤物 | 国产成人无码av片在线观看不卡 | 久久久久久a亚洲欧洲av冫 | 欧美35页视频在线观看 | 色综合久久久无码网中文 | 亚洲精品一区二区三区在线观看 | 人妻天天爽夜夜爽一区二区 | 国产色在线 | 国产 | 国产精品对白交换视频 | 99精品久久毛片a片 | 亚洲国产精品成人久久蜜臀 | 人妻少妇精品无码专区动漫 | 国语自产偷拍精品视频偷 | 久久亚洲中文字幕精品一区 | 成年美女黄网站色大免费全看 | 国产亚洲精品久久久久久国模美 | 亚洲精品一区国产 | 国产av一区二区三区最新精品 | 久久久久成人精品免费播放动漫 | 亚洲一区av无码专区在线观看 | 精品亚洲韩国一区二区三区 | 亚洲第一网站男人都懂 | 国产午夜福利亚洲第一 | 国产精品视频免费播放 | 蜜桃臀无码内射一区二区三区 | 人人妻人人澡人人爽人人精品浪潮 | 成人亚洲精品久久久久 | 国产精品办公室沙发 | 久久久久久久久蜜桃 | 亚洲欧美日韩国产精品一区二区 | 欧美日韩一区二区综合 | 人妻互换免费中文字幕 | 亚洲综合另类小说色区 | 人妻中文无码久热丝袜 | 性做久久久久久久久 | 亚洲精品国产a久久久久久 | 狂野欧美性猛xxxx乱大交 | 一本色道久久综合狠狠躁 | 精品欧洲av无码一区二区三区 | 日本精品久久久久中文字幕 | 无码任你躁久久久久久久 | 奇米影视7777久久精品人人爽 | 免费观看的无遮挡av | 久久久久99精品成人片 | 亚洲精品国偷拍自产在线观看蜜桃 | 老熟妇乱子伦牲交视频 | 天天拍夜夜添久久精品大 | 丰满诱人的人妻3 | 欧美人与动性行为视频 | 老熟妇仑乱视频一区二区 | 少妇愉情理伦片bd | 国产在线精品一区二区三区直播 | 未满小14洗澡无码视频网站 | 欧美成人高清在线播放 | 中文字幕av日韩精品一区二区 | 国产高清不卡无码视频 | 精品国产精品久久一区免费式 | 久久精品国产一区二区三区肥胖 | 领导边摸边吃奶边做爽在线观看 | 午夜福利不卡在线视频 | 国产黄在线观看免费观看不卡 | 国产精品99久久精品爆乳 | 亚洲成a人片在线观看无码 | 啦啦啦www在线观看免费视频 | 国产99久久精品一区二区 | 亚洲国产欧美日韩精品一区二区三区 | 久久久亚洲欧洲日产国码αv | 99国产精品白浆在线观看免费 | 偷窥日本少妇撒尿chinese | 国产在热线精品视频 | 亚洲理论电影在线观看 | 99久久人妻精品免费一区 | 天天摸天天碰天天添 | 色噜噜亚洲男人的天堂 | 国产精品久久精品三级 | 国产日产欧产精品精品app | 国产成人无码午夜视频在线观看 | 中文字幕日韩精品一区二区三区 | 精品厕所偷拍各类美女tp嘘嘘 | 狠狠cao日日穞夜夜穞av | 国产香蕉尹人视频在线 | 免费国产成人高清在线观看网站 | 精品国产av色一区二区深夜久久 | 狠狠色欧美亚洲狠狠色www | 成人精品视频一区二区 | 亚洲日韩中文字幕在线播放 | 日本肉体xxxx裸交 | 好男人社区资源 | 亚洲国产精品久久久久久 | 亚洲理论电影在线观看 | 成人动漫在线观看 | 亚洲成熟女人毛毛耸耸多 | 18无码粉嫩小泬无套在线观看 | 国内精品久久毛片一区二区 | 国产片av国语在线观看 | 婷婷丁香六月激情综合啪 | 少妇无码吹潮 | 丰满少妇弄高潮了www | 中文字幕人妻无码一夲道 | 国产在线精品一区二区高清不卡 | 欧美丰满老熟妇xxxxx性 | 国产无遮挡吃胸膜奶免费看 | 乌克兰少妇性做爰 | 国产极品美女高潮无套在线观看 | 国产一区二区三区影院 | 2020最新国产自产精品 | 欧美老妇交乱视频在线观看 | 国产激情综合五月久久 | 国产极品美女高潮无套在线观看 | 在线精品亚洲一区二区 | 夜精品a片一区二区三区无码白浆 | 中文无码成人免费视频在线观看 | 老太婆性杂交欧美肥老太 | 97夜夜澡人人爽人人喊中国片 | 久久综合久久自在自线精品自 | 久精品国产欧美亚洲色aⅴ大片 | 无码人妻丰满熟妇区毛片18 | 久久国语露脸国产精品电影 | 精品国产av色一区二区深夜久久 | 欧美第一黄网免费网站 |