99re热视频这里只精品,久久久天堂国产精品女人,国产av一区二区三区,久久久精品成人免费看片,99久久精品免费看国产一区二区三区

Spark GraphX頂點和邊RDDs

2018-11-26 16:36 更新

Spark GraphX頂點和邊RDDs

GraphX暴露保存在圖中的頂點和邊的RDD。然而,因為GraphX包含的頂點和邊擁有優(yōu)化的數(shù)據(jù)結(jié)構(gòu),這些數(shù)據(jù)結(jié)構(gòu)提供了額外的功能。頂點和邊分別返回VertexRDDEdgeRDD。這一章我們將學(xué)習(xí)它們的一些有用的功能。

VertexRDDs

VertexRDD[A]繼承自RDD[(VertexID, A)]并且添加了額外的限制,那就是每個VertexID只能出現(xiàn)一次。此外,VertexRDD[A]代表了一組屬性類型為A的頂點。在內(nèi)部,這通過保存頂點屬性到一個可重復(fù)使用的hash-map數(shù)據(jù)結(jié)構(gòu)來獲得。所以,如果兩個VertexRDDs從相同的基本VertexRDD獲得(如通過filter或者mapValues),它們能夠在固定的時間內(nèi)連接而不需要hash評價。為了利用這個索引數(shù)據(jù)結(jié)構(gòu),VertexRDD暴露了一下附加的功能:

class VertexRDD[VD] extends RDD[(VertexID, VD)] {
  // Filter the vertex set but preserves the internal index
  def filter(pred: Tuple2[VertexId, VD] => Boolean): VertexRDD[VD]
  // Transform the values without changing the ids (preserves the internal index)
  def mapValues[VD2](map: VD => VD2): VertexRDD[VD2]
  def mapValues[VD2](map: (VertexId, VD) => VD2): VertexRDD[VD2]
  // Remove vertices from this set that appear in the other set
  def diff(other: VertexRDD[VD]): VertexRDD[VD]
  // Join operators that take advantage of the internal indexing to accelerate joins (substantially)
  def leftJoin[VD2, VD3](other: RDD[(VertexId, VD2)])(f: (VertexId, VD, Option[VD2]) => VD3): VertexRDD[VD3]
  def innerJoin[U, VD2](other: RDD[(VertexId, U)])(f: (VertexId, VD, U) => VD2): VertexRDD[VD2]
  // Use the index on this RDD to accelerate a `reduceByKey` operation on the input RDD.
  def aggregateUsingIndex[VD2](other: RDD[(VertexId, VD2)], reduceFunc: (VD2, VD2) => VD2): VertexRDD[VD2]
}

舉個例子,filter操作如何返回一個VertexRDD。過濾器實際使用一個BitSet實現(xiàn),因此它能夠重用索引以及保留和其它VertexRDDs做連接時速度快的能力。同樣的,mapValues操作不允許map函數(shù)改變VertexID,因此可以保證相同的HashMap數(shù)據(jù)結(jié)構(gòu)能夠重用。當連接兩個從相同的hashmap獲取的VertexRDDs和使用線性掃描而不是昂貴的點查找實現(xiàn)連接操作時,leftJoininnerJoin都能夠使用。

從一個RDD[(VertexID, A)]高效地構(gòu)建一個新的VertexRDDaggregateUsingIndex操作是有用的。概念上,如果我通過一組頂點構(gòu)造了一個VertexRDD[B],而VertexRDD[B]是一些RDD[(VertexID, A)]中頂點的超集,那么我們就可以在聚合以及隨后索引RDD[(VertexID, A)]中重用索引。例如:

val setA: VertexRDD[Int] = VertexRDD(sc.parallelize(0L until 100L).map(id => (id, 1)))
val rddB: RDD[(VertexId, Double)] = sc.parallelize(0L until 100L).flatMap(id => List((id, 1.0), (id, 2.0)))
// There should be 200 entries in rddB
rddB.count
val setB: VertexRDD[Double] = setA.aggregateUsingIndex(rddB, _ + _)
// There should be 100 entries in setB
setB.count
// Joining A and B should now be fast!
val setC: VertexRDD[Double] = setA.innerJoin(setB)((id, a, b) => a + b)

EdgeRDDs

EdgeRDD[ED]繼承自RDD[Edge[ED]],使用定義在PartitionStrategy的各種分區(qū)策略中的一個在塊分區(qū)中組織邊。在每個分區(qū)中,邊屬性和相鄰結(jié)構(gòu)被分別保存,當屬性值改變時,它們可以最大化的重用。

EdgeRDD暴露了三個額外的函數(shù)

// Transform the edge attributes while preserving the structure
def mapValues[ED2](f: Edge[ED] => ED2): EdgeRDD[ED2]
// Revere the edges reusing both attributes and structure
def reverse: EdgeRDD[ED]
// Join two `EdgeRDD`s partitioned using the same partitioning strategy.
def innerJoin[ED2, ED3](other: EdgeRDD[ED2])(f: (VertexId, VertexId, ED, ED2) => ED3): EdgeRDD[ED3]

在大多數(shù)的應(yīng)用中,我們發(fā)現(xiàn),EdgeRDD操作可以通過圖操作者(graph operators)或者定義在基本RDD中的操作來完成。

以上內(nèi)容是否對您有幫助:
在線筆記
App下載
App下載

掃描二維碼

下載編程獅App

公眾號
微信公眾號

編程獅公眾號