新聞中心
學(xué)習(xí)spark任何技術(shù)之前,請(qǐng)先正確理解spark,可以參考:正確理解spark
成都創(chuàng)新互聯(lián)是一家集網(wǎng)站建設(shè),莫力達(dá)企業(yè)網(wǎng)站建設(shè),莫力達(dá)品牌網(wǎng)站建設(shè),網(wǎng)站定制,莫力達(dá)網(wǎng)站建設(shè)報(bào)價(jià),網(wǎng)絡(luò)營(yíng)銷(xiāo),網(wǎng)絡(luò)優(yōu)化,莫力達(dá)網(wǎng)站推廣為一體的創(chuàng)新建站企業(yè),幫助傳統(tǒng)企業(yè)提升企業(yè)形象加強(qiáng)企業(yè)競(jìng)爭(zhēng)力。可充分滿(mǎn)足這一群體相比中小企業(yè)更為豐富、高端、多元的互聯(lián)網(wǎng)需求。同時(shí)我們時(shí)刻保持專(zhuān)業(yè)、時(shí)尚、前沿,時(shí)刻以成就客戶(hù)成長(zhǎng)自我,堅(jiān)持不斷學(xué)習(xí)、思考、沉淀、凈化自己,讓我們?yōu)楦嗟钠髽I(yè)打造出實(shí)用型網(wǎng)站。
以下對(duì)RDD的三種創(chuàng)建方式、單類(lèi)型RDD基本的transformation api、采樣Api以及pipe操作進(jìn)行了python api方面的闡述
一、RDD的三種創(chuàng)建方式
從穩(wěn)定的文件存儲(chǔ)系統(tǒng)中創(chuàng)建RDD,比如local fileSystem或者h(yuǎn)dfs等,如下:
""" 創(chuàng)建RDD的方法: 1: 從一個(gè)穩(wěn)定的存儲(chǔ)系統(tǒng)中,比如hdfs文件, 或者本地文件系統(tǒng) """ text_file_rdd = sc.textFile("file:////Users/tangweiqun/spark-course/word.txt") print "text_file_rdd = {0}".format(",".join(text_file_rdd.collect()))
2. 可以經(jīng)過(guò)transformation api從一個(gè)已經(jīng)存在的RDD上創(chuàng)建一個(gè)新的RDD,以下是map這個(gè)轉(zhuǎn)換api
""" 2: 從一個(gè)已經(jīng)存在的RDD中, 即RDD的transformation api """ map_rdd = text_file_rdd.map(lambda line: "{0}-{1}".format(line, "test")) print "map_rdd = {0}".format(",".join(map_rdd.collect()))
3. 從一個(gè)內(nèi)存中的列表數(shù)據(jù)創(chuàng)建一個(gè)RDD,可以指定RDD的分區(qū)數(shù),如果不指定的話,則取所有Executor的所有cores數(shù)量
""" 3: 從一個(gè)已經(jīng)存在于內(nèi)存中的列表, 可以指定分區(qū),如果不指定的話分區(qū)數(shù)為所有executor的cores數(shù),下面的api時(shí)指定了2個(gè)分區(qū) """ parallelize_rdd = sc.parallelize([1, 2, 3, 3, 4], 2) print "parallelize_rdd = {0}".format(parallelize_rdd.glom().collect())
注:對(duì)于第三種情況,scala中還提供了makeRDD api,這個(gè)api可以指定創(chuàng)建RDD每一個(gè)分區(qū)所在的機(jī)器,這個(gè)api的原理詳見(jiàn)spark core RDD scala api中
二、單類(lèi)型RDD基本的transformation api
先基于內(nèi)存中的數(shù)據(jù)創(chuàng)建一個(gè)RDD
conf = SparkConf().setAppName("appName").setMaster("local") sc = SparkContext(conf=conf) parallelize_rdd = sc.parallelize([1, 2, 3, 3, 4], 2)
map操作,表示對(duì)parallelize_rdd的每一個(gè)元素應(yīng)用我們自定義的函數(shù)接口,如下是將每一個(gè)元素加1:
map_rdd = parallelize_rdd.map(lambda x: x + 1) """ 結(jié)果:[[2, 3], [4, 4, 5]] """ print "map_rdd = {0}".format(map_rdd.glom().collect())
需要注意的是,map操作可以返回與RDD不同類(lèi)型的數(shù)據(jù),如下,返回一個(gè)String類(lèi)型對(duì)象:
map_string_rdd = parallelize_rdd.map(lambda x: "{0}-{1}".format(x, "test")) """ 結(jié)果:[['1-test', '2-test'], ['3-test', '3-test', '4-test']] """ print "map_string_rdd = {0}".format(map_string_rdd.glom().collect())
2. flatMap操作,對(duì)parallelize_rdd的每一個(gè)元素應(yīng)用我們自定義的lambda函數(shù),這個(gè)函數(shù)的輸出是一個(gè)數(shù)據(jù)列表,flatMap會(huì)對(duì)這些輸出的數(shù)據(jù)列表進(jìn)行展平
flatmap_rdd = parallelize_rdd.flatMap(lambda x: range(x)) """ 結(jié)果:[[0, 0, 1], [0, 1, 2, 0, 1, 2, 0, 1, 2, 3]] """ print "flatmap_rdd = {0}".format(flatmap_rdd.glom().collect())
3. filter操作,對(duì)parallelize_rdd的每一個(gè)元素應(yīng)用我們自定義的過(guò)濾函數(shù),過(guò)濾掉我們不需要的元素,如下,過(guò)濾掉不等于1的元素:
filter_rdd = parallelize_rdd.filter(lambda x: x != 1) """ 結(jié)果:[[2], [3, 3, 4]] """ print "filter_rdd = {0}".format(filter_rdd.glom().collect())
4. glom操作,查看parallelize_rdd每一個(gè)分區(qū)對(duì)應(yīng)的元素?cái)?shù)據(jù)
glomRDD = parallelize_rdd.glom() """ 結(jié)果:[[1, 2], [3, 3, 4]] 說(shuō)明parallelize_rdd有兩個(gè)分區(qū),第一個(gè)分區(qū)中有數(shù)據(jù)1和2,第二個(gè)分區(qū)中有數(shù)據(jù)3,3和4 """ print "glomRDD = {0}".format(glomRDD.collect())
5. mapPartitions操作,對(duì)parallelize_rdd的每一個(gè)分區(qū)的數(shù)據(jù)應(yīng)用我們自定義的函數(shù)接口方法,假設(shè)我們需要為每一個(gè)元素加上一個(gè)初始值,而這個(gè)初始值的獲取又是非常耗時(shí)的,這個(gè)時(shí)候用mapPartitions會(huì)有非常大的優(yōu)勢(shì),如下:
//這是一個(gè)初始值獲取的方法,是一個(gè)比較耗時(shí)的方法 def get_init_number(source): print "get init number from {0}, may be take much time........".format(source) time.sleep(1) return 1 def map_partition_func(iterator): """ 每一個(gè)分區(qū)獲取一次初始值,integerJavaRDD有兩個(gè)分區(qū),那么會(huì)調(diào)用兩次getInitNumber方法 所以對(duì)應(yīng)需要初始化的比較耗時(shí)的操作,比如初始化數(shù)據(jù)庫(kù)的連接等,一般都是用mapPartitions來(lái)為對(duì)每一個(gè)分區(qū)初始化一次,而不要去使用map操作 :param iterator: :return: """ init_number = get_init_number("map_partition_func") yield map(lambda x : x + init_number, iterator) map_partition_rdd = parallelize_rdd.mapPartitions(map_partition_func) """ 結(jié)果:[[[2, 3]], [[4, 4, 5]]] """ print "map_partition_rdd = {0}".format(map_partition_rdd.glom().collect()) def map_func(x): """ 遍歷每一個(gè)元素的時(shí)候都會(huì)去獲取初始值,這個(gè)integerJavaRDD含有5個(gè)元素,那么這個(gè)getInitNumber方法會(huì)被調(diào)用4次,嚴(yán)重的影響了時(shí)間,不如mapPartitions性能好 :param x: :return: """ init_number = get_init_number("map_func") return x + init_number map_rdd_init_number = parallelize_rdd.map(map_func) """ 結(jié)果:[[2, 3], [4, 4, 5]] """ print "map_rdd_init_number = {0}".format(map_rdd_init_number.glom().collect())
6. mapPartitionsWithIndex操作,對(duì)parallelize_rdd的每一個(gè)分區(qū)的數(shù)據(jù)應(yīng)用我們自定義的函數(shù)接口方法,在應(yīng)用函數(shù)接口方法的時(shí)候帶上了分區(qū)信息,即知道你當(dāng)前處理的是第幾個(gè)分區(qū)的數(shù)據(jù)
def map_partition_with_index_func(partition_index, iterator): yield (partition_index, sum(iterator)) map_partition_with_index_rdd = parallelize_rdd.mapPartitionsWithIndex(map_partition_with_index_func) """ 結(jié)果:[[(0, 3)], [(1, 10)]] """ print "map_partition_with_index_rdd = {0}".format(map_partition_with_index_rdd.glom().collect())
三、采樣Api
先基于內(nèi)存中的數(shù)據(jù)創(chuàng)建一個(gè)RDD
conf = SparkConf().setAppName("appName").setMaster("local") sc = SparkContext(conf=conf) parallelize_rdd = sc.parallelize([1, 2, 3, 3, 4], 2)
sample
""" 第一個(gè)參數(shù)為withReplacement 如果withReplacement=true的話表示有放回的抽樣,采用泊松抽樣算法實(shí)現(xiàn) 如果withReplacement=false的話表示無(wú)放回的抽樣,采用伯努利抽樣算法實(shí)現(xiàn) 第二個(gè)參數(shù)為:fraction,表示每一個(gè)元素被抽取為樣本的概率,并不是表示需要抽取的數(shù)據(jù)量的因子 比如從100個(gè)數(shù)據(jù)中抽樣,fraction=0.2,并不是表示需要抽取100 * 0.2 = 20個(gè)數(shù)據(jù), 而是表示100個(gè)元素的被抽取為樣本概率為0.2;樣本的大小并不是固定的,而是服從二項(xiàng)分布 當(dāng)withReplacement=true的時(shí)候fraction>=0 當(dāng)withReplacement=false的時(shí)候 0 < fraction < 1 第三個(gè)參數(shù)為:reed表示生成隨機(jī)數(shù)的種子,即根據(jù)這個(gè)reed為rdd的每一個(gè)分區(qū)生成一個(gè)隨機(jī)種子 """ sample_rdd = parallelize_rdd.sample(False, 0.5, 100) """ 結(jié)果:[[1], [3, 4]] """ print "sample_rdd = {0}".format(sample_rdd.glom().collect())
2. randomSplit
""" //按照權(quán)重對(duì)RDD進(jìn)行隨機(jī)抽樣切分,有幾個(gè)權(quán)重就切分成幾個(gè)RDD //隨機(jī)抽樣采用伯努利抽樣算法實(shí)現(xiàn), 以下是有兩個(gè)權(quán)重,則會(huì)切成兩個(gè)RDD """ split_rdds = parallelize_rdd.randomSplit([0.2, 0.8]) print len(split_rdds) """[[], [3, 4]]""" print "split_rdds[0] = {0}".format(split_rdds[0].glom().collect()) """[[1, 2], [3]]""" print "split_rdds[1] = {0}".format(split_rdds[1].glom().collect())
3. takeSample
""" //隨機(jī)抽樣指定數(shù)量的樣本數(shù)據(jù) //第一個(gè)參數(shù)為withReplacement //如果withReplacement=true的話表示有放回的抽樣,采用泊松抽樣算法實(shí)現(xiàn) //如果withReplacement=false的話表示無(wú)放回的抽樣,采用伯努利抽樣算法實(shí)現(xiàn) //第二個(gè)參數(shù)指定多少,則返回多少個(gè)樣本數(shù) """ """隨機(jī)抽樣指定數(shù)量的樣本數(shù)據(jù) 結(jié)果:[1] """ print parallelize_rdd.takeSample(False, 1)
4. 分層采樣,對(duì)key-value類(lèi)型的RDD進(jìn)行采樣
"""創(chuàng)建一個(gè)key value類(lèi)型的RDD""" pair_rdd = sc.parallelize([('A', 1), ('B', 2), ('C', 3), ('B', 4), ('A', 5)]) sampleByKey_rdd = pair_rdd.sampleByKey(withReplacement=False, fractions={'A':0.5, 'B':1, 'C':0.2}) """ 結(jié)果:[[('A', 1), ('B', 2), ('B', 4)]] """ print "sampleByKey_rdd = {0}".format(sampleByKey_rdd.glom().collect())
抽樣的原理詳細(xì)可以參考:spark core RDD api。這些原理性的東西用文字不太好表述
四、pipe,表示在RDD執(zhí)行流中的某一步執(zhí)行其他的腳本,比如python或者shell腳本等
conf = SparkConf().setAppName("appName").setMaster("local") sc = SparkContext(conf=conf) parallelize_rdd = sc.parallelize(["test1", "test2", "test3", "test4", "test5"], 2) """ //如果是在真實(shí)的spark集群中,那么要求echo.py在集群的每一臺(tái)機(jī)器的同一個(gè)目錄下面都要有 //第二個(gè)參數(shù)是環(huán)境變量 """ pipe_rdd = parallelize_rdd.pipe("python /Users/tangweiqun/spark/source/spark-course/spark-rdd-java/src/main/resources/echo.py", {"env":"env"}) """ 結(jié)果:slave1-test1-env slave1-test2-env slave1-test3-env slave1-test4-env slave1-test5-env """ print "pipe_rdd = {0}".format(" ".join(pipe_rdd.collect()))
echo.py的內(nèi)容如下:
import sys import os #input = "test" input = sys.stdin env_keys = os.environ.keys() env = "" if "env" in env_keys: env = os.environ["env"] for ele in input: output = "slave1-" + ele.strip('\n') + "-" + env print (output) input.close
對(duì)于pipe的原理,以及怎么實(shí)現(xiàn)的,參考:spark core RDD api,這個(gè)里面還清楚的講述了怎么消除手工將腳本拷貝到每一臺(tái)機(jī)器中的工作
網(wǎng)站欄目:spark2.x由淺入深深到底系列七之RDDpythonapi詳解一
分享URL:http://fisionsoft.com.cn/article/jidgji.html