Flink-Sink_将结果输出到Kafka_Redis_ES_Mysql中
生活随笔
收集整理的這篇文章主要介紹了
Flink-Sink_将结果输出到Kafka_Redis_ES_Mysql中
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
Sink
將計算好結果輸出到外部系統, 調用 addSink()傳入指定的SinkFunction()
pom.xml 事先導入對應的 connector:
<dependencies><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>1.10.1</version></dependency><!--依賴的一些組件需要 Scala 環境--><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_2.12</artifactId><version>1.10.1</version></dependency><!--kafka依賴--><!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka-0.11 --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka-0.11_2.12</artifactId><version>1.10.1</version></dependency><!--rabbit依賴--><!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-rabbitmq --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-rabbitmq_2.12</artifactId><version>1.10.1</version></dependency><!--flink 數據存入 redis--><!-- https://mvnrepository.com/artifact/org.apache.bahir/flink-connector-redis --><dependency><groupId>org.apache.bahir</groupId><artifactId>flink-connector-redis_2.11</artifactId><version>1.0</version></dependency><!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-elasticsearch-base --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-elasticsearch6_2.12</artifactId><version>1.10.1</version></dependency><!-- https://mvnrepository.com/artifact/mysql/mysql-connector-java --><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>5.1.44</version></dependency></dependencies>實操代碼如下:
import com.regotto.entity.SensorReading; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.functions.RuntimeContext; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; import org.apache.flink.streaming.api.functions.sink.SinkFunction; import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction; import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer; import org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSink; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011; import org.apache.flink.streaming.connectors.redis.RedisSink; import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig; import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand; import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription; import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper; import org.apache.http.HttpHost; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.client.Requests;import java.sql.Connection; import java.sql.Driver; import java.sql.DriverManager; import java.sql.PreparedStatement; import java.util.ArrayList; import java.util.HashMap;/*** @author regotto*/ public class SinkTest {private static void saveToRedis(DataStream<SensorReading> dataStream) {FlinkJedisPoolConfig.Builder builder = new FlinkJedisPoolConfig.Builder();builder.setHost("localhost");// 頂級接口 SinkFunction, 核心方法 invokedataStream.addSink(new RedisSink<>(builder.build(), new RedisMapper<SensorReading>() {/*** 將溫度數據保存為 id-temperature hash 形式到 redis* @return*/@Overridepublic RedisCommandDescription getCommandDescription() {return new RedisCommandDescription(RedisCommand.HSET, "sensor");}@Overridepublic String getKeyFromData(SensorReading sensorReading) {return sensorReading.getId();}@Overridepublic String getValueFromData(SensorReading sensorReading) {return sensorReading.getTemperature().toString();}}));}private static void saveToKafka(DataStream<SensorReading> dataStream) {// 將數據輸出到 Kafka 中dataStream.map((MapFunction<SensorReading, String>) value -> value.toString()).addSink(new FlinkKafkaProducer011<String>("localhost:9092", "test", new SimpleStringSchema()));}private static void saveToEs(DataStream<SensorReading> dataStream) {// 將數據輸出到 ElasticSearchArrayList<HttpHost> httpHosts = new ArrayList<>();httpHosts.add(new HttpHost("localhost", 9200));//真正的 SinkFunction 是 ElasticsearchSink(使用構建者構建), ElasticsearchSinkFunction 只是負責處理以哪種方式存入dataStream.addSink(new ElasticsearchSink.Builder<>(httpHosts, (ElasticsearchSinkFunction<SensorReading>) (sensorReading, runtimeContext, requestIndexer) -> {HashMap<String, String> source = new HashMap<>();source.put("id", sensorReading.getId());source.put("temp", sensorReading.getTemperature().toString());source.put("time", sensorReading.getTimestamp().toString());IndexRequest indexRequest = Requests.indexRequest().index("sensor").type("readingData").source(source);requestIndexer.add(indexRequest);}).build());}private static void saveToMysql(DataStream<SensorReading> dataStream) {/*由于性能問題, 官方未提供 mysqlSink, 將數據存入 mysql, 自定義 sinkjdbc 要連接處理, 使用 RichSinkFunction, 利用 open, close 方法*/dataStream.addSink(new RichSinkFunction<SensorReading>() {Connection connection = null;PreparedStatement insertStatement = null;@Overridepublic void open(Configuration parameters) throws Exception {Class.forName("com.mysql.jdbc.Driver");connection = DriverManager.getConnection("jdbc:mysql://localhost:3306/test", "root", "123456");insertStatement = connection.prepareStatement("insert into sensorreading (id, timestamp, temperature)values(?,?,?)");}@Overridepublic void invoke(SensorReading value, Context context) throws Exception {insertStatement.setString(1, value.getId());insertStatement.setLong(2, value.getTimestamp());insertStatement.setDouble(3, value.getTemperature());insertStatement.execute();}@Overridepublic void close() throws Exception {insertStatement.close();connection.close();}});}public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);DataStream<String> input = env.readTextFile("sensor.txt");DataStream<SensorReading> dataStream = input.map((MapFunction<String, SensorReading>) value -> {String[] split = value.split(",");return new SensorReading(split[0], Long.valueOf(split[1]), Double.valueOf(split[2]));});saveToMysql(dataStream);env.execute();} }總結
進行數據存儲時, 指定對應 SinkFunction 即可.
總結
以上是生活随笔為你收集整理的Flink-Sink_将结果输出到Kafka_Redis_ES_Mysql中的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: linux强制使用windows命名,如
- 下一篇: void函数调用时显示不允许使用不完整的