spark 持久化 mysql_Spark 从零到开发(八)nginx日志清洗并持久化实战
本文將介紹如何清洗nginx日志并存儲(chǔ)到mysql中,附帶azkaban定時(shí)任務(wù)協(xié)作完成對(duì)access.log的清洗任務(wù)。
1. 查看nginx日志格式
cd /var/log/nginx
[root@FantJ nginx]# cat access.log
140.205.205.25 - - [19/Aug/2018:03:41:59 +0800] "GET / HTTP/1.1" 404 312 "-" "Scrapy/1.5.0 (+https://scrapy.org)" "-"
185.55.46.110 - - [19/Aug/2018:03:56:16 +0800] "GET / HTTP/1.0" 404 180 "-" "-" "-"
80.107.89.207 - - [19/Aug/2018:03:56:25 +0800] "GET / HTTP/1.1" 404 191 "-" "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_11_6) AppleWebKit/601.7.7 (KHTML, like Gecko) Version/9.1.2 Safari/601.7.7" "-"
140.205.205.25 - - [19/Aug/2018:04:13:52 +0800] "HEAD / HTTP/1.1" 404 0 "-" "Go-http-client/1.1" "-"
139.162.88.63 - - [19/Aug/2018:04:31:56 +0800] "GET http://clientapi.ipip.net/echo.php?info=1234567890 HTTP/1.1" 404 207 "-" "Go-http-client/1.1" "-"
......
我們需要根據(jù)這個(gè)格式來(lái)寫正則表達(dá)式,對(duì)數(shù)據(jù)進(jìn)行過(guò)濾。上面是我的日志格式。
log_format main '$remote_addr - $remote_user [$time_local] "$request" '
'$status $body_bytes_sent "$http_referer" '
'"$http_user_agent" "$http_x_forwarded_for"';
這是我nginx的日志配置。(centos版本默認(rèn)配置)。
2. 正則表達(dá)式測(cè)試
public static void main(String[] args) {
Pattern p = Pattern.compile("([^ ]*) ([^ ]*) ([^ ]*) (\\[.*\\]) (\\\".*?\\\") (-|[0-9]*) (-|[0-9]*) (\\\".*?\\\") (\\\".*?\\\")([^ ]*)");
Matcher m = p.matcher("202.173.10.31 - - [18/Aug/2018:21:16:28 +0800] \"GET / HTTP/1.1\" 404 312 \"http://www.sdf.sdf\" \"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/56.0.2924.87 Safari/537.36\" \"-\"\n");
while (m.find()) {
System.out.println(m.group(1));
System.out.println(m.group(2));
System.out.println(m.group(3));
System.out.println(m.group(4));
System.out.println(m.group(5));
System.out.println(m.group(6));
System.out.println(m.group(7));
System.out.println(m.group(8));
System.out.println(m.group(9));
System.out.println(m.group(10));
System.out.println(m.toString());
}
}
控制臺(tái)輸出:
202.173.10.31
-
-
[18/Aug/2018:21:16:28 +0800]
"GET / HTTP/1.1"
404
312
"http://www.xxx.top"
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/56.0.2924.87 Safari/537.36"
證明我們的正則可以使用。
3. Spark程序?qū)崿F(xiàn)
上一章我介紹了RDD和DF之間的轉(zhuǎn)換和臨時(shí)表Sql的執(zhí)行,這章節(jié)增加了對(duì)RDD數(shù)據(jù)的持久化操作,我將把RDD數(shù)據(jù)集存儲(chǔ)到mysql中。
3.1 創(chuàng)建mysql表
CREATE TABLE `access` (
`remote_addr` varchar(255) DEFAULT NULL,
`remote_user` varchar(255) DEFAULT NULL,
`time_local` varchar(255) DEFAULT NULL,
`request` varchar(255) DEFAULT NULL,
`status` varchar(255) DEFAULT NULL,
`byte_sent` varchar(255) DEFAULT NULL,
`refere` varchar(255) DEFAULT NULL,
`http_agent` varchar(255) DEFAULT NULL,
`http_forward_for` varchar(255) DEFAULT NULL
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
CREATE TABLE `acc_addr_count` (
`remote_addr` varchar(255) DEFAULT NULL,
`count` int(11) DEFAULT NULL
) ENGINE=InnoDB DEFAULT CHARSET=latin1;
第一個(gè)表是log的全部數(shù)據(jù)內(nèi)容,第二個(gè)表是對(duì)ip數(shù)目做一統(tǒng)計(jì)。這兩個(gè)表都在我的數(shù)據(jù)庫(kù)nginx中。
3.2 編寫DBHelper.java
public class DBHelper {
private String url = "jdbc:mysql://192.168.27.166:3306/nginx";
private String name = "com.mysql.jdbc.Driver";
private String user = "root";
private String password = "xxx";
//獲取數(shù)據(jù)庫(kù)連接
public Connection connection = null;
public DBHelper(){
try {
Class.forName(name);
connection = DriverManager.getConnection(url,user,password);
} catch (Exception e) {
e.printStackTrace();
}
}
public void close() throws SQLException {
this.connection.close();
}
}
3.3 編寫實(shí)體類(javaBean)
我將用反射的方法完成對(duì)整條log的清洗,用動(dòng)態(tài)元素創(chuàng)建來(lái)完成對(duì)acc_addr_count表的收集。(不清楚這兩種方法的可先看下上一章)
NginxParams.java
public class NginxParams implements Serializable {
private String remoteAddr;
private String remoteUser;
private String timeLocal;
private String request;
private String status;
private String byteSent;
private String referer;
private String httpUserAgent;
private String httpForwardedFor;
setter and getter ...methods...
@Override
public String toString() {
return "NginxParams{" +
"remoteAddr='" + remoteAddr + '\'' +
", remoteUser='" + remoteUser + '\'' +
", timeLocal='" + timeLocal + '\'' +
", request='" + request + '\'' +
", status='" + status + '\'' +
", byteSent='" + byteSent + '\'' +
", referer='" + referer + '\'' +
", httpUserAgent='" + httpUserAgent + '\'' +
", httpForwardedFor='" + httpForwardedFor + '\'' +
'}';
}
}
3.4 編寫清洗代碼
NginxLogCollect.java
public class NginxLogCollect implements Serializable {
static DBHelper dbHelper = null;
public static void main(String[] args) {
SparkConf conf = new SparkConf().setAppName("NginxLogCollect").setMaster("local");
JavaSparkContext sc = new JavaSparkContext(conf);
sc.setLogLevel("ERROR");
SQLContext sqlContext = new SQLContext(sc);
JavaRDD lines = sc.textFile("C:\\Users\\84407\\Desktop\\nginx.log");
JavaRDD nginxs = lines.map((Function) line -> {
Pattern p = Pattern.compile("([^ ]*) ([^ ]*) ([^ ]*) (\\[.*\\]) (\\\".*?\\\") (-|[0-9]*) (-|[0-9]*) (\\\".*?\\\") (\\\".*?\\\")([^ ]*)");
Matcher m = p.matcher(line);
NginxParams nginxParams = new NginxParams();
while (m.find()){
nginxParams.setRemoteAddr(m.group(1));
nginxParams.setRemoteUser(m.group(2));
nginxParams.setTimeLocal(m.group(4));
nginxParams.setRequest(m.group(5));
nginxParams.setStatus(m.group(6));
nginxParams.setByteSent(m.group(7));
nginxParams.setReferer(m.group(8));
nginxParams.setHttpUserAgent(m.group(9));
nginxParams.setHttpForwardedFor(m.group(10));
}
return nginxParams;
});
/**
* 使用反射方式,將RDD轉(zhuǎn)換為DataFrame
*/
DataFrame nginxDF = sqlContext.createDataFrame(nginxs,NginxParams.class);
/**
* 拿到一個(gè)DataFrame之后,就可以將其注冊(cè)為一個(gè)臨時(shí)表,然后針對(duì)其中的數(shù)據(jù)執(zhí)行sql語(yǔ)句
*/
nginxDF.registerTempTable("nginxs");
DataFrame allDF = sqlContext.sql("select * from nginxs");
//統(tǒng)計(jì)ip訪問(wèn)數(shù)
DataFrame addrCount = sqlContext.sql("select remoteAddr,COUNT(remoteAddr)as count from nginxs GROUP BY remoteAddr ORDER BY count DESC");
/**
* 將查詢出來(lái)的DataFrame ,再次轉(zhuǎn)換為RDD
*/
JavaRDD allRDD = allDF.javaRDD();
JavaRDD addrCountRDD = addrCount.javaRDD();
/**
* 將RDD中的數(shù)據(jù)進(jìn)行映射,映射為NginxParams
*/
JavaRDD map = allRDD.map((Function) row -> {
NginxParams nginxParams = new NginxParams();
nginxParams.setRemoteAddr(row.getString(4));
nginxParams.setRemoteUser(row.getString(5));
nginxParams.setTimeLocal(row.getString(8));
nginxParams.setRequest(row.getString(6));
nginxParams.setStatus(row.getString(7));
nginxParams.setByteSent(row.getString(0));
nginxParams.setReferer(row.getString(2));
nginxParams.setHttpUserAgent(row.getString(3));
nginxParams.setHttpForwardedFor(row.getString(1));
return nginxParams;
});
/**
* 將數(shù)據(jù)collect回來(lái),然后打印
*/
// List nginxParamsList = map.collect();
// for (NginxParams np:nginxParamsList){
// System.out.println(np);
// }
dbHelper = new DBHelper();
String sql = "INSERT INTO `access` VALUES (?,?,?,?,?,?,?,?,?)";
map.foreach((VoidFunction) nginxParams -> {
PreparedStatement pt = dbHelper.connection.prepareStatement(sql);
pt.setString(1,nginxParams.getRemoteAddr());
pt.setString(2,nginxParams.getRemoteUser());
pt.setString(3,nginxParams.getTimeLocal());
pt.setString(4,nginxParams.getRequest());
pt.setString(5,nginxParams.getStatus());
pt.setString(6,nginxParams.getByteSent());
pt.setString(7,nginxParams.getReferer());
pt.setString(8,nginxParams.getHttpUserAgent());
pt.setString(9,nginxParams.getHttpForwardedFor());
pt.executeUpdate();
});
String addrCountSql = "insert into `acc_addr_count` values(?,?)";
addrCountRDD.foreach((VoidFunction) row -> {
System.out.println("row.getString(0)"+row.getString(0));
System.out.println("row.getString(1)"+row.getLong(1));
PreparedStatement pt = dbHelper.connection.prepareStatement(addrCountSql);
pt.setString(1,row.getString(0));
pt.setString(2, String.valueOf(row.getLong(1)));
pt.executeUpdate();
});
}
}
4. 執(zhí)行完后查看數(shù)據(jù)庫(kù):
5. 總結(jié)
5.1 集群中執(zhí)行
上面例子執(zhí)行在本地,如果打包運(yùn)行在服務(wù)器,需要執(zhí)行腳本。
/home/fantj/spark/bin/spark-submit \
--class com.fantj.nginxlog.NginxLogCollect\
--num-executors 1 \
--driver-memory 100m \
--executor-memory 100m \
--executor-cores 3 \
--files /home/fantj/hive/conf/hive-site.xml \
--driver-class-path /home/fantj/hive/lib/mysql-connector-java-5.1.17.jar \
/home/fantj/nginxlog.jar \
并修改setMaster()和sc.textFile()的參數(shù)。
5.2 定時(shí)任務(wù)實(shí)現(xiàn)
我們可以將執(zhí)行腳本打包寫一個(gè)azkaban的定時(shí)job,然后做每天的數(shù)據(jù)統(tǒng)計(jì)。當(dāng)然,這里面還有很多細(xì)節(jié),比如nginx日志按天分割等。但是都是一些小問(wèn)題。(不熟悉azkaban的:Azkaban 簡(jiǎn)單入門)
總結(jié)
以上是生活随笔為你收集整理的spark 持久化 mysql_Spark 从零到开发(八)nginx日志清洗并持久化实战的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: python爬取qq音乐周杰伦_Pyth
- 下一篇: spark指定hive字段_Spark2