Data Algorithms: Recipes for Scaling Up with Hadoop and Spark
Input: <person><,><friend1 ><friend2 >...<friendN>
map(100, (200 300 400 500 600)) will generate:
([100, 200], [200 300 400 500 600])
([100, 300], [200 300 400 500 600])
([100, 400], [200 300 400 500 600])
([100, 500], [200 300 400 500 600])
([100, 600], [200 300 400 500 600])
Before these key-value pairs are sent to the reducers, they are grouped by keys:
([100, 200], [200 300 400 500 600])
([100, 200], [100 300 400])
=> ([100, 200], ([200 300 400 500 600], [100 300 400]))
So, the reducers will receive the following set of key-value pairs:
([100, 200], ([200 300 400 500 600], [100 300 400]))
Finally, the reducers will generate:
([100, 200], [300, 400])
https://github.com/mahmoudparsian/data-algorithms-book
Hadoop Implementation Using Text
* map(key, value) {
* // key is the key generated by MapReduce/Hadoop
* // value is one line of input
* Let (<person> (<friend_1> <friend_2> ... <friend_N>)) = parse(line);
* reducerValue = (<friend_1> <friend_2> ... <friend_N>);
* foreach friend in (<friend_1> <friend_2> ... <friend_N>) {
* reducerKey = buildSortedKey(person, friend);
* emit(reducerKey, reducerValue);
* }
* }
public class CommonFriendsMapper
extends Mapper<LongWritable, Text, Text, Text> {
private static Text reducerKey = new Text();
private static Text reducerValue = new Text();
static String getFriends(String[] tokens) {
if (tokens.length == 2) {
return "";
}
StringBuilder builder = new StringBuilder();
for (int i=1; i < tokens.length; i++) {
builder.append(tokens[i]);
if (i < (tokens.length -1)) {
builder.append(",");
}
}
return builder.toString();
}
static String buildSortedKey(String person, String friend) {
long p = Long.parseLong(person);
long f = Long.parseLong(friend);
if (p < f) {
return person + "," + friend;
}
else {
return friend + "," + person;
}
}
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
// parse input, delimiter is a single space
String[] tokens = StringUtils.split(value.toString(), " ");
// create reducer value
String friends = getFriends(tokens);
reducerValue.set(friends);
String person = tokens[0];
for (int i=1; i < tokens.length; i++) {
String friend = tokens[i];
String reducerKeyAsString = buildSortedKey(person, friend);
reducerKey.set(reducerKeyAsString);
context.write(reducerKey, reducerValue);
}
}
}
https://github.com/mahmoudparsian/data-algorithms-book/blob/master/src/main/java/org/dataalgorithms/chap08/mapreduce/CommonFriendsReducer.java
public class CommonFriendsReducer extends Reducer<Text, Text, Text, Text> {
* The goal is to find common friends by intersecting
* all lists defined in values parameter.
*
* @param key is a pair: <user_id_1><,><user_id_2>
* @param values is a list of { <friend_1><,>...<,><friend_n> }
*/
public void reduce(Text key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
Map<String, Integer> map = new HashMap<String, Integer>();
Iterator<Text> iterator = values.iterator();
int numOfValues = 0;
while (iterator.hasNext()) {
String friends = iterator.next().toString();
if (friends.equals("")) {
context.write(key, new Text("[]"));
return;
}
addFriends(map, friends);
numOfValues++;
}
// now iterate the map to see how many have numOfValues
List<String> commonFriends = new ArrayList<String>();
for (Map.Entry<String, Integer> entry : map.entrySet()){
//System.out.println(entry.getKey() + "/" + entry.getValue());
if (entry.getValue() == numOfValues) {
commonFriends.add(entry.getKey());
}
}
// sen it to output
context.write(key, new Text(commonFriends.toString()));
}
static void addFriends(Map<String, Integer> map, String friendsList) {
String[] friends = StringUtils.split(friendsList, ",");
for (String friend : friends) {
Integer count = map.get(friend);
if (count == null) {
map.put(friend, 1);
}
else {
map.put(friend, ++count);
}
}
}
}
Hadoop Implementation Using ArrayListOfLongsWritable
http://grepcode.com/file_/repo1.maven.org/maven2/edu.umd/cloud9/1.4.12/edu/umd/cloud9/io/array/ArrayListOfLongsWritable.java/?v=source
public class CommonFriendsMapperUsingList
extends Mapper<LongWritable, Text, Text, ArrayListOfLongsWritable> {
private static Text reducerKey = new Text();
static ArrayListOfLongsWritable getFriends(String[] tokens) {
if (tokens.length == 2) {
return new ArrayListOfLongsWritable();
}
ArrayListOfLongsWritable list = new ArrayListOfLongsWritable();
for (int i=1; i < tokens.length; i++) {
list.add(Long.parseLong(tokens[i]));
}
return list;
}
static String buildSortedKey(String person, String friend) {
long p = Long.parseLong(person);
long f = Long.parseLong(friend);
if (p < f) {
return person + "," + friend;
}
else {
return friend + "," + person;
}
}
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
// parse input, delimiter is a single space
String[] tokens = StringUtils.split(value.toString(), " ");
// create reducer value
ArrayListOfLongsWritable friends = getFriends(tokens);
String person = tokens[0];
for (int i=1; i < tokens.length; i++) {
String friend = tokens[i];
String reducerKeyAsString = buildSortedKey(person, friend);
reducerKey.set(reducerKeyAsString);
context.write(reducerKey, friends);
}
}
}
Spark:
https://github.com/mahmoudparsian/data-algorithms-book/blob/master/src/main/java/org/dataalgorithms/chap08/spark/FindCommonFriends.java
public class FindCommonFriends {
public static void main(String[] args) throws Exception {
if (args.length < 1) {
// Spark master URL:
// format: spark://<spark-master-host-name>:7077
// example: spark://myserver00:7077
System.err.println("Usage: FindCommonFriends <input-file>");
System.exit(1);
}
//String sparkMasterURL = args[0];
//System.out.println("sparkMasterURL="+sparkMasterURL);
String hdfsInputFileName = args[0];
System.out.println("hdfsInputFileName="+hdfsInputFileName);
// create context object
JavaSparkContext ctx = SparkUtil.createJavaSparkContext("FindCommonFriends");
// create the first RDD from input file
JavaRDD<String> records = ctx.textFile(hdfsInputFileName, 1);
// PairFlatMapFunction<T, K, V>
// T => Iterable<Tuple2<K, V>>
JavaPairRDD<Tuple2<Long,Long>,Iterable<Long>> pairs =
// T K V
records.flatMapToPair(new PairFlatMapFunction<String, Tuple2<Long,Long>, Iterable<Long>>() {
public Iterable<Tuple2<Tuple2<Long,Long>,Iterable<Long>>> call(String s) {
String[] tokens = s.split(",");
long person = Long.parseLong(tokens[0]);
String friendsAsString = tokens[1];
String[] friendsTokenized = friendsAsString.split(" ");
if (friendsTokenized.length == 1) {
Tuple2<Long,Long> key = buildSortedTuple(person, Long.parseLong(friendsTokenized[0]));
return Arrays.asList(new Tuple2<Tuple2<Long,Long>,Iterable<Long>>(key, new ArrayList<Long>()));
}
List<Long> friends = new ArrayList<Long>();
for (String f : friendsTokenized) {
friends.add(Long.parseLong(f));
}
List<Tuple2<Tuple2<Long, Long> ,Iterable<Long>>> result =
new ArrayList<Tuple2<Tuple2<Long, Long> ,Iterable<Long>>>();
for (Long f : friends) {
Tuple2<Long,Long> key = buildSortedTuple(person, f);
result.add(new Tuple2<Tuple2<Long,Long>, Iterable<Long>>(key, friends));
}
return result;
}
});
JavaPairRDD<Tuple2<Long, Long>, Iterable<Iterable<Long>>> grouped = pairs.groupByKey();
// Find intersection of all List<List<Long>>
// mapValues[U](f: (V) => U): JavaPairRDD[K, U]
// Pass each value in the key-value pair RDD through a map function without changing the keys;
// this also retains the original RDD's partitioning.
JavaPairRDD<Tuple2<Long, Long>, Iterable<Long>> commonFriends =
grouped.mapValues(new Function< Iterable<Iterable<Long>>, // input
Iterable<Long> // output
>() {
public Iterable<Long> call(Iterable<Iterable<Long>> s) {
Map<Long, Integer> countCommon = new HashMap<Long, Integer>();
int size = 0;
for (Iterable<Long> iter : s) {
size++;
List<Long> list = iterableToList(iter);
if ((list == null) || (list.isEmpty())) {
continue;
}
for (Long f : list) {
Integer count = countCommon.get(f);
if (count == null) {
countCommon.put(f, 1);
}
else {
countCommon.put(f, ++count);
}
}
}
// if countCommon.Entry<f, count> == countCommon.Entry<f, s.size()>
// then that is a common friend
List<Long> finalCommonFriends = new ArrayList<Long>();
for (Map.Entry<Long, Integer> entry : countCommon.entrySet()){
if (entry.getValue() == size) {
finalCommonFriends.add(entry.getKey());
}
}
return finalCommonFriends;
}
});
// debug3
List<Tuple2<Tuple2<Long, Long>, Iterable<Long>>> debug3 = commonFriends.collect();
for (Tuple2<Tuple2<Long, Long>, Iterable<Long>> t2 : debug3) {
System.out.println("debug3 key="+t2._1+ "\t value="+t2._2);
}
System.exit(0);
}
static Tuple2<Long,Long> buildSortedTuple(long a, long b) {
if (a < b) {
return new Tuple2<Long, Long>(a,b);
}
else {
return new Tuple2<Long, Long>(b,a);
}
}
static List<Long> iterableToList(Iterable<Long> iterable) {
List<Long> list = new ArrayList<Long>();
for (Long item : iterable) {
list.add(item);
}
return list;
}
}
Input: <person><,><friend1 ><friend2 >...<friendN>
map(100, (200 300 400 500 600)) will generate:
([100, 200], [200 300 400 500 600])
([100, 300], [200 300 400 500 600])
([100, 400], [200 300 400 500 600])
([100, 500], [200 300 400 500 600])
([100, 600], [200 300 400 500 600])
Before these key-value pairs are sent to the reducers, they are grouped by keys:
([100, 200], [200 300 400 500 600])
([100, 200], [100 300 400])
=> ([100, 200], ([200 300 400 500 600], [100 300 400]))
So, the reducers will receive the following set of key-value pairs:
([100, 200], ([200 300 400 500 600], [100 300 400]))
Finally, the reducers will generate:
([100, 200], [300, 400])
Hadoop Implementation Using Text
* map(key, value) {
* // key is the key generated by MapReduce/Hadoop
* // value is one line of input
* Let (<person> (<friend_1> <friend_2> ... <friend_N>)) = parse(line);
* reducerValue = (<friend_1> <friend_2> ... <friend_N>);
* foreach friend in (<friend_1> <friend_2> ... <friend_N>) {
* reducerKey = buildSortedKey(person, friend);
* emit(reducerKey, reducerValue);
* }
* }
public class CommonFriendsMapper
extends Mapper<LongWritable, Text, Text, Text> {
private static Text reducerKey = new Text();
private static Text reducerValue = new Text();
static String getFriends(String[] tokens) {
if (tokens.length == 2) {
return "";
}
StringBuilder builder = new StringBuilder();
for (int i=1; i < tokens.length; i++) {
builder.append(tokens[i]);
if (i < (tokens.length -1)) {
builder.append(",");
}
}
return builder.toString();
}
static String buildSortedKey(String person, String friend) {
long p = Long.parseLong(person);
long f = Long.parseLong(friend);
if (p < f) {
return person + "," + friend;
}
else {
return friend + "," + person;
}
}
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
// parse input, delimiter is a single space
String[] tokens = StringUtils.split(value.toString(), " ");
// create reducer value
String friends = getFriends(tokens);
reducerValue.set(friends);
String person = tokens[0];
for (int i=1; i < tokens.length; i++) {
String friend = tokens[i];
String reducerKeyAsString = buildSortedKey(person, friend);
reducerKey.set(reducerKeyAsString);
context.write(reducerKey, reducerValue);
}
}
}
https://github.com/mahmoudparsian/data-algorithms-book/blob/master/src/main/java/org/dataalgorithms/chap08/mapreduce/CommonFriendsReducer.java
public class CommonFriendsReducer extends Reducer<Text, Text, Text, Text> {
* The goal is to find common friends by intersecting
* all lists defined in values parameter.
*
* @param key is a pair: <user_id_1><,><user_id_2>
* @param values is a list of { <friend_1><,>...<,><friend_n> }
*/
public void reduce(Text key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
Map<String, Integer> map = new HashMap<String, Integer>();
Iterator<Text> iterator = values.iterator();
int numOfValues = 0;
while (iterator.hasNext()) {
String friends = iterator.next().toString();
if (friends.equals("")) {
context.write(key, new Text("[]"));
return;
}
addFriends(map, friends);
numOfValues++;
}
// now iterate the map to see how many have numOfValues
List<String> commonFriends = new ArrayList<String>();
for (Map.Entry<String, Integer> entry : map.entrySet()){
//System.out.println(entry.getKey() + "/" + entry.getValue());
if (entry.getValue() == numOfValues) {
commonFriends.add(entry.getKey());
}
}
// sen it to output
context.write(key, new Text(commonFriends.toString()));
}
static void addFriends(Map<String, Integer> map, String friendsList) {
String[] friends = StringUtils.split(friendsList, ",");
for (String friend : friends) {
Integer count = map.get(friend);
if (count == null) {
map.put(friend, 1);
}
else {
map.put(friend, ++count);
}
}
}
}
Hadoop Implementation Using ArrayListOfLongsWritable
http://grepcode.com/file_/repo1.maven.org/maven2/edu.umd/cloud9/1.4.12/edu/umd/cloud9/io/array/ArrayListOfLongsWritable.java/?v=source
public class CommonFriendsMapperUsingList
extends Mapper<LongWritable, Text, Text, ArrayListOfLongsWritable> {
private static Text reducerKey = new Text();
static ArrayListOfLongsWritable getFriends(String[] tokens) {
if (tokens.length == 2) {
return new ArrayListOfLongsWritable();
}
ArrayListOfLongsWritable list = new ArrayListOfLongsWritable();
for (int i=1; i < tokens.length; i++) {
list.add(Long.parseLong(tokens[i]));
}
return list;
}
static String buildSortedKey(String person, String friend) {
long p = Long.parseLong(person);
long f = Long.parseLong(friend);
if (p < f) {
return person + "," + friend;
}
else {
return friend + "," + person;
}
}
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
// parse input, delimiter is a single space
String[] tokens = StringUtils.split(value.toString(), " ");
// create reducer value
ArrayListOfLongsWritable friends = getFriends(tokens);
String person = tokens[0];
for (int i=1; i < tokens.length; i++) {
String friend = tokens[i];
String reducerKeyAsString = buildSortedKey(person, friend);
reducerKey.set(reducerKeyAsString);
context.write(reducerKey, friends);
}
}
}
Spark:
https://github.com/mahmoudparsian/data-algorithms-book/blob/master/src/main/java/org/dataalgorithms/chap08/spark/FindCommonFriends.java
public class FindCommonFriends {
public static void main(String[] args) throws Exception {
if (args.length < 1) {
// Spark master URL:
// format: spark://<spark-master-host-name>:7077
// example: spark://myserver00:7077
System.err.println("Usage: FindCommonFriends <input-file>");
System.exit(1);
}
//String sparkMasterURL = args[0];
//System.out.println("sparkMasterURL="+sparkMasterURL);
String hdfsInputFileName = args[0];
System.out.println("hdfsInputFileName="+hdfsInputFileName);
// create context object
JavaSparkContext ctx = SparkUtil.createJavaSparkContext("FindCommonFriends");
// create the first RDD from input file
JavaRDD<String> records = ctx.textFile(hdfsInputFileName, 1);
// PairFlatMapFunction<T, K, V>
// T => Iterable<Tuple2<K, V>>
JavaPairRDD<Tuple2<Long,Long>,Iterable<Long>> pairs =
// T K V
records.flatMapToPair(new PairFlatMapFunction<String, Tuple2<Long,Long>, Iterable<Long>>() {
public Iterable<Tuple2<Tuple2<Long,Long>,Iterable<Long>>> call(String s) {
String[] tokens = s.split(",");
long person = Long.parseLong(tokens[0]);
String friendsAsString = tokens[1];
String[] friendsTokenized = friendsAsString.split(" ");
if (friendsTokenized.length == 1) {
Tuple2<Long,Long> key = buildSortedTuple(person, Long.parseLong(friendsTokenized[0]));
return Arrays.asList(new Tuple2<Tuple2<Long,Long>,Iterable<Long>>(key, new ArrayList<Long>()));
}
List<Long> friends = new ArrayList<Long>();
for (String f : friendsTokenized) {
friends.add(Long.parseLong(f));
}
List<Tuple2<Tuple2<Long, Long> ,Iterable<Long>>> result =
new ArrayList<Tuple2<Tuple2<Long, Long> ,Iterable<Long>>>();
for (Long f : friends) {
Tuple2<Long,Long> key = buildSortedTuple(person, f);
result.add(new Tuple2<Tuple2<Long,Long>, Iterable<Long>>(key, friends));
}
return result;
}
});
JavaPairRDD<Tuple2<Long, Long>, Iterable<Iterable<Long>>> grouped = pairs.groupByKey();
// Find intersection of all List<List<Long>>
// mapValues[U](f: (V) => U): JavaPairRDD[K, U]
// Pass each value in the key-value pair RDD through a map function without changing the keys;
// this also retains the original RDD's partitioning.
JavaPairRDD<Tuple2<Long, Long>, Iterable<Long>> commonFriends =
grouped.mapValues(new Function< Iterable<Iterable<Long>>, // input
Iterable<Long> // output
>() {
public Iterable<Long> call(Iterable<Iterable<Long>> s) {
Map<Long, Integer> countCommon = new HashMap<Long, Integer>();
int size = 0;
for (Iterable<Long> iter : s) {
size++;
List<Long> list = iterableToList(iter);
if ((list == null) || (list.isEmpty())) {
continue;
}
for (Long f : list) {
Integer count = countCommon.get(f);
if (count == null) {
countCommon.put(f, 1);
}
else {
countCommon.put(f, ++count);
}
}
}
// if countCommon.Entry<f, count> == countCommon.Entry<f, s.size()>
// then that is a common friend
List<Long> finalCommonFriends = new ArrayList<Long>();
for (Map.Entry<Long, Integer> entry : countCommon.entrySet()){
if (entry.getValue() == size) {
finalCommonFriends.add(entry.getKey());
}
}
return finalCommonFriends;
}
});
// debug3
List<Tuple2<Tuple2<Long, Long>, Iterable<Long>>> debug3 = commonFriends.collect();
for (Tuple2<Tuple2<Long, Long>, Iterable<Long>> t2 : debug3) {
System.out.println("debug3 key="+t2._1+ "\t value="+t2._2);
}
System.exit(0);
}
static Tuple2<Long,Long> buildSortedTuple(long a, long b) {
if (a < b) {
return new Tuple2<Long, Long>(a,b);
}
else {
return new Tuple2<Long, Long>(b,a);
}
}
static List<Long> iterableToList(Iterable<Long> iterable) {
List<Long> list = new ArrayList<Long>();
for (Long item : iterable) {
list.add(item);
}
return list;
}
}
The map and flatMap methods have a similar purpose, but map is 1 to 1, while flatMap is 1 to 0-N (outputting 0 is similar to a filter, except of course it could be outputting a different type).