最近2018中文字幕在日韩欧美国产成人片_国产日韩精品一区二区在线_在线观看成年美女黄网色视频_国产精品一区三区五区_国产精彩刺激乱对白_看黄色黄大色黄片免费_人人超碰自拍cao_国产高清av在线_亚洲精品电影av_日韩美女尤物视频网站

RELATEED CONSULTING
相關(guān)咨詢(xún)
選擇下列產(chǎn)品馬上在線溝通
服務(wù)時(shí)間:8:30-17:00
你可能遇到了下面的問(wèn)題
關(guān)閉右側(cè)工具欄

新聞中心

這里有您想知道的互聯(lián)網(wǎng)營(yíng)銷(xiāo)解決方案
spark2.x由淺入深深到底系列七之RDDpythonapi詳解一

學(xué)習(xí)spark任何技術(shù)之前,請(qǐng)先正確理解spark,可以參考:正確理解spark

成都創(chuàng)新互聯(lián)是一家集網(wǎng)站建設(shè),莫力達(dá)企業(yè)網(wǎng)站建設(shè),莫力達(dá)品牌網(wǎng)站建設(shè),網(wǎng)站定制,莫力達(dá)網(wǎng)站建設(shè)報(bào)價(jià),網(wǎng)絡(luò)營(yíng)銷(xiāo),網(wǎng)絡(luò)優(yōu)化,莫力達(dá)網(wǎng)站推廣為一體的創(chuàng)新建站企業(yè),幫助傳統(tǒng)企業(yè)提升企業(yè)形象加強(qiáng)企業(yè)競(jìng)爭(zhēng)力。可充分滿(mǎn)足這一群體相比中小企業(yè)更為豐富、高端、多元的互聯(lián)網(wǎng)需求。同時(shí)我們時(shí)刻保持專(zhuān)業(yè)、時(shí)尚、前沿,時(shí)刻以成就客戶(hù)成長(zhǎng)自我,堅(jiān)持不斷學(xué)習(xí)、思考、沉淀、凈化自己,讓我們?yōu)楦嗟钠髽I(yè)打造出實(shí)用型網(wǎng)站。

以下對(duì)RDD的三種創(chuàng)建方式、單類(lèi)型RDD基本的transformation api、采樣Api以及pipe操作進(jìn)行了python api方面的闡述

一、RDD的三種創(chuàng)建方式

  1. 從穩(wěn)定的文件存儲(chǔ)系統(tǒng)中創(chuàng)建RDD,比如local fileSystem或者h(yuǎn)dfs等,如下:

"""
創(chuàng)建RDD的方法:
1: 從一個(gè)穩(wěn)定的存儲(chǔ)系統(tǒng)中,比如hdfs文件, 或者本地文件系統(tǒng)
"""
text_file_rdd = sc.textFile("file:////Users/tangweiqun/spark-course/word.txt")
print "text_file_rdd = {0}".format(",".join(text_file_rdd.collect()))

2.  可以經(jīng)過(guò)transformation api從一個(gè)已經(jīng)存在的RDD上創(chuàng)建一個(gè)新的RDD,以下是map這個(gè)轉(zhuǎn)換api

"""
2: 從一個(gè)已經(jīng)存在的RDD中, 即RDD的transformation api
"""
map_rdd = text_file_rdd.map(lambda line: "{0}-{1}".format(line, "test"))
print "map_rdd = {0}".format(",".join(map_rdd.collect()))

3.  從一個(gè)內(nèi)存中的列表數(shù)據(jù)創(chuàng)建一個(gè)RDD,可以指定RDD的分區(qū)數(shù),如果不指定的話,則取所有Executor的所有cores數(shù)量

"""
3: 從一個(gè)已經(jīng)存在于內(nèi)存中的列表,  可以指定分區(qū),如果不指定的話分區(qū)數(shù)為所有executor的cores數(shù),下面的api時(shí)指定了2個(gè)分區(qū)
"""
parallelize_rdd = sc.parallelize([1, 2, 3, 3, 4], 2)
print "parallelize_rdd = {0}".format(parallelize_rdd.glom().collect())

注:對(duì)于第三種情況,scala中還提供了makeRDD api,這個(gè)api可以指定創(chuàng)建RDD每一個(gè)分區(qū)所在的機(jī)器,這個(gè)api的原理詳見(jiàn)spark core RDD scala api

二、單類(lèi)型RDD基本的transformation api

先基于內(nèi)存中的數(shù)據(jù)創(chuàng)建一個(gè)RDD

conf = SparkConf().setAppName("appName").setMaster("local")
sc = SparkContext(conf=conf)

parallelize_rdd = sc.parallelize([1, 2, 3, 3, 4], 2)
  1.  map操作,表示對(duì)parallelize_rdd的每一個(gè)元素應(yīng)用我們自定義的函數(shù)接口,如下是將每一個(gè)元素加1:

map_rdd = parallelize_rdd.map(lambda x: x + 1)
"""
    結(jié)果:[[2, 3], [4, 4, 5]]
    """
print "map_rdd = {0}".format(map_rdd.glom().collect())

需要注意的是,map操作可以返回與RDD不同類(lèi)型的數(shù)據(jù),如下,返回一個(gè)String類(lèi)型對(duì)象:

map_string_rdd = parallelize_rdd.map(lambda x: "{0}-{1}".format(x, "test"))
"""
    結(jié)果:[['1-test', '2-test'], ['3-test', '3-test', '4-test']]
    """
print "map_string_rdd = {0}".format(map_string_rdd.glom().collect())

2.  flatMap操作,對(duì)parallelize_rdd的每一個(gè)元素應(yīng)用我們自定義的lambda函數(shù),這個(gè)函數(shù)的輸出是一個(gè)數(shù)據(jù)列表,flatMap會(huì)對(duì)這些輸出的數(shù)據(jù)列表進(jìn)行展平

flatmap_rdd = parallelize_rdd.flatMap(lambda x: range(x))
"""
    結(jié)果:[[0, 0, 1], [0, 1, 2, 0, 1, 2, 0, 1, 2, 3]]
    """
print "flatmap_rdd = {0}".format(flatmap_rdd.glom().collect())

3.  filter操作,對(duì)parallelize_rdd的每一個(gè)元素應(yīng)用我們自定義的過(guò)濾函數(shù),過(guò)濾掉我們不需要的元素,如下,過(guò)濾掉不等于1的元素:

filter_rdd = parallelize_rdd.filter(lambda x: x != 1)
"""
    結(jié)果:[[2], [3, 3, 4]]
    """
print "filter_rdd = {0}".format(filter_rdd.glom().collect())

4.  glom操作,查看parallelize_rdd每一個(gè)分區(qū)對(duì)應(yīng)的元素?cái)?shù)據(jù)

glomRDD = parallelize_rdd.glom()
"""
結(jié)果:[[1, 2], [3, 3, 4]] 說(shuō)明parallelize_rdd有兩個(gè)分區(qū),第一個(gè)分區(qū)中有數(shù)據(jù)1和2,第二個(gè)分區(qū)中有數(shù)據(jù)3,3和4
"""
print "glomRDD = {0}".format(glomRDD.collect())

5.  mapPartitions操作,對(duì)parallelize_rdd的每一個(gè)分區(qū)的數(shù)據(jù)應(yīng)用我們自定義的函數(shù)接口方法,假設(shè)我們需要為每一個(gè)元素加上一個(gè)初始值,而這個(gè)初始值的獲取又是非常耗時(shí)的,這個(gè)時(shí)候用mapPartitions會(huì)有非常大的優(yōu)勢(shì),如下:

//這是一個(gè)初始值獲取的方法,是一個(gè)比較耗時(shí)的方法
def get_init_number(source):
    print "get init number from {0}, may be take much time........".format(source)
    time.sleep(1)
    return 1

def map_partition_func(iterator):
    """
    每一個(gè)分區(qū)獲取一次初始值,integerJavaRDD有兩個(gè)分區(qū),那么會(huì)調(diào)用兩次getInitNumber方法
    所以對(duì)應(yīng)需要初始化的比較耗時(shí)的操作,比如初始化數(shù)據(jù)庫(kù)的連接等,一般都是用mapPartitions來(lái)為對(duì)每一個(gè)分區(qū)初始化一次,而不要去使用map操作
    :param iterator:
    :return:
    """
    init_number = get_init_number("map_partition_func")
    yield map(lambda x : x + init_number, iterator)
map_partition_rdd = parallelize_rdd.mapPartitions(map_partition_func)
"""
    結(jié)果:[[[2, 3]], [[4, 4, 5]]]
    """
print "map_partition_rdd = {0}".format(map_partition_rdd.glom().collect())

def map_func(x):
    """
    遍歷每一個(gè)元素的時(shí)候都會(huì)去獲取初始值,這個(gè)integerJavaRDD含有5個(gè)元素,那么這個(gè)getInitNumber方法會(huì)被調(diào)用4次,嚴(yán)重的影響了時(shí)間,不如mapPartitions性能好
    :param x:
    :return:
    """
    init_number = get_init_number("map_func")
    return x + init_number
map_rdd_init_number = parallelize_rdd.map(map_func)
"""
    結(jié)果:[[2, 3], [4, 4, 5]]
    """
print "map_rdd_init_number = {0}".format(map_rdd_init_number.glom().collect())

6.  mapPartitionsWithIndex操作,對(duì)parallelize_rdd的每一個(gè)分區(qū)的數(shù)據(jù)應(yīng)用我們自定義的函數(shù)接口方法,在應(yīng)用函數(shù)接口方法的時(shí)候帶上了分區(qū)信息,即知道你當(dāng)前處理的是第幾個(gè)分區(qū)的數(shù)據(jù)

def map_partition_with_index_func(partition_index, iterator): yield (partition_index, sum(iterator))
map_partition_with_index_rdd = parallelize_rdd.mapPartitionsWithIndex(map_partition_with_index_func)
"""
    結(jié)果:[[(0, 3)], [(1, 10)]]
    """
print "map_partition_with_index_rdd = {0}".format(map_partition_with_index_rdd.glom().collect())

三、采樣Api

先基于內(nèi)存中的數(shù)據(jù)創(chuàng)建一個(gè)RDD

conf = SparkConf().setAppName("appName").setMaster("local")
sc = SparkContext(conf=conf)

parallelize_rdd = sc.parallelize([1, 2, 3, 3, 4], 2)
  1.  sample

"""
第一個(gè)參數(shù)為withReplacement
如果withReplacement=true的話表示有放回的抽樣,采用泊松抽樣算法實(shí)現(xiàn)
如果withReplacement=false的話表示無(wú)放回的抽樣,采用伯努利抽樣算法實(shí)現(xiàn)

第二個(gè)參數(shù)為:fraction,表示每一個(gè)元素被抽取為樣本的概率,并不是表示需要抽取的數(shù)據(jù)量的因子
比如從100個(gè)數(shù)據(jù)中抽樣,fraction=0.2,并不是表示需要抽取100 * 0.2 = 20個(gè)數(shù)據(jù),
而是表示100個(gè)元素的被抽取為樣本概率為0.2;樣本的大小并不是固定的,而是服從二項(xiàng)分布
當(dāng)withReplacement=true的時(shí)候fraction>=0
當(dāng)withReplacement=false的時(shí)候 0 < fraction < 1

第三個(gè)參數(shù)為:reed表示生成隨機(jī)數(shù)的種子,即根據(jù)這個(gè)reed為rdd的每一個(gè)分區(qū)生成一個(gè)隨機(jī)種子
"""
sample_rdd = parallelize_rdd.sample(False, 0.5, 100)
"""
結(jié)果:[[1], [3, 4]]
"""
print "sample_rdd = {0}".format(sample_rdd.glom().collect())

2.  randomSplit

"""
//按照權(quán)重對(duì)RDD進(jìn)行隨機(jī)抽樣切分,有幾個(gè)權(quán)重就切分成幾個(gè)RDD
//隨機(jī)抽樣采用伯努利抽樣算法實(shí)現(xiàn), 以下是有兩個(gè)權(quán)重,則會(huì)切成兩個(gè)RDD
"""
split_rdds = parallelize_rdd.randomSplit([0.2, 0.8])
print len(split_rdds)
"""[[], [3, 4]]"""
print "split_rdds[0] = {0}".format(split_rdds[0].glom().collect())
"""[[1, 2], [3]]"""
print "split_rdds[1] = {0}".format(split_rdds[1].glom().collect())

3.  takeSample

"""
//隨機(jī)抽樣指定數(shù)量的樣本數(shù)據(jù)
//第一個(gè)參數(shù)為withReplacement
//如果withReplacement=true的話表示有放回的抽樣,采用泊松抽樣算法實(shí)現(xiàn)
//如果withReplacement=false的話表示無(wú)放回的抽樣,采用伯努利抽樣算法實(shí)現(xiàn)
//第二個(gè)參數(shù)指定多少,則返回多少個(gè)樣本數(shù)
"""
"""隨機(jī)抽樣指定數(shù)量的樣本數(shù)據(jù)
結(jié)果:[1]
"""
print parallelize_rdd.takeSample(False, 1)

4. 分層采樣,對(duì)key-value類(lèi)型的RDD進(jìn)行采樣

"""創(chuàng)建一個(gè)key value類(lèi)型的RDD"""
pair_rdd = sc.parallelize([('A', 1), ('B', 2), ('C', 3), ('B', 4), ('A', 5)])
sampleByKey_rdd = pair_rdd.sampleByKey(withReplacement=False, fractions={'A':0.5, 'B':1, 'C':0.2})
"""
結(jié)果:[[('A', 1), ('B', 2), ('B', 4)]]
"""
print "sampleByKey_rdd = {0}".format(sampleByKey_rdd.glom().collect())

抽樣的原理詳細(xì)可以參考:spark core RDD api。這些原理性的東西用文字不太好表述

四、pipe,表示在RDD執(zhí)行流中的某一步執(zhí)行其他的腳本,比如python或者shell腳本等

conf = SparkConf().setAppName("appName").setMaster("local")
sc = SparkContext(conf=conf)

parallelize_rdd = sc.parallelize(["test1", "test2", "test3", "test4", "test5"], 2)

"""
//如果是在真實(shí)的spark集群中,那么要求echo.py在集群的每一臺(tái)機(jī)器的同一個(gè)目錄下面都要有
//第二個(gè)參數(shù)是環(huán)境變量
"""
pipe_rdd = parallelize_rdd.pipe("python /Users/tangweiqun/spark/source/spark-course/spark-rdd-java/src/main/resources/echo.py", {"env":"env"})
"""
結(jié)果:slave1-test1-env slave1-test2-env slave1-test3-env slave1-test4-env slave1-test5-env
"""
print "pipe_rdd = {0}".format(" ".join(pipe_rdd.collect()))

echo.py的內(nèi)容如下:

import sys
import os

#input = "test"
input = sys.stdin
env_keys = os.environ.keys()
env = ""
if "env" in env_keys:
   env = os.environ["env"]
for ele in input:
   output = "slave1-" + ele.strip('\n') + "-" + env
       print (output)

input.close

對(duì)于pipe的原理,以及怎么實(shí)現(xiàn)的,參考:spark core RDD api,這個(gè)里面還清楚的講述了怎么消除手工將腳本拷貝到每一臺(tái)機(jī)器中的工作


網(wǎng)站欄目:spark2.x由淺入深深到底系列七之RDDpythonapi詳解一
分享URL:http://fisionsoft.com.cn/article/jidgji.html