Spark2.0流式处理读Kafka并写ES
2019獨角獸企業(yè)重金招聘Python工程師標準>>>
maven依賴:
? ? ? ?<dependency>
?? ??? ??? ?<groupId>org.apache.spark</groupId>
?? ??? ??? ?<artifactId>spark-core_2.11</artifactId>
?? ??? ??? ?<version>2.1.0</version>
?? ??? ??? ?<scope>provided</scope>?? ??? ??? ?
?? ??? ?</dependency>
?? ??? ?<dependency>
?? ??? ??? ?<groupId>org.apache.spark</groupId>
?? ??? ??? ?<artifactId>spark-streaming_2.11</artifactId>
?? ??? ??? ?<version>2.1.0</version>
?? ??? ??? ?<scope>provided</scope>
?? ??? ?</dependency>
?? ??? ?
?? ??? ?<dependency>
?? ??? ??? ?<groupId>org.apache.spark</groupId>
?? ??? ??? ?<artifactId>spark-sql_2.11</artifactId>
?? ??? ??? ?<version>2.1.0</version>
?? ??? ??? ?<scope>provided</scope>
?? ??? ?</dependency>
?? ??? ?
?? ??? ?<dependency>
?? ??? ??? ?<groupId>org.apache.spark</groupId>
?? ??? ??? ?<artifactId>spark-streaming-kafka-0-8_2.11</artifactId>
?? ??? ??? ?<version>2.1.0</version>
?? ??? ?</dependency>
?? ??? ?<dependency>
?? ??? ??? ?<groupId>org.elasticsearch</groupId>
?? ??? ??? ?<artifactId>elasticsearch-spark-20_2.11</artifactId>
?? ??? ??? ?<version>6.2.0</version>
?? ??? ??? ?<exclusions>
?? ??? ??? ??? ?<exclusion>
?? ??? ??? ??? ??? ?<artifactId>log4j-over-slf4j</artifactId>
?? ??? ??? ??? ??? ?<groupId>org.slf4j</groupId>
?? ??? ??? ??? ?</exclusion>
?? ??? ??? ?</exclusions>?? ??? ??? ?
??? ??? ?</dependency>
代碼:
package com.suning.sevs.bussiness
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.SparkConf
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.Seconds
import org.apache.spark.streaming.kafka.KafkaUtils
import kafka.serializer.StringDecoder
import org.slf4j.LoggerFactory
import org.elasticsearch.spark._
//測試kafka
object testKafka {
? def main(args: Array[String]): Unit = {
? ? val logger = LoggerFactory.getLogger(testKafka.getClass)
? ? val sparkconf = new SparkConf().setAppName("testKafka ")
? ? ? .set("HADOOP_USER_NAME", “user”)
? ? ? .set("HADOOP_GROUP_NAME", "user")
? ? ? ? ? ? .set("es.nodes", "10.10.2.1,10.10.2.2")
? ? ? ? ? ? .set("es.port", "9900")
? ??
? ? // ? ? val spark = SparkSession
? ? // ? ??? ??? ? ?? ?.builder
? ? // ? ??? ??? ? ?? ?.appName("testKafka")
? ? // ? ??? ??? ? ?? ?.config(sparkconf)
? ? // ? ??? ??? ? ?? ?.getOrCreate()
? ? // ? ? ? import spark.implicits._
? ? // ? ? ? ?val topic = spark.readStream.format("kafka")
? ? // ? ? .option("kafka.bootstrap.servers", "10.10.1.245:9092,10.10.1.246:9092")
? ? // ? ?.option("subscribe", "mytopic") ? ??
? ? // ? ?.option("startingOffsets", "latest") ?
? ? // ? ?.option("minPartitions", "2")?
? ? // ? ?.load()
? ? // ? ?
? ? // ? ?val query=topic.writeStream.format("console").outputMode(OutputMode.Append()).start()
? ? val ssc = new StreamingContext(sparkconf, Seconds(1))
? ? val topicsSet = "mytopic".split(",").toSet
? ? val kafkaParams = Map[String, String]("metadata.broker.list" -> "10.10.1.245:9092,10.10.1.246:9092")
? ? val directKafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
? ? ? ssc, kafkaParams, topicsSet)
? ? val lines = directKafkaStream.map(_._2)
? ? lines.foreachRDD(rdd=>{
? ? ??
? ? ? val esRdd=rdd.map(line=>{
? ? ? ? Map("sys"->line,"mycode" -> "1")
? ? ? }
? ? ? )
? ? ? esRdd.saveToEs("indexName/typeName")
? ? ? ??
? ? })
??
? ? // Start the computation
? ? ssc.start()
? ? ssc.awaitTermination()
? }
}
轉載于:https://my.oschina.net/u/778683/blog/2996104
創(chuàng)作挑戰(zhàn)賽新人創(chuàng)作獎勵來咯,堅持創(chuàng)作打卡瓜分現(xiàn)金大獎總結
以上是生活随笔為你收集整理的Spark2.0流式处理读Kafka并写ES的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 一个表单验证引发的深思!!!
- 下一篇: apache 二级域名设置