Wednesday, December 30, 2015

Spark MLlib



http://spark.apache.org/docs/latest/streaming-programming-guide.html#mllib-operations
You can also easily use machine learning algorithms provided by MLlib. First of all, there are streaming machine learning algorithms (e.g. Streaming Linear RegressionStreaming KMeans, etc.) which can simultaneously learn from the streaming data as well as apply the model on the streaming data. Beyond these, for a much larger class of machine learning algorithms, you can learn a learning model offline (i.e. using historical data) and then apply the model online on streaming data.

https://issues.apache.org/jira/browse/SPARK-6407
On-line Collaborative Filtering(CF) has been widely used and studied. To re-train a CF model from scratch every time when new data comes in is very inefficient (http://stackoverflow.com/questions/27734329/apache-spark-incremental-training-of-als-model). However, in Spark community we see few discussion about collaborative filtering on streaming data. Given streaming k-means, streaming logistic regression, and the on-going incremental model training of Naive Bayes Classifier (SPARK-4144), we think it is meaningful to consider streaming Collaborative Filtering support on MLlib.
We have already been considering about this issue during the past week. We plan to refer to this paper
(https://www.cs.utexas.edu/~cjohnson/ParallelCollabFilt.pdf). It is based on SGD instead of ALS, which is easier to be tackled under streaming data.


http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.ml.recommendation.ALS
ALS attempts to estimate the ratings matrix R as the product of two lower-rank matrices, X and Y, i.e. X * Yt = R. Typically these approximations are called 'factor' matrices. The general approach is iterative. During each iteration, one of the factor matrices is held constant, while the other is solved for using least squares. The newly-solved factor matrix is then held constant while solving for the other factor matrix.
https://issues.apache.org/jira/browse/SPARK-9073
https://github.com/apache/spark/pull/7447/files


http://spark.apache.org/docs/latest/streaming-programming-guide.html#mllib-operations

.config("spark.logConf","true") .config("spark.logLevel","ERROR")
Recommendation algorithms fall into roughly five general mechanisms: knowledge-based, demographic-based, content-based, collaborative filtering (item-based or user-based), and latent factor-based

val rootLogger = Logger.getRootLogger()
rootLogger.setLevel(Level.ERROR)

val algALS = new ALS()
algALS.setItemCol("product") // Otherwise will get exception "Field "item" does not exist"
algALS.setRank(12)
algALS.setRegParam(0.1) // was regularization parameter, was lambda in MLlib
algALS.setMaxIter(20)
val mdlReco = algALS.fit(train)
val predictions = mdlReco.transform(test)

val pred = predictions.na.drop()
val evaluator = new RegressionEvaluator()
evaluator.setLabelCol("rating")
var rmse = evaluator.evaluate(pred)
evaluator.setMetricName("mse")
var mse = evaluator.evaluate(pred)
we set rank=12, regularizationParameter=0.1, and maxIterations=20. In reality, the rank could be 8 or 12; the regularization parameter 0.1,1.0, or 10; and the iterations 10 or 20. So now we need to try 12 runs with these different values, calculate the accuracy, and then select the one with the best value

A pipeline is nothing but a linear graph of transformers and estimators.
valtreePipeline = new Pipeline().setStages(Array(indexer, assembler, algTree))
valmdlTree = treePipeline.fit(trainData)
val predictions = mdlTree.transform(testData)

MATRIX FACTORIZATION
val predictedRating = model.predict(789, 123)

when we split training and test data randomly, it might happen that the test data has users that are not in the training data. This is the cold start problem: how can we recommend something to a user who hasn't rated anything before, basically a new user?

RSME and MSE
https://cloud.google.com/solutions/recommendations-using-machine-learning-on-compute-engine

http://spark.apache.org/docs/latest/ml-collaborative-filtering.html
https://aws.amazon.com/blogs/big-data/building-a-recommendation-engine-with-spark-ml-on-amazon-emr-using-zeppelin/

http://stackoverflow.com/questions/26326703/how-to-score-all-user-product-combinations-in-spark-matrixfactorizationmodel

https://databricks.com/blog/2014/10/20/efficient-similarity-algorithm-now-in-spark-twitter.html


http://spark.apache.org/docs/latest/mllib-collaborative-filtering.html
spark.mllib currently supports model-based collaborative filtering, in which users and products are described by a small set of latent factors that can be used to predict missing entries. spark.mllib uses the alternating least squares (ALS) algorithm to learn these latent factors.

The standard approach to matrix factorization based collaborative filtering treats the entries in the user-item matrix as explicitpreferences given by the user to the item,  for example, users giving ratings to movies.
It is common in many real-world use cases to only have access to implicit feedback (e.g. views, clicks, purchases, likes, shares etc.). The approach used in spark.mllib to deal with such data is taken from Collaborative Filtering for Implicit Feedback Datasets. Essentially, instead of trying to model the matrix of ratings directly, this approach treats the data as numbers representing the strength in observations of user actions (such as the number of clicks, or the cumulative duration someone spent viewing a movie). Those numbers are then related to the level of confidence in observed user preferences, rather than explicit ratings given to items. The model then tries to find latent factors that can be used to predict the expected preference of a user for an item.

Advanced Analytics with Spark: Patterns for Learning from Data at Scale

Learning Spark
HashingTF builds term frequency feature vectors from text data, and Logistic RegressionWithSGD, which implements the logistic regression procedure using stochastic gradient descent (SGD). We assume that we start with two files, spam.txt and normal.txt, each of which contains examples of spam and non-spam emails, one per line. We then turn the text in each file into a feature vector with TF, and train a logistic regression model to separate the two types of messages.

import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.feature.HashingTF
import org.apache.spark.mllib.classification.LogisticRegressionWithSGD

val spam = sc.textFile("spam.txt")
val normal = sc.textFile("normal.txt")

// Create a HashingTF instance to map email text to vectors of 10,000 features.
val tf = new HashingTF(numFeatures = 10000)
// Each email is split into words, and each word is mapped to one feature.
val spamFeatures = spam.map(email => tf.transform(email.split(" ")))
val normalFeatures = normal.map(email => tf.transform(email.split(" ")))

// Create LabeledPoint datasets for positive (spam) and negative (normal) examples.
val positiveExamples = spamFeatures.map(features => LabeledPoint(1, features))
val negativeExamples = normalFeatures.map(features => LabeledPoint(0, features))
val trainingData = positiveExamples.union(negativeExamples)
trainingData.cache() // Cache since Logistic Regression is an iterative algorithm.

// Run Logistic Regression using the SGD algorithm.
val model = new LogisticRegressionWithSGD().run(trainingData)

// Test on a positive example (spam) and a negative one (normal).
val posTest = tf.transform(
  "O M G GET cheap stuff by sending money to ...".split(" "))
val negTest = tf.transform(
  "Hi Dad, I started studying Spark the other ...".split(" "))
println("Prediction for positive test example: " + model.predict(posTest))
println("Prediction for negative test example: " + model.predict(negTest))

There are multiple types of learning problems, including classification, regression, or clustering, which have different objectives.

All learning algorithms require defining a set of features for each item, which will be fed into the learning function. In many cases, defining the right features is the most challenging part of using machine learning.

Most algorithms are defined only for numerical features (specifically, a vector of numbers representing the value for each feature), so often an important step is feature extraction and transformation to produce these feature vectors.

most learning algorithms have multiple parameters that can affect results, so real-world pipelines will train multiple versions of a model and evaluate each one.

Vector
A mathematical vector. MLlib supports both dense vectors, where every entry is stored, and sparse vectors, where only the nonzero entries are stored to save space.

LabeledPoint
A labeled data point for supervised learning algorithms such as classification and regression. Includes a feature vector and a label (which is a floating-point value).

Various Model classes
Each Model is the result of a training algorithm, and typically has a predict() method for applying the model to a new data point or to an RDD of new data points.

// Create the dense vector <1.0, 2.0, 3.0>; Vectors.dense takes values or an array
val denseVec1 = Vectors.dense(1.0, 2.0, 3.0)
val denseVec2 = Vectors.dense(Array(1.0, 2.0, 3.0))

// Create the sparse vector <1.0, 0.0, 2.0, 0.0>; Vectors.sparse takes the size of
// the vector (here 4) and the positions and values of nonzero entries
val sparseVec1 = Vectors.sparse(4, Array(0, 2), Array(1.0, 2.0))

Feature Extraction
TF-IDF
rdd = sc.wholeTextFiles("data").map(lambda (name, text): text.split())
tf = HashingTF()
tfVectors = tf.transform(rdd).cache()

# Compute the IDF, then the TF-IDF vectors
idf = IDF()
idfModel = idf.fit(tfVectors)
tfIdfVectors = idfModel.transform(tfVectors)

SCALING

Most machine learning algorithms consider the magnitude of each element in the feature vector, and thus work best when the features are scaled so they weigh equally (e.g., all features have a mean of 0 and standard deviation of 1).
vectors = [Vectors.dense([-2.0, 5.0, 1.0]), Vectors.dense([2.0, 0.0, 1.0])]
dataset = sc.parallelize(vectors)
scaler = StandardScaler(withMean=True, withStd=True)
model = scaler.fit(dataset)
result = model.transform(dataset)

NORMALIZATION
In some situations, normalizing vectors to length 1 is also useful to prepare input data. The Normalizer class allows this. Simply use Normalizer().transform(rdd). By default Normalizer uses the  norm (i.e, Euclidean length), but you can also pass a power p to Normalizer to use the  norm.

WORD2VEC
Statistics.colStats(rdd)
Computes a statistical summary of an RDD of vectors, which stores the min, max, mean, and variance for each column in the set of vectors. This can be used to obtain a wide variety of statistics in one pass.

Statistics.corr(rdd, method)
Computes the correlation matrix between columns in an RDD of vectors, using either the Pearson or Spearman correlation
Statistics.corr(rdd1, rdd2, method)
Statistics.chiSqTest(rdd)
Computes Pearson’s independence test for every feature with the label on an RDD of LabeledPoint objects. Returns an array of ChiSqTestResult objects that capture the p-value, test statistic, and degrees of freedom for each feature. Label and feature values must be categorical (i.e., discrete values).

Classification and regression are two common forms of supervised learning, where algorithms attempt to predict a variable from features of objects using labeled training data (i.e., examples where we know the answer). The difference between them is the type of variable predicted: in classification, the variable is discrete. In regression, the variable predicted is continuous.

A LabeledPoint consists simply of a label (which is always a Double value, but can be set to discrete integers for classification) and a features vector.

For binary classification, MLlib expects the labels 0 and 1. In some texts, –1 and 1 are used instead, but this will lead to incorrect results. For multiclass classification, MLlib expects labels from 0 to C–1, where C is the number of classes.

LINEAR REGRESSION
SGD is Stochastic Gradient Descent
stepSize
Step size for gradient descent
val points: RDD[LabeledPoint] = // ...
val lr = new LinearRegressionWithSGD().setNumIterations(200).setIntercept(true)
val model = lr.run(points)
println("weights: %s, intercept: %s".format(model.weights, model.intercept))

LOGISTIC REGRESSION
Logistic regression is a binary classification method that identifies a linear separating plane between positive and negative examples. In MLlib, it takes LabeledPoints with label 0 or 1 and returns a LogisticRegressionModel that can predict new points.

The LogisticRegressionModel from these algorithms computes a score between 0 and 1 for each point, as returned by the logistic function. It then returns either 0 or 1 based on a threshold that can be set by the user: by default, if the score is at least 0.5, it will return 1. You can change this threshold via setThreshold(). You can also disable it altogether via clearThreshold(), in which case predict() will return the raw scores. For balanced datasets with about the same number of positive and negative examples, we recommend leaving the threshold at 0.5. For imbalanced datasets, you can increase the threshold to drive down the number of false positives (i.e., increase precision but decrease recall), or you can decrease the threshold to drive down the number of false negatives.

SUPPORT VECTOR MACHINES
Support Vector Machines, or SVMs, are another binary classification method with linear separating planes, again expecting labels of 0 or 1.

http://stackoverflow.com/questions/35998935/how-to-classify-new-training-example-after-model-training-in-apache-spark

https://www.vernier.com/til/1014/
The Mean Squared Error (MSE) is a measure of how close a fitted line is to data points. For every data point, you take the distance vertically from the point to the corresponding y value on the curve fit (the error), and square the value. Then you add up all those values for all data points, and divide by the number of points minus two.** The squaring is done so negative values do not cancel positive values. The smaller the Mean Squared Error, the closer the fit is to the data. The MSE has the units squared of whatever is plotted on the vertical axis.
Another quantity that we calculate is the Root Mean Squared Error (RMSE). It is just the square root of the mean square error. That is probably the most easily interpreted statistic, since it has the same units as the quantity plotted on the vertical axis.

统计学中,均方误差是对于无法观察的参数的一个估计函数T;其定义为:
即,它是"误差"的平方的期望值。误差就是估计值与被估计量的差

TODO
使用Spark MLlib给豆瓣用户推荐电影

Labels

Review (572) System Design (334) System Design - Review (198) Java (189) Coding (75) Interview-System Design (65) Interview (63) Book Notes (59) Coding - Review (59) to-do (45) Linux (43) Knowledge (39) Interview-Java (35) Knowledge - Review (32) Database (31) Design Patterns (31) Big Data (29) Product Architecture (28) MultiThread (27) Soft Skills (27) Concurrency (26) Cracking Code Interview (26) Miscs (25) Distributed (24) OOD Design (24) Google (23) Career (22) Interview - Review (21) Java - Code (21) Operating System (21) Interview Q&A (20) System Design - Practice (20) Tips (19) Algorithm (17) Company - Facebook (17) Security (17) How to Ace Interview (16) Brain Teaser (14) Linux - Shell (14) Redis (14) Testing (14) Tools (14) Code Quality (13) Search (13) Spark (13) Spring (13) Company - LinkedIn (12) How to (12) Interview-Database (12) Interview-Operating System (12) Solr (12) Architecture Principles (11) Resource (10) Amazon (9) Cache (9) Git (9) Interview - MultiThread (9) Scalability (9) Trouble Shooting (9) Web Dev (9) Architecture Model (8) Better Programmer (8) Cassandra (8) Company - Uber (8) Java67 (8) Math (8) OO Design principles (8) SOLID (8) Design (7) Interview Corner (7) JVM (7) Java Basics (7) Kafka (7) Mac (7) Machine Learning (7) NoSQL (7) C++ (6) Chrome (6) File System (6) Highscalability (6) How to Better (6) Network (6) Restful (6) CareerCup (5) Code Review (5) Hash (5) How to Interview (5) JDK Source Code (5) JavaScript (5) Leetcode (5) Must Known (5) Python (5)

Popular Posts