Spark GraphX

package Spark_GraphX

import org.apache.spark.graphx._
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object 属性图 {
  def main(args: Array[String]): Unit = {
    val conf=new SparkConf().setAppName("SimpleGraphX").setMaster("local[2]")
    val sc=new SparkContext(conf)
   //定义顶点
    val users:RDD[(VertexId,(String,String))]=sc.parallelize(Array((3L,("soyo","student")),(7L,("soyo2","postdoc")),(5L,("xiaozhou","professor")),(2L,("xiaocui","professor"))))
    //定义边
    val relationships:RDD[Edge[String]]=sc.parallelize(Array(Edge(3L,7L,"collab"),Edge(5L,3L,"advisor"),Edge(2L,5L,"colleague"),Edge(5L,7L,"parent")))
   //定义默认的作者,以防与不存在的作者有边
    val defaultUser=("Jone","Dance")
    val graph=Graph(users,relationships,defaultUser)
    println("*****************")
    println("找到图中属性是student的点")
    graph.vertices.filter{case (id,(name,occupation))=>occupation=="student"}.collect.foreach{case(id,(name,occupation))=>println(s"$name is $occupation")}
    println("--------------------------")
    println("找到途中边的属性是advisor的边")
    graph.edges.filter(x=>x.attr=="advisor").collect().foreach(x=>println(s"${x.srcId} to ${x.dstId} 属性为 ${x.attr}"))
    println("--------------------------")
    println("找到图中的最大出度,入度,度数")
    println("最大的出度:"+graph.outDegrees.reduce(max))
    println("最大的入度:"+graph.inDegrees.reduce(max))
    println("最大的度数:"+graph.degrees.reduce(max))
    //Scala 可直接调用简单Java程序,集合等操作可以相互转换(使用Scala的JavaConversions方法对象)
    // System.out.print("hello word")
  }
  //VertexId:顶点,Int:度数
  def max(a:(VertexId,Int),b:(VertexId,Int)):(VertexId,Int)={
    if(a._2>b._2)a else b
  }

}

结果:

*****************
找到图中属性是student的点
soyo is student
--------------------------
找到途中边的属性是advisor的边
5 to 3 属性为 advisor
--------------------------
找到图中的最大出度,入度,度数
最大的出度:(5,2)
最大的入度:(7,2)
最大的度数:(5,3)