Spark查找某个IP的归属地,二分算法,try{}catch{}的使用,将结果存MySQL数据库
生活随笔
收集整理的這篇文章主要介紹了
Spark查找某个IP的归属地,二分算法,try{}catch{}的使用,将结果存MySQL数据库
小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.
1、創(chuàng)建Maven工程
調(diào)整Maven倉庫所在的位置,具體參考:http://blog.csdn.net/tototuzuoquan/article/details/74571374
2、編寫Pom文件
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>cn.toto.spark</groupId><artifactId>bigdata</artifactId><version>1.0-SNAPSHOT</version><properties><maven.compiler.source>1.7</maven.compiler.source><maven.compiler.target>1.7</maven.compiler.target><encoding>UTF-8</encoding><scala.version>2.10.6</scala.version><spark.version>1.6.2</spark.version><hadoop.version>2.6.4</hadoop.version></properties><dependencies><dependency><groupId>org.scala-lang</groupId><artifactId>scala-library</artifactId><version>${scala.version}</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-core_2.10</artifactId><version>${spark.version}</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-client</artifactId><version>${hadoop.version}</version></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>5.1.38</version></dependency></dependencies><build><sourceDirectory>src/main/scala</sourceDirectory><testSourceDirectory>src/test/scala</testSourceDirectory><plugins><plugin><groupId>net.alchim31.maven</groupId><artifactId>scala-maven-plugin</artifactId><version>3.2.2</version><executions><execution><goals><goal>compile</goal><goal>testCompile</goal></goals><configuration><args><arg>-make:transitive</arg><arg>-dependencyfile</arg><arg>${project.build.directory}/.scala_dependencies</arg></args></configuration></execution></executions></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><version>2.4.3</version><executions><execution><phase>package</phase><goals><goal>shade</goal></goals><configuration><filters><filter><artifact>*:*</artifact><excludes><exclude>META-INF/*.SF</exclude><exclude>META-INF/*.DSA</exclude><exclude>META-INF/*.RSA</exclude></excludes></filter></filters></configuration></execution></executions></plugin></plugins></build></project>3、準(zhǔn)備要處理的文件
其中ip信息的文件(ip.txt)如下:
數(shù)據(jù)訪問文件(access.log)如下:**
4.獲取ip歸屬地信息
package cn.toto.sparkimport java.io.{BufferedReader, FileInputStream, InputStreamReader}import scala.collection.mutable.ArrayBuffer/*** Created by toto on 2017/7/8.* 查找IP的歸屬地信息*/ object IPLocationDemo {def ip2Long(ip: String): Long = {val fragments = ip.split("[.]")var ipNum = 0Lfor (i <- 0 until fragments.length){ipNum = fragments(i).toLong | ipNum << 8L}ipNum}def readData(path: String) = {val br = new BufferedReader(new InputStreamReader(new FileInputStream(path)))var s: String = nullvar flag = trueval lines = new ArrayBuffer[String]()while (flag){s = br.readLine()if (s != null)lines += selseflag = false}lines}def binarySearch(lines: ArrayBuffer[String], ip: Long) : Int = {var low = 0var high = lines.length - 1while (low <= high) {val middle = (low + high) / 2if ((ip >= lines(middle).split("\\|")(2).toLong) && (ip <= lines(middle).split("\\|")(3).toLong))return middleif (ip < lines(middle).split("\\|")(2).toLong)high = middle - 1else {low = middle + 1}}-1}/*** 運行后的結(jié)果是:* 2016917821* 120.55.0.0|120.55.255.255|2016870400|2016935935|亞洲|中國|浙江|杭州||阿里巴巴|330100|China|CN|120.153576|30.287459** 要求2016917821 在 |2016870400|2016935935| 之間。* @param args*/def main(args: Array[String]): Unit = {val ip = "120.55.185.61"val ipNum = ip2Long(ip)println(ipNum)val lines = readData("E:\\learnTempFolder\\ip.txt")val index = binarySearch(lines, ipNum)print(lines(index))} }運行結(jié)果:
5.查詢IP歸屬地相關(guān)信息,并將這些信息存儲到MySQL數(shù)據(jù)庫中
代碼如下:
package cn.toto.sparkimport java.sql.{Connection, Date, DriverManager, PreparedStatement}import org.apache.spark.{SparkConf, SparkContext}/*** Created by toto on 2017/7/8.*/ object IPLocation {val data2MySQL = (iterator: Iterator[(String, Int)]) => {var conn: Connection = nullvar ps : PreparedStatement = nullval sql = "INSERT INTO location_info (location, counts, accesse_date) VALUES (?, ?, ?)"try {conn = DriverManager.getConnection("jdbc:mysql://192.168.106.100:3306/bigdata", "root", "123456")iterator.foreach(line => {ps = conn.prepareStatement(sql)ps.setString(1, line._1)ps.setInt(2, line._2)ps.setDate(3, new Date(System.currentTimeMillis()))ps.executeUpdate()})} catch {case e: Exception => println("Mysql Exception")} finally {if (ps != null)ps.close()if (conn != null)conn.close()}}def ip2Long(ip: String): Long = {val fragments = ip.split("[.]")var ipNum = 0Lfor (i <- 0 until fragments.length){ipNum = fragments(i).toLong | ipNum << 8L}ipNum}def binarySearch(lines: Array[(String, String, String)], ip: Long) : Int = {var low = 0var high = lines.length - 1while (low <= high) {val middle = (low + high) / 2if ((ip >= lines(middle)._1.toLong) && (ip <= lines(middle)._2.toLong))return middleif (ip < lines(middle)._1.toLong)high = middle - 1else {low = middle + 1}}-1}def main(args: Array[String]): Unit = {val conf = new SparkConf().setMaster("local[2]").setAppName("IpLocation")val sc = new SparkContext(conf)val ipRulesRdd = sc.textFile("E://workspace//ip.txt").map(line =>{val fields = line.split("\\|")val start_num = fields(2)val end_num = fields(3)val province = fields(6)(start_num, end_num, province)})//全部的ip映射規(guī)則val ipRulesArrary = ipRulesRdd.collect()//廣播規(guī)則val ipRulesBroadcast = sc.broadcast(ipRulesArrary)//加載要處理的數(shù)據(jù)val ipsRDD = sc.textFile("E://workspace//access.log").map(line => {val fields = line.split("\\|")fields(1)})val result = ipsRDD.map(ip => {val ipNum = ip2Long(ip)val index = binarySearch(ipRulesBroadcast.value, ipNum)val info = ipRulesBroadcast.value(index)//(ip的起始Num, ip的結(jié)束Num,省份名)info}).map(t => (t._3, 1)).reduceByKey(_+_)//向MySQL寫入數(shù)據(jù)result.foreachPartition(data2MySQL(_))//println(result.collect().toBuffer)sc.stop()} }數(shù)據(jù)庫SQL:
CREATE DATABASE bigdata CHARACTER SET utf8;USE bigdata;CREATE TABLE location_info (id INT(10) AUTO_INCREMENT PRIMARY KEY,location VARCHAR(100),counts INT(10),accesse_date DATE ) ENGINE=INNODB DEFAULT CHARSET=utf8;運行程序,運行結(jié)果后:
總結(jié)
以上是生活随笔為你收集整理的Spark查找某个IP的归属地,二分算法,try{}catch{}的使用,将结果存MySQL数据库的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 电汇一定要去银行吗
- 下一篇: Spark编程指引(四)----共享变量