分享一下spark streaming与flume集成的scala代码。
生活随笔
收集整理的這篇文章主要介紹了
分享一下spark streaming与flume集成的scala代码。
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
文章來自:http://www.cnblogs.com/hark0623/p/4172462.html? 轉發請注明
object LogicHandle {def main(args: Array[String]) {//添加這個不會報執行錯誤val path = new File(".").getCanonicalPath()System.getProperties().put("hadoop.home.dir", path);new File("./bin").mkdirs();new File("./bin/winutils.exe").createNewFile();//val sparkConf = new SparkConf().setAppName("SensorRealTime").setMaster("local[2]")val sparkConf = new SparkConf().setAppName("SensorRealTime")val ssc = new StreamingContext(sparkConf, Seconds(20))val hostname = "localhost"val port = 2345val storageLevel = StorageLevel.MEMORY_ONLYval flumeStream = FlumeUtils.createStream(ssc, hostname, port, storageLevel)val lhc = new LogicHandleClass();//日志格式化模板val sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");val sdfHour = new SimpleDateFormat("HH");val sdfMinute = new SimpleDateFormat("mm")//存儲數據的hash對象 key/value存儲 根據文檔規則,使用各統計指標的key/valuevar redisMap = new HashMap[String, String]
flumeStream.foreachRDD(rdd => {val events = rdd.collect()//println("event count:" + events.length)var i = 1for (event <- events) {val sensorInfo = new String(event.event.getBody.array()) //單行記錄//單行記錄格式化val arrayFileds = sensorInfo.split(",")if (arrayFileds.length == 6) {val shopId = arrayFileds(0) //店內編號 val floorId = shopId.substring(0, 5) //樓層編號val mac = arrayFileds(1)val ts = arrayFileds(2).toLong //時間戳val time = sdf.format(ts * 1000)var hour = sdfHour.format(ts * 1000)var minute = sdfMinute.format(ts * 1000)var allMinute = hour.toInt * 60 + minute.toIntval x = arrayFileds(3)val y = arrayFileds(4)val level = arrayFileds(5)//后邊就是我的業務代碼了,省略了}}//存儲至redis中 lhc.SetAll(redisMap)})ssc.start()ssc.awaitTermination()} }
object LogicHandle {def main(args: Array[String]) {//添加這個不會報執行錯誤val path = new File(".").getCanonicalPath()System.getProperties().put("hadoop.home.dir", path);new File("./bin").mkdirs();new File("./bin/winutils.exe").createNewFile();//val sparkConf = new SparkConf().setAppName("SensorRealTime").setMaster("local[2]")val sparkConf = new SparkConf().setAppName("SensorRealTime")val ssc = new StreamingContext(sparkConf, Seconds(20))val hostname = "localhost"val port = 2345val storageLevel = StorageLevel.MEMORY_ONLYval flumeStream = FlumeUtils.createStream(ssc, hostname, port, storageLevel)val lhc = new LogicHandleClass();//日志格式化模板val sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");val sdfHour = new SimpleDateFormat("HH");val sdfMinute = new SimpleDateFormat("mm")//存儲數據的hash對象 key/value存儲 根據文檔規則,使用各統計指標的key/valuevar redisMap = new HashMap[String, String]
flumeStream.foreachRDD(rdd => {val events = rdd.collect()//println("event count:" + events.length)var i = 1for (event <- events) {val sensorInfo = new String(event.event.getBody.array()) //單行記錄//單行記錄格式化val arrayFileds = sensorInfo.split(",")if (arrayFileds.length == 6) {val shopId = arrayFileds(0) //店內編號 val floorId = shopId.substring(0, 5) //樓層編號val mac = arrayFileds(1)val ts = arrayFileds(2).toLong //時間戳val time = sdf.format(ts * 1000)var hour = sdfHour.format(ts * 1000)var minute = sdfMinute.format(ts * 1000)var allMinute = hour.toInt * 60 + minute.toIntval x = arrayFileds(3)val y = arrayFileds(4)val level = arrayFileds(5)//后邊就是我的業務代碼了,省略了}}//存儲至redis中 lhc.SetAll(redisMap)})ssc.start()ssc.awaitTermination()} }
?
轉載于:https://www.cnblogs.com/hark0623/p/4172462.html
總結
以上是生活随笔為你收集整理的分享一下spark streaming与flume集成的scala代码。的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 关于”点九”
- 下一篇: 依赖注入及AOP简述(六)——字符串请求