學(xué)習(xí)spark任何技術(shù)之前,請先正確理解spark,可以參考:正確理解spark

目前成都創(chuàng)新互聯(lián)公司已為超過千家的企業(yè)提供了網(wǎng)站建設(shè)、域名、網(wǎng)頁空間、綿陽服務(wù)器托管、企業(yè)網(wǎng)站設(shè)計(jì)、屯昌網(wǎng)站維護(hù)等服務(wù),公司將堅(jiān)持客戶導(dǎo)向、應(yīng)用為本的策略,正道將秉承"和諧、參與、激情"的文化,與客戶和合作伙伴齊心協(xié)力一起成長,共同發(fā)展。
以下對(duì)RDD的三種創(chuàng)建方式、單類型RDD基本的transformation api、采樣Api以及pipe操作進(jìn)行了java api方面的闡述
一、RDD的三種創(chuàng)建方式
從穩(wěn)定的文件存儲(chǔ)系統(tǒng)中創(chuàng)建RDD,比如local fileSystem或者h(yuǎn)dfs等,如下:
//從hdfs文件中創(chuàng)建
JavaRDD<String> textFileRDD = sc.textFile("hdfs://master:9999/users/hadoop-twq/word.txt");
//從本地文件系統(tǒng)的文件中,注意file:后面肯定是至少三個(gè)///,四個(gè)也行,不能是兩個(gè)
//如果指定第二個(gè)參數(shù)的話,表示創(chuàng)建的RDD的最小的分區(qū)數(shù),如果文件分塊的數(shù)量大于指定的分區(qū)
//數(shù)的話則已文件的分塊數(shù)量為準(zhǔn)
JavaRDD<String> textFileRDD = sc.textFile("hdfs://master:9999/users/hadoop-twq/word.txt" 2 );2. 可以經(jīng)過transformation api從一個(gè)已經(jīng)存在的RDD上創(chuàng)建一個(gè)新的RDD,以下是map這個(gè)轉(zhuǎn)換api
JavaRDD<String> mapRDD = textFileRDD.map(new Function<String, String>() {
@Override
public String call(String s) throws Exception {
return s + "test";
}
});
System.out.println("mapRDD = " + mapRDD.collect());3. 從一個(gè)內(nèi)存中的列表數(shù)據(jù)創(chuàng)建一個(gè)RDD,可以指定RDD的分區(qū)數(shù),如果不指定的話,則取所有Executor的所有cores數(shù)量
//創(chuàng)建一個(gè)單類型的JavaRDD
JavaRDD<Integer> integerJavaRDD = sc.parallelize(Arrays.asList(1, 2, 3, 3, 4), 2);
System.out.println("integerJavaRDD = " + integerJavaRDD.glom().collect());
//創(chuàng)建一個(gè)單類型且類型為Double的JavaRDD
JavaDoubleRDD doubleJavaDoubleRDD = sc.parallelizeDoubles(Arrays.asList(2.0, 3.3, 5.6));
System.out.println("doubleJavaDoubleRDD = " + doubleJavaDoubleRDD.collect());
//創(chuàng)建一個(gè)key-value類型的RDD
import scala.Tuple2;
JavaPairRDD<String, Integer> javaPairRDD = sc.parallelizePairs(Arrays.asList(new Tuple2("test", 3), new Tuple2("kkk", 3)));
System.out.println("javaPairRDD = " + javaPairRDD.collect());注:對(duì)于第三種情況,scala中還提供了makeRDD api,這個(gè)api可以指定創(chuàng)建RDD每一個(gè)分區(qū)所在的機(jī)器,這個(gè)api的原理詳見spark core RDD scala api中
二、單類型RDD基本的transformation api
先基于內(nèi)存中的數(shù)據(jù)創(chuàng)建一個(gè)RDD
JavaRDD<Integer> integerJavaRDD = sc.parallelize(Arrays.asList(1, 2, 3, 3), 2);
map操作,表示對(duì)integerJavaRDD的每一個(gè)元素應(yīng)用我們自定義的函數(shù)接口,如下是將每一個(gè)元素加1:
JavaRDD<Integer> mapRDD = integerJavaRDD.map(new Function<Integer, Integer>() {
@Override
public Integer call(Integer element) throws Exception {
return element + 1;
}
});
//結(jié)果:[2, 3, 4, 4]
System.out.println("mapRDD = " + mapRDD.collect());需要注意的是,map操作可以返回與RDD不同類型的數(shù)據(jù),如下,返回一個(gè)自定義的User對(duì)象:
public class User implements Serializable {
private String userId;
private Integer amount;
public User(String userId, Integer amount) {
this.userId = userId;
this.amount = amount;
}
//getter setter....
@Override
public String toString() {
return "User{" +
"userId='" + userId + '\'' +
", amount=" + amount +
'}';
}
}
JavaRDD<User> userJavaRDD = integerJavaRDD.map(new Function<Integer, User>() {
@Override
public User call(Integer element) throws Exception {
if (element < 3) {
return new User("小于3", 22);
} else {
return new User("大于3", 23);
}
}
});
//結(jié)果:[User{userId='小于3', amount=22}, User{userId='小于3', amount=22}, User{userId='大于3', amount=23}, User{userId='大于3', amount=23}]
System.out.println("userJavaRDD = " + userJavaRDD.collect());2. flatMap操作,對(duì)integerJavaRDD的每一個(gè)元素應(yīng)用我們自定義的FlatMapFunction,這個(gè)函數(shù)的輸出是一個(gè)數(shù)據(jù)列表,flatMap會(huì)對(duì)這些輸出的數(shù)據(jù)列表進(jìn)行展平
JavaRDD<Integer> flatMapJavaRDD = integerJavaRDD.flatMap(new FlatMapFunction<Integer, Integer>() {
@Override
public Iterator<Integer> call(Integer element) throws Exception {
//輸出一個(gè)list,這個(gè)list里的元素是0到element
List<Integer> list = new ArrayList<>();
int i = 0;
while (i <= element) {
list.add(i);
i++;
}
return list.iterator();
}
});
//結(jié)果: [0, 1, 0, 1, 2, 0, 1, 2, 3, 0, 1, 2, 3]
System.out.println("flatMapJavaRDD = " + flatMapJavaRDD.collect());3. filter操作,對(duì)integerJavaRDD的每一個(gè)元素應(yīng)用我們自定義的過濾函數(shù),過濾掉我們不需要的元素,如下,過濾掉不等于1的元素:
JavaRDD<Integer> filterJavaRDD = integerJavaRDD.filter(new Function<Integer, Boolean>() {
@Override
public Boolean call(Integer integer) throws Exception {
return integer != 1;
}
});
//結(jié)果為:[2, 3, 3]
System.out.println("filterJavaRDD = " + filterJavaRDD.collect());4. glom操作,查看integerJavaRDD每一個(gè)分區(qū)對(duì)應(yīng)的元素?cái)?shù)據(jù)
JavaRDD<List<Integer>> glomRDD = integerJavaRDD.glom();
//結(jié)果: [[1, 2], [3, 3]], 說明integerJavaRDD有兩個(gè)分區(qū),第一個(gè)分區(qū)中有數(shù)據(jù)1和2,第二個(gè)分區(qū)中有數(shù)據(jù)3和3
System.out.println("glomRDD = " + glomRDD.collect());5. mapPartitions操作,對(duì)integerJavaRDD的每一個(gè)分區(qū)的數(shù)據(jù)應(yīng)用我們自定義的函數(shù)接口方法,假設(shè)我們需要為每一個(gè)元素加上一個(gè)初始值,而這個(gè)初始值的獲取又是非常耗時(shí)的,這個(gè)時(shí)候用mapPartitions會(huì)有非常大的優(yōu)勢,如下:
//這是一個(gè)初始值獲取的方法,是一個(gè)比較耗時(shí)的方法
public static Integer getInitNumber(String source) {
System.out.println("get init number from " + source + ", may be take much time........");
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
return 1;
}
JavaRDD<Integer> mapPartitionTestRDD = integerJavaRDD.mapPartitions(new FlatMapFunction<Iterator<Integer>, Integer>() {
@Override
public Iterator<Integer> call(Iterator<Integer> integerIterator) throws Exception {
//每一個(gè)分區(qū)獲取一次初始值,integerJavaRDD有兩個(gè)分區(qū),那么會(huì)調(diào)用兩次getInitNumber方法
//所以對(duì)應(yīng)需要初始化的比較耗時(shí)的操作,比如初始化數(shù)據(jù)庫的連接等,一般都是用mapPartitions來為對(duì)每一個(gè)分區(qū)初始化一次,而不要去使用map操作
Integer initNumber = getInitNumber("mapPartitions");
List<Integer> list = new ArrayList<>();
while (integerIterator.hasNext()) {
list.add(integerIterator.next() + initNumber);
}
return list.iterator();
}
});
//結(jié)果為: [2, 3, 4, 4]
System.out.println("mapPartitionTestRDD = " + mapPartitionTestRDD.collect());
JavaRDD<Integer> mapInitNumberRDD = integerJavaRDD.map(new Function<Integer, Integer>() {
@Override
public Integer call(Integer integer) throws Exception {
//遍歷每一個(gè)元素的時(shí)候都會(huì)去獲取初始值,這個(gè)integerJavaRDD含有4個(gè)元素,那么這個(gè)getInitNumber方法會(huì)被調(diào)用4次,嚴(yán)重的影響了時(shí)間,不如mapPartitions性能好
Integer initNumber = getInitNumber("map");
return integer + initNumber;
}
});
//結(jié)果為:[2, 3, 4, 4]
System.out.println("mapInitNumberRDD = " + mapInitNumberRDD.collect());6. mapPartitionsWithIndex操作,對(duì)integerJavaRDD的每一個(gè)分區(qū)的數(shù)據(jù)應(yīng)用我們自定義的函數(shù)接口方法,在應(yīng)用函數(shù)接口方法的時(shí)候帶上了分區(qū)信息,即知道你當(dāng)前處理的是第幾個(gè)分區(qū)的數(shù)據(jù)
JavaRDD<Integer> mapPartitionWithIndex = integerJavaRDD.mapPartitionsWithIndex(new Function2<Integer, Iterator<Integer>, Iterator<Integer>>() {
@Override
public Iterator<Integer> call(Integer partitionId, Iterator<Integer> integerIterator) throws Exception {
//partitionId表示當(dāng)前處理的第幾個(gè)分區(qū)的信息
System.out.println("partition id = " + partitionId);
List<Integer> list = new ArrayList<>();
while (integerIterator.hasNext()) {
list.add(integerIterator.next() + partitionId);
}
return list.iterator();
}
}, false);
//結(jié)果 [1, 2, 4, 4]
System.out.println("mapPartitionWithIndex = " + mapPartitionWithIndex.collect());三、采樣Api
先基于內(nèi)存中的數(shù)據(jù)創(chuàng)建一個(gè)RDD
JavaRDD<Integer> listRDD = sc.parallelize(Arrays.asList(1, 2, 3, 3), 2);
sample
//第一個(gè)參數(shù)為withReplacement
//如果withReplacement=true的話表示有放回的抽樣,采用泊松抽樣算法實(shí)現(xiàn)
//如果withReplacement=false的話表示無放回的抽樣,采用伯努利抽樣算法實(shí)現(xiàn)
//第二個(gè)參數(shù)為:fraction,表示每一個(gè)元素被抽取為樣本的概率,并不是表示需要抽取的數(shù)據(jù)量的因子
//比如從100個(gè)數(shù)據(jù)中抽樣,fraction=0.2,并不是表示需要抽取100 * 0.2 = 20個(gè)數(shù)據(jù),
//而是表示100個(gè)元素的被抽取為樣本概率為0.2;樣本的大小并不是固定的,而是服從二項(xiàng)分布
//當(dāng)withReplacement=true的時(shí)候fraction>=0
//當(dāng)withReplacement=false的時(shí)候 0 < fraction < 1
//第三個(gè)參數(shù)為:reed表示生成隨機(jī)數(shù)的種子,即根據(jù)這個(gè)reed為rdd的每一個(gè)分區(qū)生成一個(gè)隨機(jī)種子
JavaRDD<Integer> sampleRDD = listRDD.sample(false, 0.5, 100);
//結(jié)果: [1, 3]
System.out.println("sampleRDD = " + sampleRDD.collect());2. randomSplit
//按照權(quán)重對(duì)RDD進(jìn)行隨機(jī)抽樣切分,有幾個(gè)權(quán)重就切分成幾個(gè)RDD
//隨機(jī)抽樣采用伯努利抽樣算法實(shí)現(xiàn), 以下是有兩個(gè)權(quán)重,則會(huì)切成兩個(gè)RDD
JavaRDD<Integer>[] splitRDDs = listRDD.randomSplit(new double[]{0.4, 0.6});
//結(jié)果為2
System.out.println("splitRDDs.length = " + splitRDDs.length);
//結(jié)果為[2, 3] 結(jié)果是不定的
System.out.println("splitRDD(0) = " + splitRDDs[0].collect());
//結(jié)果為[1, 3] 結(jié)果是不定的
System.out.println("splitRDD(1) = " + splitRDDs[1].collect());3. takeSample
//隨機(jī)抽樣指定數(shù)量的樣本數(shù)據(jù) //第一個(gè)參數(shù)為withReplacement //如果withReplacement=true的話表示有放回的抽樣,采用泊松抽樣算法實(shí)現(xiàn) //如果withReplacement=false的話表示無放回的抽樣,采用伯努利抽樣算法實(shí)現(xiàn) //第二個(gè)參數(shù)指定多少,則返回多少個(gè)樣本數(shù) 結(jié)果為[2, 3] System.out.println(listRDD.takeSample(false, 2));
4. 分層采樣,對(duì)key-value類型的RDD進(jìn)行采樣
//創(chuàng)建一個(gè)key value類型的RDD
import scala.Tuple2;
JavaPairRDD<String, Integer> javaPairRDD =
sc.parallelizePairs(Arrays.asList(new Tuple2("test", 3),
new Tuple2("kkk", 3), new Tuple2("kkk", 3)));
//定義每一個(gè)key的采樣因子
Map<String, Double> fractions = new HashMap<>();
fractions.put("test", 0.5);
fractions.put("kkk", 0.4);
//對(duì)每一個(gè)key進(jìn)行采樣
//結(jié)果為 [(test,3), (kkk,3)]
//sampleByKey 并不對(duì)過濾全量數(shù)據(jù),因此只得到近似值
System.out.println(javaPairRDD.sampleByKey(true, fractions).collect());
//結(jié)果為 [(test,3), (kkk,3)]
//sampleByKeyExtra 會(huì)對(duì)全量數(shù)據(jù)做采樣計(jì)算,因此耗費(fèi)大量的計(jì)算資源,但是結(jié)果會(huì)更準(zhǔn)確。
System.out.println(javaPairRDD.sampleByKeyExact(true, fractions).collect());抽樣的原理詳細(xì)可以參考:spark core RDD api。這些原理性的東西用文字不太好表述
四、pipe,表示在RDD執(zhí)行流中的某一步執(zhí)行其他的腳本,比如python或者shell腳本等
JavaRDD<String> dataRDD = sc.parallelize(Arrays.asList("hi", "hello", "how", "are", "you"), 2);
//啟動(dòng)echo.py需要的環(huán)境變量
Map<String, String> env = new HashMap<>();
env.put("env", "envtest");
List<String> commands = new ArrayList<>();
commands.add("python");
//如果是在真實(shí)的spark集群中,那么要求echo.py在集群的每一臺(tái)機(jī)器的同一個(gè)目錄下面都要有
commands.add("/Users/tangweiqun/spark/source/spark-course/spark-rdd-java/src/main/resources/echo.py");
JavaRDD<String> result = dataRDD.pipe(commands, env);
//結(jié)果為: [slave1-hi-envtest, slave1-hello-envtest, slave1-how-envtest, slave1-are-envtest, slave1-you-envtest]
System.out.println(result.collect());echo.py的內(nèi)容如下:
import sys
import os
#input = "test"
input = sys.stdin
env_keys = os.environ.keys()
env = ""
if "env" in env_keys:
env = os.environ["env"]
for ele in input:
output = "slave1-" + ele.strip('\n') + "-" + env
print (output)
input.close對(duì)于pipe的原理,以及怎么實(shí)現(xiàn)的,參考:spark core RDD api,這個(gè)里面還清楚的講述了怎么消除手工將腳本拷貝到每一臺(tái)機(jī)器中的工作
系統(tǒng)學(xué)習(xí)spark:
1、[老湯] Spark 2.x 之精講Spark Core:https://edu.51cto.com/sd/88429
2、[老湯]Spark 2.x 之精講Spark SQL專題:https://edu.51cto.com/sd/16f3d
3、[老湯]Scala內(nèi)功修煉系列專題:https://edu.51cto.com/sd/8e85b
4、[老湯]Spark 2.x之精講Spark Streamig:https://edu.51cto.com/sd/8c525
5、[老湯]Spark 2.x精講套餐:https://edu.51cto.com/sd/ff9a4
6、從Scala到Spark 2.x專題:https://edu.51cto.com/sd/d72af
本文名稱:spark2.x由淺入深深到底系列六之RDDjavaapi詳解一
當(dāng)前路徑:http://www.chinadenli.net/article14/gshide.html
成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供搜索引擎優(yōu)化、ChatGPT、網(wǎng)站營銷、外貿(mào)建站、網(wǎng)站設(shè)計(jì)、虛擬主機(jī)
聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請盡快告知,我們將會(huì)在第一時(shí)間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場,如需處理請聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時(shí)需注明來源: 創(chuàng)新互聯(lián)