Pages

Showing posts with label Apache Spark. Show all posts
Showing posts with label Apache Spark. Show all posts

Saturday, August 15, 2015

Latent Dirichlet Allocation (LDA) with Apache Spark MLlib

Latent Dirichlet allocation is an scalable machine learning algorithm for topic annotation or topic modelling. It is available in Apache Spark MLlib. I will not explain the internals of the algorithm in detail here.

Please visit the following link for more information about LDA algorithm.
http://jayaniwithanawasam.blogspot.com/2013/12/infer-topics-for-documents-using-latent.html

Here’s the code for LDA algorithm in Spark MLlib.
import scala.Tuple2;

import org.apache.spark.api.java.*;

import org.apache.spark.api.java.function.Function;

import org.apache.spark.mllib.clustering.DistributedLDAModel;

import org.apache.spark.mllib.clustering.LDA;

import org.apache.spark.mllib.linalg.Vector;

import org.apache.spark.mllib.linalg.Vectors;

import org.apache.spark.SparkConf;

public class lda {

  public static void main(String[] args) {



// Spark configuration details

    SparkConf conf = new SparkConf().setAppName("LDA");

    JavaSparkContext sc = new JavaSparkContext(conf);



    // Load and parse the data (sample_lda_data.txt is available with Spark installation)

    // word count vectors (columns: terms [vocabulary], rows [documents])

    String path = "data/mllib/sample_lda_data.txt";

   

    // Read data

    // creates a RDD with each line as an element

    // E.g., 1 2 6 0 2 3 1 1 0 0 3

    JavaRDD data = sc.textFile(path);

   

    // Map is a transformation that passes each data set element through a function

    // It returns a new RDD representing the results

    // Prepares input as numerical representation

    JavaRDD parsedData = data.map(

        new Function() {


public Vector call(String s) {

            String[] sarray = s.trim().split(" ");

            double[] values = new double[sarray.length];

            for (int i = 0; i < sarray.length; i++)

              values[i] = Double.parseDouble(sarray[i]);

            return Vectors.dense(values);

          }

        }

    );

   

    // Index documents with unique IDs

    // The transformation 'zipWithIndex' provides a stable indexing, numbering each element in its original order.

    JavaPairRDD corpus = JavaPairRDD.fromJavaRDD(parsedData.zipWithIndex().map(

        new Function, Tuple2>() {



public Tuple2 call(Tuple2 doc_id) {

            return doc_id.swap();

          }

        }

    ));

    corpus.cache();



    // Cluster the documents into three topics using LDA

    // number of topics = 3

    DistributedLDAModel ldaModel = new LDA().setK(3).run(corpus);



    // Topic and its term distribution

    // columns = 3 topics/ rows = terms (vocabulary)

    System.out.println("Topic-Term distribution: \n" + ldaModel.topicsMatrix());

    // document and its topic distribution

    // [(doc ID: [topic 1, topic 2, topic3]), (doc ID: ...]

    JavaRDD> topicDist = ldaModel.topicDistributions().toJavaRDD();

    System.out.println("Document-Topic distribution: \n" + topicDist.collect());

    sc.close();

  }

}

Output:

Topic-Term distribution




Document-Topic distribution

Market Basket Analysis with Apache Spark MLlib FP-Growth

Market Basket Analysis 

source: http://www.noahdatatech.com/solutions/predictive-analytics/

Market basket analysis is identifying items in the supermarket which customers are more likely to buy together.
e.g., Customers who bought pampers also bought beer

      
This is important for super markets to arrange their items in a consumer convenient manner as well as to come up with promotions taking item affinity in to consideration.

Frequent Item set Mining and Association Rule Learning  


Frequent item set mining is a sub area in data mining that focuses on identifying frequently co-occuring items. Once, the frequent item set is ready, we can come up with rules         to derive association between items.
        e.g., Frequent item set = {pampers, beer, milk}, association rule = {pampers, milk ---> beer}

        There are two possible popular approaches for frequent item set mining and association rule learning as given below:

Apriori algorithm 
FP-Growth algorithm

To explain above algorithms, let us consider example with 4 customers making 4 transactions in supermarket that contain 7 items in total as given below:

    Transaction 1: Jana’s purchase: egg, beer, pampers, milk
    Transaction 2: Abi’s purchase: carrot, milk, pampers, beer
    Transaction 3: Mahesha’s purchase: perfume, tissues, carrot
    Transaction 4: Jayani’s purchase: perfume, pampers, beer

    Item index
    1: egg, 2: beer, 3: pampers, 4: carrot, 5: milk, 6: perfume, 7: tissues

Using Apriori algorithm


Apriori algorithm identifies frequent item sets by starting individual items and  extending item set by one at a time. This is known as candidate generation step.
This algorithm makes the assumption that any sub set of item within a frequent item set is also frequent.

Transaction: Items
1: 1, 2, 3, 5
2: 4, 5, 3, 2
3: 6, 7, 4
4: 6, 3, 2

Minimum Support 


Minimum support is used to prune the associations that are less frequent.

Minimum support = number of times item occur in transactions/ number of transactions

For example, lets say we define minimum support as 0.5.
Calculating support for egg is 1/4 = 0.25 (0.25 < 0.5), so that is eliminated. Support for beer is 3/4 = 0.75 (0.75 > 0.5) is it is considered for further processing.

Calculation of support for all items

size of the candidate itemset = 1

itemset: support
1: 0.25: eliminated
2: 0.75
3: 0.75
4: 0.5
5: 0.5
6: 0.5
7: 0.25: eliminated

remaining items: 2, 3, 4, 5, 6

extend candidate itemset by 1
size of the items = 2

itemset: support
2, 3: 0.75
2, 4: 0.25: eliminated
2, 5: 0.5
2, 6: 0.25: eliminated
3, 4: 0.25: eliminated
3, 5: 0.5
3, 6: 0.25: eliminated
4, 5: 0.25: eliminated
4, 6: 0.25: eliminated
5, 6: 0.25: eliminated

remaining items: {2,3},{ 2, 5}, {3, 5}

extend candidate itemset by 1
size of the items = 3

2, 3, 5: 0.5

Using FP-Growth algorithm


In FP-Growth algorithm, frequent patterns are mined using a tree approach (construction of Frequent Patter Tree)
FP-Growth algorithm has been proven to execute much faster than the Apriori algorithm.

Calculate support for frequent items and sort in degreasing order of the frequency as given below:

item: frequency
1: 1 - eliminated
2: 3
3: 3
4: 2
5: 2
6: 2
7: 1 - eliminated

Decreasing order of the frequency
2 (3), 3 (3), 4 (2), 5 (2), 6 (2)

Construction of FP-Tree

A) Transaction 1
 1, 2, 3, 5 > 2 (1), 3 (1), 5 (1)

B) Transaction 2
4, 5, 3, 2 > 2 (2), 3 (2), 4 (1), 5 (1)

C) Transaction 3
6, 7, 4 > 4 (1), 6 (1)

D) Transaction 4
6, 3, 2 > 2 (3), 3 (3), 6 (1)

Once the FP-tree is constructed, frequent item sets are calculated using depth first strategy along with divide and conquer mechanism.
This enables algorithm is be computationally more effective and parallelizable (using map-reduce).

Code Example with Apache Spark MLlib

    public static void main(String[] args) {
   
        SparkConf conf = new SparkConf().setAppName("Market Basket Analysis");
        JavaSparkContext sc = new JavaSparkContext(conf);
       
        // Items
        String item1 = "egg";
        String item2 = "beer";
        String item3 = "pampers";
        String item4 = "carrot";
        String item5 = "milk";
        String item6 = "perfume";
        String item7 = "tissues";
       
        // Transactions
        List transaction1 = new ArrayList();
        transaction1.add(item1);
        transaction1.add(item2);
        transaction1.add(item3);
        transaction1.add(item5);

        List transaction2 = new ArrayList();
        transaction2.add(item4);
        transaction2.add(item5);
        transaction2.add(item3);
        transaction2.add(item2);
       
        List transaction3 = new ArrayList();
        transaction3.add(item6);
        transaction3.add(item7);
        transaction3.add(item4);
       
        List transaction4 = new ArrayList();
        transaction4.add(item6);
        transaction4.add(item3);
        transaction4.add(item2);
       
        List> transactions = new ArrayList>();
        transactions.add(transaction1);
        transactions.add(transaction2);
        transactions.add(transaction3);
        transactions.add(transaction4);
       
        // Make transaction collection parallel with Spark
        JavaRDD> transactionsRDD = sc.parallelize(transactions);

        // Set configurations for FP-Growth
        FPGrowth fpg = new FPGrowth()
          .setMinSupport(0.5)
          .setNumPartitions(10);
       
        // Generate model
        FPGrowthModel model = fpg.run(transactionsRDD);

        // Display frequently co-occuring items
        for (FPGrowth.FreqItemset itemset: model.freqItemsets().toJavaRDD().collect()) {
           System.out.println("[" + Joiner.on(",").join(itemset.javaItems()) + "], " + itemset.freq());
        }
        sc.close();
    }

Saturday, July 25, 2015

Content Recommendations with PredictionIO

I’m currently working on content recommendations using PredictionIO mainly with its SimilarityProduct engine and Recommendation engine (given below)
  • https://docs.prediction.io/templates/similarproduct/quickstart/
  • https://docs.prediction.io/templates/recommendation/quickstart/
Before going into details about content recommendations, lets get an understanding about the under lying frameworks, technologies and languages used.

PredictionIO   

PredictionIO (https://prediction.io/) is an open source machine learning server which mainly facilitates  infrastructure support for diverse machine learning algorithms. It provides built-in rest apis/ SDKs to access machine learning algorithms in an operational machine learning environment.

PredictionIO has abstracted away the key steps in machine learning such as,
  •     reading data from data source (datasource)
  •     preparing data according to required input format (datapreparator)
  •     training data set (train)
  •     testing the model and querying (test)
   
It has provided a framework approach to use, implement or customise algorithms according to application needs.

PredictionIO is mostly based on Spark cluster computing framework.

Spark RDDs


Most of the algorithms are implemented with Scala, which is JVM based, functional programming language.
Algorithms are running on top of Spark, which is capable of distributing data as RDDs across a cluster of computers to parrellelize the computations performed on them.

RDD (Resilient Distributed Dataset) is the core abstraction of Spark, which is partitioned across the cluster to perform in-memory operations on top of them. Unlike the Hadoop Map Reduce programming paradigm, Spark RDDs are capable of caching intermediate results in memory (not storage) which makes them significantly faster.

Consequently, this approach is ideal for iterative machine learning algorithms such as K-means, gradient descent and ALS (Alternating Least Squares). Further, RDDs have optimised the fault tolerance mechanism in distributed setting by recomputing lost RDDs using instructions given as DAG (Direct Acyclic Graph) of transformations and actions. This eliminates the need for data replication.

Content Recommendations using ALS in MLlib


Content recommendation is a key area which PredictionIO is focused on. Currently, ALS (Alternating Least Squares) is supported in Spark MLlib scalable machine learning library.
Mlle. is built on Apache Spark for large scale data processing and it is a standard component in Spark.

why MLlib? - http://stanford.edu/~rezab/sparkworkshop/slides/xiangrui.pdf

A key goal of using machine learning in any application is to find hidden patterns or insights in large datasets.

When it comes to recommendations, If you take movie recommendation example, there can be latent factors (hidden factors) such as movie actors, genre, theme that affects user ratings and user preferences for movies. The idea in ALS algorithm is to uncover these latent factors using Matrix factorization in Linear algebra.

A set of user features (E.g., John likes Keenu Reeves movies) and product features (E.g., Keenu Reeves acts in Matrix movie) that can be the implicit reasons for user preferences is estimated mathematically based on known user preferences for given items to predict unseen/ unknown preferences for a given user.

The following link contains a good introduction with mathematical details about matrix factorisation. You can try out Python example as well.
http://www.quuxlabs.com/blog/2010/09/matrix-factorization-a-simple-tutorial-and-implementation-in-python/

The following link well explains the scalable in-memory matrix calculations of ALS in Spark MLlib.
http://www.slideshare.net/MrChrisJohnson/collaborative-filtering-with-spark

Given below are the key stages that your data will go through to provide useful item recommendations to users in PredictionIO.

Datasource

Hbase is one default backend for Event store/ input is taken as rest query [item/ user/ ratings]
metadata is stored in Elastic search
One example event imported:
{"eventId":"fBAiPFEGKPOdFsTrGK7VwwAAAU6RUP-Tl37BvsDRuHs","event":"rate","entityType":"user","entityId":"admin","targetEntityType":"item","targetEntityId":"6","properties":{"rating":5},"eventTime":"2015-07-15T10:44:41.491Z","creationTime":"2015-07-15T10:44:41.491Z"}

Datapreparator 

prepare input data for the ML algorithm in the format that is required, which in this case users, items and user ratings

Train

Once trained, the model is saved in local file system.
Training parameters for the ALS algorithm can be configured in Engine.json file as given below:
"algorithms": [

    {

      "name": "als",

      "params": {

        "rank": 10,

        "numIterations": 20,

        "lambda": 0.01,

        "seed": 3

      }

    }

  ]
  • rank - number of latent factors for model
  • lambda - regularization parameter in ALS (to avoid overfitting)
  • iterations - number of iterations

Querying (Test)

Once the model is trained recommendations for unseen user preferences can be requested via REST service

How to set up Apache Spark (Java) - MLlib in Eclipse?

Apache Spark version: 1.3.0

Download Apache Spark required pre-built version from the following link:
http://spark.apache.org/downloads.html

Create Maven project in Eclipse
File > New > Maven Project

Add following dependencies in pom.xml.



  org.apache.spark
  spark-mllib_2.10
  1.3.0
  provided
 

org.apache.spark
spark-core_2.10
1.3.0
provided


We have mentioned scope as “provided” as those dependancies are already available in Spark server.

Create new class and add you Java source code for required MLlib algorithm

Run as > Maven Build… > package

Verify .jar file is created in ‘target' folder of Maven project

Change the location to Spark installation you downloaded and unpacked and try following command:
./bin/spark-submit —class --master local[2]
E.g.,

./bin/spark-submit --class fpgrowth --master local[2] /Users/XXX/target/uber-TestMLlib-0.0.1-SNAPSHOT.jar

Exception in thread "main" java.lang.NoSuchMethodError: org.apache.spark.mllib

Exception in thread "main" java.lang.NoSuchMethodError: org.apache.spark.mllib.clustering.LDA.run(Lorg/apache/spark/api/java/JavaPairRDD;)Lorg/apache/spark/mllib/clustering/DistributedLDAModel

Spark version during the compilation time (in Maven repository) was different from runtime Spark version in Spark server class path (Spark installation directory/ lib)

Tuesday, February 10, 2015

java.io.IOException: No FileSystem for scheme: HDFS

To solve the above issue, add the following to hadoop-2.6.0/etc/hadoop/core-site.xml:

  fs.hdfs.impl
  org.apache.hadoop.hdfs.DistributedFileSystem
  The FileSystem for hdfs: uris.