99re热视频这里只精品,久久久天堂国产精品女人,国产av一区二区三区,久久久精品成人免费看片,99久久精品免费看国产一区二区三区

輸入DStreams

2018-02-24 15:57 更新

輸入DStreams和receivers

輸入DStreams表示從數(shù)據(jù)源獲取輸入數(shù)據(jù)流的DStreams。在快速例子中,lines表示輸入DStream,它代表從netcat服務(wù)器獲取的數(shù)據(jù)流。每一個(gè)輸入流DStream和一個(gè)Receiver對(duì)象相關(guān)聯(lián),這個(gè)Receiver從源中獲取數(shù)據(jù),并將數(shù)據(jù)存入內(nèi)存中用于處理。

輸入DStreams表示從數(shù)據(jù)源獲取的原始數(shù)據(jù)流。Spark Streaming擁有兩類數(shù)據(jù)源

  • 基本源(Basic sources):這些源在StreamingContext API中直接可用。例如文件系統(tǒng)、套接字連接、Akka的actor等。
  • 高級(jí)源(Advanced sources):這些源包括Kafka,Flume,Kinesis,Twitter等等。它們需要通過額外的類來使用。我們?cè)?a href="#">關(guān)聯(lián)那一節(jié)討論了類依賴。

需要注意的是,如果你想在一個(gè)流應(yīng)用中并行地創(chuàng)建多個(gè)輸入DStream來接收多個(gè)數(shù)據(jù)流,你能夠創(chuàng)建多個(gè)輸入流(這將在性能調(diào)優(yōu)那一節(jié)介紹)。它將創(chuàng)建多個(gè)Receiver同時(shí)接收多個(gè)數(shù)據(jù)流。但是,receiver作為一個(gè)長期運(yùn)行的任務(wù)運(yùn)行在Spark worker或executor中。因此,它占有一個(gè)核,這個(gè)核是分配給Spark Streaming應(yīng)用程序的所有核中的一個(gè)(it occupies one of the cores allocated to the Spark Streaming application)。所以,為Spark Streaming應(yīng)用程序分配足夠的核(如果是本地運(yùn)行,那么是線程)用以處理接收的數(shù)據(jù)并且運(yùn)行receiver是非常重要的。

幾點(diǎn)需要注意的地方:

  • 如果分配給應(yīng)用程序的核的數(shù)量少于或者等于輸入DStreams或者receivers的數(shù)量,系統(tǒng)只能夠接收數(shù)據(jù)而不能處理它們。
  • 當(dāng)運(yùn)行在本地,如果你的master URL被設(shè)置成了“l(fā)ocal”,這樣就只有一個(gè)核運(yùn)行任務(wù)。這對(duì)程序來說是不足的,因?yàn)樽鳛?code>receiver的輸入DStream將會(huì)占用這個(gè)核,這樣就沒有剩余的核來處理數(shù)據(jù)了。

基本源

我們已經(jīng)在快速例子中看到,ssc.socketTextStream(...)方法用來把從TCP套接字獲取的文本數(shù)據(jù)創(chuàng)建成DStream。除了套接字,StreamingContext API也支持把文件以及Akka actors作為輸入源創(chuàng)建DStream。

  • 文件流(File Streams):從任何與HDFS API兼容的文件系統(tǒng)中讀取數(shù)據(jù),一個(gè)DStream可以通過如下方式創(chuàng)建
streamingContext.fileStream[keyClass, valueClass, inputFormatClass](dataDirectory)

Spark Streaming將會(huì)監(jiān)控dataDirectory目錄,并且處理目錄下生成的任何文件(嵌套目錄不被支持)。需要注意一下三點(diǎn):

1 所有文件必須具有相同的數(shù)據(jù)格式
2 所有文件必須在`dataDirectory`目錄下創(chuàng)建,文件是自動(dòng)的移動(dòng)和重命名到數(shù)據(jù)目錄下
3 一旦移動(dòng),文件必須被修改。所以如果文件被持續(xù)的附加數(shù)據(jù),新的數(shù)據(jù)不會(huì)被讀取。

對(duì)于簡單的文本文件,有一個(gè)更簡單的方法streamingContext.textFileStream(dataDirectory)可以被調(diào)用。文件流不需要運(yùn)行一個(gè)receiver,所以不需要分配核。

在Spark1.2中,fileStream在Python API中不可用,只有textFileStream可用。

  • 基于自定義actor的流:DStream可以調(diào)用streamingContext.actorStream(actorProps, actor-name)方法從Akka actors獲取的數(shù)據(jù)流來創(chuàng)建。具體的信息見自定義receiver指南actorStream在Python API中不可用。
  • RDD隊(duì)列作為數(shù)據(jù)流:為了用測試數(shù)據(jù)測試Spark Streaming應(yīng)用程序,人們也可以調(diào)用streamingContext.queueStream(queueOfRDDs)方法基于RDD隊(duì)列創(chuàng)建DStreams。每個(gè)push到隊(duì)列的RDD都被當(dāng)做DStream的批數(shù)據(jù),像流一樣處理。

關(guān)于從套接字、文件和actor中獲取流的更多細(xì)節(jié),請(qǐng)看StreamingContextJavaStreamingContext

高級(jí)源

這類源需要非Spark庫接口,并且它們中的部分還需要復(fù)雜的依賴(例如kafka和flume)。為了減少依賴的版本沖突問題,從這些源創(chuàng)建DStream的功能已經(jīng)被移到了獨(dú)立的庫中,你能在關(guān)聯(lián)查看細(xì)節(jié)。例如,如果你想用來自推特的流數(shù)據(jù)創(chuàng)建DStream,你需要按照如下步驟操作:

  • 關(guān)聯(lián):添加spark-streaming-twitter_2.10到SBT或maven項(xiàng)目的依賴中
  • 編寫:導(dǎo)入TwitterUtils類,用TwitterUtils.createStream方法創(chuàng)建DStream,如下所示

    import org.apache.spark.streaming.twitter._
    TwitterUtils.createStream(ssc)
  • 部署:將編寫的程序以及其所有的依賴(包括spark-streaming-twitter_2.10的依賴以及它的傳遞依賴)打?yàn)閖ar包,然后部署。這在部署章節(jié)將會(huì)作更進(jìn)一步的介紹。

需要注意的是,這些高級(jí)的源在spark-shell中不能被使用,因此基于這些源的應(yīng)用程序無法在shell中測試。

下面將介紹部分的高級(jí)源:

  • Twitter:Spark Streaming利用Twitter4j 3.0.3獲取公共的推文流,這些推文通過推特流API獲得。認(rèn)證信息可以通過Twitter4J庫支持的任何方法提供。你既能夠得到公共流,也能夠得到基于關(guān)鍵字過濾后的流。你可以查看API文檔(scalajava)和例子(TwitterPopularTagsTwitterAlgebirdCMS
  • Flume:Spark Streaming 1.2能夠從flume 1.4.0中獲取數(shù)據(jù),可以查看flume集成指南了解詳細(xì)信息
  • Kafka:Spark Streaming 1.2能夠從kafka 0.8.0中獲取數(shù)據(jù),可以查看kafka集成指南了解詳細(xì)信息
  • Kinesis:查看Kinesis集成指南了解詳細(xì)信息

自定義源

在Spark 1.2中,這些源不被Python API支持。輸入DStream也可以通過自定義源創(chuàng)建,你需要做的是實(shí)現(xiàn)用戶自定義的receiver,這個(gè)receiver可以從自定義源接收數(shù)據(jù)以及將數(shù)據(jù)推到Spark中。通過自定義receiver指南了解詳細(xì)信息

Receiver可靠性

基于可靠性有兩類數(shù)據(jù)源。源(如kafka、flume)允許。如果從這些可靠的源獲取數(shù)據(jù)的系統(tǒng)能夠正確的應(yīng)答所接收的數(shù)據(jù),它就能夠確保在任何情況下不丟失數(shù)據(jù)。這樣,就有兩種類型的receiver:

  • Reliable Receiver:一個(gè)可靠的receiver正確的應(yīng)答一個(gè)可靠的源,數(shù)據(jù)已經(jīng)收到并且被正確地復(fù)制到了Spark中。
  • Unreliable Receiver :這些receivers不支持應(yīng)答。即使對(duì)于一個(gè)可靠的源,開發(fā)者可能實(shí)現(xiàn)一個(gè)非可靠的receiver,這個(gè)receiver不會(huì)正確應(yīng)答。

怎樣編寫可靠的Receiver的細(xì)節(jié)在自定義receiver中有詳細(xì)介紹。

以上內(nèi)容是否對(duì)您有幫助:
在線筆記
App下載
App下載

掃描二維碼

下載編程獅App

公眾號(hào)
微信公眾號(hào)

編程獅公眾號(hào)