案例实操:Azkaban调度spark作业
生活随笔
收集整理的這篇文章主要介紹了
案例实操:Azkaban调度spark作业
小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.
新建AccessLogDriverCluster類
package com.it19gong.clickproject;import java.sql.PreparedStatement; import java.util.ArrayList; import java.util.List;import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.Function; import org.apache.spark.api.java.function.VoidFunction; import org.apache.spark.sql.DataFrame; import org.apache.spark.sql.Row; import org.apache.spark.sql.RowFactory; import org.apache.spark.sql.SQLContext; import org.apache.spark.sql.types.DataTypes; import org.apache.spark.sql.types.StructField; import org.apache.spark.sql.types.StructType;public class AccessLogDriverCluster {static DBHelper db1=null;public static void main(String[] args) throws Exception {// 創(chuàng)建SparkConf、JavaSparkContext、SQLContextSparkConf conf = new SparkConf() .setAppName("RDD2DataFrameProgrammatically"); JavaSparkContext sc = new JavaSparkContext(conf);SQLContext sqlContext = new SQLContext(sc);// 第一步,創(chuàng)建一個(gè)普通的RDD,但是,必須將其轉(zhuǎn)換為RDD<Row>的這種格式//獲取昨天時(shí)間JavaRDD<String> lines = sc.textFile("hdfs://node1/data/clickLog/2019/08/31");// 分析一下// 它報(bào)了一個(gè),不能直接從String轉(zhuǎn)換為Integer的一個(gè)類型轉(zhuǎn)換的錯(cuò)誤// 就說明什么,說明有個(gè)數(shù)據(jù),給定義成了String類型,結(jié)果使用的時(shí)候,要用Integer類型來使用// 而且,錯(cuò)誤報(bào)在sql相關(guān)的代碼中// 所以,基本可以斷定,就是說,在sql中,用到age<=18的語法,所以就強(qiáng)行就將age轉(zhuǎn)換為Integer來使用// 但是,肯定是之前有些步驟,將age定義為了String// 所以就往前找,就找到了這里// 往Row中塞數(shù)據(jù)的時(shí)候,要注意,什么格式的數(shù)據(jù),就用什么格式轉(zhuǎn)換一下,再塞進(jìn)去JavaRDD<Row> clickRDD = lines.map(new Function<String, Row>() {private static final long serialVersionUID = 1L;@Overridepublic Row call(String line) throws Exception {String itr[] = line.split(" ");String ip = itr[0];String date = AnalysisNginxTool.nginxDateStmpToDate(itr[3]);String url = itr[6];String upFlow = itr[9];return RowFactory.create(ip,date,url,Integer.valueOf(upFlow)); }});// 第二步,動(dòng)態(tài)構(gòu)造元數(shù)據(jù)// 比如說,id、name等,field的名稱和類型,可能都是在程序運(yùn)行過程中,動(dòng)態(tài)從mysql db里// 或者是配置文件中,加載出來的,是不固定的// 所以特別適合用這種編程的方式,來構(gòu)造元數(shù)據(jù)List<StructField> structFields = new ArrayList<StructField>();structFields.add(DataTypes.createStructField("ip", DataTypes.StringType, true)); structFields.add(DataTypes.createStructField("date", DataTypes.StringType, true)); structFields.add(DataTypes.createStructField("url", DataTypes.StringType, true)); structFields.add(DataTypes.createStructField("upflow", DataTypes.IntegerType, true)); StructType structType = DataTypes.createStructType(structFields);// 第三步,使用動(dòng)態(tài)構(gòu)造的元數(shù)據(jù),將RDD轉(zhuǎn)換為DataFrameDataFrame studentDF = sqlContext.createDataFrame(clickRDD, structType);// 后面,就可以使用DataFrame了studentDF.registerTempTable("log"); DataFrame sumFlowDF = sqlContext.sql("select ip,sum(upflow) as sum from log group by ip order by sum desc"); db1=new DBHelper();final String sql="insert into upflow(ip,sum) values(?,?) ";sumFlowDF.javaRDD().foreach(new VoidFunction<Row>() {@Overridepublic void call(Row t) throws Exception {// TODO Auto-generated method stubPreparedStatement pt = db1.conn.prepareStatement(sql);pt.setString(1,t.getString(0));pt.setString(2,String.valueOf(t.getLong(1)));pt.executeUpdate();}});;}}打包
報(bào)錯(cuò)
刪除apptest文件
再次打包
把打好的包拷貝出來
并且重命名
vim project.sh /opt/modules/spark-1.5.1-bin-hadoop2.6/bin/spark-submit --class com.it19gong.clickproject.AccessLogDriverCluster --num-executors 3 --driver-memory 100m --executor-memory 100m --executor-cores 3 --files /opt/modules/hive/conf/hive-site.xml --driver-class-path /opt/modules/hive/lib/mysql-connector-java-5.1.28.jar /home/hadoop/sparkproject.jar把原來的包刪除
上傳新的包
執(zhí)行腳本
mysql數(shù)據(jù)多了兩條
打開azkaban的頁面,這里再次提醒要用谷歌瀏覽器
新建spark.job文件
#command.job type=command command=bash project.sh打包成zip包
上傳zip包
開始執(zhí)行
mysql數(shù)據(jù)庫多了兩天數(shù)據(jù)
參考鏈接:https://www.cnblogs.com/braveym/p/12259956.html
總結(jié)
以上是生活随笔為你收集整理的案例实操:Azkaban调度spark作业的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: azkaban修改MySQL配置上传包的
- 下一篇: Linkis1.0用户使用文档:JAVA