Friday, January 8, 2016

Spark Graph



Apache Spark Graph Processing
a graph is represented by a property graph,
class Graph[VD, ED] {
val vertices: VertexRDD[VD]
  val edges: EdgeRDD[ED,VD]
}
VertexRDD[VD] and EdgeRDD[ED, VD]
he property graph in Spark is a directed multigraph. It means that the graph is permitted to have multiple edges between any pair of vertices. Moreover, each edge is directed and defines a unidirectional relationship
Additional properties about the relationship can be stored as an attribute of the edge.

val people = sc.textFile("./data/people.csv")
val links = sc.textFile("./data/links.csv")
case class Person(name: String, age: Int)
val peopleRDD: RDD[(VertexId, Person)] = people map { line =>
  val row = line split ','
  (row(0).toInt, Person(row(1), row(2).toInt))
}

type Connection = String
val linksRDD: RDD[Edge[Connection]] = links map {line =>
  val row = line split ','
  Edge(row(0).toInt, row(1).toInt, row(2))
}
val tinySocial: Graph[Person, Connection] = Graph(peopleRDD, linksRDD)

To paste or write code in multiple lines in the shell:

Type the command :paste
Paste or write the given code
Evaluate the code by pressing the keys Cmd + D on Mac or Ctrl + D in Windows
VertexId is simply a type alias for Long as defined in GraphX. In addition, the Edge class is defined in org.apache.spark.graphx.Edge as:
class Edge(srcId: VertexId, dstId: VertexId, attr: ED)

tinySocial.vertices.collect()
tinySocial.edges.collect()

val profNetwork =
tinySocial.edges.filter{ case Edge(_,_,link) => profLinks.contains(link)}
for {
  Edge(src, dst, link) <- profNetwork.collect()
  srcName = (peopleRDD.filter{case (id, person) => id == src} first)._2.name
  dstName = (peopleRDD.filter{case (id, person) => id == dst} first)._2.name
} println(srcName + " is a " + link + " of " + dstName)

tinySocial.subgraph(profLinks contains _.attr).
     triplets.foreach(t => println(t.srcAttr.name + " is a " + t.attr + " of " + t.dstAttr.name))

val emailGraph = GraphLoader.edgeListFile(sc, "./data/emailEnron.txt")
emailGraph.edges.filter(_.srcId == 19021).map(_.dstId).collect()    

val ingredients: RDD[(VertexId, FNNode)] =
sc.textFile("./data/ingr_info.tsv").
      filter(! _.startsWith("#")).
      map {line =>
             val row = line split '\t'
             (row(0).toInt, Ingredient(row(1), row(2)))
          }


val compounds: RDD[(VertexId, FNNode)] =
sc.textFile("./data/comp_info.tsv").
      filter(! _.startsWith("#")).
      map {line =>
             val row = line split '\t'
             (10000L + row(0).toInt, Compound(row(1), row(2)))
          }
val links: RDD[Edge[Int]] =
  sc.textFile("./data/ingr_comp.tsv").
     filter(! _.startsWith("#")).
     map {line =>
        val row = line split '\t'
        Edge(row(0).toInt, 10000L + row(1).toInt, 1)
     }
val nodes = ingredients ++ compounds
val foodNetwork = Graph(nodes, links)

def max(a: (VertexId, Int), b: (VertexId, Int)): (VertexId, Int) = {
  if (a._2 > b._2) a else b
}
emailGraph.outDegrees.reduce(max)
emailGraph.outDegrees.filter(_._2 <= 1).count
egoNetwork.degrees.
  map(t => (t._2,t._1)).
  groupByKey.map(t => (t._1,t._2.size)).
  sortBy(_._1).collect()

GraphStream for drawing networks, and BreezeViz for plotting structural properties of graphs, such as degree distribution
Apache Zeppelin

./bin/spark-shell  --jars (find "." -name '*.jar' | xargs echo | tr ' ' ',')
import org.graphstream.graph.{Graph=>GraphStream}
import org.graphstream.graph.implementations._

val graph: SingleGraph = new SingleGraph("EgoSocial")
graph.addAttribute("ui.stylesheet","url(file:.//style/stylesheet)")
graph.addAttribute("ui.quality")
graph.addAttribute("ui.antialias")

for ((id,_) <- egoNetwork.vertices.collect()) {
val node = graph.addNode(id.toString).asInstanceOf[SingleNode]
}
for (Edge(x,y,_) <- egoNetwork.edges.collect()) {
val edge = graph.addEdge(x.toString ++ y.toString, x.toString, y.toString,
true).
     asInstanceOf[AbstractEdge]
}
graph.display()

def degreeHistogram(net: Graph[Int, Int]): Array[(Int, Int)] =
    net.degrees.map(t => (t._2,t._1)).
          groupByKey.map(t => (t._1,t._2.size)).
          sortBy(_._1).collect()
val nn = egoNetwork.numVertices
val egoDegreeDistribution = degreeHistogram(egoNetwork).map({case (d,n) => (d,n.toDouble/nn)})

Labels

Review (572) System Design (334) System Design - Review (198) Java (189) Coding (75) Interview-System Design (65) Interview (63) Book Notes (59) Coding - Review (59) to-do (45) Linux (43) Knowledge (39) Interview-Java (35) Knowledge - Review (32) Database (31) Design Patterns (31) Big Data (29) Product Architecture (28) MultiThread (27) Soft Skills (27) Concurrency (26) Cracking Code Interview (26) Miscs (25) Distributed (24) OOD Design (24) Google (23) Career (22) Interview - Review (21) Java - Code (21) Operating System (21) Interview Q&A (20) System Design - Practice (20) Tips (19) Algorithm (17) Company - Facebook (17) Security (17) How to Ace Interview (16) Brain Teaser (14) Linux - Shell (14) Redis (14) Testing (14) Tools (14) Code Quality (13) Search (13) Spark (13) Spring (13) Company - LinkedIn (12) How to (12) Interview-Database (12) Interview-Operating System (12) Solr (12) Architecture Principles (11) Resource (10) Amazon (9) Cache (9) Git (9) Interview - MultiThread (9) Scalability (9) Trouble Shooting (9) Web Dev (9) Architecture Model (8) Better Programmer (8) Cassandra (8) Company - Uber (8) Java67 (8) Math (8) OO Design principles (8) SOLID (8) Design (7) Interview Corner (7) JVM (7) Java Basics (7) Kafka (7) Mac (7) Machine Learning (7) NoSQL (7) C++ (6) Chrome (6) File System (6) Highscalability (6) How to Better (6) Network (6) Restful (6) CareerCup (5) Code Review (5) Hash (5) How to Interview (5) JDK Source Code (5) JavaScript (5) Leetcode (5) Must Known (5) Python (5)

Popular Posts