Tuesday, March 8, 2016

Distributed BFS algorithms



https://medium.com/@KerrySheldon/breadth-first-search-in-apache-spark-d274403494ca
Let’s pretend that we have access to a TON of cell phone records for people that are involved in a criminal operation. And let’s assume that we are trying to figure out if, and how, a new potential target is connected to known people in the criminal network. With these records, we can answer the following questions:
  • Is this new potential target (target B) connected to a known target of the investigation (target A)?
  • How close is the connection — 1st degree (they are in direct contact with each other), 2nd (they share a connection), 3rd, etc.?
  • How strong is the connection? That is, how many people are connecting these two people at that level? 1, 10, or 50, etc.?

http://www.cs.yale.edu/homes/aspnes/pinewiki/DistributedBreadthFirstSearch.html
http://www.cs.yale.edu/homes/aspnes/pinewiki/Flooding.html
http://www.1point3acres.com/bbs/thread-148865-1-1.html
4. Pirate. Design Wikipedia crawler.
                  followup 1: No global status.
                  followup 2: deal with machine failure
                  followup 3: make the wiki site unaware of this crawler.
1. distributed bfs
2. consistent hashing
3. assign task with a random delay
http://shirleyisnotageek.blogspot.com/2015/03/breadth-first-search-using-distributed.html
How would you design the data structures for a very large social network (Facebook, LinkedIn, etc)? Describe how you would design an algorithm to show the connection, or path, between two people (e.g., Me -> Bob -> Susan -> Jason -> You).

This problem is from the Cracking code interview. However, I saw a video on YouTube about how to use distributed system (MapReduce) to do Breadth first search. So I guess that would be a good answer.


How to store the graph: The nodes a list of adjacent nodes (if all weights are 1).

At each iteration, we will start from the original node and grow the frontier by one level. The distance to the start node (DistanceTo(startNode) = 0). For all nodes n directly reachable from startNode, DistanceTo(n) = 1.

Using the above graph, if startNode = 1, then DistanceTo(2) = 1, DistanceTo(11) = 1, DistanceTo(5) = 1, ...etc....
For all nodes reachable from other set of nodes S, DistanceTo(n) = 1 + min(DistanceTo(m), m in S).
So if 4, and 7 is reachable by 2, DistanceTo(4) = 2, DistanceTo(7) = 2.

Not the entire adjacency matrix(sparse matrix, adjacent nodes) to the mapper. Each mapper receives a single row, describing who can be reached from some nodes that we've already known about.
Key: node n that is processing
Value: DistanceTo(n), a list of adjacency nodes (nodes n points to).

So for 1:
Key:1
Value: 0,  (2, 3, 5, 11)

Then from those nodes it can reach, we emit those nodes as keys, DistanceTo = D + 1 (shuffle & sort phrase).
So output from mapper:
Key 2, Value 1
Key 3, Value 1
Key 5, Value 1
Key 11, Value 1

The reducer then receive all these values and select the minimum as the new distance. So if 3 can be reached by:
1 -> 3 (1)
1 -> 11 -> 12 -> 3 (3)
The reducer will select 1.

Then we will pick those output keys and move to the next iteration: a non- MapReduce component then feeds the output of this step back into the MapReduce task for another iteration
Mapper emits the node itself and the points-to list as well. So 1 will be sent back to mapper again, so the shortest distanceTo will not be changed.

Eventually all DistanceTo will converge to their shortest distance, so the algorithm will stop if no shorter distance is found.

Add the edge weight to the adjacency nodes, DistanceTo(n) = DistanceTo(m) + weight(m, n).

http://www.cs.ucsb.edu/~gilbert/cs140/old/cs140Win2011/bfsproject/bfs.html

Parallel Breadth-First Search

The idea of doing BFS in parallel is that, in principal, you can process all the vertices on a single level at the same time. That is, once you've found all the level-1 vertices, you can do a parallel loop that explores from each of them to find level-2 vertices. Thus, the parallel code will have an important sequential loop over levels, starting at 0.
In the parallel code, it's possible that when you're processing level i, two vertices v and w will both find the same level-i+1 vertex x as a neighbor. This will cause a data race when they both try to set level[x]=i+1, and also when they each try to set parent[x] to themselves. But if you're careful this is a "benign data race" -- it doesn't actually cause any problem, because there's no disagreement about what level[x] should be, and it doesn't matter in the end whether parent[x] turns out to be v or w.

https://lintool.github.io/Cloud9/docs/content/bfs.html
The key to Dijkstra’s algorithm is the priority queue that maintains a globally sorted list of nodes by current distance. This is not possible in MapReduce, as the programming model does not provide a mechanism for exchanging global data. Instead, we adopt a brute force approach known as parallel breadth-first search. First, as a simplification let us assume that all edges have unit distance.

Distance to each node is directly stored alongside the adjacency list of that node, and initialized to ∞ for all nodes except for the source node. In the pseudo-code, we use n to denote the node id (an integer) and N to denote the node’s corresponding data structure (adjacency list and current distance). The algorithm works by mapping over all nodes and emitting a key-value pair for each neighbor on the node’s adjacency list. The key contains the node id of the neighbor, and the value is the current distance to the node plus one. This says: if we can reach node n with a distance d, then we must be able to reach all the nodes that are connected to n with distance d + 1. After shuffle and sort, reducers will receive keys corresponding to the destination node ids and distances corresponding to all paths leading to that node. The reducer will select the shortest of these distances and then update the distance in the node data structure.

It is apparent that parallel breadth-first search is an iterative algorithm, where each iteration corresponds to a MapReduce job. The first time we run the algorithm, we “discover” all nodes that are connected to the source. The second iteration, we discover all nodes connected to those, and so on. Each iteration of the algorithm expands the “search frontier” by one hop, and, eventually, all nodes will be discovered with their shortest distances (assuming a fully-connected graph). Before we discuss termination of the algorithm, there is one more detail required to make the parallel breadth-first search algorithm work. We need to “pass along” the graph structure from one iteration to the next. This is accomplished by emitting the node data structure itself, with the node id as a key.

In the reducer, we must distinguish the node data structure from distance values
and update the minimum distance in the node data structure before emitting it as the final value. The final output is now ready to serve as input to the next iteration.

Typically, execution of an iterative MapReduce algorithm requires a non-MapReduce “driver” program, which submits a MapReduce job to iterate the algorithm, checks to see if a termination condition has been met, and if not, repeats. Hadoop provides a lightweight API for constructs called “counters”.



  1. public class ParallelDijkstra extends Configured implements Tool {  
  2.   
  3.     public static String OUT = "output";  
  4.     public static String IN = "inputlarger";  
  5.   
  6.     public static class DijkstraMapper extends Mapper<LongWritable, Text, LongWritable, Text> {  
  7.   
  8.         public void map(LongWritable key, Text value, Context context)  
  9.                 throws IOException, InterruptedException {  
  10.   
  11.             //From slide 20 of Graph Algorithms with MapReduce (by Jimmy Lin, Univ @ Maryland)  
  12.             //Key is node n  
  13.             //Value is D, Points-To  
  14.             //For every point (or key), look at everything it points to.  
  15.             //Emit or write to the points to variable with the current distance + 1  
  16.             Text word = new Text();  
  17.             String line = value.toString();//looks like 1 0 2:3:  
  18.             String[] sp = line.split(" ");//splits on space  
  19.             int distanceAdded = Integer.parseInt(sp[1]) + 1;  
  20.             String[] pointsTo = sp[2].split(":");  
  21.             for (String distance : pointsTo) {  
  22.                 word.set("VALUE " + distanceAdded);//tells me to look at distance value  
  23.                 context.write(new LongWritable(Integer.parseInt(distance)), word);  
  24.                 word.clear();  
  25.             }  
  26.             //pass in current node's distance (if it is the lowest distance)  
  27.             word.set("VALUE " + sp[1]);  
  28.             context.write(new LongWritable(Integer.parseInt(sp[0])), word);  
  29.             word.clear();  
  30.   
  31.             word.set("NODES " + sp[2]);//tells me to append on the final tally  
  32.             context.write(new LongWritable(Integer.parseInt(sp[0])), word);  
  33.             word.clear();  
  34.   
  35.         }  
  36.     }  
  37.   
  38.     public static class DijkstraReducer extends Reducer<LongWritable, Text, LongWritable, Text> {  
  39.         public void reduce(LongWritable key, Iterable<Text> values, Context context)  
  40.                 throws IOException, InterruptedException {  
  41.   
  42.             //From slide 20 of Graph Algorithms with MapReduce (by Jimmy Lin, Univ @ Maryland)  
  43.             //The key is the current point  
  44.             //The values are all the possible distances to this point  
  45.             //we simply emit the point and the minimum distance value  
  46.   
  47.             String nodes = "UNMODED";  
  48.             Text word = new Text();  
  49.             int lowest = 10009;//start at infinity  
  50.   
  51.             for (Text val : values) {//looks like NODES/VALUES 1 0 2:3:, we need to use the first as a key  
  52.                 String[] sp = val.toString().split(" ");//splits on space  
  53.                 //look at first value  
  54.                 if (sp[0].equalsIgnoreCase("NODES")) {  
  55.                     nodes = null;  
  56.                     nodes = sp[1];  
  57.                 } else if (sp[0].equalsIgnoreCase("VALUE")) {  
  58.                     int distance = Integer.parseInt(sp[1]);  
  59.                     lowest = Math.min(distance, lowest);  
  60.                 }  
  61.             }  
  62.             word.set(lowest + " " + nodes);  
  63.             context.write(key, word);  
  64.             word.clear();  
  65.         }  
  66.     }  
http://www.johnandcailin.com/blog/cailin/breadth-first-graph-search-using-iterative-map-reduce-algorithm
every Map iteration "makes a mess" and every Reduce iteration "cleans up the mess". let's say we by representing a node in the following text format
ID    EDGES|DISTANCE_FROM_SOURCE|COLOR|
where EDGES is a comma delimited list of the ids of the nodes that are connected to this node. in the beginning, we do not know the distance and will use Integer.MAX_VALUE for marking "unknown". the color tells us whether or not we've seen the node before, so this starts off as white.
suppose we start with the following input graph, in which we've stated that node #1 is the source (starting point) for the search, and as such have marked this one special node with distance 0 and color GRAY.
1       2,5|0|GRAY|
2       1,3,4,5|Integer.MAX_VALUE|WHITE|
3       2,4|Integer.MAX_VALUE|WHITE|
4       2,3,5|Integer.MAX_VALUE|WHITE|
5       1,2,4|Integer.MAX_VALUE|WHITE|
the mappers are responsible for "exploding" all gray nodes - e.g. for exploding all nodes that live at our current depth in the tree. for each gray node, the mappers emit a new gray node, with distance = distance + 1. they also then emit the input gray node, but colored black. (once a node has been exploded, we're done with it.) mappers also emit all non-gray nodes, with no change. so, the output of the first map iteration would be
1       2,5|0|BLACK|
2       NULL|1|GRAY|
5       NULL|1|GRAY|
2       1,3,4,5|Integer.MAX_VALUE|WHITE|
3       2,4|Integer.MAX_VALUE|WHITE|
4       2,3,5|Integer.MAX_VALUE|WHITE|
5       1,2,4|Integer.MAX_VALUE|WHITE|
note that when the mappers "explode" the gray nodes and create a new node for each edge, they do not know what to write for the edges of this new node - so they leave it blank.
the reducers, of course, receive all data for a given key - in this case it means that they receive the data for all "copies" of each node. for example, the reducer that receives the data for key = 2 gets the following list of values :
2       NULL|1|GRAY|
2       1,3,4,5|Integer.MAX_VALUE|WHITE|
the reducers job is to take all this data and construct a new node using
  • the non-null list of edges
  • the minimum distance
  • the darkest color
using this logic the output from our first iteration will be :
1       2,5,|0|BLACK
2       1,3,4,5,|1|GRAY
3       2,4,|Integer.MAX_VALUE|WHITE
4       2,3,5,|Integer.MAX_VALUE|WHITE
5       1,2,4,|1|GRAY
the second iteration uses this as the input and outputs :
1       2,5,|0|BLACK
2       1,3,4,5,|1|BLACK
3       2,4,|2|GRAY
4       2,3,5,|2|GRAY
5       1,2,4,|1|BLACK
and the third iteration outputs:
1       2,5,|0|BLACK
2       1,3,4,5,|1|BLACK
3       2,4,|2|BLACK
4       2,3,5,|2|BLACK
5       1,2,4,|1|BLACK
subsequent iterations will continue to print out the same output.
how do you know when you're done? you are done when there are no output nodes that are colored gray. note - if not all nodes in your input are actually connected to your source, you may have final output nodes that are still white.

  public Text getLine() {
    StringBuffer s = new StringBuffer();
    
    for (int v : edges) {
      s.append(v).append(",");
    }
    s.append("|");

    if (this.distance < Integer.MAX_VALUE) {
      s.append(this.distance).append("|");
    } else {
      s.append("Integer.MAX_VALUE").append("|");
    }

    s.append(color.toString());

    return new Text(s.toString());
  }
  public static class MapClass extends MapReduceBase implements
      Mapper<LongWritable, Text, IntWritable, Text> {

    public void map(LongWritable key, Text value, OutputCollector<IntWritable, Text> output,
        Reporter reporter) throws IOException {

      Node node = new Node(value.toString());

      // For each GRAY node, emit each of the edges as a new node (also GRAY)
      if (node.getColor() == Node.Color.GRAY) {
        for (int v : node.getEdges()) {
          Node vnode = new Node(v);
          vnode.setDistance(node.getDistance() + 1);
          vnode.setColor(Node.Color.GRAY);
          output.collect(new IntWritable(vnode.getId()), vnode.getLine());
        }
        // We're done with this node now, color it BLACK
        node.setColor(Node.Color.BLACK);
      }

      // No matter what, we emit the input node
      // If the node came into this method GRAY, it will be output as BLACK
      output.collect(new IntWritable(node.getId()), node.getLine())

    }
  }

    public void reduce(IntWritable key, Iterator<Text> values,
        OutputCollector<IntWritable, Text> output, Reporter reporter) throws IOException {

      List<Integer> edges = null;
      int distance = Integer.MAX_VALUE;
      Node.Color color = Node.Color.WHITE;

      while (values.hasNext()) {
        Text value = values.next();

        Node u = new Node(key.get() + "\t" + value.toString());

        // One (and only one) copy of the node will be the fully expanded
        // version, which includes the edges
        if (u.getEdges().size() > 0) {
          edges = u.getEdges();
        }

        // Save the minimum distance
        if (u.getDistance() < distance) {
          distance = u.getDistance();
        }

        // Save the darkest color
        if (u.getColor().ordinal() > color.ordinal()) {
          color = u.getColor();
        }

      }

      Node n = new Node(key.get());
      n.setDistance(distance);
      n.setEdges(edges);
      n.setColor(color);
      output.collect(key, new Text(n.getLine()));
     
    }
  }


https://catalystcode.github.io/case-studies/hadoop/hdinsight/big-data/batch-processing/2015/07/21/Parallel-breadth-first-aggregation-algorithm.html

https://github.com/DNyaika/BFS-with-MapReduce/blob/master/src/main/java/it/unitn/bd/bfs/BfsSpark.java

https://medium.com/@KerrySheldon/breadth-first-search-in-apache-spark-d274403494ca
http://www.theanalyticsuniverse.com/breadth-first-search-algorithm-bfs-in-apace-spark

Labels

Review (572) System Design (334) System Design - Review (198) Java (189) Coding (75) Interview-System Design (65) Interview (63) Book Notes (59) Coding - Review (59) to-do (45) Linux (43) Knowledge (39) Interview-Java (35) Knowledge - Review (32) Database (31) Design Patterns (31) Big Data (29) Product Architecture (28) MultiThread (27) Soft Skills (27) Concurrency (26) Cracking Code Interview (26) Miscs (25) Distributed (24) OOD Design (24) Google (23) Career (22) Interview - Review (21) Java - Code (21) Operating System (21) Interview Q&A (20) System Design - Practice (20) Tips (19) Algorithm (17) Company - Facebook (17) Security (17) How to Ace Interview (16) Brain Teaser (14) Linux - Shell (14) Redis (14) Testing (14) Tools (14) Code Quality (13) Search (13) Spark (13) Spring (13) Company - LinkedIn (12) How to (12) Interview-Database (12) Interview-Operating System (12) Solr (12) Architecture Principles (11) Resource (10) Amazon (9) Cache (9) Git (9) Interview - MultiThread (9) Scalability (9) Trouble Shooting (9) Web Dev (9) Architecture Model (8) Better Programmer (8) Cassandra (8) Company - Uber (8) Java67 (8) Math (8) OO Design principles (8) SOLID (8) Design (7) Interview Corner (7) JVM (7) Java Basics (7) Kafka (7) Mac (7) Machine Learning (7) NoSQL (7) C++ (6) Chrome (6) File System (6) Highscalability (6) How to Better (6) Network (6) Restful (6) CareerCup (5) Code Review (5) Hash (5) How to Interview (5) JDK Source Code (5) JavaScript (5) Leetcode (5) Must Known (5) Python (5)

Popular Posts