項目場景:  
使用sparkStream接收kafka的數(shù)據(jù)進行計算,并且打包上傳到linux進行spark任務(wù)的submit 
 
錯誤集合:  
1.錯誤1:
 
Failed to 
add  file
: / usr
/ local
/ spark
- yarn
/ . /myapp
/ sparkDemo04
. jar to Spark environment
java
. io
. FileNotFoundException
:  Jar D
: \usr\local\spark
- yarn\myapp\sparkDemo04
. jar not found
WARN ProcfsMetricsGetter
:  Exception when trying to 
compute  pagesize
,  as a result reporting of ProcessTree metrics is stopped
 
2.windows下ideal中在yarn模式下運行代碼出錯,顯示如下報錯
 
WARN CheckpointReader
:  Error reading checkpoint from file hdfs
: / / hadoop102
: 9000 / checkpoint6
/ checkpoint
- 1637834226000 
java
. io
. IOException
:  java
. lang
. ClassCastException
:  cannot assign instance of java
. lang
. invoke
. SerializedLambda to 
field  org
. apache
. spark
. streaming
. dstream
. MappedDStream
. mapFunc of 
type  scala
. Function1 in instance of org
. apache
. spark
. streaming
. dstream
. MappedDStream
 
3.報的一些kafka包notfound的問題,這個下面就不討論了,只需要把對應(yīng)的包下載后放到spark目錄下的jars文件中即可,比如常見的
 
java
. lang
. NoClassDefFoundError
:  org
/ apache
/ spark
/ kafka010
/ KafkaConfigUpdater
 
都可以通過添加包的方式解決,如果是spark shell里面出現(xiàn)這種錯誤,則需要在輸入spark-shell命令時,在后面添加 --jars 包路徑
 
import  com. study. stream05_kafka.  SparkKafka
. createSSC
import  org. apache. kafka. clients. consumer.  { ConsumerConfig
,  ConsumerRecord
} 
import  org. apache. log4j.  { Level
,  Logger
} 
import  org. apache. spark.  SparkConf
import  org. apache. spark. rdd.  RDD
import  org. apache. spark. streaming. dstream.  { DStream
,  InputDStream
} 
import  org. apache. spark. streaming. kafka010.  { ConsumerStrategies
,  KafkaUtils
,  LocationStrategies
} 
import  org. apache. spark. streaming.  { Seconds
,  StreamingContext
} import  java. lang.  System
. getProperty
import  scala. collection. mutable.  ListBuffer
object  stream05_kafka 
{ object  SparkKafka
{ def  createSSC
( ) :  _root_
. org
. apache
. spark
. streaming
. StreamingContext
= { val  sparkConf 
=  new  SparkConf
( ) . setMaster
( "local[*]" ) . setAppName
( "kafka2" ) sparkConf
. set
( "spark.streaming.stopGracefullyOnShutdown" , "true" ) sparkConf
. set
( "spark.hadoop.fs.defaultFS" , "hdfs://hadoop102:9000" ) sparkConf
. set
( "spark.hadoop.yarn.resoursemanager.address" , "hadoop103:8088" ) val  streamingContext
:  StreamingContext 
=  new  StreamingContext
( sparkConf
,  Seconds
( 3 ) ) streamingContext
. checkpoint
( "hdfs://hadoop102:9000/checkpoint6" ) val  kafkaPara
:  Map
[ String ,  Object
]  =  Map
[ String ,  Object
] ( ConsumerConfig
. BOOTSTRAP_SERVERS_CONFIG 
->  "hadoop102:9092,hadoop103:9092,hadoop104:9092" , ConsumerConfig
. GROUP_ID_CONFIG 
->  "second" , "key.deserializer"  ->  "org.apache.kafka.common.serialization.StringDeserializer" , "value.deserializer"  ->  "org.apache.kafka.common.serialization.StringDeserializer" ) val  kafkaDS
:  InputDStream
[ ConsumerRecord
[ String ,  String ] ]  =  KafkaUtils
. createDirectStream
[ String ,  String ] ( streamingContext
, LocationStrategies
. PreferConsistent
, ConsumerStrategies
. Subscribe
[ String ,  String ] ( Set
( "sparkOnKafka" ) ,  kafkaPara
) ) val  num
:  DStream
[ String ]  =  kafkaDS
. map
( _
. value
( ) ) val  result 
=  num
. map
( line
=> { val  flows 
=  line
. split
( "," ) val  up
= flows
( 1 ) . toInt
val  down
= flows
( 2 ) . toInt
( flows
( 0 ) , ( up
, down
, up
+ down
) ) } ) . updateStateByKey
( ( queueValue
,  buffValue
:  Option
[ ( Int , Int , Int ) ] )  =>  { val  cur
= buffValue
. getOrElse
( ( 0 , 0 , 0 ) ) var  curUp
= cur
. _1
var  curDown
= cur
. _2
for  ( elem 
<-  queueValue
)  { curUp
+= elem
. _1curDown
+= elem
. _2
} Option
( ( curUp
, curDown
, curUp
+ curDown
) ) } ) result
. print
( ) streamingContext
} } def  main
( args
:  Array
[ String ] ) :  Unit  =  { println
( "**************" ) Logger
. getLogger
( "org.apache.spark" ) . setLevel
( Level
. WARN
) System
. getProperties
. setProperty
( "HADOOP_USER_NAME" ,  "hadoop" ) val  streamingContext 
=  StreamingContext
. getActiveOrCreate
( "hdfs://hadoop102:9000/checkpoint6" ,  ( ) => createSSC
( ) ) streamingContext
. start
( ) streamingContext
. awaitTermination
( ) } } 
 
原因分析:  
首先,這里指出如果要打包到linux 下在yarn模式下進行spark的submit,需要設(shè)置master為yarn,至于是yarn-client還是yarn-cluster需要提交任務(wù)時指定,默認是client。我這里寫成local,所以一開始都是windows下可以正常連接kafka拿到數(shù)據(jù)進行計算,但是linux下就不行了。歸根結(jié)底沒有連接yarn。
 
解決方案:  
錯誤1的解決:所以如果要在windows下運行,需要先使用mvn package或者build artifacts對程序進行打包,然后對sparkConf.setJars指定包的路徑,這樣在windows下就可以正常運行了
 
import  com. study. stream05_kafka.  SparkKafka
. createSSC
import  org. apache. kafka. clients. consumer.  { ConsumerConfig
,  ConsumerRecord
} 
import  org. apache. log4j.  { Level
,  Logger
} 
import  org. apache. spark.  SparkConf
import  org. apache. spark. rdd.  RDD
import  org. apache. spark. streaming. dstream.  { DStream
,  InputDStream
} 
import  org. apache. spark. streaming. kafka010.  { ConsumerStrategies
,  KafkaUtils
,  LocationStrategies
} 
import  org. apache. spark. streaming.  { Seconds
,  StreamingContext
} import  java. lang.  System
. getProperty
import  scala. collection. mutable.  ListBuffer
object  stream05_kafka 
{ object  SparkKafka
{ def  createSSC
( ) :  _root_
. org
. apache
. spark
. streaming
. StreamingContext
= { val  sparkConf 
=  new  SparkConf
( ) . setMaster
( "yarn" ) . setAppName
( "kafka2" ) . set
( "spark.serializer" ,  "org.apache.spark.serializer.KryoSerializer" ) sparkConf
. set
( "spark.streaming.stopGracefullyOnShutdown" , "true" ) sparkConf
. set
( "spark.hadoop.fs.defaultFS" , "hdfs://hadoop102:9000" ) sparkConf
. set
( "spark.hadoop.yarn.resoursemanager.address" , "hadoop103:8088" ) val  streamingContext
:  StreamingContext 
=  new  StreamingContext
( sparkConf
,  Seconds
( 3 ) ) streamingContext
. checkpoint
( "hdfs://hadoop102:9000/checkpoint7" ) val  kafkaPara
:  Map
[ String ,  Object
]  =  Map
[ String ,  Object
] ( ConsumerConfig
. BOOTSTRAP_SERVERS_CONFIG 
->  "hadoop102:9092,hadoop103:9092,hadoop104:9092" , ConsumerConfig
. GROUP_ID_CONFIG 
->  "second" , "key.deserializer"  ->  "org.apache.kafka.common.serialization.StringDeserializer" , "value.deserializer"  ->  "org.apache.kafka.common.serialization.StringDeserializer" ) val  kafkaDS
:  InputDStream
[ ConsumerRecord
[ String ,  String ] ]  =  KafkaUtils
. createDirectStream
[ String ,  String ] ( streamingContext
, LocationStrategies
. PreferConsistent
, ConsumerStrategies
. Subscribe
[ String ,  String ] ( Set
( "sparkOnKafka" ) ,  kafkaPara
) ) val  num
:  DStream
[ String ]  =  kafkaDS
. map
( _
. value
( ) ) val  result 
=  num
. map
( line
=> { val  flows 
=  line
. split
( "," ) val  up
= flows
( 1 ) . toInt
val  down
= flows
( 2 ) . toInt
( flows
( 0 ) , ( up
, down
, up
+ down
) ) } ) . updateStateByKey
( ( queueValue
,  buffValue
:  Option
[ ( Int , Int , Int ) ] )  =>  { val  cur
= buffValue
. getOrElse
( ( 0 , 0 , 0 ) ) var  curUp
= cur
. _1
var  curDown
= cur
. _2
for  ( elem 
<-  queueValue
)  { curUp
+= elem
. _1curDown
+= elem
. _2
} Option
( ( curUp
, curDown
, curUp
+ curDown
) ) } ) result
. print
( ) streamingContext
} } def  main
( args
:  Array
[ String ] ) :  Unit  =  { println
( "**************" ) Logger
. getLogger
( "org.apache.spark" ) . setLevel
( Level
. WARN
) System
. getProperties
. setProperty
( "HADOOP_USER_NAME" ,  "hadoop" ) val  streamingContext 
=  StreamingContext
. getActiveOrCreate
( "hdfs://hadoop102:9000/checkpoint7" ,  ( ) => createSSC
( ) ) 
streamingContext
. start
( ) streamingContext
. awaitTermination
( ) } } 
 
另外,打包的時候不要添加setJars,否則還是會報錯,報的是什么已經(jīng)忘了,這篇博客也是在我解決問題之后寫的,沒有記錄太多報錯,如果我沒記錯的話可能會報這種錯誤
 
cannot assign instance of java
. lang
. invoke
. SerializedLambda to 
field  org
. apache
. spark
. rdd
. MapPartitionsRDD
. f of 
type  scala
. Function3 in instance of org
. apache
. spark
. rdd
. MapPartitionsRDD
 
困惑:  
為了解決這個bug,也是在yarn日志和spark日志來回看,看了一天,最讓我頭疼的就是spark-submit使用control+z退出后,spark-submit進行還會在后臺運行,我都懷疑是不是我的kill -9 操作使檢查點損壞導(dǎo)致數(shù)據(jù)恢復(fù)失敗的,請問各路大神怎么才能結(jié)束sparkSubmit進程?
                            總結(jié) 
                            
                                以上是生活随笔 為你收集整理的spark on yarn模式下SparkStream整合kafka踩的各种坑(已解决)_fqzzzzz的博客 的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
                            
                            
                                如果覺得生活随笔 網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔 推薦給好友。