生活随笔
收集整理的這篇文章主要介紹了
spark读取hbase数据
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
import?org.apache.hadoop.hbase.HBaseConfiguration??import?org.apache.hadoop.hbase.mapreduce.TableInputFormat??import?org.apache.spark.sql.SparkSession??import?scala.collection.mutable????object?HbaseUtils?{??????/**?????*?指定HBase?表?字段?開始結束?rowkey?scan?????*?????*?@param?spark?????*?@param?tableName?????*?@param?columns?????*?@param?startRow?????*?@param?endRow?????*?@return?????*/????def?scan(spark:?SparkSession,?tableName:?String,?columns:?mutable.ArrayBuffer[String],?startRow:?String?=?null,?endRow:?String?=?null)?=?{????????val?scan_columns?=?columns.mkString("?")????????val?scanConf?=?HBaseConfiguration.create()??????scanConf.set(TableInputFormat.INPUT_TABLE,?tableName)??????if(columns.length?==?0){????????scanConf.set(TableInputFormat.SCAN_COLUMNS,?scan_columns)??????}??????if(StringUtils.isNotEmpty(startRow)){????????scanConf.set(TableInputFormat.SCAN_ROW_START,?startRow)??????}??????if(StringUtils.isNotEmpty(endRow)){????????scanConf.set(TableInputFormat.SCAN_ROW_STOP,?endRow)??????}??????scanConf.set("mapreduce.task.timeout",?"1200000")??????scanConf.set("hbase.client.scanner.timeout.period",?"600000")??????scanConf.set("hbase.rpc.timeout",?"600000")????????val?hBaseRDD?=?spark.sparkContext.newAPIHadoopRDD(scanConf,?classOf[TableInputFormat],????????classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],????????classOf[org.apache.hadoop.hbase.client.Result])????????hBaseRDD????}??}??????import?org.apache.hadoop.hbase.client.Result??import?org.apache.hadoop.hbase.util.Bytes??import?org.apache.spark.SparkConf??import?org.apache.spark.sql.SparkSession??import?org.apache.log4j.{Level,?Logger}??import?scala.collection.mutable??import?org.apache.spark.sql.functions.{col,?split}????object?MakeBaseInfo?{??????val?BaseInfo:?Array[String]?=?Array("uid",?"age",?"height",?"weight",?"role",?"vbadge",??????"has_photos",?"video_verified",?"is_human_face","has_description",?"ip_location","has_avatar",??????"vip")??????val?Follow:?Array[String]?=?Array("followed_num",?"followed_with_time",?"follower_num")??????val?StatisFeature:Array[String]?=?Array("click","clicked","show","send_session",??????"receive_session","desc_len","online_click","online_clicked","online_show","online_showed",??????"nearby_click","nearby_clicked","nearby_show","nearby_showed","newbie_click","newbie_clicked",??????"newbie_show","newbie_showed","social_stay_time","visit_count","visited_count")??????val?TagInfo:?Array[String]?=?Array(""?+??????"tag_1_1",?"tag_1_2",?"tag_1_3",?"tag_1_4",??????"tag_2_1",?"tag_2_2",?"tag_2_3",?"tag_2_4",?"tag_2_5",?"tag_2_6",?"tag_2_7",?"tag_2_8",?"tag_2_9",?"tag_2_10",?"tag_2_11",?"tag_2_12",??????"tag_3_1",?"tag_3_2",?"tag_3_3",?"tag_3_4",??????"tag_4_1",?"tag_4_2",?"tag_4_3",?"tag_4_4",?"tag_4_5",?"tag_4_6",?"tag_4_7",?"tag_4_8",?"tag_4_9",?"tag_4_10",?"tag_4_11",?"tag_4_12",??????"tag_5_1",?"tag_5_2",?"tag_5_3",?"tag_5_4",?"tag_5_5")??????val?humanFeature:?Array[String]?=?Array("max_ratio",?"max_beauty",?"is_human_body")??????def?main(args:?Array[String]):?Unit?=?{????????val?spark?=?getSparkSql????????import?spark.implicits._????????val?online_feature?=?getBaseUsersInfo(spark).toDF("uid",?"item")????????val?online_feature_csv?=?online_feature.select(online_feature("uid"),????????split(col("item"),",").getItem(0).as("age"),????????split(col("item"),",").getItem(1).as("height"),????????split(col("item"),",").getItem(2).as("weight"),????????split(col("item"),",").getItem(3).as("role"),????????split(col("item"),",").getItem(4).as("vbadge"),????????split(col("item"),",").getItem(5).as("has_photos"),????????split(col("item"),",").getItem(6).as("video_verified"),????????split(col("item"),",").getItem(7).as("is_human_face"),????????split(col("item"),",").getItem(8).as("has_description"),????????split(col("item"),",").getItem(9).as("ip_location"),????????split(col("item"),",").getItem(10).as("has_avatar"),????????split(col("item"),",").getItem(11).as("vip"),????????split(col("item"),",").getItem(12).as("followed_num"),????????split(col("item"),",").getItem(13).as("follower_num"),????????split(col("item"),",").getItem(14).as("click"),????????split(col("item"),",").getItem(15).as("clicked"),????????split(col("item"),",").getItem(16).as("show"),????????split(col("item"),",").getItem(17).as("send_session"),????????split(col("item"),",").getItem(18).as("receive_session"),????????split(col("item"),",").getItem(19).as("desc_len"),????????split(col("item"),",").getItem(20).as("online_click"),????????split(col("item"),",").getItem(21).as("online_clicked"),????????split(col("item"),",").getItem(22).as("online_show"),????????split(col("item"),",").getItem(23).as("online_showed"),????????split(col("item"),",").getItem(24).as("nearby_click"),????????split(col("item"),",").getItem(25).as("nearby_clicked"),????????split(col("item"),",").getItem(26).as("nearby_show"),????????split(col("item"),",").getItem(27).as("nearby_showed"),????????split(col("item"),",").getItem(28).as("newbie_click"),????????split(col("item"),",").getItem(29).as("newbie_clicked"),????????split(col("item"),",").getItem(30).as("newbie_show"),????????split(col("item"),",").getItem(31).as("newbie_showed"),????????split(col("item"),",").getItem(32).as("social_stay_time"),????????split(col("item"),",").getItem(33).as("visit_count"),????????split(col("item"),",").getItem(34).as("visited_count"),????????split(col("item"),",").getItem(35).as("tag_1_1"),????????split(col("item"),",").getItem(36).as("tag_1_2"),????????split(col("item"),",").getItem(37).as("tag_1_3"),????????split(col("item"),",").getItem(38).as("tag_1_4"),????????split(col("item"),",").getItem(39).as("tag_2_1"),????????split(col("item"),",").getItem(40).as("tag_2_2"),????????split(col("item"),",").getItem(41).as("tag_2_3"),????????split(col("item"),",").getItem(42).as("tag_2_4"),????????split(col("item"),",").getItem(43).as("tag_2_5"),????????split(col("item"),",").getItem(44).as("tag_2_6"),????????split(col("item"),",").getItem(45).as("tag_2_7"),????????split(col("item"),",").getItem(46).as("tag_2_8"),????????split(col("item"),",").getItem(47).as("tag_2_9"),????????split(col("item"),",").getItem(48).as("tag_2_10"),????????split(col("item"),",").getItem(49).as("tag_2_11"),????????split(col("item"),",").getItem(50).as("tag_2_12"),????????split(col("item"),",").getItem(51).as("tag_3_1"),????????split(col("item"),",").getItem(52).as("tag_3_2"),????????split(col("item"),",").getItem(53).as("tag_3_3"),????????split(col("item"),",").getItem(54).as("tag_3_4"),????????split(col("item"),",").getItem(55).as("tag_4_1"),????????split(col("item"),",").getItem(56).as("tag_4_2"),????????split(col("item"),",").getItem(57).as("tag_4_3"),????????split(col("item"),",").getItem(58).as("tag_4_4"),????????split(col("item"),",").getItem(59).as("tag_4_5"),????????split(col("item"),",").getItem(60).as("tag_4_6"),????????split(col("item"),",").getItem(61).as("tag_4_7"),????????split(col("item"),",").getItem(62).as("tag_4_8"),????????split(col("item"),",").getItem(63).as("tag_4_9"),????????split(col("item"),",").getItem(64).as("tag_4_10"),????????split(col("item"),",").getItem(65).as("tag_4_11"),????????split(col("item"),",").getItem(66).as("tag_4_12"),????????split(col("item"),",").getItem(67).as("tag_5_1"),????????split(col("item"),",").getItem(68).as("tag_5_2"),????????split(col("item"),",").getItem(69).as("tag_5_3"),????????split(col("item"),",").getItem(70).as("tag_5_4"),????????split(col("item"),",").getItem(71).as("tag_5_5"))????????online_feature_csv.write.format("csv").option("header","true").save("/data/wangtao/test/")??????spark.stop()??????}??????def?getSparkSql:?SparkSession?=?{????????val?JobName="aaa"??????val?parallelism="100"????????Logger.getLogger("org").setLevel(Level.WARN)????????val?sparkConf?=?new?SparkConf().setAppName(JobName)??????sparkConf.set("spark.default.parallelism",?parallelism)??????sparkConf.set("spark.serializer",?"org.apache.spark.serializer.KryoSerializer")??????sparkConf.set("spark.hadoop.validateOutputSpecs",?"false")????????SparkSession.builder().config(sparkConf).enableHiveSupport().getOrCreate()????}??????private?def?getBaseUsersInfo(spark:?SparkSession)=?{????????val?columns?=?mutable.ArrayBuffer[String]()????????for?(field?<-?BaseInfo)?{????????columns.append("f1:%s".format(field))??????}??????for?(field?<-?Follow)?{????????columns.append("f1:%s".format(field))??????}??????for?(field?<-?TagInfo)?{????????columns.append("f1:%s".format(field))??????}????????for?(field?<-?StatisFeature)?{????????columns.append("f1:%s".format(field))??????}????????HbaseUtils.scan(spark,?"online_social_feature",?columns)????????.map?{??????????case?(_,?result)?=>??????????????//通過列族和列名獲取列????????????val?uid?=?Bytes.toString(result.getRow)??????????????//?收集數據????????????val?features?=?mutable.ArrayBuffer[String]()??????????????val?BaseInfo:?Array[String]?=?Array("uid",?"age",?"height",?"weight",?"role",?"vbadge",??????????????"has_photos",?"video_verified",?"is_human_face","has_description",?"ip_location","has_avatar",??????????????"vip")??????????????val?Follow:?Array[String]?=?Array("followed_num","follower_num")??????????????val?StatisFeature:Array[String]?=?Array("click","clicked","show","send_session",??????????????"receive_session","desc_len","online_click","online_clicked","online_show","online_showed",??????????????"nearby_click","nearby_clicked","nearby_show","nearby_showed","newbie_click","newbie_clicked",??????????????"newbie_show","newbie_showed","social_stay_time","visit_count","visited_count")??????????????val?TagInfo:?Array[String]?=?Array(""?+??????????????"tag_1_1",?"tag_1_2",?"tag_1_3",?"tag_1_4",??????????????"tag_2_1",?"tag_2_2",?"tag_2_3",?"tag_2_4",?"tag_2_5",?"tag_2_6",?"tag_2_7",?"tag_2_8",?"tag_2_9",?"tag_2_10",?"tag_2_11",?"tag_2_12",??????????????"tag_3_1",?"tag_3_2",?"tag_3_3",?"tag_3_4",??????????????"tag_4_1",?"tag_4_2",?"tag_4_3",?"tag_4_4",?"tag_4_5",?"tag_4_6",?"tag_4_7",?"tag_4_8",?"tag_4_9",?"tag_4_10",?"tag_4_11",?"tag_4_12",??????????????"tag_5_1",?"tag_5_2",?"tag_5_3",?"tag_5_4",?"tag_5_5")??????????????val?humanFeature:?Array[String]?=?Array("max_ratio",?"max_beauty",?"is_human_body")??????????????takeRowValue(result,?features,?"f1",?"age",?"None")????????????takeRowValue(result,?features,?"f1",?"height",?"None")????????????takeRowValue(result,?features,?"f1",?"weight",?"None")????????????takeRowValue(result,?features,?"f1",?"role",?"-1")????????????takeRowValue(result,?features,?"f1",?"vbadge",?"0")????????????takeRowValue(result,?features,?"f1",?"has_photos",?"0")????????????takeRowValue(result,?features,?"f1",?"video_verified",?"0")????????????takeRowValue(result,?features,?"f1",?"is_human_face",?"None")????????????takeRowValue(result,?features,?"f1",?"has_description",?"0")????????????takeRowValue(result,?features,?"f1",?"ip_location",?"None")????????????takeRowValue(result,?features,?"f1",?"has_avatar",?"None")????????????takeRowValue(result,?features,?"f1",?"vip",?"None")????????????takeRowValue(result,?features,?"f1",?"followed_num",?"0")????????????takeRowValue(result,?features,?"f1",?"follower_num",?"0")????????????takeRowValue(result,?features,?"f1",?"click",?"None")????????????takeRowValue(result,?features,?"f1",?"clicked",?"None")????????????takeRowValue(result,?features,?"f1",?"show",?"None")????????????takeRowValue(result,?features,?"f1",?"send_session",?"None")????????????takeRowValue(result,?features,?"f1",?"receive_session",?"None")????????????takeRowValue(result,?features,?"f1",?"desc_len",?"None")????????????takeRowValue(result,?features,?"f1",?"online_click",?"None")????????????takeRowValue(result,?features,?"f1",?"online_clicked",?"None")????????????takeRowValue(result,?features,?"f1",?"online_show",?"None")????????????takeRowValue(result,?features,?"f1",?"online_showed",?"None")????????????takeRowValue(result,?features,?"f1",?"nearby_click",?"None")????????????takeRowValue(result,?features,?"f1",?"nearby_clicked",?"None")????????????takeRowValue(result,?features,?"f1",?"nearby_show",?"None")????????????takeRowValue(result,?features,?"f1",?"nearby_showed",?"None")????????????takeRowValue(result,?features,?"f1",?"newbie_click",?"None")????????????takeRowValue(result,?features,?"f1",?"newbie_clicked",?"None")????????????takeRowValue(result,?features,?"f1",?"newbie_show",?"None")????????????takeRowValue(result,?features,?"f1",?"newbie_showed",?"None")????????????takeRowValue(result,?features,?"f1",?"social_stay_time",?"None")????????????takeRowValue(result,?features,?"f1",?"visit_count",?"None")????????????takeRowValue(result,?features,?"f1",?"visited_count",?"None")????????????takeRowValue(result,?features,?"f1",?"tag_1_1",?"0")????????????takeRowValue(result,?features,?"f1",?"tag_1_2",?"0")????????????takeRowValue(result,?features,?"f1",?"tag_1_3",?"0")????????????takeRowValue(result,?features,?"f1",?"tag_1_4",?"0")????????????takeRowValue(result,?features,?"f1",?"tag_2_1",?"0")????????????takeRowValue(result,?features,?"f1",?"tag_2_2",?"0")????????????takeRowValue(result,?features,?"f1",?"tag_2_3",?"0")????????????takeRowValue(result,?features,?"f1",?"tag_2_4",?"0")????????????takeRowValue(result,?features,?"f1",?"tag_2_5",?"0")????????????takeRowValue(result,?features,?"f1",?"tag_2_6",?"0")????????????takeRowValue(result,?features,?"f1",?"tag_2_7",?"0")????????????takeRowValue(result,?features,?"f1",?"tag_2_8",?"0")????????????takeRowValue(result,?features,?"f1",?"tag_2_9",?"0")????????????takeRowValue(result,?features,?"f1",?"tag_2_10",?"0")????????????takeRowValue(result,?features,?"f1",?"tag_2_11",?"0")????????????takeRowValue(result,?features,?"f1",?"tag_2_12",?"0")????????????takeRowValue(result,?features,?"f1",?"tag_3_1",?"0")????????????takeRowValue(result,?features,?"f1",?"tag_3_2",?"0")????????????takeRowValue(result,?features,?"f1",?"tag_3_3",?"0")????????????takeRowValue(result,?features,?"f1",?"tag_3_4",?"0")????????????takeRowValue(result,?features,?"f1",?"tag_4_1",?"0")????????????takeRowValue(result,?features,?"f1",?"tag_4_2",?"0")????????????takeRowValue(result,?features,?"f1",?"tag_4_3",?"0")????????????takeRowValue(result,?features,?"f1",?"tag_4_4",?"0")????????????takeRowValue(result,?features,?"f1",?"tag_4_5",?"0")????????????takeRowValue(result,?features,?"f1",?"tag_4_6",?"0")????????????takeRowValue(result,?features,?"f1",?"tag_4_7",?"0")????????????takeRowValue(result,?features,?"f1",?"tag_4_8",?"0")????????????takeRowValue(result,?features,?"f1",?"tag_4_9",?"0")????????????takeRowValue(result,?features,?"f1",?"tag_4_10",?"0")????????????takeRowValue(result,?features,?"f1",?"tag_4_11",?"0")????????????takeRowValue(result,?features,?"f1",?"tag_4_12",?"0")????????????takeRowValue(result,?features,?"f1",?"tag_5_1",?"0")????????????takeRowValue(result,?features,?"f1",?"tag_5_2",?"0")????????????takeRowValue(result,?features,?"f1",?"tag_5_3",?"0")????????????takeRowValue(result,?features,?"f1",?"tag_5_4",?"0")????????????takeRowValue(result,?features,?"f1",?"tag_5_5",?"0")????//??????????uid?+?","?+?features.mkString(",")????????????(uid,features.mkString(","))????????}????}??????def?takeRowValue(result:?Result,?features:?mutable.ArrayBuffer[String],?????????????????????cf:?String,?field:?String,?default:?String):?Unit?=?{??????var?value?=?Bytes.toString(result.getValue(cf.getBytes,?field.getBytes))??????if?(filedValueIsEmpty(value))?{????????value?=?default??????}??????features.append(s"$value")????}??????def?filedValueIsEmpty(value:?String):?Boolean?=?{??????if?(value?==?null?||?StringUtils.isEmpty(value)?||?value?==?"NULL"?||?value?==?"null"?||?value?==?"None")????????return?true??????false????}????}??
總結
以上是生活随笔為你收集整理的spark读取hbase数据的全部內容,希望文章能夠幫你解決所遇到的問題。
如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。