https://github.com/mahmoudparsian/data-algorithms-book/blob/master/src/main/java/org/dataalgorithms/chap07/mapreduce/MBAMapper.java
public class MBAMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
public static final Logger THE_LOGGER = Logger.getLogger(MBAMapper.class);
public static final int DEFAULT_NUMBER_OF_PAIRS = 2;
//output key2: list of items paired; can be 2 or 3 ...
private static final Text reducerKey = new Text();
//output value2: number of the paired items in the item list
private static final IntWritable NUMBER_ONE = new IntWritable(1);
int numberOfPairs; // will be read by setup(), set by driver
@Override
protected void setup(Context context) throws IOException, InterruptedException {
this.numberOfPairs = context.getConfiguration().getInt("number.of.pairs", DEFAULT_NUMBER_OF_PAIRS);
THE_LOGGER.info("setup() numberOfPairs = " + numberOfPairs);
}
@Override
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
// input line
String line = value.toString();
List<String> items = convertItemsToList(line);
if ((items == null) || (items.isEmpty())) {
// no mapper output will be generated
return;
}
generateMapperOutput(numberOfPairs, items, context);
}
private static List<String> convertItemsToList(String line) {
if ((line == null) || (line.length() == 0)) {
// no mapper output will be generated
return null;
}
String[] tokens = StringUtils.split(line, ",");
if ( (tokens == null) || (tokens.length == 0) ) {
return null;
}
List<String> items = new ArrayList<String>();
for (String token : tokens) {
if (token != null) {
items.add(token.trim());
}
}
return items;
}
/**
*
* build <key, value> by sorting the input list
* If not sort the input, it may have duplicated list but not considered as a same list.
* ex: (a, b, c) and (a, c, b) might become different items to be counted if not sorted
* @param numberOfPairs number of pairs associated
* @param items list of items (from input line)
* @param context Hadoop Job context
* @throws IOException
* @throws InterruptedException
*/
private void generateMapperOutput(int numberOfPairs, List<String> items, Context context)
throws IOException, InterruptedException {
List<List<String>> sortedCombinations = Combination.findSortedCombinations(items, numberOfPairs);
for (List<String> itemList: sortedCombinations) {
System.out.println("itemlist="+itemList.toString());
reducerKey.set(itemList.toString());
context.write(reducerKey, NUMBER_ONE);
}
}
}
https://github.com/mahmoudparsian/data-algorithms-book/blob/master/src/main/java/org/dataalgorithms/chap07/mapreduce/MBAReducer.java
public class MBAReducer extends Reducer<Text, IntWritable, Text, IntWritable>{
@Override
public void reduce(Text key, Iterable<IntWritable> values, Context context)
throws IOException, InterruptedException {
int sum = 0; // total items paired
for (IntWritable value : values) {
sum += value.get();
}
context.write(key, new IntWritable(sum));
}
}
https://github.com/mahmoudparsian/data-algorithms-book/blob/master/src/main/java/org/dataalgorithms/chap07/spark/FindAssociationRules.java
public class FindAssociationRules {
static List<String> toList(String transaction) {
String[] items = transaction.trim().split(",");
List<String> list = new ArrayList<String>();
for (String item : items) {
list.add(item);
}
return list;
}
static List<String> removeOneItem(List<String> list, int i) {
if ( (list == null) || (list.isEmpty()) ) {
return list;
}
if ( (i < 0) || (i > (list.size()-1)) ) {
return list;
}
List<String> cloned = new ArrayList<String>(list);
cloned.remove(i);
return cloned;
}
public static void main(String[] args) throws Exception {
// STEP-1: handle input parameters
if (args.length < 1) {
System.err.println("Usage: FindAssociationRules <transactions>");
System.exit(1);
}
String transactionsFileName = args[0];
// STEP-2: create a Spark context object
JavaSparkContext ctx = new JavaSparkContext();
// STEP-3: read all transactions from HDFS and create the first RDD
JavaRDD<String> transactions = ctx.textFile(transactionsFileName, 1);
transactions.saveAsTextFile("/rules/output/1");
// STEP-4: generate frequent patterns
// PairFlatMapFunction<T, K, V>
// T => Iterable<Tuple2<K, V>>
JavaPairRDD<List<String>,Integer> patterns =
transactions.flatMapToPair(new PairFlatMapFunction<
String, // T
List<String>, // K
Integer // V
>() {
@Override
public Iterable<Tuple2<List<String>,Integer>> call(String transaction) {
List<String> list = toList(transaction);
List<List<String>> combinations = Combination.findSortedCombinations(list);
List<Tuple2<List<String>,Integer>> result = new ArrayList<Tuple2<List<String>,Integer>>();
for (List<String> combList : combinations) {
if (combList.size() > 0) {
result.add(new Tuple2<List<String>,Integer>(combList, 1));
}
}
return result;
}
});
// STEP-5: combine/reduce frequent patterns
JavaPairRDD<List<String>, Integer> combined = patterns.reduceByKey(new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer i1, Integer i2) {
return i1 + i2;
}
});
// now, we have: patterns(K,V)
// K = pattern as List<String>
// V = frequency of pattern
// now given (K,V) as (List<a,b,c>, 2) we will
// generate the following (K2,V2) pairs:
//
// (List<a,b,c>, T2(null, 2))
// (List<a,b>, T2(List<a,b,c>, 2))
// (List<a,c>, T2(List<a,b,c>, 2))
// (List<b,c>, T2(List<a,b,c>, 2))
// STEP-6: generate all sub-patterns
// PairFlatMapFunction<T, K, V>
// T => Iterable<Tuple2<K, V>>
JavaPairRDD<List<String>,Tuple2<List<String>,Integer>> subpatterns =
combined.flatMapToPair(new PairFlatMapFunction<
Tuple2<List<String>, Integer>, // T
List<String>, // K
Tuple2<List<String>,Integer> // V
>() {
@Override
public Iterable<Tuple2<List<String>,Tuple2<List<String>,Integer>>>
call(Tuple2<List<String>, Integer> pattern) {
List<Tuple2<List<String>,Tuple2<List<String>,Integer>>> result =
new ArrayList<Tuple2<List<String>,Tuple2<List<String>,Integer>>>();
List<String> list = pattern._1;
Integer frequency = pattern._2;
result.add(new Tuple2(list, new Tuple2(null,frequency)));
if (list.size() == 1) {
return result;
}
// pattern has more than one items
// result.add(new Tuple2(list, new Tuple2(null,size)));
for (int i=0; i < list.size(); i++) {
List<String> sublist = removeOneItem(list, i);
result.add(new Tuple2(sublist, new Tuple2(list, frequency)));
}
return result;
}
});
// STEP-6: combine sub-patterns
JavaPairRDD<List<String>,Iterable<Tuple2<List<String>,Integer>>> rules = subpatterns.groupByKey();
// STEP-7: generate association rules
// Now, use (K=List<String>, V=Iterable<Tuple2<List<String>,Integer>>)
// to generate association rules
// JavaRDD<R> map(Function<T,R> f)
// Return a new RDD by applying a function to all elements of this RDD.
JavaRDD<List<Tuple3<List<String>,List<String>,Double>>> assocRules = rules.map(new Function<
Tuple2<List<String>,Iterable<Tuple2<List<String>,Integer>>>, // T: input
List<Tuple3<List<String>,List<String>,Double>> // R: ( ac => b, 1/3): T3(List(a,c), List(b), 0.33)
// ( ad => c, 1/3): T3(List(a,d), List(c), 0.33)
>() {
@Override
public List<Tuple3<List<String>,List<String>,Double>> call(Tuple2<List<String>,Iterable<Tuple2<List<String>,Integer>>> in) {
List<Tuple3<List<String>,List<String>,Double>> result =
new ArrayList<Tuple3<List<String>,List<String>,Double>>();
List<String> fromList = in._1;
Iterable<Tuple2<List<String>,Integer>> to = in._2;
List<Tuple2<List<String>,Integer>> toList = new ArrayList<Tuple2<List<String>,Integer>>();
Tuple2<List<String>,Integer> fromCount = null;
for (Tuple2<List<String>,Integer> t2 : to) {
// find the "count" object
if (t2._1 == null) {
fromCount = t2;
}
else {
toList.add(t2);
}
}
// Now, we have the required objects for generating association rules:
// "fromList", "fromCount", and "toList"
if (toList.isEmpty()) {
// no output generated, but since Spark does not like null objects, we will fake a null object
return result; // an empty list
}
// now using 3 objects: "from", "fromCount", and "toList",
// create association rules:
for (Tuple2<List<String>,Integer> t2 : toList) {
double confidence = (double) t2._2 / (double) fromCount._2;
List<String> t2List = new ArrayList<String>(t2._1);
t2List.removeAll(fromList);
result.add(new Tuple3(fromList, t2List, confidence));
}
return result;
}
});
assocRules.saveAsTextFile("/rules/output/6");
// done
ctx.close();
System.exit(0);
}
}
public class MBAMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
public static final Logger THE_LOGGER = Logger.getLogger(MBAMapper.class);
public static final int DEFAULT_NUMBER_OF_PAIRS = 2;
//output key2: list of items paired; can be 2 or 3 ...
private static final Text reducerKey = new Text();
//output value2: number of the paired items in the item list
private static final IntWritable NUMBER_ONE = new IntWritable(1);
int numberOfPairs; // will be read by setup(), set by driver
@Override
protected void setup(Context context) throws IOException, InterruptedException {
this.numberOfPairs = context.getConfiguration().getInt("number.of.pairs", DEFAULT_NUMBER_OF_PAIRS);
THE_LOGGER.info("setup() numberOfPairs = " + numberOfPairs);
}
@Override
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
// input line
String line = value.toString();
List<String> items = convertItemsToList(line);
if ((items == null) || (items.isEmpty())) {
// no mapper output will be generated
return;
}
generateMapperOutput(numberOfPairs, items, context);
}
private static List<String> convertItemsToList(String line) {
if ((line == null) || (line.length() == 0)) {
// no mapper output will be generated
return null;
}
String[] tokens = StringUtils.split(line, ",");
if ( (tokens == null) || (tokens.length == 0) ) {
return null;
}
List<String> items = new ArrayList<String>();
for (String token : tokens) {
if (token != null) {
items.add(token.trim());
}
}
return items;
}
/**
*
* build <key, value> by sorting the input list
* If not sort the input, it may have duplicated list but not considered as a same list.
* ex: (a, b, c) and (a, c, b) might become different items to be counted if not sorted
* @param numberOfPairs number of pairs associated
* @param items list of items (from input line)
* @param context Hadoop Job context
* @throws IOException
* @throws InterruptedException
*/
private void generateMapperOutput(int numberOfPairs, List<String> items, Context context)
throws IOException, InterruptedException {
List<List<String>> sortedCombinations = Combination.findSortedCombinations(items, numberOfPairs);
for (List<String> itemList: sortedCombinations) {
System.out.println("itemlist="+itemList.toString());
reducerKey.set(itemList.toString());
context.write(reducerKey, NUMBER_ONE);
}
}
}
https://github.com/mahmoudparsian/data-algorithms-book/blob/master/src/main/java/org/dataalgorithms/chap07/mapreduce/MBAReducer.java
public class MBAReducer extends Reducer<Text, IntWritable, Text, IntWritable>{
@Override
public void reduce(Text key, Iterable<IntWritable> values, Context context)
throws IOException, InterruptedException {
int sum = 0; // total items paired
for (IntWritable value : values) {
sum += value.get();
}
context.write(key, new IntWritable(sum));
}
}
https://github.com/mahmoudparsian/data-algorithms-book/blob/master/src/main/java/org/dataalgorithms/chap07/spark/FindAssociationRules.java
public class FindAssociationRules {
static List<String> toList(String transaction) {
String[] items = transaction.trim().split(",");
List<String> list = new ArrayList<String>();
for (String item : items) {
list.add(item);
}
return list;
}
static List<String> removeOneItem(List<String> list, int i) {
if ( (list == null) || (list.isEmpty()) ) {
return list;
}
if ( (i < 0) || (i > (list.size()-1)) ) {
return list;
}
List<String> cloned = new ArrayList<String>(list);
cloned.remove(i);
return cloned;
}
public static void main(String[] args) throws Exception {
// STEP-1: handle input parameters
if (args.length < 1) {
System.err.println("Usage: FindAssociationRules <transactions>");
System.exit(1);
}
String transactionsFileName = args[0];
// STEP-2: create a Spark context object
JavaSparkContext ctx = new JavaSparkContext();
// STEP-3: read all transactions from HDFS and create the first RDD
JavaRDD<String> transactions = ctx.textFile(transactionsFileName, 1);
transactions.saveAsTextFile("/rules/output/1");
// STEP-4: generate frequent patterns
// PairFlatMapFunction<T, K, V>
// T => Iterable<Tuple2<K, V>>
JavaPairRDD<List<String>,Integer> patterns =
transactions.flatMapToPair(new PairFlatMapFunction<
String, // T
List<String>, // K
Integer // V
>() {
@Override
public Iterable<Tuple2<List<String>,Integer>> call(String transaction) {
List<String> list = toList(transaction);
List<List<String>> combinations = Combination.findSortedCombinations(list);
List<Tuple2<List<String>,Integer>> result = new ArrayList<Tuple2<List<String>,Integer>>();
for (List<String> combList : combinations) {
if (combList.size() > 0) {
result.add(new Tuple2<List<String>,Integer>(combList, 1));
}
}
return result;
}
});
// STEP-5: combine/reduce frequent patterns
JavaPairRDD<List<String>, Integer> combined = patterns.reduceByKey(new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer i1, Integer i2) {
return i1 + i2;
}
});
// now, we have: patterns(K,V)
// K = pattern as List<String>
// V = frequency of pattern
// now given (K,V) as (List<a,b,c>, 2) we will
// generate the following (K2,V2) pairs:
//
// (List<a,b,c>, T2(null, 2))
// (List<a,b>, T2(List<a,b,c>, 2))
// (List<a,c>, T2(List<a,b,c>, 2))
// (List<b,c>, T2(List<a,b,c>, 2))
// STEP-6: generate all sub-patterns
// PairFlatMapFunction<T, K, V>
// T => Iterable<Tuple2<K, V>>
JavaPairRDD<List<String>,Tuple2<List<String>,Integer>> subpatterns =
combined.flatMapToPair(new PairFlatMapFunction<
Tuple2<List<String>, Integer>, // T
List<String>, // K
Tuple2<List<String>,Integer> // V
>() {
@Override
public Iterable<Tuple2<List<String>,Tuple2<List<String>,Integer>>>
call(Tuple2<List<String>, Integer> pattern) {
List<Tuple2<List<String>,Tuple2<List<String>,Integer>>> result =
new ArrayList<Tuple2<List<String>,Tuple2<List<String>,Integer>>>();
List<String> list = pattern._1;
Integer frequency = pattern._2;
result.add(new Tuple2(list, new Tuple2(null,frequency)));
if (list.size() == 1) {
return result;
}
// pattern has more than one items
// result.add(new Tuple2(list, new Tuple2(null,size)));
for (int i=0; i < list.size(); i++) {
List<String> sublist = removeOneItem(list, i);
result.add(new Tuple2(sublist, new Tuple2(list, frequency)));
}
return result;
}
});
// STEP-6: combine sub-patterns
JavaPairRDD<List<String>,Iterable<Tuple2<List<String>,Integer>>> rules = subpatterns.groupByKey();
// STEP-7: generate association rules
// Now, use (K=List<String>, V=Iterable<Tuple2<List<String>,Integer>>)
// to generate association rules
// JavaRDD<R> map(Function<T,R> f)
// Return a new RDD by applying a function to all elements of this RDD.
JavaRDD<List<Tuple3<List<String>,List<String>,Double>>> assocRules = rules.map(new Function<
Tuple2<List<String>,Iterable<Tuple2<List<String>,Integer>>>, // T: input
List<Tuple3<List<String>,List<String>,Double>> // R: ( ac => b, 1/3): T3(List(a,c), List(b), 0.33)
// ( ad => c, 1/3): T3(List(a,d), List(c), 0.33)
>() {
@Override
public List<Tuple3<List<String>,List<String>,Double>> call(Tuple2<List<String>,Iterable<Tuple2<List<String>,Integer>>> in) {
List<Tuple3<List<String>,List<String>,Double>> result =
new ArrayList<Tuple3<List<String>,List<String>,Double>>();
List<String> fromList = in._1;
Iterable<Tuple2<List<String>,Integer>> to = in._2;
List<Tuple2<List<String>,Integer>> toList = new ArrayList<Tuple2<List<String>,Integer>>();
Tuple2<List<String>,Integer> fromCount = null;
for (Tuple2<List<String>,Integer> t2 : to) {
// find the "count" object
if (t2._1 == null) {
fromCount = t2;
}
else {
toList.add(t2);
}
}
// Now, we have the required objects for generating association rules:
// "fromList", "fromCount", and "toList"
if (toList.isEmpty()) {
// no output generated, but since Spark does not like null objects, we will fake a null object
return result; // an empty list
}
// now using 3 objects: "from", "fromCount", and "toList",
// create association rules:
for (Tuple2<List<String>,Integer> t2 : toList) {
double confidence = (double) t2._2 / (double) fromCount._2;
List<String> t2List = new ArrayList<String>(t2._1);
t2List.removeAll(fromList);
result.add(new Tuple3(fromList, t2List, confidence));
}
return result;
}
});
assocRules.saveAsTextFile("/rules/output/6");
// done
ctx.close();
System.exit(0);
}
}