博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Spark-GraphxAPI学习笔记
阅读量:6960 次
发布时间:2019-06-27

本文共 3329 字,大约阅读时间需要 11 分钟。

  • 图的集合视图
graph包含三个基本的类集合视图:
val vertices: VertexRDD[VD] val edges: EdgeRDD[ED] val triplets: RDD[EdgeTriplet[VD, ED]],即可理解为:RDD(srcId,srcAttr,dstId,dstAttr,attr)
在对graph的某个视图作map/filter操作时,可以使用case表达式来匹配对应的元素,如:graph.edges.filter { case Edge(src, dst, prop) => src > dst }.count
  • 图信息接口
val numEdges: Long  val numVertices: Long  val inDegrees: VertexRDD[Int]  val outDegrees: VertexRDD[Int]  val degrees: VertexRDD[Int]
  • 缓存方法
def persist(newLevel: StorageLevel = StorageLevel.MEMORY_ONLY): Graph[VD, ED] def cache(): Graph[VD, ED] def unpersistVertices(blocking: Boolean = true): Graph[VD, ED],当在一个图上频繁修改顶点值而不重用边信息时,可以用此方法对顶点去缓存以提高GC性能
  • 节点与边的变换操作
def mapVertices[VD2](map: (VertexID, VD) => VD2): Graph[VD2, ED]  def mapEdges[ED2](map: Edge[ED] => ED2): Graph[VD, ED2]  def mapEdges[ED2](map: (PartitionID, Iterator[Edge[ED]]) => Iterator[ED2]): Graph[VD, ED2]  def mapTriplets[ED2](map: EdgeTriplet[VD, ED] => ED2): Graph[VD, ED2]  def mapTriplets[ED2](map: (PartitionID, Iterator[EdgeTriplet[VD, ED]]) => Iterator[ED2])  : Graph[VD, ED2]
  • 修改图结构操作
def reverse: Graph[VD, ED]  def subgraph(      epred: EdgeTriplet[VD,ED] => Boolean = (x => true),      vpred: (VertexID, VD) => Boolean = ((v, d) => true)) : Graph[VD, ED],按条件生成子图  def mask[VD2, ED2](other: Graph[VD2, ED2]): Graph[VD, ED],生成的结果图的顶点和边同时存在于原来的两个图中  def groupEdges(merge: (ED, ED) => ED): Graph[VD, ED],把重复边进行reduce操作,注意此操作之前,应当在图上调用partitionBy方法
  • 图join操作
def joinVertices[U](table: RDD[(VertexID, U)])(mapFunc: (VertexID, VD, U) => VD): Graph[VD, ED]
def outerJoinVertices[U, VD2](other: RDD[(VertexID, U)])      (mapFunc: (VertexID, VD, Option[U]) => VD2) : Graph[VD2, ED]
注: joinVertices操作实际上是根据给定的另一个图(原图的每个顶点id至多对应此图的的一个顶点id)把原图中的顶点的属性值根据指定的mapFunc函数进行修改,返回一个新图,新图的顶点类型不变,如果图中的某个顶点id在另一个图中不存在,则保留原值
而outerJoinVertices操作和joinVertices类似,只不过,当图中某个顶点id在另一个图中不存在时,则使用None值
 
  • 在邻边上聚合信息
def collectNeighborIds(edgeDirection: EdgeDirection): VertexRDD[Array[VertexID]] def collectNeighbors(edgeDirection: EdgeDirection): VertexRDD[Array[(VertexID, VD)]] def aggregateMessages[Msg: ClassTag](      sendMsg: EdgeContext[VD, ED, Msg] => Unit,      mergeMsg: (Msg, Msg) => Msg,      tripletFields: TripletFields = TripletFields.All)    : VertexRDD[A]
注:聚合信息的核心方法是:aggregateMessages,其操作的本质是sendMsg和mergeMsg,具体而言,是依次在图的每条边(edgeTriplets)上根据sendMsg函数的要求,把该边上A端的节点信息发送给B端,如:把src节点信息发到dst节点信息,或者把dst节点信息发送到src节点上,然后在B端调用merge函数将可能收到的多个msg合并成一个msg.
tripletFields字段指定要操作哪些字段,如果仅操作部分字段的话,通过此参数进行限定可提高性能。aggregateMessages方法最终返回一个新的顶点集:VertexRDD,这个新的顶点集中每个vertex节点上包含上聚合后的信息。
collectNeighborIds与collectNeighbors函数就是对aggregateMessages的简单封装以实现聚合相邻节点id和相邻节点的功能
  • VertexRDD与RDD有一个明显的区别是,VertexRDD的key不重复,而RDD的key可以重复
  • aggregateUsingIndex函数的作用类似于reduceByKey,如vertexRdd1.aggregateUsingIndex(rdd2,_+_),作用是利用vertexRdd1的索引结果对rdd2进行聚合,在rdd2中对vertexRdd1中出现的id对应的属性值做聚合操作,很像reduceByKey,得到的结果是一个VertexRDD,这个结果与vertexRdd1进行join等操作时就会很快,因为他们具有相同的索引结构

 

  • PageRank算法:

  graph.pageRank(tolorence,reset),用于计算类似于网页排名的各种经典问题,tolorence参数用于指定可容忍的收敛度,毕竟无穷迭代下去是耗时也意义不大的,reset参数用于设定终止点和陷阱问题的概率,防止迭代结果倾斜或终止到一个节点的事情发生,所以这个参数不能传0,详情参考:

 

  •   连通体算法:

    graph.connectedComponents() 返回一个新图,新图的顶点属性被替换成了该顶点所在的连通体的id,这个id是此连通体中所有节点中id最小的那个节点的id

    例如,我要计算一个图中连通图的个数: graph.connectedComponents.vertices.map(e => (e._2, 1L)).reduceByKey(_ + _).sortBy(e => e._2, ascending = false).count

 

转载于:https://www.cnblogs.com/yepei/p/6171126.html

你可能感兴趣的文章
转:atoi函数的实现
查看>>
ASP.NET全局异常处理
查看>>
线索二叉树
查看>>
java8新特性全面解析
查看>>
Python安装、配置图文详解(转载)
查看>>
卡尔曼滤波器:用R语言中的KFAS建模时间序列
查看>>
常用的14种HTTP状态码速查手册
查看>>
缓存系统Memcached
查看>>
邮箱验证信息
查看>>
软件项目技术点(8)—— canvas调用drawImage绘制图片
查看>>
dom解析和sax解析
查看>>
Android画图并保存图片(转载)
查看>>
中年人编程
查看>>
跳槽穷半年,改行穷三年,说的太好了!
查看>>
EhCache 分布式缓存/缓存集群
查看>>
即时通讯(IM)
查看>>
通过php简单操作mysql
查看>>
spring security antMatchers相关内容
查看>>
Github的gitignore
查看>>
树莓派安装OpenCV完整过程
查看>>