Sunday, January 3, 2016

Common Friends - Data Algorithms



Data Algorithms: Recipes for Scaling Up with Hadoop and Spark
Input: <person><,><friend1 ><friend2 >...<friendN>
map(100, (200 300 400 500 600)) will generate:

([100, 200], [200 300 400 500 600])
([100, 300], [200 300 400 500 600])
([100, 400], [200 300 400 500 600])
([100, 500], [200 300 400 500 600])
([100, 600], [200 300 400 500 600])
Before these key-value pairs are sent to the reducers, they are grouped by keys:

([100, 200], [200 300 400 500 600])
([100, 200], [100 300 400])
=> ([100, 200], ([200 300 400 500 600], [100 300 400]))
So, the reducers will receive the following set of key-value pairs:

([100, 200], ([200 300 400 500 600], [100 300 400]))
Finally, the reducers will generate:
([100, 200], [300, 400])

https://github.com/mahmoudparsian/data-algorithms-book
Hadoop Implementation Using Text
 * map(key, value) {
 *  // key is the key generated by MapReduce/Hadoop
 *  // value is one line of input
 *  Let (<person> (<friend_1> <friend_2> ... <friend_N>)) = parse(line);
 *  reducerValue =  (<friend_1> <friend_2> ... <friend_N>);
 *  foreach friend in (<friend_1> <friend_2> ... <friend_N>)  {
 *    reducerKey = buildSortedKey(person, friend);
 *    emit(reducerKey, reducerValue);
 *  }
 * }
public class CommonFriendsMapper
  extends Mapper<LongWritable, Text, Text, Text> {

  private static Text reducerKey = new Text();
  private static Text reducerValue = new Text();

  static String getFriends(String[] tokens) {
    if (tokens.length == 2) {
      return "";
    }
    StringBuilder builder = new StringBuilder();
    for (int i=1; i < tokens.length; i++) {
      builder.append(tokens[i]);
      if (i < (tokens.length -1)) {
        builder.append(",");
      }
    }
    return builder.toString();
  }

  static String buildSortedKey(String person, String friend) {
    long p = Long.parseLong(person);
    long f = Long.parseLong(friend);
    if (p < f) {
      return person + "," + friend;
    }
    else {
      return friend + "," + person;
    }
  }
   

  public void map(LongWritable key, Text value, Context context)
    throws IOException, InterruptedException {
    // parse input, delimiter is a single space
      String[] tokens = StringUtils.split(value.toString(), " ");

    // create reducer value
      String friends = getFriends(tokens);
      reducerValue.set(friends);
   
    String person = tokens[0];
    for (int i=1; i < tokens.length; i++) {
      String friend = tokens[i];
      String reducerKeyAsString = buildSortedKey(person, friend);
      reducerKey.set(reducerKeyAsString);
      context.write(reducerKey, reducerValue);
    }
  }
}
https://github.com/mahmoudparsian/data-algorithms-book/blob/master/src/main/java/org/dataalgorithms/chap08/mapreduce/CommonFriendsReducer.java
public class CommonFriendsReducer extends Reducer<Text, Text, Text, Text> {
   * The goal is to find common friends by intersecting
   * all lists defined in values parameter.
   *
   * @param key is a pair: <user_id_1><,><user_id_2>
   * @param values is a list of { <friend_1><,>...<,><friend_n> }
   */
  public void reduce(Text key, Iterable<Text> values, Context context)
    throws IOException, InterruptedException {
     Map<String, Integer> map = new HashMap<String, Integer>();
       Iterator<Text> iterator = values.iterator();
       int numOfValues = 0;
       while (iterator.hasNext()) {
          String friends = iterator.next().toString();
          if (friends.equals("")) {
             context.write(key, new Text("[]"));
             return;
          }
      addFriends(map, friends);
      numOfValues++;
       }
     
       // now iterate the map to see how many have numOfValues
       List<String> commonFriends = new ArrayList<String>();
     for (Map.Entry<String, Integer> entry : map.entrySet()){
        //System.out.println(entry.getKey() + "/" + entry.getValue());
        if (entry.getValue() == numOfValues) {
          commonFriends.add(entry.getKey());
        }
     }    
     
       // sen it to output
       context.write(key, new Text(commonFriends.toString()));
    }
 
    static void addFriends(Map<String, Integer> map, String friendsList) {
    String[] friends = StringUtils.split(friendsList, ",");
    for (String friend : friends) {
      Integer count = map.get(friend);
      if (count == null) {
        map.put(friend, 1);
      }
      else {
        map.put(friend, ++count);
      }
    }
    }
}

Hadoop Implementation Using ArrayListOfLongsWritable
http://grepcode.com/file_/repo1.maven.org/maven2/edu.umd/cloud9/1.4.12/edu/umd/cloud9/io/array/ArrayListOfLongsWritable.java/?v=source
public class CommonFriendsMapperUsingList
    extends Mapper<LongWritable, Text, Text, ArrayListOfLongsWritable> {
 
    private static Text reducerKey = new Text();

    static ArrayListOfLongsWritable getFriends(String[] tokens) {
        if (tokens.length == 2) {
            return new ArrayListOfLongsWritable();
        }
     
        ArrayListOfLongsWritable list = new ArrayListOfLongsWritable();
        for (int i=1; i < tokens.length; i++) {
            list.add(Long.parseLong(tokens[i]));
        }
        return list;
    }
 
    static String buildSortedKey(String person, String friend) {
        long p = Long.parseLong(person);
        long f = Long.parseLong(friend);
        if (p < f) {
            return person + "," + friend;
        }
        else {
            return friend + "," + person;
        }
    }
         
 
    public void map(LongWritable key, Text value, Context context)
        throws IOException, InterruptedException {
        // parse input, delimiter is a single space
          String[] tokens = StringUtils.split(value.toString(), " ");

        // create reducer value
          ArrayListOfLongsWritable friends = getFriends(tokens);
       
        String person = tokens[0];
        for (int i=1; i < tokens.length; i++) {
            String friend = tokens[i];
            String reducerKeyAsString = buildSortedKey(person, friend);
            reducerKey.set(reducerKeyAsString);
            context.write(reducerKey, friends);
        }
    }
}
Spark:
https://github.com/mahmoudparsian/data-algorithms-book/blob/master/src/main/java/org/dataalgorithms/chap08/spark/FindCommonFriends.java
public class FindCommonFriends {

  public static void main(String[] args) throws Exception {

    if (args.length < 1) {
       // Spark master URL:
       // format:   spark://<spark-master-host-name>:7077
       // example:  spark://myserver00:7077
       System.err.println("Usage: FindCommonFriends <input-file>");
       System.exit(1);
    }
 
    //String sparkMasterURL = args[0];
    //System.out.println("sparkMasterURL="+sparkMasterURL);
 
    String hdfsInputFileName = args[0];
    System.out.println("hdfsInputFileName="+hdfsInputFileName);

    // create context object
    JavaSparkContext ctx = SparkUtil.createJavaSparkContext("FindCommonFriends");

    // create the first RDD from input file
    JavaRDD<String> records = ctx.textFile(hdfsInputFileName, 1);

    // PairFlatMapFunction<T, K, V>
    // T => Iterable<Tuple2<K, V>>
    JavaPairRDD<Tuple2<Long,Long>,Iterable<Long>> pairs =
          //                                            T       K                  V
          records.flatMapToPair(new PairFlatMapFunction<String, Tuple2<Long,Long>, Iterable<Long>>() {
      public Iterable<Tuple2<Tuple2<Long,Long>,Iterable<Long>>> call(String s) {
         String[] tokens = s.split(",");
         long person = Long.parseLong(tokens[0]);
         String friendsAsString = tokens[1];
         String[] friendsTokenized = friendsAsString.split(" ");
         if (friendsTokenized.length == 1) {
            Tuple2<Long,Long> key = buildSortedTuple(person, Long.parseLong(friendsTokenized[0]));
            return Arrays.asList(new Tuple2<Tuple2<Long,Long>,Iterable<Long>>(key, new ArrayList<Long>()));
         }
         List<Long> friends = new ArrayList<Long>();
         for (String f : friendsTokenized) {
            friends.add(Long.parseLong(f));
         }
       
         List<Tuple2<Tuple2<Long, Long> ,Iterable<Long>>> result =
             new ArrayList<Tuple2<Tuple2<Long, Long> ,Iterable<Long>>>();
         for (Long f : friends) {
            Tuple2<Long,Long> key = buildSortedTuple(person, f);
            result.add(new Tuple2<Tuple2<Long,Long>, Iterable<Long>>(key, friends));
         }
         return result;
      }
    });  
    JavaPairRDD<Tuple2<Long, Long>, Iterable<Iterable<Long>>> grouped = pairs.groupByKey();

    // Find intersection of all List<List<Long>>
    // mapValues[U](f: (V) => U): JavaPairRDD[K, U]
    // Pass each value in the key-value pair RDD through a map function without changing the keys;
    // this also retains the original RDD's partitioning.
    JavaPairRDD<Tuple2<Long, Long>, Iterable<Long>> commonFriends =
        grouped.mapValues(new Function< Iterable<Iterable<Long>>, // input
                                        Iterable<Long>            // output
                                      >() {
      public Iterable<Long> call(Iterable<Iterable<Long>> s) {
         Map<Long, Integer> countCommon = new HashMap<Long, Integer>();
         int size = 0;
         for (Iterable<Long> iter : s) {
            size++;
            List<Long> list = iterableToList(iter);
            if ((list == null) || (list.isEmpty())) {
               continue;
            }
            for (Long f : list) {
               Integer count = countCommon.get(f);
               if (count == null) {
                  countCommon.put(f, 1);
               }
               else {
                  countCommon.put(f, ++count);
               }
            }
         }
       
         // if countCommon.Entry<f, count> ==  countCommon.Entry<f, s.size()>
         // then that is a common friend
         List<Long> finalCommonFriends = new ArrayList<Long>();
         for (Map.Entry<Long, Integer> entry : countCommon.entrySet()){
            if (entry.getValue() == size) {
               finalCommonFriends.add(entry.getKey());
            }
         }
         return finalCommonFriends;
      }
    });
 
    // debug3
    List<Tuple2<Tuple2<Long, Long>, Iterable<Long>>> debug3 = commonFriends.collect();
    for (Tuple2<Tuple2<Long, Long>, Iterable<Long>> t2 : debug3) {
      System.out.println("debug3 key="+t2._1+ "\t value="+t2._2);
    }
 
    System.exit(0);
  }

  static Tuple2<Long,Long> buildSortedTuple(long a, long b) {
     if (a < b) {
        return new Tuple2<Long, Long>(a,b);
     }
     else {
        return new Tuple2<Long, Long>(b,a);
     }
  }

  static List<Long> iterableToList(Iterable<Long> iterable) {
    List<Long> list = new ArrayList<Long>();
    for (Long item : iterable) {    
       list.add(item);
    }
    return list;
  }

}

The map and flatMap methods have a similar purpose, but map is 1 to 1, while flatMap is 1 to 0-N (outputting 0 is similar to a filter, except of course it could be outputting a different type).

Labels

Review (572) System Design (334) System Design - Review (198) Java (189) Coding (75) Interview-System Design (65) Interview (63) Book Notes (59) Coding - Review (59) to-do (45) Linux (43) Knowledge (39) Interview-Java (35) Knowledge - Review (32) Database (31) Design Patterns (31) Big Data (29) Product Architecture (28) MultiThread (27) Soft Skills (27) Concurrency (26) Cracking Code Interview (26) Miscs (25) Distributed (24) OOD Design (24) Google (23) Career (22) Interview - Review (21) Java - Code (21) Operating System (21) Interview Q&A (20) System Design - Practice (20) Tips (19) Algorithm (17) Company - Facebook (17) Security (17) How to Ace Interview (16) Brain Teaser (14) Linux - Shell (14) Redis (14) Testing (14) Tools (14) Code Quality (13) Search (13) Spark (13) Spring (13) Company - LinkedIn (12) How to (12) Interview-Database (12) Interview-Operating System (12) Solr (12) Architecture Principles (11) Resource (10) Amazon (9) Cache (9) Git (9) Interview - MultiThread (9) Scalability (9) Trouble Shooting (9) Web Dev (9) Architecture Model (8) Better Programmer (8) Cassandra (8) Company - Uber (8) Java67 (8) Math (8) OO Design principles (8) SOLID (8) Design (7) Interview Corner (7) JVM (7) Java Basics (7) Kafka (7) Mac (7) Machine Learning (7) NoSQL (7) C++ (6) Chrome (6) File System (6) Highscalability (6) How to Better (6) Network (6) Restful (6) CareerCup (5) Code Review (5) Hash (5) How to Interview (5) JDK Source Code (5) JavaScript (5) Leetcode (5) Must Known (5) Python (5)

Popular Posts