Java Spark之创建RDD的两种方式和操作RDD
首先看看思維導圖,我的spark是1.6.1版本,jdk是1.7版本?
spark是什么??
Spark是基于內存計算的大數據并行計算框架。Spark基于內存計算,提高了在大數據環境下數據處理的實時性,同時保證了高容錯性和高可伸縮性,允許用戶將Spark 部署在大量廉價硬件之上,形成集群。
下載和安裝?
可以看我之前發表的博客?
Spark安裝
安裝成功后運行示例程序
在spark安裝目錄下examples/src/main目錄中。 運行的一個Java或Scala示例程序,使用bin/run-example <class> [params]
./bin/run-example SparkPi 10
啟動spark-shell時的參數?
./bin/spark-shell –master local[2]?
參數master 表名主機master在分布式集群中的URL?
local【2】 表示在本地通過開啟2個線程運行
運行模式?
四種:?
1.Mesos?
2.Hadoop YARN?
3.spark?
4.local
一般我們用的是local和spark模式
首先建立maven工程加入整個項目所用到的包的maven依賴
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
? xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
? <modelVersion>4.0.0</modelVersion>
? <groupId>sparkday01</groupId>
? <artifactId>sparkday01</artifactId>
? <version>0.0.1-SNAPSHOT</version>
? <packaging>jar</packaging>
? <name>sparkday01</name>
? <url>http://maven.apache.org</url>
? <properties>
? ? <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
? </properties>
? <dependencies>
? ? <dependency>
? ? ? <groupId>junit</groupId>
? ? ? <artifactId>junit</artifactId>
? ? ? <version>4.12</version>
? ? ? <scope>test</scope>
? ? </dependency>
? ? ?<dependency>
? ? ? <groupId>org.apache.spark</groupId>
? ? ? <artifactId>spark-core_2.10</artifactId>
? ? ? <version>1.6.1</version>
? ? ?</dependency>
? ? <dependency>
? ? ? <groupId>org.apache.hadoop</groupId>
? ? ? <artifactId>hadoop-client</artifactId>
? ? ? <version>2.6.4</version>
? ? </dependency>
? ? <dependency>
? ? <groupId>org.apache.spark</groupId>
? ? <artifactId>spark-sql_2.10</artifactId>
? ? <version>1.6.1</version>
? ? </dependency>
? </dependencies>
</project>
下面開始初始化spark
spark程序需要做的第一件事情,就是創建一個SparkContext對象,它將告訴spark如何訪問一個集群,而要創建一個SparkContext對象,你首先要創建一個SparkConf對象,該對象訪問了你的應用程序的信息
比如下面的代碼是運行在spark模式下
public class sparkTestCon {
? ? public static void main(String[] args) {
? ? ? ? SparkConf conf=new SparkConf();
? ? ? ? conf.set("spark.testing.memory", "2147480000"); ? ? //因為jvm無法獲得足夠的資源
? ? ? ? JavaSparkContext sc = new JavaSparkContext("spark://192.168.52.140:7077", "First Spark App",conf);
? ? ? ? System.out.println(sc);
? ? }
}
下面是運行在本機,把上面的第6行代碼改為如下
JavaSparkContext sc = new JavaSparkContext("local", "First Spark App",conf);
快速入門
可以參看我的博客,轉載的一篇文章?
Spark快速入門
Spark編程
每一個spark應用程序都包含一個驅動程序(driver program ),他會運行用戶的main函數,并在集群上執行各種并行操作(parallel operations)
spark提供的最主要的抽象概念有兩種:?
彈性分布式數據集(resilient distributed dataset)簡稱RDD ,他是一個元素集合,被分區地分布到集群的不同節點上,可以被并行操作,RDDS可以從hdfs(或者任意其他的支持Hadoop的文件系統)上的一個文件開始創建,或者通過轉換驅動程序中已經存在的Scala集合得到,用戶也可以讓spark將一個RDD持久化到內存中,使其能再并行操作中被有效地重復使用,最后RDD能自動從節點故障中恢復
spark的第二個抽象概念是共享變量(shared variables),它可以在并行操作中使用,在默認情況下,當spark將一個函數以任務集的形式在不同的節點上并行運行時,會將該函數所使用的每個變量拷貝傳遞給每一個任務中,有時候,一個變量需要在任務之間,或者驅動程序之間進行共享,spark支持兩種共享變量:?
廣播變量(broadcast variables),它可以在所有節點的內存中緩存一個值。?
累加器(accumulators):只能用于做加法的變量,例如計算器或求和器
RDD的創建有兩種方式?
1.引用外部文件系統的數據集(HDFS)?
2.并行化一個已經存在于驅動程序中的集合(并行集合,是通過對于驅動程序中的集合調用JavaSparkContext.parallelize來構建的RDD)
第一種方式創建?
下面通過代碼來理解RDD和怎么操作RDD
package com.tg.spark;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.storage.StorageLevel;
/**
?* 引用外部文件系統的數據集(HDFS)創建RDD
?* ?匿名內部類定義函數傳給spark
?* @author 湯高
?*
?*/
public class RDDOps {
? ? //完成對所有行的長度求和
? ? public static void main(String[] args) {
? ? ? ? SparkConf conf=new SparkConf();
? ? ? ? conf.set("spark.testing.memory", "2147480000"); ? ? //因為jvm無法獲得足夠的資源
? ? ? ? JavaSparkContext sc = new JavaSparkContext("local", "First Spark App",conf);
? ? ? ? System.out.println(sc);
? ? ? ? //通過hdfs上的文件定義一個RDD 這個數據暫時還沒有加載到內存,也沒有在上面執行動作,lines僅僅指向這個文件
? ? ? ? JavaRDD<String> lines = sc.textFile("hdfs://master:9000/testFile/README.md");
? ? ? ? //定義lineLengths作為Map轉換的結果 由于惰性,不會立即計算lineLengths
? ? ? ? //第一個參數為傳入的內容,第二個參數為函數操作完后返回的結果類型
? ? ? ? JavaRDD<Integer> lineLengths = lines.map(new Function<String, Integer>() {
? ? ? ? ? public Integer call(String s) {?
? ? ? ? ? ? ? System.out.println("每行長度"+s.length());
? ? ? ? ? ? ? return s.length(); }
? ? ? ? });
? ? ? ? //運行reduce ?這是一個動作action ?這時候,spark才將計算拆分成不同的task,
? ? ? ? //并運行在獨立的機器上,每臺機器運行他自己的map部分和本地的reducation,并返回結果集給去驅動程序
? ? ? ? int totalLength = lineLengths.reduce(new Function2<Integer, Integer, Integer>() {
? ? ? ? ? public Integer call(Integer a, Integer b) { return a + b; }
? ? ? ? });
? ? ? ? System.out.println(totalLength);
? ? ? ? //為了以后復用 ?持久化到內存...
? ? ? ? lineLengths.persist(StorageLevel.MEMORY_ONLY());
? ? }
}
如果覺得剛剛那種寫法難以理解,可以看看第二種寫法
package com.tg.spark;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.storage.StorageLevel;
/**
?* 引用外部文件系統的數據集(HDFS)創建RDD?
?* ?外部類定義函數傳給spark
?* @author 湯高
?*
?*/
public class RDDOps2 {
? ? // 完成對所有行的長度求和
? ? public static void main(String[] args) {
? ? ? ? SparkConf conf = new SparkConf();
? ? ? ? conf.set("spark.testing.memory", "2147480000"); // 因為jvm無法獲得足夠的資源
? ? ? ? JavaSparkContext sc = new JavaSparkContext("local", "First Spark App", conf);
? ? ? ? System.out.println(sc);
? ? ? ? //通過hdfs上的文件定義一個RDD 這個數據暫時還沒有加載到內存,也沒有在上面執行動作,lines僅僅指向這個文件
? ? ? ? JavaRDD<String> lines = sc.textFile("hdfs://master:9000/testFile/README.md");
? ? ? ? //定義lineLengths作為Map轉換的結果 由于惰性,不會立即計算lineLengths
? ? ? ? JavaRDD<Integer> lineLengths = lines.map(new GetLength());
? ? ? ? //運行reduce ?這是一個動作action ?這時候,spark才將計算拆分成不同的task,
? ? ? ? ? ? ? ? //并運行在獨立的機器上,每臺機器運行他自己的map部分和本地的reducation,并返回結果集給去驅動程序
? ? ? ? int totalLength = lineLengths.reduce(new Sum());
? ? ? ? System.out.println("總長度"+totalLength);
? ? ? ? // 為了以后復用 持久化到內存...
? ? ? ? lineLengths.persist(StorageLevel.MEMORY_ONLY());
? ? }
? ? //定義map函數
? ? //第一個參數為傳入的內容,第二個參數為函數操作完后返回的結果類型
? ? static class GetLength implements Function<String, Integer> {
? ? ? ? public Integer call(String s) {
? ? ? ? ? ? return s.length();
? ? ? ? }
? ? }
? ? //定義reduce函數?
? ? //第一個參數為內容,第三個參數為函數操作完后返回的結果類型
? ? static class Sum implements Function2<Integer, Integer, Integer> {
? ? ? ? public Integer call(Integer a, Integer b) {
? ? ? ? ? ? return a + b;
? ? ? ? }
? ? }
}
第二種方式創建RDD
package com.tg.spark;
import java.util.Arrays;
import java.util.List;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.storage.StorageLevel;
import com.tg.spark.RDDOps2.GetLength;
import com.tg.spark.RDDOps2.Sum;
/**
?* 并行化一個已經存在于驅動程序中的集合創建RDD
?* @author 湯高
?*
?*/
public class RDDOps3 {
? ? // 完成對所有數求和
? ? public static void main(String[] args) {
? ? ? ? SparkConf conf = new SparkConf();
? ? ? ? conf.set("spark.testing.memory", "2147480000"); // 因為jvm無法獲得足夠的資源
? ? ? ? JavaSparkContext sc = new JavaSparkContext("local", "First Spark App", conf);
? ? ? ? System.out.println(sc);
? ? ? ? List<Integer> data = Arrays.asList(1, 2, 3, 4, 5);
? ? ? ? //并行集合,是通過對于驅動程序中的集合調用JavaSparkContext.parallelize來構建的RDD
? ? ? ? JavaRDD<Integer> distData = sc.parallelize(data);
? ? ? ? JavaRDD<Integer> lineLengths = distData.map(new GetLength());
? ? ? ? // 運行reduce 這是一個動作action 這時候,spark才將計算拆分成不同的task,
? ? ? ? // 并運行在獨立的機器上,每臺機器運行他自己的map部分和本地的reducation,并返回結果集給去驅動程序
? ? ? ? int totalLength = lineLengths.reduce(new Sum());
? ? ? ? System.out.println("總和" + totalLength);
? ? ? ? // 為了以后復用 持久化到內存...
? ? ? ? lineLengths.persist(StorageLevel.MEMORY_ONLY());
? ? }
? ? // 定義map函數
? ? static class GetLength implements Function<Integer, Integer> {
? ? ? ? @Override
? ? ? ? public Integer call(Integer a) throws Exception {
? ? ? ? ? ? return a;
? ? ? ? }
? ? }
? ? // 定義reduce函數
? ? static class Sum implements Function2<Integer, Integer, Integer> {
? ? ? ? public Integer call(Integer a, Integer b) {
? ? ? ? ? ? return a + b;
? ? ? ? }
? ? }
}
注意:上面的寫法是基于jdk1.7或者更低版本?
基于jdk1.8有更簡單的寫法?
下面是官方文檔的說明
Note: In this guide, we’ll often use the concise Java 8 lambda syntax to specify Java functions, but in older versions of Java you can implement the interfaces in the org.apache.spark.api.java.function package. We describe passing functions to Spark in more detail below.
Spark’s API relies heavily on passing functions in the driver program to run on the cluster. In Java, functions are represented by classes implementing the interfaces in the org.apache.spark.api.java.function package. There are two ways to create such functions:
Implement the Function interfaces in your own class, either as an anonymous inner class or a named one, and pass an instance of it to Spark.
In Java 8, use lambda expressions to concisely define an implementation.
所以如果要完成上面第一種創建方式,在jdk1.8中可以簡單的這么寫
JavaRDD<String> lines = sc.textFile("hdfs://master:9000/testFile/README.md");
JavaRDD<Integer> lineLengths = lines.map(s -> s.length());
int totalLength = lineLengths.reduce((a, b) -> a + b);
要完成第二種方式的創建,簡單的這么寫
List<Integer> data = Arrays.asList(1, 2, 3, 4, 5);
JavaRDD<Integer> distData = sc.parallelize(data);
主要不同就是在jdk1.7中我們要自己寫一個函數傳到map或者reduce方法中,而在jdk1.8中可以直接在map或者reduce方法中寫lambda表達式
好了,今天就寫到這里,以后的更多內容后面再寫?
碼字不易,轉載請指明出處http://blog.csdn.net/tanggao1314/article/details/51570452
參考資料?
Spark編程指南
?
總結
以上是生活随笔為你收集整理的Java Spark之创建RDD的两种方式和操作RDD的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Spark RDD创建操作
- 下一篇: Elasticsearch 实现自定义排