redis 2m数据读取_Flink读写Redis(二)读取redis数据
自定義flink的RedisSource,實(shí)現(xiàn)從redis中讀取數(shù)據(jù),借鑒了flink-connector-redis_2.11的實(shí)現(xiàn)思路,對(duì)redis讀取操作進(jìn)行封裝,其中flink-connector-redis_2.11的使用和介紹可參考文末連接。項(xiàng)目中需要引入flink-connector-redis_2.11的maven依賴。
抽象redis數(shù)據(jù)
由于redis有不同的數(shù)據(jù)類型,所以先定義MyRedisRecord類,封裝redis數(shù)據(jù)類型和數(shù)據(jù)對(duì)象
package?com.jike.flink.examples.redis;import?org.apache.flink.streaming.connectors.redis.common.mapper.RedisDataType;
import?java.io.Serializable;
public?class?MyRedisRecord?implements?Serializable?{
????private?Object?data;
????private?RedisDataType?redisDataType;
????public?MyRedisRecord(Object?data,?RedisDataType?redisDataType)?{
????????this.data?=?data;
????????this.redisDataType?=?redisDataType;
????}
????public?Object?getData()?{
????????return?data;
????}
????public?void?setData(Object?data)?{
????????this.data?=?data;
????}
????public?RedisDataType?getRedisDataType()?{
????????return?redisDataType;
????}
????public?void?setRedisDataType(RedisDataType?redisDataType)?{
????????this.redisDataType?=?redisDataType;
????}
}
定義Redis數(shù)據(jù)讀取類
將redis的操作封裝起來,首先定義接口類,定義支持的讀取操作,例子中這只寫了哈希表的get操作,可以增加更多的操作
package?com.jike.flink.examples.redis;import?java.io.Serializable;
import?java.util.Map;
public?interface?MyRedisCommandsContainer?extends?Serializable?{
????Map?hget(String?key);
????void?close();
}
定義一個(gè)MyRedisCommandsContainer接口實(shí)現(xiàn)類,實(shí)現(xiàn)對(duì)redis的讀取操作,主要是調(diào)用了Jedis的API,可以支持哨兵模式和直連redis兩種模式的連接redis
package?com.jike.flink.examples.redis;import?org.apache.flink.util.Preconditions;
import?org.slf4j.Logger;
import?org.slf4j.LoggerFactory;
import?redis.clients.jedis.Jedis;
import?redis.clients.jedis.JedisPool;
import?redis.clients.jedis.JedisSentinelPool;
import?java.util.HashMap;
import?java.util.Map;
import?java.util.Set;
public?class?MyRedisContainer?implements?MyRedisCommandsContainer,Cloneable{
????private?static?final?long?serialVersionUID?=?1L;
????private?static?final?Logger?LOG?=?LoggerFactory.getLogger(MyRedisContainer.class);
????private?final?JedisPool?jedisPool;
????private?final?JedisSentinelPool?jedisSentinelPool;
????public?MyRedisContainer(JedisPool?jedisPool)?{
????????Preconditions.checkNotNull(jedisPool,?"Jedis?Pool?can?not?be?null");
????????this.jedisPool?=?jedisPool;
????????this.jedisSentinelPool?=?null;
????}
????public?MyRedisContainer(JedisSentinelPool?sentinelPool)?{
????????Preconditions.checkNotNull(sentinelPool,?"Jedis?Sentinel?Pool?can?not?be?null");
????????this.jedisPool?=?null;
????????this.jedisSentinelPool?=?sentinelPool;
????}
????@Override
????public?Map?hget(String?key)?{
????????Jedis?jedis?=?null;
????????try?{
????????????jedis?=?this.getInstance();
????????????Map?map?=?new?HashMap();
????????????Set?fieldSet?=?jedis.hkeys(key);for(String?s?:?fieldSet){
????????????????map.put(s,jedis.hget(key,s));
????????????}return??map;
????????}?catch?(Exception?e)?{if?(LOG.isErrorEnabled())?{
????????????????LOG.error("Cannot?get?Redis?message?with?command?HGET?to?key?{}?error?message?{}",?new?Object[]{key,?e.getMessage()});
????????????}
????????????throw?e;
????????}?finally?{
????????????this.releaseInstance(jedis);
????????}
????}
????private?Jedis?getInstance()?{return?this.jedisSentinelPool?!=?null???this.jedisSentinelPool.getResource()?:?this.jedisPool.getResource();
????}
????private?void?releaseInstance(Jedis?jedis)?{if?(jedis?!=?null)?{
????????????try?{
????????????????jedis.close();
????????????}?catch?(Exception?var3)?{
????????????????LOG.error("Failed?to?close?(return)?instance?to?pool",?var3);
????????????}
????????}
????}
????public?void?close()?{if?(this.jedisPool?!=?null)?{
????????????this.jedisPool.close();
????????}if?(this.jedisSentinelPool?!=?null)?{
????????????this.jedisSentinelPool.close();
????????}
????}
}
定義MyRedisCommandsContainer對(duì)象的創(chuàng)建類
該類用來根據(jù)不同的配置生成不同的對(duì)象,例子中考慮了直連redis和哨兵模式兩張情況,后續(xù)還可以考慮redis集群的情形
package?com.jike.flink.examples.redis;import?org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import?org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisConfigBase;
import?org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;
import?org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisSentinelConfig;
import?org.apache.flink.util.Preconditions;
import?redis.clients.jedis.JedisPool;
import?redis.clients.jedis.JedisSentinelPool;
public?class?MyRedisCommandsContainerBuilder?{
????public?MyRedisCommandsContainerBuilder(){
????}
????public?static?MyRedisCommandsContainer?build(FlinkJedisConfigBase?flinkJedisConfigBase)?{
????????if?(flinkJedisConfigBase?instanceof?FlinkJedisPoolConfig)?{
????????????FlinkJedisPoolConfig?flinkJedisPoolConfig?=?(FlinkJedisPoolConfig)flinkJedisConfigBase;
????????????return?build(flinkJedisPoolConfig);
????????}?else?if?(flinkJedisConfigBase?instanceof?FlinkJedisSentinelConfig)?{
????????????FlinkJedisSentinelConfig?flinkJedisSentinelConfig?=?(FlinkJedisSentinelConfig)flinkJedisConfigBase;
????????????return?build(flinkJedisSentinelConfig);
????????}?else?{
????????????throw?new?IllegalArgumentException("Jedis?configuration?not?found");
????????}
????}
????public?static?MyRedisCommandsContainer?build(FlinkJedisPoolConfig?jedisPoolConfig)?{
????????Preconditions.checkNotNull(jedisPoolConfig,?"Redis?pool?config?should?not?be?Null");
????????GenericObjectPoolConfig?genericObjectPoolConfig?=?new?GenericObjectPoolConfig();
????????genericObjectPoolConfig.setMaxIdle(jedisPoolConfig.getMaxIdle());
????????genericObjectPoolConfig.setMaxTotal(jedisPoolConfig.getMaxTotal());
????????genericObjectPoolConfig.setMinIdle(jedisPoolConfig.getMinIdle());
????????JedisPool?jedisPool?=?new?JedisPool(genericObjectPoolConfig,?jedisPoolConfig.getHost(),?jedisPoolConfig.getPort(),?jedisPoolConfig.getConnectionTimeout(),?jedisPoolConfig.getPassword(),?jedisPoolConfig.getDatabase());
????????return?new?MyRedisContainer(jedisPool);
????}
????public?static?MyRedisCommandsContainer?build(FlinkJedisSentinelConfig?jedisSentinelConfig)?{
????????Preconditions.checkNotNull(jedisSentinelConfig,?"Redis?sentinel?config?should?not?be?Null");
????????GenericObjectPoolConfig?genericObjectPoolConfig?=?new?GenericObjectPoolConfig();
????????genericObjectPoolConfig.setMaxIdle(jedisSentinelConfig.getMaxIdle());
????????genericObjectPoolConfig.setMaxTotal(jedisSentinelConfig.getMaxTotal());
????????genericObjectPoolConfig.setMinIdle(jedisSentinelConfig.getMinIdle());
????????JedisSentinelPool?jedisSentinelPool?=?new?JedisSentinelPool(jedisSentinelConfig.getMasterName(),?jedisSentinelConfig.getSentinels(),?genericObjectPoolConfig,?jedisSentinelConfig.getConnectionTimeout(),?jedisSentinelConfig.getSoTimeout(),?jedisSentinelConfig.getPassword(),?jedisSentinelConfig.getDatabase());
????????return?new?MyRedisContainer(jedisSentinelPool);
????}
}
redis操作描述類
package?com.jike.flink.examples.redis;import?org.apache.flink.streaming.connectors.redis.common.mapper.RedisDataType;
public?enum?MyRedisCommand?{
????HGET(RedisDataType.HASH);
????private?RedisDataType?redisDataType;
????private?MyRedisCommand(RedisDataType?redisDataType)?{
????????this.redisDataType?=?redisDataType;
????}
????public?RedisDataType?getRedisDataType()?{
????????return?this.redisDataType;
????}
}
package?com.jike.flink.examples.redis;
import?org.apache.flink.streaming.connectors.redis.common.mapper.RedisDataType;
import?org.apache.flink.util.Preconditions;
import?java.io.Serializable;
public?class?MyRedisCommandDescription?implements?Serializable?{
????private?static?final?long?serialVersionUID?=?1L;
????private?MyRedisCommand?redisCommand;
????private?String?additionalKey;
????public?MyRedisCommandDescription(MyRedisCommand?redisCommand,?String?additionalKey)?{
????????Preconditions.checkNotNull(redisCommand,?"Redis?command?type?can?not?be?null");
????????this.redisCommand?=?redisCommand;
????????this.additionalKey?=?additionalKey;
????????if?((redisCommand.getRedisDataType()?==?RedisDataType.HASH?||?redisCommand.getRedisDataType()?==?RedisDataType.SORTED_SET)?&&?additionalKey?==?null)?{
????????????throw?new?IllegalArgumentException("Hash?and?Sorted?Set?should?have?additional?key");
????????}
????}
????public?MyRedisCommandDescription(MyRedisCommand?redisCommand)?{
????????this(redisCommand,?(String)null);
????}
????public?MyRedisCommand?getCommand()?{
????????return?this.redisCommand;
????}
????public?String?getAdditionalKey()?{
????????return?this.additionalKey;
????}
}
RedisSource
定義flink redis source的實(shí)現(xiàn),繼承RichSourceFunction類,該類構(gòu)造方法接收兩個(gè)參數(shù),包括redis配置信息以及要讀取的redis數(shù)據(jù)類型信息;open方法會(huì)在source打開時(shí)執(zhí)行,用來完成redis操作類對(duì)象的創(chuàng)建;run方法會(huì)一直讀取redis數(shù)據(jù),并根據(jù)數(shù)據(jù)類型調(diào)用對(duì)應(yīng)的redis操作,封裝成MyRedisRecord對(duì)象
package?com.jike.flink.examples.redis;import?org.apache.flink.configuration.Configuration;
import?org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import?org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisConfigBase;
import?org.apache.flink.util.Preconditions;
public?class?RedisSource?extends?RichSourceFunction{
????private?static?final?long?serialVersionUID?=?1L;
????private?String?additionalKey;
????private?MyRedisCommand?redisCommand;
????private?FlinkJedisConfigBase?flinkJedisConfigBase;
????private?MyRedisCommandsContainer?redisCommandsContainer;
????private?volatile?boolean?isRunning?=?true;
????public?RedisSource(FlinkJedisConfigBase?flinkJedisConfigBase,?MyRedisCommandDescription?redisCommandDescription)?{
????????Preconditions.checkNotNull(flinkJedisConfigBase,?"Redis?connection?pool?config?should?not?be?null");
????????Preconditions.checkNotNull(redisCommandDescription,?"MyRedisCommandDescription??can?not?be?null");
????????this.flinkJedisConfigBase?=?flinkJedisConfigBase;
????????this.redisCommand?=?redisCommandDescription.getCommand();
????????this.additionalKey?=?redisCommandDescription.getAdditionalKey();
????}
????@Override
????public?void?open(Configuration?parameters)?throws?Exception?{
????????this.redisCommandsContainer?=?MyRedisCommandsContainerBuilder.build(this.flinkJedisConfigBase);
????}
????@Override
????public?void?run(SourceContext?sourceContext)?throws?Exception?{while?(isRunning){
????????????switch(this.redisCommand)?{case?HGET:
????????????????????sourceContext.collect(new?MyRedisRecord(this.redisCommandsContainer.hget(this.additionalKey),?this.redisCommand.getRedisDataType()));break;
????????????????default:
????????????????????throw?new?IllegalArgumentException("Cannot?process?such?data?type:?"?+?this.redisCommand);
????????????}
????????}
????}
????@Override
????public?void?cancel()??{
????????isRunning?=?false;if?(this.redisCommandsContainer?!=?null)?{
????????????this.redisCommandsContainer.close();
????????}
????}
}
使用
利用上述提供的類,計(jì)算出現(xiàn)次數(shù)最多的單詞,其中redis中的哈希表保存?zhèn)€了各個(gè)單詞的詞頻 定義MyMapRedisRecordSplitter,實(shí)現(xiàn)FlatMapFunction接口,對(duì)redis中讀取的哈希表進(jìn)行遍歷,以形式輸出到下游
package?com.jike.flink.examples.redis;import?org.apache.flink.api.common.functions.FlatMapFunction;
import?org.apache.flink.api.java.tuple.Tuple2;
import?org.apache.flink.streaming.connectors.redis.common.mapper.RedisDataType;
import?org.apache.flink.util.Collector;
import?java.util.Map;
public?class?MyMapRedisRecordSplitter?implements?FlatMapFunction>?{
????@Override
????public?void?flatMap(MyRedisRecord?myRedisRecord,?Collector>?collector)?throws?Exception?{
????????assert?myRedisRecord.getRedisDataType()?==?RedisDataType.HASH;
????????Map?map?=?(Map)myRedisRecord.getData();for(Map.Entry?e?:?map.entrySet()){
????????????collector.collect(new?Tuple2<>(e.getKey(),Integer.valueOf(e.getValue())));
????????}
????}
}
定義主程序類
package?com.jike.flink.examples.redis;import?org.apache.flink.api.java.tuple.Tuple2;
import?org.apache.flink.streaming.api.datastream.DataStream;
import?org.apache.flink.streaming.api.datastream.DataStreamSource;
import?org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import?org.apache.flink.streaming.api.windowing.time.Time;
import?org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;
public?class?MaxCount{
????public?static?void?main(String[]?args)?throws?Exception?{
????????StreamExecutionEnvironment?executionEnvironment?=?StreamExecutionEnvironment.getExecutionEnvironment();
????????FlinkJedisPoolConfig?conf?=?new?FlinkJedisPoolConfig.Builder().setHost("ip").setPort(30420).setPassword("passwd").build();
????????DataStreamSource?source?=?executionEnvironment.addSource(new?RedisSource(conf,new?MyRedisCommandDescription(MyRedisCommand.HGET,"flink")));
????????DataStream>?max?=?source.flatMap(new?MyMapRedisRecordSplitter()).timeWindowAll(Time.milliseconds(5000)).maxBy(1);
????????max.print().setParallelism(1);
????????executionEnvironment.execute();
????}
}
redis中的數(shù)據(jù)及idea中的打印結(jié)果
總結(jié)
簡(jiǎn)單實(shí)現(xiàn)了一個(gè)統(tǒng)一的flink redis source,提供了redis數(shù)據(jù)讀取的功能,可以作為flink自定義數(shù)據(jù)源的入門學(xué)習(xí)。
Flink讀寫Redis(一)-寫入Redis
flink-connector-redis源碼閱讀
總結(jié)
以上是生活随笔為你收集整理的redis 2m数据读取_Flink读写Redis(二)读取redis数据的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: python的模块提供了许多文件管理方法
- 下一篇: namenode和datanode工作机