在线观看不卡亚洲电影_亚洲妓女99综合网_91青青青亚洲娱乐在线观看_日韩无码高清综合久久

鍍金池/ 教程/ 大數(shù)據(jù)/ Spark核心編程
Spark 安裝
Spark編程
Spark核心編程
Spark教程
Apache Spark RDD
Spark部署

Spark核心編程

Spark 核心是整個項目的基礎(chǔ)。它提供了分布式任務(wù)調(diào)度,調(diào)度和基本的 I/O 功能。Spark 使用一種稱為RDD(彈性分布式數(shù)據(jù)集)一個專門的基礎(chǔ)數(shù)據(jù)結(jié)構(gòu),是整個機(jī)器分區(qū)數(shù)據(jù)的邏輯集合。RDDS可以用兩種方法來創(chuàng)建的;一個是在外部存儲系統(tǒng)引用的數(shù)據(jù)集,第二個是通過應(yīng)用轉(zhuǎn)換(如map, filter, reducer, join)在現(xiàn)有RDDS。

RDD抽象通過語言集成API公開。這簡化了編程的復(fù)雜性,因為應(yīng)用程序的處理RDDS方式類似于操縱的本地集合數(shù)據(jù)。

Spark Shell

Spark提供了一個交互的shell ? 一個強大的工具,以交互方式分析數(shù)據(jù)。 這是在 Scala或Python語言。Spark主要抽象稱為彈性分布式數(shù)據(jù)集(RDD)項目的分布式采集。RDDS可以從Hadoop的輸入格式來創(chuàng)建(如HDFS文件)或通過轉(zhuǎn)化其他RDDS。

打開 Spark Shell

下面的命令用來打開Spark shell。
$ spark-shell

創(chuàng)建簡單RDD

讓我們從文本文件中創(chuàng)建一個簡單的 RDD。使用下面的命令來創(chuàng)建一個簡單的 RDD。
scala> val inputfile = sc.textFile(“input.txt”)
對上述命令的輸出為:
inputfile: org.apache.spark.rdd.RDD[String] = input.txt MappedRDD[1] at textFile at <console>:12
Spark RDD API引入了一些變革和一些動作來操縱RDD。

RDD 轉(zhuǎn)換

RDD轉(zhuǎn)換返回指向新的RDD,并允許創(chuàng)建RDDS之間的依賴關(guān)系。 在依賴關(guān)系鏈中的每個RDD(依賴關(guān)系的字串)具有這樣的功能,用于計算其數(shù)據(jù)并具有一個指針(依賴性)到其父RDD。

Spark是懶惰的,所以什么都不會被執(zhí)行,除非調(diào)用一些改造或行動將觸發(fā)作業(yè)創(chuàng)建和執(zhí)行??磫卧~計數(shù)示例,如下面的代碼片段。

因此,RDD轉(zhuǎn)型不是一組數(shù)據(jù)而是在程序中的一個步驟(可能是唯一的步驟)告訴Spark如何獲取數(shù)據(jù)以及如何使用它。
下面給出是RDD轉(zhuǎn)換的列表。
S.No
轉(zhuǎn)換&含義
1

map(func)

返回一個新的分布式數(shù)據(jù)集,傳遞源的每個元素形成通過一個函數(shù) func

2

filter(func)

返回由選擇在func返回true,源元素組成了一個新的數(shù)據(jù)集
3

flatMap(func)

類似映射,但每個輸入項目可以被映射到0以上輸出項目(所以func應(yīng)返回seq而不是單一的項目)
4

mapPartitions(func)

類似映射,只不過是單獨的每個分區(qū)(塊)上運行RDD,因此 func 的類型必須是Iterator<T> ? Iterator<U> 對類型T在RDD上運行時

5

mapPartitionsWithIndex(func)

類似映射分區(qū),而且還提供func 來表示分區(qū)的索引的整數(shù)值,因此 func 必須是類型 (Int, Iterator<T>) ? Iterator<U> 當(dāng)類型T在RDD上運行時

6

sample(withReplacement, fraction, seed)

采樣數(shù)據(jù)的一小部分,有或沒有更換,利用給定的隨機(jī)數(shù)發(fā)生器的種子
7

union(otherDataset)

返回一個新的數(shù)據(jù)集,其中包含源數(shù)據(jù)和參數(shù)元素的結(jié)合
8

intersection(otherDataset)

返回包含在源數(shù)據(jù)和參數(shù)元素的新RDD交集
9

distinct([numTasks])

返回一個新的數(shù)據(jù)集包含源數(shù)據(jù)集的不同元素
10

groupByKey([numTasks])

當(dāng)調(diào)用(K,V)數(shù)據(jù)集,返回(K, Iterable<V>) 對數(shù)據(jù)集

11

reduceByKey(func, [numTasks])

12

aggregateByKey(zeroValue)(seqOp, combOp, [numTasks])

13

sortByKey([ascending], [numTasks])

14

join(otherDataset, [numTasks])

15

cogroup(otherDataset, [numTasks])

16

cartesian(otherDataset)

當(dāng)上調(diào)用類型T和U的數(shù)據(jù)集,返回(T,U)對數(shù)據(jù)集(所有元素對)
17

pipe(command, [envVars])

RDD通過shell命令每個分區(qū),例如:一個Perl或bash腳本。RDD元素被寫入到進(jìn)程的標(biāo)準(zhǔn)輸入和線路輸出,標(biāo)準(zhǔn)輸出形式返回一個字符串RDD

18

coalesce(numPartitions)

減少RDD到numPartitions分區(qū)的數(shù)量。過濾大型數(shù)據(jù)集后,更高效地運行的操作
19

repartition(numPartitions)

打亂RDD數(shù)據(jù)隨機(jī)創(chuàng)造更多或更少的分區(qū),并在它們之間平衡。這總是打亂的所有數(shù)據(jù)在網(wǎng)絡(luò)上
20

repartitionAndSortWithinPartitions(partitioner)

根據(jù)給定的分區(qū)重新分區(qū)RDD及在每個結(jié)果分區(qū),排序鍵記錄。這是調(diào)用重新分配排序在每個分區(qū)內(nèi),因為它可以推動分揀向下進(jìn)入混洗機(jī)制效率更高。

動作

下表給出了操作,及其返回值的列表。
S.No 操作 & 含義
1

reduce(func)

合計數(shù)據(jù)集的元素,使用函數(shù) func (其中有兩個參數(shù)和返回一行). 該函數(shù)應(yīng)該是可交換和可結(jié)合,以便它可以正確地在并行計算。

2

collect()

返回數(shù)據(jù)集的所有作為數(shù)組在驅(qū)動程序的元素。這是一個過濾器或其它操作之后返回數(shù)據(jù)的一個足夠小的子集,通常是有用的

3

count()

返回該數(shù)據(jù)集的元素數(shù)
4

first()

返回的數(shù)據(jù)集的第一個元素(類似于使用(1))
5

take(n)

返回與該數(shù)據(jù)集的前n個元素的陣列。
6

takeSample (withReplacement,num, [seed])

返回數(shù)組的數(shù)據(jù)集num個元素,有或沒有更換隨機(jī)抽樣,預(yù)指定的隨機(jī)數(shù)發(fā)生器的種子可選

7

takeOrdered(n, [ordering])

返回RDD使用或者按其自然順序或自定義比較的前第n個元素
8

saveAsTextFile(path)

寫入數(shù)據(jù)集是一個文本文件中的元素(或一組文本文件),在給定的目錄的本地文件系統(tǒng),HDFS或任何其他的Hadoop支持的文件系統(tǒng)。Spark調(diào)用每個元素的 toString,將其轉(zhuǎn)換為文件中的文本行

9

saveAsSequenceFile(path) (Java and Scala)

寫入數(shù)據(jù)集,為Hadoop SequenceFile元素在給定的路徑寫入在本地文件系統(tǒng),HDFS或任何其他Hadoop支持的文件系統(tǒng)。 這是適用于實現(xiàn)Hadoop可寫接口RDDS的鍵 - 值對。在Scala中,它也可以在屬于隱式轉(zhuǎn)換為可寫(Spark包括轉(zhuǎn)換為基本類型,如 Int, Double, String 等等)類型。

10

saveAsObjectFile(path) (Java and Scala)

寫入數(shù)據(jù)集的內(nèi)容使用Java序列化為一個簡單的格式,然后可以使用SparkContext.objectFile()加載。

11

countByKey()

僅適用于RDDS的類型 (K, V). 返回(K, Int)對與每個鍵的次數(shù)的一個HashMap。

12

foreach(func)

數(shù)據(jù)集的每個元素上運行函數(shù)func。這通常對于不良反應(yīng),例如更新累加器或與外部存儲系統(tǒng)進(jìn)行交互進(jìn)行。

 ? 在 foreach()以外修改變量,其他累加器可能會導(dǎo)致不確定的行為。請參閱了解閉包的更多細(xì)節(jié)。

RDD編程

讓我們來看看幾個RDD轉(zhuǎn)換和操作RDD編程實現(xiàn),用一個例子的協(xié)助說明。

示例

考慮一個單詞計數(shù)的例子 ? 它計算出現(xiàn)在文檔中的每個單詞。請看下面的文字為輸入并保存在主目錄中的 input.txt 文件。

input.txt ? 作為輸入文件

people are not as beautiful as they look, 
as they walk or as they talk.
they are only as beautiful  as they love, 
as they care as they share.
按照下面給出命令執(zhí)行示例程序。

打開Spark-Shell

下面的命令用來打開spark shell. 通常情況下,spark 使用Scala構(gòu)建。因此,Spark 程序需要在 Scala 環(huán)境中運行。

$ spark-shell 

如果Spark shell 成功打開,會發(fā)現(xiàn)下面的輸出。看看輸出“Spark 上下文可作為sc” 的最后一行表示Spark容器會自動創(chuàng)建Spark 上下文對象名為sc。啟動程序的第一步驟之前,SparkContext 對象應(yīng)該被創(chuàng)建。

Spark assembly has been built with Hive, including Datanucleus jars on classpath 
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties 
15/06/04 15:25:22 INFO SecurityManager: Changing view acls to: hadoop 
15/06/04 15:25:22 INFO SecurityManager: Changing modify acls to: hadoop 
15/06/04 15:25:22 INFO SecurityManager: SecurityManager: authentication disabled;
   ui acls disabled; users with view permissions: Set(hadoop); users with modify permissions: Set(hadoop) 
15/06/04 15:25:22 INFO HttpServer: Starting HTTP Server 
15/06/04 15:25:23 INFO Utils: Successfully started service 'HTTP class server' on port 43292. 
Welcome to 
      ____              __ 
     / __/__  ___ _____/ /__ 
    _\ \/ _ \/ _ `/ __/  '_/ 
   /___/ .__/\_,_/_/ /_/\_\   version 1.4.0 
      /_/  
		
Using Scala version 2.10.4 (Java HotSpot(TM) 64-Bit Server VM, Java 1.7.0_71) 
Type in expressions to have them evaluated. 
Spark context available as sc 
scala>

創(chuàng)建一個RDD

首先,我們必須使用 Spark-Scala API 讀取輸入文件,并創(chuàng)建一個RDD。

下面的命令被用于從給定位置讀出的文件。這里,新的 RDD 使用輸入文件名創(chuàng)建。這是在 textFile(“”)方法的參數(shù)字符串是用于輸入文件名的絕對路徑。然而,如果僅給出文件名,那么它輸入文件則在當(dāng)前位置。

scala> val inputfile = sc.textFile("input.txt")

執(zhí)行字?jǐn)?shù)轉(zhuǎn)換

我們的目標(biāo)是計算一個文件中的字?jǐn)?shù)。分裂每一行成詞創(chuàng)建一個平面地圖(flatMap(line ? line.split(“ ”)).

接下來,讀每個詞作為一個鍵和值 ‘1’ (<key, value> = <word,1>) 使用映射函數(shù) (map(word ? (word, 1)).

最后,加入類似的鍵值降低這些鍵 (reduceByKey(_+_)).

下面的命令用于執(zhí)行字?jǐn)?shù)統(tǒng)計邏輯。執(zhí)行此操作后,不會有任何輸出,因為這不是一個動作,這是一個轉(zhuǎn)換; 指向一個新的RDD或告訴spark,用給定的數(shù)據(jù)來做什么)。

scala> val counts = inputfile.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey(_+_);

當(dāng)前RDD

同時用RDD工作,如果想了解當(dāng)前的RDD,那么可使用下面的命令。 它會告訴你關(guān)于當(dāng)前RDD及其依賴調(diào)試的描述。

scala> counts.toDebugString

緩存轉(zhuǎn)換

可以使用 persist() 或 cache() 方法標(biāo)記一個RDD。在第一次計算的操作,這將被保存在存儲器中的節(jié)點上。使用下面的命令來存儲中間轉(zhuǎn)換在內(nèi)存中。

scala> counts.cache()

應(yīng)用動作

應(yīng)用動作(操作),比如存儲所有的轉(zhuǎn)換結(jié)果到一個文本文件中。saveAsTextFile(“”)方法字符串參數(shù)是輸出文件夾的絕對路徑。試試下面的命令來保存輸出文本文件。在下面的例子中, ‘output’ 的文件夾為當(dāng)前位置。

scala> counts.saveAsTextFile("output")

檢查輸出

打開另一個終端進(jìn)入主目錄(其中spark 在其他終端中執(zhí)行)。下面的命令用于檢查輸出目錄。

[hadoop@localhost ~]$ cd output/ 
[hadoop@localhost output]$ ls -1 
 
part-00000 
part-00001 
_SUCCESS

下面的命令是用來查看輸出的 Part-00001 文件。

[hadoop@localhost output]$ cat part-00000

輸出

(people,1) 
(are,2) 
(not,1) 
(as,8) 
(beautiful,2) 
(they, 7) 
(look,1) 
下面的命令是用來查看輸出的 Part-00001 文件。
[hadoop@localhost output]$ cat part-00001 

輸出

(walk, 1) 
(or, 1) 
(talk, 1) 
(only, 1) 
(love, 1) 
(care, 1) 
(share, 1) 

UN持久存儲

UN持續(xù)存在之前,如果想看到用于該應(yīng)用程序的存儲空間,可使用下面的URL在瀏覽器中查看。
http://localhost:4040
這將會看到下面的屏幕,該屏幕顯示用于應(yīng)用程序,這些都在 Spark shell 運行的存儲空間。
storage space
如果想特別的RDD存儲空間,然后使用下面的命令。
Scala> counts.unpersist() 

將看到如下輸出 ?

15/06/27 00:57:33 INFO ShuffledRDD: Removing RDD 9 from persistence list 
15/06/27 00:57:33 INFO BlockManager: Removing RDD 9 
15/06/27 00:57:33 INFO BlockManager: Removing block rdd_9_1 
15/06/27 00:57:33 INFO MemoryStore: Block rdd_9_1 of size 480 dropped from memory (free 280061810) 
15/06/27 00:57:33 INFO BlockManager: Removing block rdd_9_0 
15/06/27 00:57:33 INFO MemoryStore: Block rdd_9_0 of size 296 dropped from memory (free 280062106) 
res7: cou.type = ShuffledRDD[9] at reduceByKey at <console>:14
為了驗證在瀏覽器中的存儲空間,使用下面的URL。
http://localhost:4040/
會看到下面的畫面。它用于應(yīng)用程序,這是在Spark shell運行存儲空間。
Storage space for application

上一篇:Spark編程下一篇:Spark教程