<name-as-string><,><date-as-timestamp><,><value-as-double>
Option #1: Sorting in Memory
https://github.com/mahmoudparsian/data-algorithms-book/blob/b41b1b009b5493895a44c9b82315a83d937402be/src/main/java/org/dataalgorithms/chap06/TimeSeriesData.java
public class TimeSeriesData
implements Writable, Comparable<TimeSeriesData> {
private long timestamp;
private double value;
public static TimeSeriesData copy(TimeSeriesData tsd) {
return new TimeSeriesData(tsd.timestamp, tsd.value);
}
public TimeSeriesData(long timestamp, double value) {
set(timestamp, value);
}
public TimeSeriesData() {
}
public void set(long timestamp, double value) {
this.timestamp = timestamp;
this.value = value;
}
public long getTimestamp() {
return this.timestamp;
}
public double getValue() {
return this.value;
}
/**
* Deserializes the point from the underlying data.
* @param in a DataInput object to read the point from.
*/
public void readFields(DataInput in) throws IOException {
this.timestamp = in.readLong();
this.value = in.readDouble();
}
/**
* Convert a binary data into TimeSeriesData
*
* @param in A DataInput object to read from.
* @return A TimeSeriesData object
* @throws IOException
*/
public static TimeSeriesData read(DataInput in) throws IOException {
TimeSeriesData tsData = new TimeSeriesData();
tsData.readFields(in);
return tsData;
}
public String getDate() {
return DateUtil.getDateAsString(this.timestamp);
}
/**
* Creates a clone of this object
*/
public TimeSeriesData clone() {
return new TimeSeriesData(timestamp, value);
}
@Override
public void write(DataOutput out) throws IOException {
out.writeLong(this.timestamp );
out.writeDouble(this.value );
}
/**
* Used in sorting the data in the reducer
*/
@Override
public int compareTo(TimeSeriesData data) {
if (this.timestamp < data.timestamp ) {
return -1;
}
else if (this.timestamp > data.timestamp ) {
return 1;
}
else {
return 0;
}
}
public String toString() {
return "("+timestamp+","+value+")";
}
}
https://github.com/mahmoudparsian/data-algorithms-book/blob/master/src/main/java/org/dataalgorithms/chap06/memorysort/SortInMemory_MovingAverageMapper.java
public class SortInMemory_MovingAverageMapper
extends Mapper<LongWritable, Text, Text, TimeSeriesData> {
// reuse Hadoop's Writable objects
private final Text reducerKey = new Text();
private final TimeSeriesData reducerValue = new TimeSeriesData();
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String record = value.toString();
if ((record == null) || (record.length() == 0)) {
return;
}
String[] tokens = StringUtils.split(record.trim(), ",");
if (tokens.length == 3) {
// tokens[0] = name of timeseries as string
// tokens[1] = timestamp
// tokens[2] = value of timeseries as double
Date date = DateUtil.getDate(tokens[1]);
if (date == null) {
return;
}
reducerKey.set(tokens[0]); // set the name as key
reducerValue.set(date.getTime(), Double.parseDouble(tokens[2]));
context.write(reducerKey, reducerValue);
}
else {
// log as error, not enough tokens
}
}
}
https://github.com/mahmoudparsian/data-algorithms-book/blob/master/src/main/java/org/dataalgorithms/chap06/memorysort/SortInMemory_MovingAverageReducer.java
public class SortInMemory_MovingAverageReducer
extends Reducer<Text, TimeSeriesData, Text, Text> {
int windowSize = 5; // default window size
/**
* will be run only once
* get parameters from Hadoop's configuration
*/
public void setup(Context context)
throws IOException, InterruptedException {
this.windowSize = context.getConfiguration().getInt("moving.average.window.size", 5);
System.out.println("setup(): key="+windowSize);
}
public void reduce(Text key, Iterable<TimeSeriesData> values, Context context)
throws IOException, InterruptedException {
System.out.println("reduce(): key="+key.toString());
// build the unsorted list of timeseries
List<TimeSeriesData> timeseries = new ArrayList<TimeSeriesData>();
for (TimeSeriesData tsData : values) {
TimeSeriesData copy = TimeSeriesData.copy(tsData);
timeseries.add(copy);
}
// sort the timeseries data in memory and
// apply moving average algorithm to sorted timeseries
Collections.sort(timeseries);
System.out.println("reduce(): timeseries="+timeseries.toString());
// calculate prefix sum
double sum = 0.0;
for (int i=0; i < windowSize-1; i++) {
sum += timeseries.get(i).getValue();
}
// now we have enough timeseries data to calculate moving average
Text outputValue = new Text(); // reuse object
for (int i = windowSize-1; i < timeseries.size(); i++) {
System.out.println("reduce(): key="+key.toString() + " i="+i);
sum += timeseries.get(i).getValue();
double movingAverage = sum / windowSize;
long timestamp = timeseries.get(i).getTimestamp();
outputValue.set(DateUtil.getDateAsString(timestamp) + "," + movingAverage);
// send output to HDFS
context.write(key, outputValue);
// prepare for next iteration
sum -= timeseries.get(i-windowSize+1).getValue();
}
} // reduce
}
Let the MapReduce framework do the sorting of the time series data (one of the main features of a MapReduce framework is sorting and grouping by key values, and Hadoop does a pretty good job of this).
Spark:
https://gist.github.com/samklr/27411098f04fc46dcd05
https://gist.github.com/samklr/27411098f04fc46dcd05
http://stackoverflow.com/questions/23402303/apache-spark-moving-average
Hadoop Way
Composite Key – A Key that is a combination of the natural key and the natural value we want to sort by.
http://blog.cloudera.com/blog/2011/04/simple-moving-average-secondary-sort-and-mapreduce-part-3/
https://github.com/mahmoudparsian/data-algorithms-book/tree/master/src/main/java/org/dataalgorithms/chap06/secondarysort
Option #1: Sorting in Memory
https://github.com/mahmoudparsian/data-algorithms-book/blob/b41b1b009b5493895a44c9b82315a83d937402be/src/main/java/org/dataalgorithms/chap06/TimeSeriesData.java
public class TimeSeriesData
implements Writable, Comparable<TimeSeriesData> {
private long timestamp;
private double value;
public static TimeSeriesData copy(TimeSeriesData tsd) {
return new TimeSeriesData(tsd.timestamp, tsd.value);
}
public TimeSeriesData(long timestamp, double value) {
set(timestamp, value);
}
public TimeSeriesData() {
}
public void set(long timestamp, double value) {
this.timestamp = timestamp;
this.value = value;
}
public long getTimestamp() {
return this.timestamp;
}
public double getValue() {
return this.value;
}
/**
* Deserializes the point from the underlying data.
* @param in a DataInput object to read the point from.
*/
public void readFields(DataInput in) throws IOException {
this.timestamp = in.readLong();
this.value = in.readDouble();
}
/**
* Convert a binary data into TimeSeriesData
*
* @param in A DataInput object to read from.
* @return A TimeSeriesData object
* @throws IOException
*/
public static TimeSeriesData read(DataInput in) throws IOException {
TimeSeriesData tsData = new TimeSeriesData();
tsData.readFields(in);
return tsData;
}
public String getDate() {
return DateUtil.getDateAsString(this.timestamp);
}
/**
* Creates a clone of this object
*/
public TimeSeriesData clone() {
return new TimeSeriesData(timestamp, value);
}
@Override
public void write(DataOutput out) throws IOException {
out.writeLong(this.timestamp );
out.writeDouble(this.value );
}
/**
* Used in sorting the data in the reducer
*/
@Override
public int compareTo(TimeSeriesData data) {
if (this.timestamp < data.timestamp ) {
return -1;
}
else if (this.timestamp > data.timestamp ) {
return 1;
}
else {
return 0;
}
}
public String toString() {
return "("+timestamp+","+value+")";
}
}
https://github.com/mahmoudparsian/data-algorithms-book/blob/master/src/main/java/org/dataalgorithms/chap06/memorysort/SortInMemory_MovingAverageMapper.java
public class SortInMemory_MovingAverageMapper
extends Mapper<LongWritable, Text, Text, TimeSeriesData> {
// reuse Hadoop's Writable objects
private final Text reducerKey = new Text();
private final TimeSeriesData reducerValue = new TimeSeriesData();
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String record = value.toString();
if ((record == null) || (record.length() == 0)) {
return;
}
String[] tokens = StringUtils.split(record.trim(), ",");
if (tokens.length == 3) {
// tokens[0] = name of timeseries as string
// tokens[1] = timestamp
// tokens[2] = value of timeseries as double
Date date = DateUtil.getDate(tokens[1]);
if (date == null) {
return;
}
reducerKey.set(tokens[0]); // set the name as key
reducerValue.set(date.getTime(), Double.parseDouble(tokens[2]));
context.write(reducerKey, reducerValue);
}
else {
// log as error, not enough tokens
}
}
}
https://github.com/mahmoudparsian/data-algorithms-book/blob/master/src/main/java/org/dataalgorithms/chap06/memorysort/SortInMemory_MovingAverageReducer.java
public class SortInMemory_MovingAverageReducer
extends Reducer<Text, TimeSeriesData, Text, Text> {
int windowSize = 5; // default window size
/**
* will be run only once
* get parameters from Hadoop's configuration
*/
public void setup(Context context)
throws IOException, InterruptedException {
this.windowSize = context.getConfiguration().getInt("moving.average.window.size", 5);
System.out.println("setup(): key="+windowSize);
}
public void reduce(Text key, Iterable<TimeSeriesData> values, Context context)
throws IOException, InterruptedException {
System.out.println("reduce(): key="+key.toString());
// build the unsorted list of timeseries
List<TimeSeriesData> timeseries = new ArrayList<TimeSeriesData>();
for (TimeSeriesData tsData : values) {
TimeSeriesData copy = TimeSeriesData.copy(tsData);
timeseries.add(copy);
}
// sort the timeseries data in memory and
// apply moving average algorithm to sorted timeseries
Collections.sort(timeseries);
System.out.println("reduce(): timeseries="+timeseries.toString());
// calculate prefix sum
double sum = 0.0;
for (int i=0; i < windowSize-1; i++) {
sum += timeseries.get(i).getValue();
}
// now we have enough timeseries data to calculate moving average
Text outputValue = new Text(); // reuse object
for (int i = windowSize-1; i < timeseries.size(); i++) {
System.out.println("reduce(): key="+key.toString() + " i="+i);
sum += timeseries.get(i).getValue();
double movingAverage = sum / windowSize;
long timestamp = timeseries.get(i).getTimestamp();
outputValue.set(DateUtil.getDateAsString(timestamp) + "," + movingAverage);
// send output to HDFS
context.write(key, outputValue);
// prepare for next iteration
sum -= timeseries.get(i-windowSize+1).getValue();
}
} // reduce
}
Let the MapReduce framework do the sorting of the time series data (one of the main features of a MapReduce framework is sorting and grouping by key values, and Hadoop does a pretty good job of this).
Option #2: Sorting Using the MapReduce Framework
in Hadoop, we can sort the data during the shuffle phase of the MRF. When the MRF does sorting by shuffling, it is called “secondary sorting”.
To implement secondary sort for a moving average, we need to make the mapper’s output key a composite of the natural key (name-as-string) and the natural value (timeseries-timestamp).
how the composite keys will be sorted?
https://github.com/mahmoudparsian/data-algorithms-book/blob/master/src/main/java/org/dataalgorithms/chap06/secondarysort/CompositeKey.java
* CompositeKey: represents a pair of
* (String name, long timestamp).
*
*
* We do a primary grouping pass on the name field to get all of the data of
* one type together, and then our "secondary sort" during the shuffle phase
* uses the timestamp long member to sort the timeseries points so that they
* arrive at the reducer partitioned and in sorted order.
public class CompositeKey implements WritableComparable<CompositeKey> {
// natural key is (name)
// composite key is a pair (name, timestamp)
private String name;
private long timestamp;
public CompositeKey(String name, long timestamp) {
set(name, timestamp);
}
public CompositeKey() {
}
public void set(String name, long timestamp) {
this.name = name;
this.timestamp = timestamp;
}
public String getName() {
return this.name;
}
public long getTimestamp() {
return this.timestamp;
}
@Override
public void readFields(DataInput in) throws IOException {
this.name = in.readUTF();
this.timestamp = in.readLong();
}
@Override
public void write(DataOutput out) throws IOException {
out.writeUTF(this.name);
out.writeLong(this.timestamp);
}
@Override
public int compareTo(CompositeKey other) {
if (this.name.compareTo(other.name) != 0) {
return this.name.compareTo(other.name);
}
else if (this.timestamp != other.timestamp) {
return timestamp < other.timestamp ? -1 : 1;
}
else {
return 0;
}
}
public static class CompositeKeyComparator extends WritableComparator {
public CompositeKeyComparator() {
super(CompositeKey.class);
}
public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
return compareBytes(b1, s1, l1, b2, s2, l2);
}
}
static { // register this comparator
WritableComparator.define(CompositeKey.class,
new CompositeKeyComparator());
}
}
public class CompositeKeyComparator extends WritableComparator {
protected CompositeKeyComparator() {
super(CompositeKey.class, true);
}
@Override
public int compare(WritableComparable w1, WritableComparable w2) {
CompositeKey key1 = (CompositeKey) w1;
CompositeKey key2 = (CompositeKey) w2;
int comparison = key1.getName().compareTo(key2.getName());
if (comparison == 0) {
// names are equal here
if (key1.getTimestamp() == key2.getTimestamp()) {
return 0;
}
else if (key1.getTimestamp() < key2.getTimestamp()) {
return -1;
}
else {
return 1;
}
}
else {
return comparison;
}
}
}
public class NaturalKeyGroupingComparator extends WritableComparator {
protected NaturalKeyGroupingComparator() {
super(CompositeKey.class, true);
}
@Override
public int compare(WritableComparable w1, WritableComparable w2) {
CompositeKey key1 = (CompositeKey) w1;
CompositeKey key2 = (CompositeKey) w2;
return key1.getName().compareTo(key2.getName());
}
}
how data will arrive at reducers?
This will be handled by another custom class, NaturalKeyPartitioner, which implements the Partitioner1 interface, which in turn partitions the key space generated by all mappers. Even though all keys are sorted by the whole CompositeKey (composed of name and timestamp), in partitioning we will use only the name because we want all the sorted values with the same name to go to a single reducer.
* This custom partitioner allow us to distribute how outputs from the
* map stage are sent to the reducers. NaturalKeyPartitioner partitions
* the data output from the map phase (SortByMRF_MovingAverageMapper)
* before it is sent through the shuffle phase. Since we want a single
* reducer to recieve all time series data for a single "name", we partition
* data output of the map phase by only the natural key component ("name").
public class NaturalKeyPartitioner implements
Partitioner<CompositeKey, TimeSeriesData> {
@Override
public int getPartition(CompositeKey key,
TimeSeriesData value,
int numberOfPartitions) {
return Math.abs((int) (hash(key.getName()) % numberOfPartitions));
}
@Override
public void configure(JobConf jobconf) {
}
/**
* adapted from String.hashCode()
*/
static long hash(String str) {
long h = 1125899906842597L; // prime
int length = str.length();
for (int i = 0; i < length; i++) {
h = 31*h + str.charAt(i);
}
return h;
}
}
public class SortByMRF_MovingAverageMapper extends MapReduceBase
implements Mapper<LongWritable, Text, CompositeKey, TimeSeriesData> {
// reuse Hadoop's Writable objects
private final CompositeKey reducerKey = new CompositeKey();
private final TimeSeriesData reducerValue = new TimeSeriesData();
@Override
public void map(LongWritable inkey, Text value,
OutputCollector<CompositeKey, TimeSeriesData> output,
Reporter reporter) throws IOException {
String record = value.toString();
if ( (record == null) || (record.length() ==0) ) {
return;
}
String[] tokens = StringUtils.split(record, ",");
if (tokens.length == 3) {
// tokens[0] = name of timeseries as string
// tokens[1] = timestamp
// tokens[2] = value of timeseries as double
Date date = DateUtil.getDate(tokens[1]);
if (date == null) {
return;
}
long timestamp = date.getTime();
reducerKey.set(tokens[0], timestamp);
reducerValue.set(timestamp, Double.parseDouble(tokens[2]));
// emit key-value pair
output.collect(reducerKey, reducerValue);
}
else {
// log as error, not enough tokens
}
}
}
public class SortByMRF_MovingAverageReducer extends MapReduceBase
implements Reducer<CompositeKey, TimeSeriesData, Text, Text> {
int windowSize = 5; // default window size
/**
* will be run only once
* get parameters from Hadoop's configuration
*/
@Override
public void configure(JobConf jobconf) {
this.windowSize = jobconf.getInt("moving.average.window.size", 5);
}
public void reduce(CompositeKey key,
Iterator<TimeSeriesData> values,
OutputCollector<Text, Text> output,
Reporter reporter)
throws IOException {
// note that values are sorted.
// apply moving average algorithm to sorted timeseries
Text outputKey = new Text();
Text outputValue = new Text();
MovingAverage ma = new MovingAverage(this.windowSize);
while (values.hasNext()) {
TimeSeriesData data = values.next();
ma.addNewNumber(data.getValue());
double movingAverage = ma.getMovingAverage();
long timestamp = data.getTimestamp();
String dateAsString = DateUtil.getDateAsString(timestamp);
//THE_LOGGER.info("Next number = " + x + ", SMA = " + sma.getMovingAverage());
outputValue.set(dateAsString + "," + movingAverage);
outputKey.set(key.getName());
output.collect(outputKey, outputValue);
}
} // reduce
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
JobConf jobconf = new JobConf(conf, SortByMRF_MovingAverageDriver.class);
jobconf.setJobName("SortByMRF_MovingAverageDriver");
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
if (otherArgs.length != 3) {
System.err.println("Usage: SortByMRF_MovingAverageDriver <window_size> <input> <output>");
System.exit(1);
}
// add jars to distributed cache
HadoopUtil.addJarsToDistributedCache(conf, "/lib/");
// set mapper/reducer
jobconf.setMapperClass(SortByMRF_MovingAverageMapper.class);
jobconf.setReducerClass(SortByMRF_MovingAverageReducer.class);
// define mapper's output key-value
jobconf.setMapOutputKeyClass(CompositeKey.class);
jobconf.setMapOutputValueClass(TimeSeriesData.class);
// define reducer's output key-value
jobconf.setOutputKeyClass(Text.class);
jobconf.setOutputValueClass(Text.class);
// set window size for moving average calculation
int windowSize = Integer.parseInt(otherArgs[0]);
jobconf.setInt("moving.average.window.size", windowSize);
// define I/O
FileInputFormat.setInputPaths(jobconf, new Path(otherArgs[1]));
FileOutputFormat.setOutputPath(jobconf, new Path(otherArgs[2]));
jobconf.setInputFormat(TextInputFormat.class);
jobconf.setOutputFormat(TextOutputFormat.class);
jobconf.setCompressMapOutput(true);
// the following 3 setting are needed for "secondary sorting"
// Partitioner decides which mapper output goes to which reducer
// based on mapper output key. In general, different key is in
// different group (Iterator at the reducer side). But sometimes,
// we want different key in the same group. This is the time for
// Output Value Grouping Comparator, which is used to group mapper
// output (similar to group by condition in SQL). The Output Key
// Comparator is used during sort stage for the mapper output key.
jobconf.setPartitionerClass(NaturalKeyPartitioner.class);
jobconf.setOutputKeyComparatorClass(CompositeKeyComparator.class);
jobconf.setOutputValueGroupingComparator(NaturalKeyGroupingComparator.class);
JobClient.runJob(jobconf);
}
Spark:
https://gist.github.com/samklr/27411098f04fc46dcd05
https://gist.github.com/samklr/27411098f04fc46dcd05
http://stackoverflow.com/questions/23402303/apache-spark-moving-average
Hadoop Way
Composite Key – A Key that is a combination of the natural key and the natural value we want to sort by.
http://blog.cloudera.com/blog/2011/04/simple-moving-average-secondary-sort-and-mapreduce-part-3/
https://github.com/mahmoudparsian/data-algorithms-book/tree/master/src/main/java/org/dataalgorithms/chap06/secondarysort