Sunday, January 3, 2016

Moving Average - Data Algorithms



<name-as-string><,><date-as-timestamp><,><value-as-double>
Option #1: Sorting in Memory
https://github.com/mahmoudparsian/data-algorithms-book/blob/b41b1b009b5493895a44c9b82315a83d937402be/src/main/java/org/dataalgorithms/chap06/TimeSeriesData.java
public class TimeSeriesData
   implements Writable, Comparable<TimeSeriesData> {

  private long timestamp;
  private double value;

  public static TimeSeriesData copy(TimeSeriesData tsd) {
    return new TimeSeriesData(tsd.timestamp, tsd.value);
  }

  public TimeSeriesData(long timestamp, double value) {
    set(timestamp, value);
  }

  public TimeSeriesData() {
  }

  public void set(long timestamp, double value) {
    this.timestamp = timestamp;
    this.value = value;
  }

  public long getTimestamp() {
    return this.timestamp;
  }

  public double getValue() {
    return this.value;
  }

  /**
   * Deserializes the point from the underlying data.
   * @param in a DataInput object to read the point from.
   */
  public void readFields(DataInput in) throws IOException {
    this.timestamp  = in.readLong();
    this.value  = in.readDouble();
  }

  /**
   * Convert a binary data into TimeSeriesData
   *
   * @param in A DataInput object to read from.
   * @return A TimeSeriesData object
   * @throws IOException
   */
  public static TimeSeriesData read(DataInput in) throws IOException {
    TimeSeriesData tsData = new TimeSeriesData();
    tsData.readFields(in);
    return tsData;
  }

  public String getDate() {
    return DateUtil.getDateAsString(this.timestamp);
  }

   /**
    * Creates a clone of this object
    */
    public TimeSeriesData clone() {
       return new TimeSeriesData(timestamp, value);
    }

  @Override
  public void write(DataOutput out) throws IOException {
    out.writeLong(this.timestamp );
    out.writeDouble(this.value );

  }

  /**
   * Used in sorting the data in the reducer
   */
  @Override
  public int compareTo(TimeSeriesData data) {
    if (this.timestamp  < data.timestamp ) {
      return -1;
    }
    else if (this.timestamp  > data.timestamp ) {
      return 1;
    }
    else {
       return 0;
    }
  }

  public String toString() {
       return "("+timestamp+","+value+")";
    }
}

https://github.com/mahmoudparsian/data-algorithms-book/blob/master/src/main/java/org/dataalgorithms/chap06/memorysort/SortInMemory_MovingAverageMapper.java
public class SortInMemory_MovingAverageMapper  
    extends Mapper<LongWritable, Text, Text, TimeSeriesData> {

   // reuse Hadoop's Writable objects
   private final Text reducerKey = new Text();
   private final TimeSeriesData reducerValue = new TimeSeriesData();

   public void map(LongWritable key, Text value, Context context)
       throws IOException, InterruptedException {
       String record = value.toString();
       if ((record == null) || (record.length() == 0)) {
          return;
       }
       String[] tokens = StringUtils.split(record.trim(), ",");
       if (tokens.length == 3) {
          // tokens[0] = name of timeseries as string
          // tokens[1] = timestamp
          // tokens[2] = value of timeseries as double
          Date date = DateUtil.getDate(tokens[1]);
          if (date == null) {
             return;
          }
          reducerKey.set(tokens[0]); // set the name as key
          reducerValue.set(date.getTime(), Double.parseDouble(tokens[2]));
          context.write(reducerKey, reducerValue);
       }
       else {
          // log as error, not enough tokens
       }
   }
}
https://github.com/mahmoudparsian/data-algorithms-book/blob/master/src/main/java/org/dataalgorithms/chap06/memorysort/SortInMemory_MovingAverageReducer.java
public class SortInMemory_MovingAverageReducer
   extends Reducer<Text, TimeSeriesData, Text, Text> {

    int windowSize = 5; // default window size
 
  /**
   *  will be run only once
   *  get parameters from Hadoop's configuration
   */
  public void setup(Context context)
        throws IOException, InterruptedException {
        this.windowSize = context.getConfiguration().getInt("moving.average.window.size", 5);
        System.out.println("setup(): key="+windowSize);
    }

  public void reduce(Text key, Iterable<TimeSeriesData> values, Context context)
    throws IOException, InterruptedException {
     
        System.out.println("reduce(): key="+key.toString());

    // build the unsorted list of timeseries
    List<TimeSeriesData> timeseries = new ArrayList<TimeSeriesData>();
    for (TimeSeriesData tsData : values) {
      TimeSeriesData copy = TimeSeriesData.copy(tsData);
      timeseries.add(copy);
    }
 
    // sort the timeseries data in memory and
        // apply moving average algorithm to sorted timeseries
        Collections.sort(timeseries);
        System.out.println("reduce(): timeseries="+timeseries.toString());
     
     
        // calculate prefix sum
        double sum = 0.0;
        for (int i=0; i < windowSize-1; i++) {
          sum += timeseries.get(i).getValue();
        }
     
        // now we have enough timeseries data to calculate moving average
    Text outputValue = new Text(); // reuse object
        for (int i = windowSize-1; i < timeseries.size(); i++) {
            System.out.println("reduce(): key="+key.toString() + "  i="+i);
          sum += timeseries.get(i).getValue();
          double movingAverage = sum / windowSize;
          long timestamp = timeseries.get(i).getTimestamp();
          outputValue.set(DateUtil.getDateAsString(timestamp) + "," + movingAverage);
          // send output to HDFS
          context.write(key, outputValue);
       
          // prepare for next iteration
          sum -= timeseries.get(i-windowSize+1).getValue();
        }
  } // reduce

}

Let the MapReduce framework do the sorting of the time series data (one of the main features of a MapReduce framework is sorting and grouping by key values, and Hadoop does a pretty good job of this).
Option #2: Sorting Using the MapReduce Framework
in Hadoop, we can sort the data during the shuffle phase of the MRF. When the MRF does sorting by shuffling, it is called “secondary sorting”.
To implement secondary sort for a moving average, we need to make the mapper’s output key a composite of the natural key (name-as-string) and the natural value (timeseries-timestamp).

how the composite keys will be sorted?
https://github.com/mahmoudparsian/data-algorithms-book/blob/master/src/main/java/org/dataalgorithms/chap06/secondarysort/CompositeKey.java
* CompositeKey: represents a pair of 
 * (String name, long timestamp).
 * 
 * 
 * We do a primary grouping pass on the name field to get all of the data of
 * one type together, and then our "secondary sort" during the shuffle phase
 * uses the timestamp long member to sort the timeseries points so that they
 * arrive at the reducer partitioned and in sorted order.
public class CompositeKey implements WritableComparable<CompositeKey> {
    // natural key is (name)
    // composite key is a pair (name, timestamp)
  private String name;
  private long timestamp;

  public CompositeKey(String name, long timestamp) {
    set(name, timestamp);
  }
  
  public CompositeKey() {
  }

  public void set(String name, long timestamp) {
    this.name = name;
    this.timestamp = timestamp;
  }

  public String getName() {
    return this.name;
  }

  public long getTimestamp() {
    return this.timestamp;
  }

  @Override
  public void readFields(DataInput in) throws IOException {
    this.name = in.readUTF();
    this.timestamp = in.readLong();
  }

  @Override
  public void write(DataOutput out) throws IOException {
    out.writeUTF(this.name);
    out.writeLong(this.timestamp);
  }

  @Override
  public int compareTo(CompositeKey other) {
    if (this.name.compareTo(other.name) != 0) {
      return this.name.compareTo(other.name);
    } 
    else if (this.timestamp != other.timestamp) {
      return timestamp < other.timestamp ? -1 : 1;
    } 
    else {
      return 0;
    }

  }

  public static class CompositeKeyComparator extends WritableComparator {
    public CompositeKeyComparator() {
      super(CompositeKey.class);
    }

    public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
      return compareBytes(b1, s1, l1, b2, s2, l2);
    }
  }

  static { // register this comparator
    WritableComparator.define(CompositeKey.class,
        new CompositeKeyComparator());
  }

}

public class CompositeKeyComparator extends WritableComparator {

  protected CompositeKeyComparator() {
    super(CompositeKey.class, true);
  }

  @Override
  public int compare(WritableComparable w1, WritableComparable w2) {
    CompositeKey key1 = (CompositeKey) w1;
    CompositeKey key2 = (CompositeKey) w2;

    int comparison = key1.getName().compareTo(key2.getName());
    if (comparison == 0) {
       // names are equal here
         if (key1.getTimestamp() == key2.getTimestamp()) {
          return 0;
         }
         else if (key1.getTimestamp() < key2.getTimestamp()) {
          return -1;
         }
         else {
          return 1;
         }
    }
    else {
      return comparison;
    }
  }
}


public class NaturalKeyGroupingComparator extends WritableComparator {

  protected NaturalKeyGroupingComparator() {
    super(CompositeKey.class, true);
  }

  @Override
  public int compare(WritableComparable w1, WritableComparable w2) {
    CompositeKey key1 = (CompositeKey) w1;
    CompositeKey key2 = (CompositeKey) w2;
    return key1.getName().compareTo(key2.getName());
  }

}
how data will arrive at reducers?
This will be handled by another custom class, NaturalKeyPartitioner, which implements the Partitioner1 interface, which in turn partitions the key space generated by all mappers. Even though all keys are sorted by the whole CompositeKey (composed of name and timestamp), in partitioning we will use only the name because we want all the sorted values with the same name to go to a single reducer.
 * This custom partitioner allow us to distribute how outputs from the 
 * map stage are sent to the reducers.  NaturalKeyPartitioner partitions 
 * the data output from the map phase (SortByMRF_MovingAverageMapper)
 * before it is sent through the shuffle phase. Since we want a single
 * reducer to recieve all time series data for a single "name", we partition
 * data output of the map phase by only the natural key component ("name").
public class NaturalKeyPartitioner implements
   Partitioner<CompositeKey, TimeSeriesData> {

  @Override
  public int getPartition(CompositeKey key, 
                          TimeSeriesData value,
                      int numberOfPartitions) {
    return Math.abs((int) (hash(key.getName()) % numberOfPartitions));
  }

  @Override
  public void configure(JobConf jobconf) {
  }
  
    /**
     *  adapted from String.hashCode()
     */
    static long hash(String str) {
       long h = 1125899906842597L; // prime
       int length = str.length();
       for (int i = 0; i < length; i++) {
          h = 31*h + str.charAt(i);
       }
       return h;
    } 
}
public class SortByMRF_MovingAverageMapper extends MapReduceBase 
   implements Mapper<LongWritable, Text, CompositeKey, TimeSeriesData> {
   // reuse Hadoop's Writable objects
   private final CompositeKey reducerKey = new CompositeKey();
   private final TimeSeriesData reducerValue = new TimeSeriesData();
  @Override
  public void map(LongWritable inkey, Text value,
      OutputCollector<CompositeKey, TimeSeriesData> output,
      Reporter reporter) throws IOException {
     String record =  value.toString();
     if ( (record == null) || (record.length() ==0) ) {
        return;
     }         
       String[] tokens = StringUtils.split(record, ",");
       if (tokens.length == 3) {
          // tokens[0] = name of timeseries as string
          // tokens[1] = timestamp
          // tokens[2] = value of timeseries as double
          Date date = DateUtil.getDate(tokens[1]);
          if (date == null) {
             return;
          }
          long timestamp = date.getTime();
          reducerKey.set(tokens[0], timestamp); 
          reducerValue.set(timestamp, Double.parseDouble(tokens[2]));
          // emit key-value pair
          output.collect(reducerKey, reducerValue);
       }
       else {
          // log as error, not enough tokens
       }
   }
}
public class SortByMRF_MovingAverageReducer extends MapReduceBase 
   implements Reducer<CompositeKey, TimeSeriesData, Text, Text> {

    int windowSize = 5; // default window size

  /**
   *  will be run only once 
   *  get parameters from Hadoop's configuration
   */
  @Override
  public void configure(JobConf jobconf) {
        this.windowSize = jobconf.getInt("moving.average.window.size", 5);
  }  
   
  public void reduce(CompositeKey key, 
                     Iterator<TimeSeriesData> values,
                 OutputCollector<Text, Text> output, 
                 Reporter reporter)
    throws IOException {

    // note that values are sorted.
        // apply moving average algorithm to sorted timeseries
        Text outputKey = new Text();
        Text outputValue = new Text();
        MovingAverage ma = new MovingAverage(this.windowSize);
        while (values.hasNext()) {
             TimeSeriesData data = values.next();
             ma.addNewNumber(data.getValue());
             double movingAverage = ma.getMovingAverage();
             long timestamp = data.getTimestamp();
             String dateAsString = DateUtil.getDateAsString(timestamp);
             //THE_LOGGER.info("Next number = " + x + ", SMA = " + sma.getMovingAverage());
           outputValue.set(dateAsString + "," + movingAverage);
           outputKey.set(key.getName());
           output.collect(outputKey, outputValue);
        }        

  } // reduce
}
    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
    JobConf jobconf = new JobConf(conf, SortByMRF_MovingAverageDriver.class);
    jobconf.setJobName("SortByMRF_MovingAverageDriver");
    
       String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
       if (otherArgs.length != 3) {
          System.err.println("Usage: SortByMRF_MovingAverageDriver <window_size> <input> <output>");
          System.exit(1);
       }

       // add jars to distributed cache
       HadoopUtil.addJarsToDistributedCache(conf, "/lib/");
       
       // set mapper/reducer
       jobconf.setMapperClass(SortByMRF_MovingAverageMapper.class);
       jobconf.setReducerClass(SortByMRF_MovingAverageReducer.class);
       
       // define mapper's output key-value
       jobconf.setMapOutputKeyClass(CompositeKey.class);
       jobconf.setMapOutputValueClass(TimeSeriesData.class);
              
       // define reducer's output key-value
       jobconf.setOutputKeyClass(Text.class);
       jobconf.setOutputValueClass(Text.class);

       // set window size for moving average calculation
       int windowSize = Integer.parseInt(otherArgs[0]);
       jobconf.setInt("moving.average.window.size", windowSize);      
       
       // define I/O
     FileInputFormat.setInputPaths(jobconf, new Path(otherArgs[1]));
     FileOutputFormat.setOutputPath(jobconf, new Path(otherArgs[2]));
       
       jobconf.setInputFormat(TextInputFormat.class); 
       jobconf.setOutputFormat(TextOutputFormat.class);
     jobconf.setCompressMapOutput(true);       
       
       // the following 3 setting are needed for "secondary sorting"
       // Partitioner decides which mapper output goes to which reducer 
       // based on mapper output key. In general, different key is in 
       // different group (Iterator at the reducer side). But sometimes, 
       // we want different key in the same group. This is the time for 
       // Output Value Grouping Comparator, which is used to group mapper 
       // output (similar to group by condition in SQL).  The Output Key 
       // Comparator is used during sort stage for the mapper output key.
       jobconf.setPartitionerClass(NaturalKeyPartitioner.class);
       jobconf.setOutputKeyComparatorClass(CompositeKeyComparator.class);
       jobconf.setOutputValueGroupingComparator(NaturalKeyGroupingComparator.class);
       
       JobClient.runJob(jobconf);
    }

Spark:
https://gist.github.com/samklr/27411098f04fc46dcd05
https://gist.github.com/samklr/27411098f04fc46dcd05
http://stackoverflow.com/questions/23402303/apache-spark-moving-average

Hadoop Way
Composite Key – A Key that is a combination of the natural key and the natural value we want to sort by.
http://blog.cloudera.com/blog/2011/04/simple-moving-average-secondary-sort-and-mapreduce-part-3/
https://github.com/mahmoudparsian/data-algorithms-book/tree/master/src/main/java/org/dataalgorithms/chap06/secondarysort

Labels

Review (572) System Design (334) System Design - Review (198) Java (189) Coding (75) Interview-System Design (65) Interview (63) Book Notes (59) Coding - Review (59) to-do (45) Linux (43) Knowledge (39) Interview-Java (35) Knowledge - Review (32) Database (31) Design Patterns (31) Big Data (29) Product Architecture (28) MultiThread (27) Soft Skills (27) Concurrency (26) Cracking Code Interview (26) Miscs (25) Distributed (24) OOD Design (24) Google (23) Career (22) Interview - Review (21) Java - Code (21) Operating System (21) Interview Q&A (20) System Design - Practice (20) Tips (19) Algorithm (17) Company - Facebook (17) Security (17) How to Ace Interview (16) Brain Teaser (14) Linux - Shell (14) Redis (14) Testing (14) Tools (14) Code Quality (13) Search (13) Spark (13) Spring (13) Company - LinkedIn (12) How to (12) Interview-Database (12) Interview-Operating System (12) Solr (12) Architecture Principles (11) Resource (10) Amazon (9) Cache (9) Git (9) Interview - MultiThread (9) Scalability (9) Trouble Shooting (9) Web Dev (9) Architecture Model (8) Better Programmer (8) Cassandra (8) Company - Uber (8) Java67 (8) Math (8) OO Design principles (8) SOLID (8) Design (7) Interview Corner (7) JVM (7) Java Basics (7) Kafka (7) Mac (7) Machine Learning (7) NoSQL (7) C++ (6) Chrome (6) File System (6) Highscalability (6) How to Better (6) Network (6) Restful (6) CareerCup (5) Code Review (5) Hash (5) How to Interview (5) JDK Source Code (5) JavaScript (5) Leetcode (5) Must Known (5) Python (5)

Popular Posts