輸入DStreams表示從數(shù)據(jù)源獲取輸入數(shù)據(jù)流的DStreams。在快速例子中,lines
表示輸入DStream,它代表從netcat服務(wù)器獲取的數(shù)據(jù)流。每一個(gè)輸入流DStream和一個(gè)Receiver
對(duì)象相關(guān)聯(lián),這個(gè)Receiver
從源中獲取數(shù)據(jù),并將數(shù)據(jù)存入內(nèi)存中用于處理。
輸入DStreams表示從數(shù)據(jù)源獲取的原始數(shù)據(jù)流。Spark Streaming擁有兩類數(shù)據(jù)源
需要注意的是,如果你想在一個(gè)流應(yīng)用中并行地創(chuàng)建多個(gè)輸入DStream來接收多個(gè)數(shù)據(jù)流,你能夠創(chuàng)建多個(gè)輸入流(這將在性能調(diào)優(yōu)那一節(jié)介紹)。它將創(chuàng)建多個(gè)Receiver同時(shí)接收多個(gè)數(shù)據(jù)流。但是,receiver
作為一個(gè)長期運(yùn)行的任務(wù)運(yùn)行在Spark worker或executor中。因此,它占有一個(gè)核,這個(gè)核是分配給Spark Streaming應(yīng)用程序的所有核中的一個(gè)(it occupies one of the cores allocated to the Spark Streaming application)。所以,為Spark Streaming應(yīng)用程序分配足夠的核(如果是本地運(yùn)行,那么是線程)用以處理接收的數(shù)據(jù)并且運(yùn)行receiver
是非常重要的。
幾點(diǎn)需要注意的地方:
我們已經(jīng)在快速例子中看到,ssc.socketTextStream(...)
方法用來把從TCP套接字獲取的文本數(shù)據(jù)創(chuàng)建成DStream。除了套接字,StreamingContext API也支持把文件以及Akka actors作為輸入源創(chuàng)建DStream。
streamingContext.fileStream[keyClass, valueClass, inputFormatClass](dataDirectory)
Spark Streaming將會(huì)監(jiān)控dataDirectory
目錄,并且處理目錄下生成的任何文件(嵌套目錄不被支持)。需要注意一下三點(diǎn):
1 所有文件必須具有相同的數(shù)據(jù)格式
2 所有文件必須在`dataDirectory`目錄下創(chuàng)建,文件是自動(dòng)的移動(dòng)和重命名到數(shù)據(jù)目錄下
3 一旦移動(dòng),文件必須被修改。所以如果文件被持續(xù)的附加數(shù)據(jù),新的數(shù)據(jù)不會(huì)被讀取。
對(duì)于簡單的文本文件,有一個(gè)更簡單的方法streamingContext.textFileStream(dataDirectory)
可以被調(diào)用。文件流不需要運(yùn)行一個(gè)receiver,所以不需要分配核。
在Spark1.2中,fileStream
在Python API中不可用,只有textFileStream
可用。
streamingContext.actorStream(actorProps, actor-name)
方法從Akka actors獲取的數(shù)據(jù)流來創(chuàng)建。具體的信息見自定義receiver指南actorStream
在Python API中不可用。streamingContext.queueStream(queueOfRDDs)
方法基于RDD隊(duì)列創(chuàng)建DStreams。每個(gè)push到隊(duì)列的RDD都被當(dāng)做DStream的批數(shù)據(jù),像流一樣處理。關(guān)于從套接字、文件和actor中獲取流的更多細(xì)節(jié),請(qǐng)看StreamingContext和JavaStreamingContext
這類源需要非Spark庫接口,并且它們中的部分還需要復(fù)雜的依賴(例如kafka和flume)。為了減少依賴的版本沖突問題,從這些源創(chuàng)建DStream的功能已經(jīng)被移到了獨(dú)立的庫中,你能在關(guān)聯(lián)查看細(xì)節(jié)。例如,如果你想用來自推特的流數(shù)據(jù)創(chuàng)建DStream,你需要按照如下步驟操作:
spark-streaming-twitter_2.10
到SBT或maven項(xiàng)目的依賴中編寫:導(dǎo)入TwitterUtils
類,用TwitterUtils.createStream
方法創(chuàng)建DStream,如下所示
import org.apache.spark.streaming.twitter._
TwitterUtils.createStream(ssc)
需要注意的是,這些高級(jí)的源在spark-shell
中不能被使用,因此基于這些源的應(yīng)用程序無法在shell中測試。
下面將介紹部分的高級(jí)源:
Twitter4j 3.0.3
獲取公共的推文流,這些推文通過推特流API獲得。認(rèn)證信息可以通過Twitter4J庫支持的任何方法提供。你既能夠得到公共流,也能夠得到基于關(guān)鍵字過濾后的流。你可以查看API文檔(scala和java)和例子(TwitterPopularTags和TwitterAlgebirdCMS)在Spark 1.2中,這些源不被Python API支持。輸入DStream也可以通過自定義源創(chuàng)建,你需要做的是實(shí)現(xiàn)用戶自定義的receiver
,這個(gè)receiver
可以從自定義源接收數(shù)據(jù)以及將數(shù)據(jù)推到Spark中。通過自定義receiver指南了解詳細(xì)信息
基于可靠性有兩類數(shù)據(jù)源。源(如kafka、flume)允許。如果從這些可靠的源獲取數(shù)據(jù)的系統(tǒng)能夠正確的應(yīng)答所接收的數(shù)據(jù),它就能夠確保在任何情況下不丟失數(shù)據(jù)。這樣,就有兩種類型的receiver:
怎樣編寫可靠的Receiver的細(xì)節(jié)在自定義receiver中有詳細(xì)介紹。
更多建議: