Sunday, December 27, 2015

Data Algorithms



https://github.com/mahmoudparsian/data-algorithms-book
The MapReduce framework automatically sorts the keys generated by mappers. This means that, before starting reducers, all intermediate key-value pairs generated by mappers must be sorted by key (and not by value). Values passed to each reducer are not sorted at all, they can be in any order.

Secondary Sort - make the values in reducer sorted
The first approach involves having the reducer read and buffer all of the values for a given key (in an array data structure, for example), then doing an in-reducer sort on the values. This approach will not scale

The second approach involves using the MapReduce framework for sorting the reducer values (this does not require in-reducer sorting of values passed to the reducer). This approach consists of “creating a composite key by adding a part of, or the entire value to, the natural key to achieve your sorting objectives.”
4 public class DateTemperaturePair
 5    implements Writable, WritableComparable<DateTemperaturePair> {
 6
 7     private Text yearMonth = new Text();                 // natural key
 8     private Text day = new Text();
 9     private IntWritable temperature = new IntWritable(); // secondary key
10
11     ...
12
13     @Override
14     /**
15      * This comparator controls the sort order of the keys.
16      */
17     public int compareTo(DateTemperaturePair pair) {
18         int compareValue = this.yearMonth.compareTo(pair.getYearMonth());
19         if (compareValue == 0) {
20             compareValue = temperature.compareTo(pair.getTemperature());
21         }
22         //return compareValue;    // sort ascending
23         return -1*compareValue;   // sort descending
24     }
25     ...
26 }

CUSTOM PARTITIONER
 4 public class DateTemperaturePartitioner
 5    extends Partitioner<DateTemperaturePair, Text> {
 6
 7     @Override
 8     public int getPartition(DateTemperaturePair pair,
 9                             Text text,
10                             int numberOfPartitions) {
11         // make sure that partitions are non-negative
12         return Math.abs(pair.getYearMonth().hashCode() % numberOfPartitions);
13      }
14 }

GROUPING COMPARATOR
4 public class DateTemperatureGroupingComparator
 5    extends WritableComparator {
 6
 7     public DateTemperatureGroupingComparator() {
 8         super(DateTemperaturePair.class, true);
 9     }
10
11     @Override
12     /**
13      * This comparator controls which keys are grouped
14      * together into a single call to the reduce() method
15      */
16     public int compare(WritableComparable wc1, WritableComparable wc2) {
17         DateTemperaturePair pair = (DateTemperaturePair) wc1;
18         DateTemperaturePair pair2 = (DateTemperaturePair) wc2;
19         return pair.getYearMonth().compareTo(pair2.getYearMonth());
20     }
21 }
job.setGroupingComparatorClass(YearMonthGroupingComparator.class);

public class SecondarySortMapper 
    extends Mapper<LongWritable, Text, DateTemperaturePair, Text> {

    private Text theTemperature = new Text();
    private DateTemperaturePair pair = new DateTemperaturePair();

    @Override
    /**
     * @param key is generated by Hadoop (ignored here)
     * @param value has this format: "YYYY,MM,DD,temperature"
     */
    protected void map(LongWritable key, Text value, Context context) 
        throws IOException, InterruptedException {
        String line = value.toString();
        String[] tokens = line.split(",");
        // YYYY = tokens[0]
        // MM = tokens[1]
        // DD = tokens[2]
        // temperature = tokens[3]
        String yearMonth = tokens[0] + tokens[1];
        String day = tokens[2];
        int temperature = Integer.parseInt(tokens[3]);

        pair.setYearMonth(yearMonth);
        pair.setDay(day);
        pair.setTemperature(temperature);
        theTemperature.set(tokens[3]);

        context.write(pair, theTemperature);
    }
}

public class SecondarySortReducer 
    extends Reducer<DateTemperaturePair, Text, Text, Text> {

    @Override
    protected void reduce(DateTemperaturePair key, Iterable<Text> values, Context context) 
      throws IOException, InterruptedException {
      StringBuilder builder = new StringBuilder();
      for (Text value : values) {
        builder.append(value.toString());
        builder.append(",");
    }
        context.write(key.getYearMonth(), new Text(builder.toString()));
    }
}

public class SecondarySortDriver  extends Configured implements Tool {

  private static Logger theLogger = Logger.getLogger(SecondarySortDriver.class);

  public int run(String[] args) throws Exception {

      Configuration conf = getConf();
    Job job = new Job(conf);
    job.setJarByClass(SecondarySortDriver.class);
    job.setJobName("SecondarySortDriver");
    
        // args[0] = input directory
        // args[1] = output directory
    FileInputFormat.setInputPaths(job, new Path(args[0]));
    FileOutputFormat.setOutputPath(job, new Path(args[1]));

    job.setOutputKeyClass(DateTemperaturePair.class);
    job.setOutputValueClass(Text.class);
    
          job.setMapperClass(SecondarySortMapper.class);
          job.setReducerClass(SecondarySortReducer.class);    
          job.setPartitionerClass(DateTemperaturePartitioner.class);
          job.setGroupingComparatorClass(DateTemperatureGroupingComparator.class);

    boolean status = job.waitForCompletion(true);
    theLogger.info("run(): status="+status);
    return status ? 0 : 1;
  }

  /**
  * The main driver for word count map/reduce program.
  * Invoke this method to submit the map/reduce job.
  * @throws Exception When there is communication problems with the job tracker.
  */
  public static void main(String[] args) throws Exception {
    // Make sure there are exactly 2 parameters
    if (args.length != 2) {
      theLogger.warn("SecondarySortDriver <input-dir> <output-dir>");
      throw new IllegalArgumentException("SecondarySortDriver <input-dir> <output-dir>");
    }

    //String inputDir = args[0];
    //String outputDir = args[1];
    int returnStatus = submitJob(args);
    theLogger.info("returnStatus="+returnStatus);
    
    System.exit(returnStatus);
  }


  /**
  * The main driver for word count map/reduce program.
  * Invoke this method to submit the map/reduce job.
  * @throws Exception When there is communication problems with the job tracker.
  */
  public static int submitJob(String[] args) throws Exception {
    //String[] args = new String[2];
    //args[0] = inputDir;
    //args[1] = outputDir;
    int returnStatus = ToolRunner.run(new SecondarySortDriver(), args);
    return returnStatus;
  }
}
To understand GroupComparator, see my answer to this question -
SortComparator:Used to define how map output keys are sorted
Excerpts from the book Hadoop - Definitive Guide:
Sort order for keys is found as follows:
  1. If the property mapred.output.key.comparator.class is set, either explicitly or by calling setSortComparatorClass() on Job, then an instance of that class is used. (In the old API the equivalent method is setOutputKeyComparatorClass() on JobConf.)
  2. Otherwise, keys must be a subclass of WritableComparable, and the registered comparator for the key class is used.
  3. If there is no registered comparator, then a RawComparator is used that deserializes the byte streams being compared into objects and delegates to the WritableComparable’s compareTo() method.
SortComparator Vs GroupComparator in a one liner: SortComparator decides how map output keys are sorted while GroupComparator decides which map output keys within the Reducer go to the same reduce method call.
Following on from this explanation
Input:
symbol time price
a      1    10
a      2    20
b      3    30
Map output: create composite key\values like so:
symbol-time time-price
a-1         1-10
a-2         2-20
b-3         3-30
The Partitioner: will route the a-1 and a-2 keys to the same reducer despite the keys being different. It will also route the b-3 to a separate reducer.
GroupComparator: once the composites key\value arrive at the reducer instead of the reducer getting
(a-1,{1-10})
(a-2,{2-20})
the above will happen due to the unique key values following composition.
the group comparator will ensure the reducer gets:
(a,{1-10,2-20})
[[In a single reduce method call.]]

The keys are sorted before processed by a reducer, using a
Raw comparator. The default comparator uses the compareTo method provided by the key type, which is a subclass of WritableComparable

If we would like in a Hadoop job to sort the IntPair using the first element only, we can provide a RawComparator and set it using job.setSortComparatorClass:

public static class IntPair implements WritableComparable<IntPair> {
  ...
  public static class FirstOnlyComparator implements RawComparator<IntPair> {
    public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
      int first1 = WritableComparator.readInt(b1, s1);
      int first2 = WritableComparator.readInt(b2, s2);
      return first1 < first2 ? -1 : first1 == first2 ? 0 : 1;
    }
    public int compare(IntPair x, IntPair y) {
      return x.getFirst() < y.getFirst() ? -1 : x.getFirst() == y.getFirst() ? 0 : 1;
    }
  }
}
...
job.setSortComparatorClass(IntPair.FirstOnlyComparator.class);
As an example, consider that the input consists of (IntWritableIntWritable) pairs. We would like to perform a Hadoop job with these pairs, such that the values belonging to one key are sorted before processed by a reducer.
  1. The mapper produces (IntPairIntWritable) pairs. Notice that the key now consists of both numbers.
  2. These pairs are sorted by the IntPair keys – i.e., by both numbers.
  3. The custom grouping comparator is used, which groups the IntPair keys using the first element only (using theRawComparator from the previous section):
Spark mode:
Standalone mode
This is the default setup. You start the Spark master on a master node and a “worker” on every slave node, and submit your Spark application to the Spark master.
YARN client mode
In this mode, you do not start a Spark master or worker nodes. Instead, you submit the Spark application to YARN, which runs the Spark driver in the client Spark process that submits the application.
YARN cluster mode
In this mode, you do not start a Spark master or worker nodes. Instead, you submit the Spark application to YARN, which runs the Spark driver in the ApplicationMaster in YARN.

Labels

Review (572) System Design (334) System Design - Review (198) Java (189) Coding (75) Interview-System Design (65) Interview (63) Book Notes (59) Coding - Review (59) to-do (45) Linux (43) Knowledge (39) Interview-Java (35) Knowledge - Review (32) Database (31) Design Patterns (31) Big Data (29) Product Architecture (28) MultiThread (27) Soft Skills (27) Concurrency (26) Cracking Code Interview (26) Miscs (25) Distributed (24) OOD Design (24) Google (23) Career (22) Interview - Review (21) Java - Code (21) Operating System (21) Interview Q&A (20) System Design - Practice (20) Tips (19) Algorithm (17) Company - Facebook (17) Security (17) How to Ace Interview (16) Brain Teaser (14) Linux - Shell (14) Redis (14) Testing (14) Tools (14) Code Quality (13) Search (13) Spark (13) Spring (13) Company - LinkedIn (12) How to (12) Interview-Database (12) Interview-Operating System (12) Solr (12) Architecture Principles (11) Resource (10) Amazon (9) Cache (9) Git (9) Interview - MultiThread (9) Scalability (9) Trouble Shooting (9) Web Dev (9) Architecture Model (8) Better Programmer (8) Cassandra (8) Company - Uber (8) Java67 (8) Math (8) OO Design principles (8) SOLID (8) Design (7) Interview Corner (7) JVM (7) Java Basics (7) Kafka (7) Mac (7) Machine Learning (7) NoSQL (7) C++ (6) Chrome (6) File System (6) Highscalability (6) How to Better (6) Network (6) Restful (6) CareerCup (5) Code Review (5) Hash (5) How to Interview (5) JDK Source Code (5) JavaScript (5) Leetcode (5) Must Known (5) Python (5)

Popular Posts