Sunday, January 3, 2016

Top 10 List - Data Algorithms



Data Algorithms: Recipes for Scaling Up with Hadoop and Spark
For example, if key-as-string is a URL and value-as-integer is the number of times that URL is visited, then you might ask: what are the top 10 URLs for last week?

SELECT cat_id, cat_name, cat_weight
   FROM cats
      ORDER BY cat_weight DESC LIMIT 10;
Spark:
We assume that all input keys are unique. That is, for a given input set {(KV)}, all Ks are unique.
https://github.com/mahmoudparsian/data-algorithms-book/blob/master/src/main/java/org/dataalgorithms/chap03/spark/Top10.java
 * Assumption: for all input (K, V), K's are unique.
 * This means that there will not etries like (A, 5) and (A, 8).
   public static void main(String[] args) throws Exception {

      // STEP-1: handle input parameters
      if (args.length < 1) {
         System.err.println("Usage: Top10 <input-file>");
         System.exit(1);
      }
      String inputPath = args[0];
      System.out.println("args[0]: <input-path>="+inputPath);

      // STEP-2: create an instance of JavaSparkContext
      JavaSparkContext ctx = SparkUtil.createJavaSparkContext();

      // STEP-3: create an RDD for input
      // input record format:
      //  <string-key><,><integer-value>,
      JavaRDD<String> lines = ctx.textFile(inputPath, 1);

 
      // STEP-4: create (K, V) pairs
      // Note: the assumption is that all K's are unique
      // PairFunction<T, K, V>
      // T => Tuple2<K, V>
      //                                                                    input   K       V
      JavaPairRDD<String,Integer> pairs = lines.mapToPair(new PairFunction<String, String, Integer>() {
         public Tuple2<String,Integer> call(String s) {
            String[] tokens = s.split(","); // cat7,234
            return new Tuple2<String,Integer>(tokens[0], Integer.parseInt(tokens[1]));
         }
      });
      // STEP-5: create a local top-10
      JavaRDD<SortedMap<Integer, String>> partitions = pairs.mapPartitions(
         new FlatMapFunction<Iterator<Tuple2<String,Integer>>, SortedMap<Integer, String>>() {
         @Override
         public Iterable<SortedMap<Integer, String>> call(Iterator<Tuple2<String,Integer>> iter) {
             SortedMap<Integer, String> top10 = new TreeMap<Integer, String>();
             while (iter.hasNext()) {
                Tuple2<String,Integer> tuple = iter.next();
                top10.put(tuple._2, tuple._1);
                // keep only top N
                if (top10.size() > 10) {
                   top10.remove(top10.firstKey());
                }
             }
             return Collections.singletonList(top10);
         }
      });

      // STEP-6: find a final top-10
      SortedMap<Integer, String> finaltop10 = new TreeMap<Integer, String>();
      List<SortedMap<Integer, String>> alltop10 = partitions.collect();
      for (SortedMap<Integer, String> localtop10 : alltop10) {
          //System.out.println(tuple._1 + ": " + tuple._2);
          // weight/count = tuple._1
          // catname/URL = tuple._2
          for (Map.Entry<Integer, String> entry : localtop10.entrySet()) {
              //   System.out.println(entry.getKey() + "--" + entry.getValue());
              finaltop10.put(entry.getKey(), entry.getValue());
              // keep only top 10
              if (finaltop10.size() > 10) {
                 finaltop10.remove(finaltop10.firstKey());
              }
          }
      }
 
      // STEP_7: emit final top-10
      for (Map.Entry<Integer, String> entry : finaltop10.entrySet()) {
         System.out.println(entry.getKey() + "--" + entry.getValue());
      }

      System.exit(0);
   }

We assume that all input keys are not unique.
https://github.com/mahmoudparsian/data-algorithms-book/blob/master/src/main/java/org/dataalgorithms/chap03/spark/Top10NonUnique.java
 * Assumption: for all input (K, V), K's are non-unique.
 * This class implements Top-N design pattern for N > 0.
 * The main assumption is that for all input (K, V)'s, K's
 * are non-unique. It means that you will find entries like
 * (A, 2), ..., (A, 5),...
 *
 * This is a general top-N algorithm which will work unique
 * and non-unique keys.
 *
 * This class may be used to find bottom-N as well (by
 * just keeping N-smallest elements in the set.
 *
 *  Top-10 Design Pattern: “Top Ten” Structure
 *
 *  1. map(input) => (K, V)
 *      
 *  2. reduce(K, List<V1, V2, ..., Vn>) => (K, V),
 *                where V = V1+V2+...+Vn
 *     now all K's are unique
 *
 *  3. partition (K,V)'s into P partitions
 *
 *  4. Find top-N for each partition (we call this a local Top-N)
 *
 *  5. Find Top-N from all local Top-N's
   public static void main(String[] args) throws Exception {
      System.out.println("args[0]: <input-path>="+args[0]);
      System.out.println("args[1]: <topN>="+args[1]);
      final int N = Integer.parseInt(args[1]);

      // STEP-2: create a Java Spark Context object
      JavaSparkContext ctx = SparkUtil.createJavaSparkContext();

      // STEP-3: broadcast the topN to all cluster nodes
      final Broadcast<Integer> topN = ctx.broadcast(N);
      // now topN is available to be read from all cluster nodes

      // STEP-4: create an RDD from input
      //    input record format:
      //        <string-key><,><integer-value-count>
      JavaRDD<String> lines = ctx.textFile(args[0], 1);  
      // STEP-5: partition RDD
      // public JavaRDD<T> coalesce(int numPartitions)
      // Return a new RDD that is reduced into numPartitions partitions.
      JavaRDD<String> rdd = lines.coalesce(9);
     
      // STEP-6: map input(T) into (K,V) pair
      // PairFunction<T, K, V>
      // T => Tuple2<K, V>
      JavaPairRDD<String,Integer> kv = rdd.mapToPair(new PairFunction<String,String,Integer>() {
         public Tuple2<String,Integer> call(String s) {
            String[] tokens = s.split(","); // url,789
            return new Tuple2<String,Integer>(tokens[0], Integer.parseInt(tokens[1]));
         }
      });

      // STEP-7: reduce frequent K's
      JavaPairRDD<String, Integer> uniqueKeys = kv.reduceByKey(new Function2<Integer, Integer, Integer>() {
         public Integer call(Integer i1, Integer i2) {
            return i1 + i2;
         }
      });
 
      // STEP-8: create a local top-N
      JavaRDD<SortedMap<Integer, String>> partitions = uniqueKeys.mapPartitions(
          new FlatMapFunction<Iterator<Tuple2<String,Integer>>, SortedMap<Integer, String>>() {
          @Override
          public Iterable<SortedMap<Integer, String>> call(Iterator<Tuple2<String,Integer>> iter) {
             final int N = topN.value();
             SortedMap<Integer, String> localTopN = new TreeMap<Integer, String>();
             while (iter.hasNext()) {
                Tuple2<String,Integer> tuple = iter.next();
                localTopN.put(tuple._2, tuple._1);
                // keep only top N
                if (localTopN.size() > N) {
                   localTopN.remove(localTopN.firstKey());
                }
             }
             return Collections.singletonList(localTopN);
          }
      });

      // STEP-9: find a final top-N
      SortedMap<Integer, String> finalTopN = new TreeMap<Integer, String>();
      List<SortedMap<Integer, String>> allTopN = partitions.collect();
      for (SortedMap<Integer, String> localTopN : allTopN) {
         for (Map.Entry<Integer, String> entry : localTopN.entrySet()) {
             // count = entry.getKey()
             // url = entry.getValue()
             finalTopN.put(entry.getKey(), entry.getValue());
             // keep only top N
             if (finalTopN.size() > N) {
                finalTopN.remove(finalTopN.firstKey());
             }
         }
      }
 
      // STEP-10: emit final top-N
      for (Map.Entry<Integer, String> entry : finalTopN.entrySet()) {
         System.out.println(entry.getKey() + "--" + entry.getValue());
      }

      System.exit(0);
   }

we use Spark’s powerful sorting function takeOrdered().
      JavaRDD<String> lines = ctx.textFile(inputPath, 1);
      // STEP-4: partition RDD
      // public JavaRDD<T> coalesce(int numPartitions)
      // Return a new RDD that is reduced into numPartitions partitions.
      JavaRDD<String> rdd = lines.coalesce(9);

      // STEP-5: map input(T) into (K,V) pair
      // PairFunction<T, K, V>
      // T => Tuple2<K, V>
      JavaPairRDD<String,Integer> kv = rdd.mapToPair(new PairFunction<String,String,Integer>() {
         public Tuple2<String,Integer> call(String s) {
            String[] tokens = s.split(","); // url,789
            return new Tuple2<String,Integer>(tokens[0], Integer.parseInt(tokens[1]));
         }
      });
      // STEP-6: reduce frequent K's
      JavaPairRDD<String, Integer> uniqueKeys = kv.reduceByKey(new Function2<Integer, Integer, Integer>() {
         public Integer call(Integer i1, Integer i2) {
            return i1 + i2;
         }
      });

      // STEP-7: find final top-N by calling takeOrdered()
      List<Tuple2<String, Integer>> topNResult = uniqueKeys.takeOrdered(N, MyTupleComparator.INSTANCE);

   static class MyTupleComparator implements Comparator<Tuple2<String, Integer>> ,Serializable {
       final static MyTupleComparator INSTANCE = new MyTupleComparator();
       public int compare(Tuple2<String, Integer> t1, Tuple2<String, Integer> t2) {
          return -t1._2.compareTo(t2._2);     // sorts RDD elements descending (use for Top-N)
          // return t1._2.compareTo(t2._2);   // sorts RDD elements ascending (use for Bottom-N)
       }
   }

Spark RDD: it represents an immutable, partitioned collection of elements that you can operate on in parallel.

Parameterizing Top N
By using the Broadcast object. Broadcast allows us to make N available as globally shared data so we’ll be able to access it from any cluster node.
final Broadcast<Integer> broadcastTopN = context.broadcast(topN);
final int topN = broadcastTopN.value();

After the data structure, broadcastT, is broadcasted, it may be read from any cluster node within mappers, reducers, and transformers.

final Broadcast<String> broadcastDirection = context.broadcast(direction);
 5 if (finalN.size() > N) {
 6    if (direction.equals("top")) {
 7        // remove element with the smallest frequency
 8        finalN.remove(finalN.firstKey());
 9    }
10    else {
11        // direction.equals("bottom")
12        // remove element with the largest frequency
13        finalN.remove(finalN.lastKey());
14    }
15 }

The general rule of thumb is to use (2 × num_executors × cores_per_executor) per executor.
Hadoop:
All Unique:
https://github.com/mahmoudparsian/data-algorithms-book/blob/master/src/main/java/org/dataalgorithms/chap03/mapreduce/TopNMapper.java
Each mapper accepts a partition of cats. After the mapper finishes creating a top 10 list as a SortedMap<Double, Text>, the cleanup() method emits that list. Note that we use a single key, retrieved by calling NullWritable.get(), which guarantees that all the mappers’ output will be consumed by a single reducer.
public class TopNMapper extends
   Mapper<Text, IntWritable, NullWritable, Text> {

   private int N = 10; // default
   private SortedMap<Integer, String> top = new TreeMap<Integer, String>();

   @Override
   public void map(Text key, IntWritable value, Context context)
         throws IOException, InterruptedException {

      String keyAsString = key.toString();
      int frequency =  value.get();
      String compositeValue = keyAsString + "," + frequency;
      top.put(frequency, compositeValue);
      // keep only top N
      if (top.size() > N) {
         top.remove(top.firstKey());
      }
   }
 
   @Override
   protected void setup(Context context) throws IOException,
         InterruptedException {
      this.N = context.getConfiguration().getInt("N", 10); // default is top 10
   }
 

   @Override
   protected void cleanup(Context context) throws IOException,
         InterruptedException {
      for (String str : top.values()) {
         context.write(NullWritable.get(), new Text(str));
      }
   }
}
https://github.com/mahmoudparsian/data-algorithms-book/blob/master/src/main/java/org/dataalgorithms/chap03/mapreduce/TopNReducer.java
public class TopNReducer  extends
   Reducer<NullWritable, Text, IntWritable, Text> {

   private int N = 10; // default
   private SortedMap<Integer, String> top = new TreeMap<Integer, String>();

   @Override
   public void reduce(NullWritable key, Iterable<Text> values, Context context)
      throws IOException, InterruptedException {
      for (Text value : values) {
         String valueAsString = value.toString().trim();
         String[] tokens = valueAsString.split(",");
         String url = tokens[0];
         int frequency =  Integer.parseInt(tokens[1]);
         top.put(frequency, url);
         // keep only top N
         if (top.size() > N) {
            top.remove(top.firstKey());
         }
      }    
      // emit final top N
        List<Integer> keys = new ArrayList<Integer>(top.keySet());
        for(int i=keys.size()-1; i>=0; i--){
         context.write(new IntWritable(keys.get(i)), new Text(top.get(keys.get(i))));
      }
   }
   protected void setup(Context context)
      throws IOException, InterruptedException {
      this.N = context.getConfiguration().getInt("N", 10); // default is top 10
   }
}

Nonunique Keys
#
# Generate unique (K,V) pairs
#
INPUT=/kv/input
OUTPUT=/kv/output
hadoop fs -rmr $OUTPUT
AGGREGATOR=org.dataalgorithms.chap03.mapreduce.AggregateByKeyDriver
hadoop jar $APP_JAR  $AGGREGATOR $INPUT $OUTPUT
#
# Find Top N
#
N=5
TopN_INPUT=/kv/output
TopN_OUTPUT=/kv/final
hadoop fs -rmr $TopN_OUTPUT
TopN=org.dataalgorithms.chap03.mapreduce.TopNDriver
hadoop jar $APP_JAR $TopN $N $TopN_INPUT $TopN_OUTPUT

Phase 1: Convert nonunique keys into unique keys
https://github.com/mahmoudparsian/data-algorithms-book/blob/master/src/main/java/org/dataalgorithms/chap03/mapreduce/AggregateByKeyMapper.java
public class AggregateByKeyMapper extends
         Mapper<Object, Text, Text, IntWritable> {

   // reuse objects
   private Text K2 = new Text();
   private IntWritable V2 = new IntWritable();

   @Override
   public void map(Object key, Text value, Context context)
         throws IOException, InterruptedException {

      String valueAsString = value.toString().trim();
      String[] tokens = valueAsString.split(",");
      if (tokens.length != 2) {
         return;
      }

      String url = tokens[0];
      int frequency =  Integer.parseInt(tokens[1]);
      K2.set(url);
      V2.set(frequency);
      context.write(K2, V2);
   }
 
}

public class AggregateByKeyReducer  extends
    Reducer<Text, IntWritable, Text, IntWritable> {

      @Override
      public void reduce(Text key, Iterable<IntWritable> values, Context context)
         throws IOException, InterruptedException {
       
         int sum = 0;
         for (IntWritable value : values) {
               sum += value.get();
         }
         context.write(key, new IntWritable(sum));
      }
}

Labels

Review (572) System Design (334) System Design - Review (198) Java (189) Coding (75) Interview-System Design (65) Interview (63) Book Notes (59) Coding - Review (59) to-do (45) Linux (43) Knowledge (39) Interview-Java (35) Knowledge - Review (32) Database (31) Design Patterns (31) Big Data (29) Product Architecture (28) MultiThread (27) Soft Skills (27) Concurrency (26) Cracking Code Interview (26) Miscs (25) Distributed (24) OOD Design (24) Google (23) Career (22) Interview - Review (21) Java - Code (21) Operating System (21) Interview Q&A (20) System Design - Practice (20) Tips (19) Algorithm (17) Company - Facebook (17) Security (17) How to Ace Interview (16) Brain Teaser (14) Linux - Shell (14) Redis (14) Testing (14) Tools (14) Code Quality (13) Search (13) Spark (13) Spring (13) Company - LinkedIn (12) How to (12) Interview-Database (12) Interview-Operating System (12) Solr (12) Architecture Principles (11) Resource (10) Amazon (9) Cache (9) Git (9) Interview - MultiThread (9) Scalability (9) Trouble Shooting (9) Web Dev (9) Architecture Model (8) Better Programmer (8) Cassandra (8) Company - Uber (8) Java67 (8) Math (8) OO Design principles (8) SOLID (8) Design (7) Interview Corner (7) JVM (7) Java Basics (7) Kafka (7) Mac (7) Machine Learning (7) NoSQL (7) C++ (6) Chrome (6) File System (6) Highscalability (6) How to Better (6) Network (6) Restful (6) CareerCup (5) Code Review (5) Hash (5) How to Interview (5) JDK Source Code (5) JavaScript (5) Leetcode (5) Must Known (5) Python (5)

Popular Posts