Data Algorithms: Recipes for Scaling Up with Hadoop and Spark
For example, if key-as-string is a URL and value-as-integer is the number of times that URL is visited, then you might ask: what are the top 10 URLs for last week?
SELECT cat_id, cat_name, cat_weight
FROM cats
ORDER BY cat_weight DESC LIMIT 10;
Spark:
We assume that all input keys are unique. That is, for a given input set {(K, V)}, all Ks are unique.
https://github.com/mahmoudparsian/data-algorithms-book/blob/master/src/main/java/org/dataalgorithms/chap03/spark/Top10.java
* Assumption: for all input (K, V), K's are unique.
* This means that there will not etries like (A, 5) and (A, 8).
public static void main(String[] args) throws Exception {
// STEP-1: handle input parameters
if (args.length < 1) {
System.err.println("Usage: Top10 <input-file>");
System.exit(1);
}
String inputPath = args[0];
System.out.println("args[0]: <input-path>="+inputPath);
// STEP-2: create an instance of JavaSparkContext
JavaSparkContext ctx = SparkUtil.createJavaSparkContext();
// STEP-3: create an RDD for input
// input record format:
// <string-key><,><integer-value>,
JavaRDD<String> lines = ctx.textFile(inputPath, 1);
// STEP-4: create (K, V) pairs
// Note: the assumption is that all K's are unique
// PairFunction<T, K, V>
// T => Tuple2<K, V>
// input K V
JavaPairRDD<String,Integer> pairs = lines.mapToPair(new PairFunction<String, String, Integer>() {
public Tuple2<String,Integer> call(String s) {
String[] tokens = s.split(","); // cat7,234
return new Tuple2<String,Integer>(tokens[0], Integer.parseInt(tokens[1]));
}
});
// STEP-5: create a local top-10
JavaRDD<SortedMap<Integer, String>> partitions = pairs.mapPartitions(
new FlatMapFunction<Iterator<Tuple2<String,Integer>>, SortedMap<Integer, String>>() {
@Override
public Iterable<SortedMap<Integer, String>> call(Iterator<Tuple2<String,Integer>> iter) {
SortedMap<Integer, String> top10 = new TreeMap<Integer, String>();
while (iter.hasNext()) {
Tuple2<String,Integer> tuple = iter.next();
top10.put(tuple._2, tuple._1);
// keep only top N
if (top10.size() > 10) {
top10.remove(top10.firstKey());
}
}
return Collections.singletonList(top10);
}
});
// STEP-6: find a final top-10
SortedMap<Integer, String> finaltop10 = new TreeMap<Integer, String>();
List<SortedMap<Integer, String>> alltop10 = partitions.collect();
for (SortedMap<Integer, String> localtop10 : alltop10) {
//System.out.println(tuple._1 + ": " + tuple._2);
// weight/count = tuple._1
// catname/URL = tuple._2
for (Map.Entry<Integer, String> entry : localtop10.entrySet()) {
// System.out.println(entry.getKey() + "--" + entry.getValue());
finaltop10.put(entry.getKey(), entry.getValue());
// keep only top 10
if (finaltop10.size() > 10) {
finaltop10.remove(finaltop10.firstKey());
}
}
}
// STEP_7: emit final top-10
for (Map.Entry<Integer, String> entry : finaltop10.entrySet()) {
System.out.println(entry.getKey() + "--" + entry.getValue());
}
System.exit(0);
}
We assume that all input keys are not unique.
https://github.com/mahmoudparsian/data-algorithms-book/blob/master/src/main/java/org/dataalgorithms/chap03/spark/Top10NonUnique.java
* Assumption: for all input (K, V), K's are non-unique.
* This class implements Top-N design pattern for N > 0.
* The main assumption is that for all input (K, V)'s, K's
* are non-unique. It means that you will find entries like
* (A, 2), ..., (A, 5),...
*
* This is a general top-N algorithm which will work unique
* and non-unique keys.
*
* This class may be used to find bottom-N as well (by
* just keeping N-smallest elements in the set.
*
* Top-10 Design Pattern: “Top Ten” Structure
*
* 1. map(input) => (K, V)
*
* 2. reduce(K, List<V1, V2, ..., Vn>) => (K, V),
* where V = V1+V2+...+Vn
* now all K's are unique
*
* 3. partition (K,V)'s into P partitions
*
* 4. Find top-N for each partition (we call this a local Top-N)
*
* 5. Find Top-N from all local Top-N's
public static void main(String[] args) throws Exception {
System.out.println("args[0]: <input-path>="+args[0]);
System.out.println("args[1]: <topN>="+args[1]);
final int N = Integer.parseInt(args[1]);
// STEP-2: create a Java Spark Context object
JavaSparkContext ctx = SparkUtil.createJavaSparkContext();
// STEP-3: broadcast the topN to all cluster nodes
final Broadcast<Integer> topN = ctx.broadcast(N);
// now topN is available to be read from all cluster nodes
// STEP-4: create an RDD from input
// input record format:
// <string-key><,><integer-value-count>
JavaRDD<String> lines = ctx.textFile(args[0], 1);
// STEP-5: partition RDD
// public JavaRDD<T> coalesce(int numPartitions)
// Return a new RDD that is reduced into numPartitions partitions.
JavaRDD<String> rdd = lines.coalesce(9);
// STEP-6: map input(T) into (K,V) pair
// PairFunction<T, K, V>
// T => Tuple2<K, V>
JavaPairRDD<String,Integer> kv = rdd.mapToPair(new PairFunction<String,String,Integer>() {
public Tuple2<String,Integer> call(String s) {
String[] tokens = s.split(","); // url,789
return new Tuple2<String,Integer>(tokens[0], Integer.parseInt(tokens[1]));
}
});
// STEP-7: reduce frequent K's
JavaPairRDD<String, Integer> uniqueKeys = kv.reduceByKey(new Function2<Integer, Integer, Integer>() {
public Integer call(Integer i1, Integer i2) {
return i1 + i2;
}
});
// STEP-8: create a local top-N
JavaRDD<SortedMap<Integer, String>> partitions = uniqueKeys.mapPartitions(
new FlatMapFunction<Iterator<Tuple2<String,Integer>>, SortedMap<Integer, String>>() {
@Override
public Iterable<SortedMap<Integer, String>> call(Iterator<Tuple2<String,Integer>> iter) {
final int N = topN.value();
SortedMap<Integer, String> localTopN = new TreeMap<Integer, String>();
while (iter.hasNext()) {
Tuple2<String,Integer> tuple = iter.next();
localTopN.put(tuple._2, tuple._1);
// keep only top N
if (localTopN.size() > N) {
localTopN.remove(localTopN.firstKey());
}
}
return Collections.singletonList(localTopN);
}
});
// STEP-9: find a final top-N
SortedMap<Integer, String> finalTopN = new TreeMap<Integer, String>();
List<SortedMap<Integer, String>> allTopN = partitions.collect();
for (SortedMap<Integer, String> localTopN : allTopN) {
for (Map.Entry<Integer, String> entry : localTopN.entrySet()) {
// count = entry.getKey()
// url = entry.getValue()
finalTopN.put(entry.getKey(), entry.getValue());
// keep only top N
if (finalTopN.size() > N) {
finalTopN.remove(finalTopN.firstKey());
}
}
}
// STEP-10: emit final top-N
for (Map.Entry<Integer, String> entry : finalTopN.entrySet()) {
System.out.println(entry.getKey() + "--" + entry.getValue());
}
System.exit(0);
}
Spark RDD: it represents an immutable, partitioned collection of elements that you can operate on in parallel.
Parameterizing Top N
By using the Broadcast object. Broadcast allows us to make N available as globally shared data so we’ll be able to access it from any cluster node.
final Broadcast<Integer> broadcastTopN = context.broadcast(topN);
final int topN = broadcastTopN.value();
After the data structure, broadcastT, is broadcasted, it may be read from any cluster node within mappers, reducers, and transformers.
final Broadcast<String> broadcastDirection = context.broadcast(direction);
5 if (finalN.size() > N) {
6 if (direction.equals("top")) {
7 // remove element with the smallest frequency
8 finalN.remove(finalN.firstKey());
9 }
10 else {
11 // direction.equals("bottom")
12 // remove element with the largest frequency
13 finalN.remove(finalN.lastKey());
14 }
15 }
The general rule of thumb is to use (2 × num_executors × cores_per_executor) per executor.
Hadoop:
All Unique:
https://github.com/mahmoudparsian/data-algorithms-book/blob/master/src/main/java/org/dataalgorithms/chap03/mapreduce/TopNMapper.java
Each mapper accepts a partition of cats. After the mapper finishes creating a top 10 list as a SortedMap<Double, Text>, the cleanup() method emits that list. Note that we use a single key, retrieved by calling NullWritable.get(), which guarantees that all the mappers’ output will be consumed by a single reducer.
public class TopNMapper extends
Mapper<Text, IntWritable, NullWritable, Text> {
private int N = 10; // default
private SortedMap<Integer, String> top = new TreeMap<Integer, String>();
@Override
public void map(Text key, IntWritable value, Context context)
throws IOException, InterruptedException {
String keyAsString = key.toString();
int frequency = value.get();
String compositeValue = keyAsString + "," + frequency;
top.put(frequency, compositeValue);
// keep only top N
if (top.size() > N) {
top.remove(top.firstKey());
}
}
@Override
protected void setup(Context context) throws IOException,
InterruptedException {
this.N = context.getConfiguration().getInt("N", 10); // default is top 10
}
@Override
protected void cleanup(Context context) throws IOException,
InterruptedException {
for (String str : top.values()) {
context.write(NullWritable.get(), new Text(str));
}
}
}
https://github.com/mahmoudparsian/data-algorithms-book/blob/master/src/main/java/org/dataalgorithms/chap03/mapreduce/TopNReducer.java
public class TopNReducer extends
Reducer<NullWritable, Text, IntWritable, Text> {
private int N = 10; // default
private SortedMap<Integer, String> top = new TreeMap<Integer, String>();
@Override
public void reduce(NullWritable key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
for (Text value : values) {
String valueAsString = value.toString().trim();
String[] tokens = valueAsString.split(",");
String url = tokens[0];
int frequency = Integer.parseInt(tokens[1]);
top.put(frequency, url);
// keep only top N
if (top.size() > N) {
top.remove(top.firstKey());
}
}
// emit final top N
List<Integer> keys = new ArrayList<Integer>(top.keySet());
for(int i=keys.size()-1; i>=0; i--){
context.write(new IntWritable(keys.get(i)), new Text(top.get(keys.get(i))));
}
}
protected void setup(Context context)
throws IOException, InterruptedException {
this.N = context.getConfiguration().getInt("N", 10); // default is top 10
}
}
Nonunique Keys
#
# Generate unique (K,V) pairs
#
INPUT=/kv/input
OUTPUT=/kv/output
hadoop fs -rmr $OUTPUT
AGGREGATOR=org.dataalgorithms.chap03.mapreduce.AggregateByKeyDriver
hadoop jar $APP_JAR $AGGREGATOR $INPUT $OUTPUT
#
# Find Top N
#
N=5
TopN_INPUT=/kv/output
TopN_OUTPUT=/kv/final
hadoop fs -rmr $TopN_OUTPUT
TopN=org.dataalgorithms.chap03.mapreduce.TopNDriver
hadoop jar $APP_JAR $TopN $N $TopN_INPUT $TopN_OUTPUT
Phase 1: Convert nonunique keys into unique keys
https://github.com/mahmoudparsian/data-algorithms-book/blob/master/src/main/java/org/dataalgorithms/chap03/mapreduce/AggregateByKeyMapper.java
public class AggregateByKeyMapper extends
Mapper<Object, Text, Text, IntWritable> {
// reuse objects
private Text K2 = new Text();
private IntWritable V2 = new IntWritable();
@Override
public void map(Object key, Text value, Context context)
throws IOException, InterruptedException {
String valueAsString = value.toString().trim();
String[] tokens = valueAsString.split(",");
if (tokens.length != 2) {
return;
}
String url = tokens[0];
int frequency = Integer.parseInt(tokens[1]);
K2.set(url);
V2.set(frequency);
context.write(K2, V2);
}
}
public class AggregateByKeyReducer extends
Reducer<Text, IntWritable, Text, IntWritable> {
@Override
public void reduce(Text key, Iterable<IntWritable> values, Context context)
throws IOException, InterruptedException {
int sum = 0;
for (IntWritable value : values) {
sum += value.get();
}
context.write(key, new IntWritable(sum));
}
}
For example, if key-as-string is a URL and value-as-integer is the number of times that URL is visited, then you might ask: what are the top 10 URLs for last week?
SELECT cat_id, cat_name, cat_weight
FROM cats
ORDER BY cat_weight DESC LIMIT 10;
Spark:
We assume that all input keys are unique. That is, for a given input set {(K, V)}, all Ks are unique.
https://github.com/mahmoudparsian/data-algorithms-book/blob/master/src/main/java/org/dataalgorithms/chap03/spark/Top10.java
* Assumption: for all input (K, V), K's are unique.
* This means that there will not etries like (A, 5) and (A, 8).
public static void main(String[] args) throws Exception {
// STEP-1: handle input parameters
if (args.length < 1) {
System.err.println("Usage: Top10 <input-file>");
System.exit(1);
}
String inputPath = args[0];
System.out.println("args[0]: <input-path>="+inputPath);
// STEP-2: create an instance of JavaSparkContext
JavaSparkContext ctx = SparkUtil.createJavaSparkContext();
// STEP-3: create an RDD for input
// input record format:
// <string-key><,><integer-value>,
JavaRDD<String> lines = ctx.textFile(inputPath, 1);
// STEP-4: create (K, V) pairs
// Note: the assumption is that all K's are unique
// PairFunction<T, K, V>
// T => Tuple2<K, V>
// input K V
JavaPairRDD<String,Integer> pairs = lines.mapToPair(new PairFunction<String, String, Integer>() {
public Tuple2<String,Integer> call(String s) {
String[] tokens = s.split(","); // cat7,234
return new Tuple2<String,Integer>(tokens[0], Integer.parseInt(tokens[1]));
}
});
// STEP-5: create a local top-10
JavaRDD<SortedMap<Integer, String>> partitions = pairs.mapPartitions(
new FlatMapFunction<Iterator<Tuple2<String,Integer>>, SortedMap<Integer, String>>() {
@Override
public Iterable<SortedMap<Integer, String>> call(Iterator<Tuple2<String,Integer>> iter) {
SortedMap<Integer, String> top10 = new TreeMap<Integer, String>();
while (iter.hasNext()) {
Tuple2<String,Integer> tuple = iter.next();
top10.put(tuple._2, tuple._1);
// keep only top N
if (top10.size() > 10) {
top10.remove(top10.firstKey());
}
}
return Collections.singletonList(top10);
}
});
// STEP-6: find a final top-10
SortedMap<Integer, String> finaltop10 = new TreeMap<Integer, String>();
List<SortedMap<Integer, String>> alltop10 = partitions.collect();
for (SortedMap<Integer, String> localtop10 : alltop10) {
//System.out.println(tuple._1 + ": " + tuple._2);
// weight/count = tuple._1
// catname/URL = tuple._2
for (Map.Entry<Integer, String> entry : localtop10.entrySet()) {
// System.out.println(entry.getKey() + "--" + entry.getValue());
finaltop10.put(entry.getKey(), entry.getValue());
// keep only top 10
if (finaltop10.size() > 10) {
finaltop10.remove(finaltop10.firstKey());
}
}
}
// STEP_7: emit final top-10
for (Map.Entry<Integer, String> entry : finaltop10.entrySet()) {
System.out.println(entry.getKey() + "--" + entry.getValue());
}
System.exit(0);
}
We assume that all input keys are not unique.
https://github.com/mahmoudparsian/data-algorithms-book/blob/master/src/main/java/org/dataalgorithms/chap03/spark/Top10NonUnique.java
* Assumption: for all input (K, V), K's are non-unique.
* This class implements Top-N design pattern for N > 0.
* The main assumption is that for all input (K, V)'s, K's
* are non-unique. It means that you will find entries like
* (A, 2), ..., (A, 5),...
*
* This is a general top-N algorithm which will work unique
* and non-unique keys.
*
* This class may be used to find bottom-N as well (by
* just keeping N-smallest elements in the set.
*
* Top-10 Design Pattern: “Top Ten” Structure
*
* 1. map(input) => (K, V)
*
* 2. reduce(K, List<V1, V2, ..., Vn>) => (K, V),
* where V = V1+V2+...+Vn
* now all K's are unique
*
* 3. partition (K,V)'s into P partitions
*
* 4. Find top-N for each partition (we call this a local Top-N)
*
* 5. Find Top-N from all local Top-N's
public static void main(String[] args) throws Exception {
System.out.println("args[0]: <input-path>="+args[0]);
System.out.println("args[1]: <topN>="+args[1]);
final int N = Integer.parseInt(args[1]);
// STEP-2: create a Java Spark Context object
JavaSparkContext ctx = SparkUtil.createJavaSparkContext();
// STEP-3: broadcast the topN to all cluster nodes
final Broadcast<Integer> topN = ctx.broadcast(N);
// now topN is available to be read from all cluster nodes
// STEP-4: create an RDD from input
// input record format:
// <string-key><,><integer-value-count>
JavaRDD<String> lines = ctx.textFile(args[0], 1);
// STEP-5: partition RDD
// public JavaRDD<T> coalesce(int numPartitions)
// Return a new RDD that is reduced into numPartitions partitions.
JavaRDD<String> rdd = lines.coalesce(9);
// STEP-6: map input(T) into (K,V) pair
// PairFunction<T, K, V>
// T => Tuple2<K, V>
JavaPairRDD<String,Integer> kv = rdd.mapToPair(new PairFunction<String,String,Integer>() {
public Tuple2<String,Integer> call(String s) {
String[] tokens = s.split(","); // url,789
return new Tuple2<String,Integer>(tokens[0], Integer.parseInt(tokens[1]));
}
});
// STEP-7: reduce frequent K's
JavaPairRDD<String, Integer> uniqueKeys = kv.reduceByKey(new Function2<Integer, Integer, Integer>() {
public Integer call(Integer i1, Integer i2) {
return i1 + i2;
}
});
// STEP-8: create a local top-N
JavaRDD<SortedMap<Integer, String>> partitions = uniqueKeys.mapPartitions(
new FlatMapFunction<Iterator<Tuple2<String,Integer>>, SortedMap<Integer, String>>() {
@Override
public Iterable<SortedMap<Integer, String>> call(Iterator<Tuple2<String,Integer>> iter) {
final int N = topN.value();
SortedMap<Integer, String> localTopN = new TreeMap<Integer, String>();
while (iter.hasNext()) {
Tuple2<String,Integer> tuple = iter.next();
localTopN.put(tuple._2, tuple._1);
// keep only top N
if (localTopN.size() > N) {
localTopN.remove(localTopN.firstKey());
}
}
return Collections.singletonList(localTopN);
}
});
// STEP-9: find a final top-N
SortedMap<Integer, String> finalTopN = new TreeMap<Integer, String>();
List<SortedMap<Integer, String>> allTopN = partitions.collect();
for (SortedMap<Integer, String> localTopN : allTopN) {
for (Map.Entry<Integer, String> entry : localTopN.entrySet()) {
// count = entry.getKey()
// url = entry.getValue()
finalTopN.put(entry.getKey(), entry.getValue());
// keep only top N
if (finalTopN.size() > N) {
finalTopN.remove(finalTopN.firstKey());
}
}
}
// STEP-10: emit final top-N
for (Map.Entry<Integer, String> entry : finalTopN.entrySet()) {
System.out.println(entry.getKey() + "--" + entry.getValue());
}
System.exit(0);
}
we use Spark’s powerful sorting function takeOrdered().
JavaRDD<String> lines = ctx.textFile(inputPath, 1);
// STEP-4: partition RDD
// public JavaRDD<T> coalesce(int numPartitions)
// Return a new RDD that is reduced into numPartitions partitions.
JavaRDD<String> rdd = lines.coalesce(9);
// STEP-5: map input(T) into (K,V) pair
// PairFunction<T, K, V>
// T => Tuple2<K, V>
JavaPairRDD<String,Integer> kv = rdd.mapToPair(new PairFunction<String,String,Integer>() {
public Tuple2<String,Integer> call(String s) {
String[] tokens = s.split(","); // url,789
return new Tuple2<String,Integer>(tokens[0], Integer.parseInt(tokens[1]));
}
});
// STEP-6: reduce frequent K's
JavaPairRDD<String, Integer> uniqueKeys = kv.reduceByKey(new Function2<Integer, Integer, Integer>() {
public Integer call(Integer i1, Integer i2) {
return i1 + i2;
}
});
// STEP-7: find final top-N by calling takeOrdered()
List<Tuple2<String, Integer>> topNResult = uniqueKeys.takeOrdered(N, MyTupleComparator.INSTANCE);
static class MyTupleComparator implements Comparator<Tuple2<String, Integer>> ,Serializable {
final static MyTupleComparator INSTANCE = new MyTupleComparator();
public int compare(Tuple2<String, Integer> t1, Tuple2<String, Integer> t2) {
return -t1._2.compareTo(t2._2); // sorts RDD elements descending (use for Top-N)
// return t1._2.compareTo(t2._2); // sorts RDD elements ascending (use for Bottom-N)
}
}
Spark RDD: it represents an immutable, partitioned collection of elements that you can operate on in parallel.
Parameterizing Top N
By using the Broadcast object. Broadcast allows us to make N available as globally shared data so we’ll be able to access it from any cluster node.
final Broadcast<Integer> broadcastTopN = context.broadcast(topN);
final int topN = broadcastTopN.value();
After the data structure, broadcastT, is broadcasted, it may be read from any cluster node within mappers, reducers, and transformers.
final Broadcast<String> broadcastDirection = context.broadcast(direction);
5 if (finalN.size() > N) {
6 if (direction.equals("top")) {
7 // remove element with the smallest frequency
8 finalN.remove(finalN.firstKey());
9 }
10 else {
11 // direction.equals("bottom")
12 // remove element with the largest frequency
13 finalN.remove(finalN.lastKey());
14 }
15 }
The general rule of thumb is to use (2 × num_executors × cores_per_executor) per executor.
Hadoop:
All Unique:
https://github.com/mahmoudparsian/data-algorithms-book/blob/master/src/main/java/org/dataalgorithms/chap03/mapreduce/TopNMapper.java
Each mapper accepts a partition of cats. After the mapper finishes creating a top 10 list as a SortedMap<Double, Text>, the cleanup() method emits that list. Note that we use a single key, retrieved by calling NullWritable.get(), which guarantees that all the mappers’ output will be consumed by a single reducer.
public class TopNMapper extends
Mapper<Text, IntWritable, NullWritable, Text> {
private int N = 10; // default
private SortedMap<Integer, String> top = new TreeMap<Integer, String>();
@Override
public void map(Text key, IntWritable value, Context context)
throws IOException, InterruptedException {
String keyAsString = key.toString();
int frequency = value.get();
String compositeValue = keyAsString + "," + frequency;
top.put(frequency, compositeValue);
// keep only top N
if (top.size() > N) {
top.remove(top.firstKey());
}
}
@Override
protected void setup(Context context) throws IOException,
InterruptedException {
this.N = context.getConfiguration().getInt("N", 10); // default is top 10
}
@Override
protected void cleanup(Context context) throws IOException,
InterruptedException {
for (String str : top.values()) {
context.write(NullWritable.get(), new Text(str));
}
}
}
https://github.com/mahmoudparsian/data-algorithms-book/blob/master/src/main/java/org/dataalgorithms/chap03/mapreduce/TopNReducer.java
public class TopNReducer extends
Reducer<NullWritable, Text, IntWritable, Text> {
private int N = 10; // default
private SortedMap<Integer, String> top = new TreeMap<Integer, String>();
@Override
public void reduce(NullWritable key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
for (Text value : values) {
String valueAsString = value.toString().trim();
String[] tokens = valueAsString.split(",");
String url = tokens[0];
int frequency = Integer.parseInt(tokens[1]);
top.put(frequency, url);
// keep only top N
if (top.size() > N) {
top.remove(top.firstKey());
}
}
// emit final top N
List<Integer> keys = new ArrayList<Integer>(top.keySet());
for(int i=keys.size()-1; i>=0; i--){
context.write(new IntWritable(keys.get(i)), new Text(top.get(keys.get(i))));
}
}
protected void setup(Context context)
throws IOException, InterruptedException {
this.N = context.getConfiguration().getInt("N", 10); // default is top 10
}
}
Nonunique Keys
#
# Generate unique (K,V) pairs
#
INPUT=/kv/input
OUTPUT=/kv/output
hadoop fs -rmr $OUTPUT
AGGREGATOR=org.dataalgorithms.chap03.mapreduce.AggregateByKeyDriver
hadoop jar $APP_JAR $AGGREGATOR $INPUT $OUTPUT
#
# Find Top N
#
N=5
TopN_INPUT=/kv/output
TopN_OUTPUT=/kv/final
hadoop fs -rmr $TopN_OUTPUT
TopN=org.dataalgorithms.chap03.mapreduce.TopNDriver
hadoop jar $APP_JAR $TopN $N $TopN_INPUT $TopN_OUTPUT
Phase 1: Convert nonunique keys into unique keys
https://github.com/mahmoudparsian/data-algorithms-book/blob/master/src/main/java/org/dataalgorithms/chap03/mapreduce/AggregateByKeyMapper.java
public class AggregateByKeyMapper extends
Mapper<Object, Text, Text, IntWritable> {
// reuse objects
private Text K2 = new Text();
private IntWritable V2 = new IntWritable();
@Override
public void map(Object key, Text value, Context context)
throws IOException, InterruptedException {
String valueAsString = value.toString().trim();
String[] tokens = valueAsString.split(",");
if (tokens.length != 2) {
return;
}
String url = tokens[0];
int frequency = Integer.parseInt(tokens[1]);
K2.set(url);
V2.set(frequency);
context.write(K2, V2);
}
}
public class AggregateByKeyReducer extends
Reducer<Text, IntWritable, Text, IntWritable> {
@Override
public void reduce(Text key, Iterable<IntWritable> values, Context context)
throws IOException, InterruptedException {
int sum = 0;
for (IntWritable value : values) {
sum += value.get();
}
context.write(key, new IntWritable(sum));
}
}