Saturday, August 15, 2015

Latent Dirichlet Allocation (LDA) with Apache Spark MLlib

Latent Dirichlet allocation is an scalable machine learning algorithm for topic annotation or topic modelling. It is available in Apache Spark MLlib. I will not explain the internals of the algorithm in detail here.

Please visit the following link for more information about LDA algorithm.
http://jayaniwithanawasam.blogspot.com/2013/12/infer-topics-for-documents-using-latent.html

Here’s the code for LDA algorithm in Spark MLlib.
import scala.Tuple2;

import org.apache.spark.api.java.*;

import org.apache.spark.api.java.function.Function;

import org.apache.spark.mllib.clustering.DistributedLDAModel;

import org.apache.spark.mllib.clustering.LDA;

import org.apache.spark.mllib.linalg.Vector;

import org.apache.spark.mllib.linalg.Vectors;

import org.apache.spark.SparkConf;

public class lda {

  public static void main(String[] args) {



// Spark configuration details

    SparkConf conf = new SparkConf().setAppName("LDA");

    JavaSparkContext sc = new JavaSparkContext(conf);



    // Load and parse the data (sample_lda_data.txt is available with Spark installation)

    // word count vectors (columns: terms [vocabulary], rows [documents])

    String path = "data/mllib/sample_lda_data.txt";

   

    // Read data

    // creates a RDD with each line as an element

    // E.g., 1 2 6 0 2 3 1 1 0 0 3

    JavaRDD data = sc.textFile(path);

   

    // Map is a transformation that passes each data set element through a function

    // It returns a new RDD representing the results

    // Prepares input as numerical representation

    JavaRDD parsedData = data.map(

        new Function() {


public Vector call(String s) {

            String[] sarray = s.trim().split(" ");

            double[] values = new double[sarray.length];

            for (int i = 0; i < sarray.length; i++)

              values[i] = Double.parseDouble(sarray[i]);

            return Vectors.dense(values);

          }

        }

    );

   

    // Index documents with unique IDs

    // The transformation 'zipWithIndex' provides a stable indexing, numbering each element in its original order.

    JavaPairRDD corpus = JavaPairRDD.fromJavaRDD(parsedData.zipWithIndex().map(

        new Function, Tuple2>() {



public Tuple2 call(Tuple2 doc_id) {

            return doc_id.swap();

          }

        }

    ));

    corpus.cache();



    // Cluster the documents into three topics using LDA

    // number of topics = 3

    DistributedLDAModel ldaModel = new LDA().setK(3).run(corpus);



    // Topic and its term distribution

    // columns = 3 topics/ rows = terms (vocabulary)

    System.out.println("Topic-Term distribution: \n" + ldaModel.topicsMatrix());

    // document and its topic distribution

    // [(doc ID: [topic 1, topic 2, topic3]), (doc ID: ...]

    JavaRDD> topicDist = ldaModel.topicDistributions().toJavaRDD();

    System.out.println("Document-Topic distribution: \n" + topicDist.collect());

    sc.close();

  }

}

Output:

Topic-Term distribution




Document-Topic distribution

Market Basket Analysis with Apache Spark MLlib FP-Growth

Market Basket Analysis 

source: http://www.noahdatatech.com/solutions/predictive-analytics/

Market basket analysis is identifying items in the supermarket which customers are more likely to buy together.
e.g., Customers who bought pampers also bought beer

      
This is important for super markets to arrange their items in a consumer convenient manner as well as to come up with promotions taking item affinity in to consideration.

Frequent Item set Mining and Association Rule Learning  


Frequent item set mining is a sub area in data mining that focuses on identifying frequently co-occuring items. Once, the frequent item set is ready, we can come up with rules         to derive association between items.
        e.g., Frequent item set = {pampers, beer, milk}, association rule = {pampers, milk ---> beer}

        There are two possible popular approaches for frequent item set mining and association rule learning as given below:

Apriori algorithm 
FP-Growth algorithm

To explain above algorithms, let us consider example with 4 customers making 4 transactions in supermarket that contain 7 items in total as given below:

    Transaction 1: Jana’s purchase: egg, beer, pampers, milk
    Transaction 2: Abi’s purchase: carrot, milk, pampers, beer
    Transaction 3: Mahesha’s purchase: perfume, tissues, carrot
    Transaction 4: Jayani’s purchase: perfume, pampers, beer

    Item index
    1: egg, 2: beer, 3: pampers, 4: carrot, 5: milk, 6: perfume, 7: tissues

Using Apriori algorithm


Apriori algorithm identifies frequent item sets by starting individual items and  extending item set by one at a time. This is known as candidate generation step.
This algorithm makes the assumption that any sub set of item within a frequent item set is also frequent.

Transaction: Items
1: 1, 2, 3, 5
2: 4, 5, 3, 2
3: 6, 7, 4
4: 6, 3, 2

Minimum Support 


Minimum support is used to prune the associations that are less frequent.

Minimum support = number of times item occur in transactions/ number of transactions

For example, lets say we define minimum support as 0.5.
Calculating support for egg is 1/4 = 0.25 (0.25 < 0.5), so that is eliminated. Support for beer is 3/4 = 0.75 (0.75 > 0.5) is it is considered for further processing.

Calculation of support for all items

size of the candidate itemset = 1

itemset: support
1: 0.25: eliminated
2: 0.75
3: 0.75
4: 0.5
5: 0.5
6: 0.5
7: 0.25: eliminated

remaining items: 2, 3, 4, 5, 6

extend candidate itemset by 1
size of the items = 2

itemset: support
2, 3: 0.75
2, 4: 0.25: eliminated
2, 5: 0.5
2, 6: 0.25: eliminated
3, 4: 0.25: eliminated
3, 5: 0.5
3, 6: 0.25: eliminated
4, 5: 0.25: eliminated
4, 6: 0.25: eliminated
5, 6: 0.25: eliminated

remaining items: {2,3},{ 2, 5}, {3, 5}

extend candidate itemset by 1
size of the items = 3

2, 3, 5: 0.5

Using FP-Growth algorithm


In FP-Growth algorithm, frequent patterns are mined using a tree approach (construction of Frequent Patter Tree)
FP-Growth algorithm has been proven to execute much faster than the Apriori algorithm.

Calculate support for frequent items and sort in degreasing order of the frequency as given below:

item: frequency
1: 1 - eliminated
2: 3
3: 3
4: 2
5: 2
6: 2
7: 1 - eliminated

Decreasing order of the frequency
2 (3), 3 (3), 4 (2), 5 (2), 6 (2)

Construction of FP-Tree

A) Transaction 1
 1, 2, 3, 5 > 2 (1), 3 (1), 5 (1)

B) Transaction 2
4, 5, 3, 2 > 2 (2), 3 (2), 4 (1), 5 (1)

C) Transaction 3
6, 7, 4 > 4 (1), 6 (1)

D) Transaction 4
6, 3, 2 > 2 (3), 3 (3), 6 (1)

Once the FP-tree is constructed, frequent item sets are calculated using depth first strategy along with divide and conquer mechanism.
This enables algorithm is be computationally more effective and parallelizable (using map-reduce).

Code Example with Apache Spark MLlib

    public static void main(String[] args) {
   
        SparkConf conf = new SparkConf().setAppName("Market Basket Analysis");
        JavaSparkContext sc = new JavaSparkContext(conf);
       
        // Items
        String item1 = "egg";
        String item2 = "beer";
        String item3 = "pampers";
        String item4 = "carrot";
        String item5 = "milk";
        String item6 = "perfume";
        String item7 = "tissues";
       
        // Transactions
        List transaction1 = new ArrayList();
        transaction1.add(item1);
        transaction1.add(item2);
        transaction1.add(item3);
        transaction1.add(item5);

        List transaction2 = new ArrayList();
        transaction2.add(item4);
        transaction2.add(item5);
        transaction2.add(item3);
        transaction2.add(item2);
       
        List transaction3 = new ArrayList();
        transaction3.add(item6);
        transaction3.add(item7);
        transaction3.add(item4);
       
        List transaction4 = new ArrayList();
        transaction4.add(item6);
        transaction4.add(item3);
        transaction4.add(item2);
       
        List> transactions = new ArrayList>();
        transactions.add(transaction1);
        transactions.add(transaction2);
        transactions.add(transaction3);
        transactions.add(transaction4);
       
        // Make transaction collection parallel with Spark
        JavaRDD> transactionsRDD = sc.parallelize(transactions);

        // Set configurations for FP-Growth
        FPGrowth fpg = new FPGrowth()
          .setMinSupport(0.5)
          .setNumPartitions(10);
       
        // Generate model
        FPGrowthModel model = fpg.run(transactionsRDD);

        // Display frequently co-occuring items
        for (FPGrowth.FreqItemset itemset: model.freqItemsets().toJavaRDD().collect()) {
           System.out.println("[" + Joiner.on(",").join(itemset.javaItems()) + "], " + itemset.freq());
        }
        sc.close();
    }