Apache Spark Graph Processing
a graph is represented by a property graph,
class Graph[VD, ED] {
val vertices: VertexRDD[VD]
val edges: EdgeRDD[ED,VD]
}
VertexRDD[VD] and EdgeRDD[ED, VD]
he property graph in Spark is a directed multigraph. It means that the graph is permitted to have multiple edges between any pair of vertices. Moreover, each edge is directed and defines a unidirectional relationship
Additional properties about the relationship can be stored as an attribute of the edge.
val people = sc.textFile("./data/people.csv")
val links = sc.textFile("./data/links.csv")
case class Person(name: String, age: Int)
val peopleRDD: RDD[(VertexId, Person)] = people map { line =>
val row = line split ','
(row(0).toInt, Person(row(1), row(2).toInt))
}
type Connection = String
val linksRDD: RDD[Edge[Connection]] = links map {line =>
val row = line split ','
Edge(row(0).toInt, row(1).toInt, row(2))
}
val tinySocial: Graph[Person, Connection] = Graph(peopleRDD, linksRDD)
To paste or write code in multiple lines in the shell:
Type the command :paste
Paste or write the given code
Evaluate the code by pressing the keys Cmd + D on Mac or Ctrl + D in Windows
VertexId is simply a type alias for Long as defined in GraphX. In addition, the Edge class is defined in org.apache.spark.graphx.Edge as:
class Edge(srcId: VertexId, dstId: VertexId, attr: ED)
tinySocial.vertices.collect()
tinySocial.edges.collect()
val profNetwork =
tinySocial.edges.filter{ case Edge(_,_,link) => profLinks.contains(link)}
for {
Edge(src, dst, link) <- profNetwork.collect()
srcName = (peopleRDD.filter{case (id, person) => id == src} first)._2.name
dstName = (peopleRDD.filter{case (id, person) => id == dst} first)._2.name
} println(srcName + " is a " + link + " of " + dstName)
tinySocial.subgraph(profLinks contains _.attr).
triplets.foreach(t => println(t.srcAttr.name + " is a " + t.attr + " of " + t.dstAttr.name))
val emailGraph = GraphLoader.edgeListFile(sc, "./data/emailEnron.txt")
emailGraph.edges.filter(_.srcId == 19021).map(_.dstId).collect()
val ingredients: RDD[(VertexId, FNNode)] =
sc.textFile("./data/ingr_info.tsv").
filter(! _.startsWith("#")).
map {line =>
val row = line split '\t'
(row(0).toInt, Ingredient(row(1), row(2)))
}
val compounds: RDD[(VertexId, FNNode)] =
sc.textFile("./data/comp_info.tsv").
filter(! _.startsWith("#")).
map {line =>
val row = line split '\t'
(10000L + row(0).toInt, Compound(row(1), row(2)))
}
val links: RDD[Edge[Int]] =
sc.textFile("./data/ingr_comp.tsv").
filter(! _.startsWith("#")).
map {line =>
val row = line split '\t'
Edge(row(0).toInt, 10000L + row(1).toInt, 1)
}
val nodes = ingredients ++ compounds
val foodNetwork = Graph(nodes, links)
def max(a: (VertexId, Int), b: (VertexId, Int)): (VertexId, Int) = {
if (a._2 > b._2) a else b
}
emailGraph.outDegrees.reduce(max)
emailGraph.outDegrees.filter(_._2 <= 1).count
egoNetwork.degrees.
map(t => (t._2,t._1)).
groupByKey.map(t => (t._1,t._2.size)).
sortBy(_._1).collect()
GraphStream for drawing networks, and BreezeViz for plotting structural properties of graphs, such as degree distribution
Apache Zeppelin
./bin/spark-shell --jars (find "." -name '*.jar' | xargs echo | tr ' ' ',')
import org.graphstream.graph.{Graph=>GraphStream}
import org.graphstream.graph.implementations._
val graph: SingleGraph = new SingleGraph("EgoSocial")
graph.addAttribute("ui.stylesheet","url(file:.//style/stylesheet)")
graph.addAttribute("ui.quality")
graph.addAttribute("ui.antialias")
for ((id,_) <- egoNetwork.vertices.collect()) {
val node = graph.addNode(id.toString).asInstanceOf[SingleNode]
}
for (Edge(x,y,_) <- egoNetwork.edges.collect()) {
val edge = graph.addEdge(x.toString ++ y.toString, x.toString, y.toString,
true).
asInstanceOf[AbstractEdge]
}
graph.display()
def degreeHistogram(net: Graph[Int, Int]): Array[(Int, Int)] =
net.degrees.map(t => (t._2,t._1)).
groupByKey.map(t => (t._1,t._2.size)).
sortBy(_._1).collect()
val nn = egoNetwork.numVertices
val egoDegreeDistribution = degreeHistogram(egoNetwork).map({case (d,n) => (d,n.toDouble/nn)})
a graph is represented by a property graph,
class Graph[VD, ED] {
val vertices: VertexRDD[VD]
val edges: EdgeRDD[ED,VD]
}
VertexRDD[VD] and EdgeRDD[ED, VD]
he property graph in Spark is a directed multigraph. It means that the graph is permitted to have multiple edges between any pair of vertices. Moreover, each edge is directed and defines a unidirectional relationship
Additional properties about the relationship can be stored as an attribute of the edge.
val people = sc.textFile("./data/people.csv")
val links = sc.textFile("./data/links.csv")
case class Person(name: String, age: Int)
val peopleRDD: RDD[(VertexId, Person)] = people map { line =>
val row = line split ','
(row(0).toInt, Person(row(1), row(2).toInt))
}
type Connection = String
val linksRDD: RDD[Edge[Connection]] = links map {line =>
val row = line split ','
Edge(row(0).toInt, row(1).toInt, row(2))
}
val tinySocial: Graph[Person, Connection] = Graph(peopleRDD, linksRDD)
To paste or write code in multiple lines in the shell:
Type the command :paste
Paste or write the given code
Evaluate the code by pressing the keys Cmd + D on Mac or Ctrl + D in Windows
VertexId is simply a type alias for Long as defined in GraphX. In addition, the Edge class is defined in org.apache.spark.graphx.Edge as:
class Edge(srcId: VertexId, dstId: VertexId, attr: ED)
tinySocial.vertices.collect()
tinySocial.edges.collect()
val profNetwork =
tinySocial.edges.filter{ case Edge(_,_,link) => profLinks.contains(link)}
for {
Edge(src, dst, link) <- profNetwork.collect()
srcName = (peopleRDD.filter{case (id, person) => id == src} first)._2.name
dstName = (peopleRDD.filter{case (id, person) => id == dst} first)._2.name
} println(srcName + " is a " + link + " of " + dstName)
tinySocial.subgraph(profLinks contains _.attr).
triplets.foreach(t => println(t.srcAttr.name + " is a " + t.attr + " of " + t.dstAttr.name))
val emailGraph = GraphLoader.edgeListFile(sc, "./data/emailEnron.txt")
emailGraph.edges.filter(_.srcId == 19021).map(_.dstId).collect()
val ingredients: RDD[(VertexId, FNNode)] =
sc.textFile("./data/ingr_info.tsv").
filter(! _.startsWith("#")).
map {line =>
val row = line split '\t'
(row(0).toInt, Ingredient(row(1), row(2)))
}
val compounds: RDD[(VertexId, FNNode)] =
sc.textFile("./data/comp_info.tsv").
filter(! _.startsWith("#")).
map {line =>
val row = line split '\t'
(10000L + row(0).toInt, Compound(row(1), row(2)))
}
val links: RDD[Edge[Int]] =
sc.textFile("./data/ingr_comp.tsv").
filter(! _.startsWith("#")).
map {line =>
val row = line split '\t'
Edge(row(0).toInt, 10000L + row(1).toInt, 1)
}
val nodes = ingredients ++ compounds
val foodNetwork = Graph(nodes, links)
def max(a: (VertexId, Int), b: (VertexId, Int)): (VertexId, Int) = {
if (a._2 > b._2) a else b
}
emailGraph.outDegrees.reduce(max)
emailGraph.outDegrees.filter(_._2 <= 1).count
egoNetwork.degrees.
map(t => (t._2,t._1)).
groupByKey.map(t => (t._1,t._2.size)).
sortBy(_._1).collect()
GraphStream for drawing networks, and BreezeViz for plotting structural properties of graphs, such as degree distribution
Apache Zeppelin
./bin/spark-shell --jars (find "." -name '*.jar' | xargs echo | tr ' ' ',')
import org.graphstream.graph.{Graph=>GraphStream}
import org.graphstream.graph.implementations._
val graph: SingleGraph = new SingleGraph("EgoSocial")
graph.addAttribute("ui.stylesheet","url(file:.//style/stylesheet)")
graph.addAttribute("ui.quality")
graph.addAttribute("ui.antialias")
for ((id,_) <- egoNetwork.vertices.collect()) {
val node = graph.addNode(id.toString).asInstanceOf[SingleNode]
}
for (Edge(x,y,_) <- egoNetwork.edges.collect()) {
val edge = graph.addEdge(x.toString ++ y.toString, x.toString, y.toString,
true).
asInstanceOf[AbstractEdge]
}
graph.display()
def degreeHistogram(net: Graph[Int, Int]): Array[(Int, Int)] =
net.degrees.map(t => (t._2,t._1)).
groupByKey.map(t => (t._1,t._2.size)).
sortBy(_._1).collect()
val nn = egoNetwork.numVertices
val egoDegreeDistribution = degreeHistogram(egoNetwork).map({case (d,n) => (d,n.toDouble/nn)})