Thursday, December 19, 2013

Topic Modeling: Infer topics for documents using Latent Dirichlet Allocation (LDA)

Introduction to Latent Dirichlet Allocation (LDA)


In LDA model, first you need to create a vocabulary on probabilistic term distribution over each topic using a set of training documents.

In a simple scenario, assume there are 2 documents in the training set and their content has following unique, important terms. (Important terms is extracted using TF vectors as I have mentioned later)

Document 1: "car", "hybrid", "Toyota"
Document 2: "birds", "parrot", "Sri Lanka"

Using the above terms, LDA creates a vocabulary on probabilistic term distribution over each topic as given below: We define that we need to form 2 topics from this training content.

Topic 1: car: 0.7,  hybrid: 0.1, Toyota: 0.1, birds: 0.02, parrot: 0.03, Sri Lanka: 0.05

Topic 1: Term-Topic distribution

Topic 2: car: 0.05,  hybrid: 0.03, Toyota: 0.02, birds: 0.4, parrot: 0.5, Sri Lanka: 0.1

Topic 2: Term-Topic distribution

The topic model is created based on above training data which will be later used for inference.

For a new document, you need to infer the probabilistic topic distribution over document. Assume the document content is as follows:

Document 3: "Toyota", "Prius", "Hybrid", "For sale", "2003"

For the above document,  probabilistic topic distribution over document will (roughly!) be a value like this:

Topic 1: 0.99, Topic 2: 0.01

Topic distribution over the new document


So, we can use the terms in the topics with high probability (E.g., car, hybrid) as metadata for the document which can be used in different applications such as search indexing, document clustering, business analytic etc.

Pre-processing 


  • Preparing input TF vectors

To bring out the important words within a document, we normally use TF-IDF vectors. However, in LDA, TF vectors are used instead of TF-IDF words to recognize the co-occurrence or correlation between words.

(In vector space model [VSM] it is assumed that occurrences of the words are independent of each other, but this assumption is wrong in many cases! n-gram generation is a solution for this problem)
    • Convert input documents to SequenceFile format

sequence file is a flat file consisting of binary key value pairs. This is used as input/ output file format for map-reduce jobs in Hadoop, which is the underlying framework which Mahout is running on.
        Configuration conf = new Configuration();
        HadoopUtil.delete(conf, new Path(infoDirectory));
        SequenceFilesFromDirectory sfd = new SequenceFilesFromDirectory();

        // input: directory contains number of text documents
        // output: the directory where the sequence files will be created
        String[] para = { "-i", targetInputDirectoryPath, "-o", sequenceFileDirectoryPath };
        sfd.run(para);
      • Convert sequence files to TF vectors

    Configuration conf = new Configuration();

    Tokenization and Analyzing


    During the tokenization, document content will be split in to set of terms/tokens. Different analyzers may use different tokenizers. Stemming and removing stop words can be done and customized in this stage. Please note that both stemming and stop words are language dependent.

    You can specify your own analyzer if you want, specifying on how you want the terms to be extracted. That has to be extended by the Lucene Analyzer class.

    StandardAnalyzer analyzer = new StandardAnalyzer(Version.LUCENE_43);

    DocumentProcessor.tokenizeDocuments(new Path(sequenceFileinputDirectoryPath + "/" + "part-m-00000"), analyzer.getClass().asSubclass(Analyzer.class),
                    new Path(infoDirectory + "/" + DocumentProcessor.TOKENIZED_DOCUMENT_OUTPUT_FOLDER), conf);
            analyzer.close();

    There are couple of important parameters for generating TF vectors.

    In mahout, DictionaryVectorizer class is used for TF weighting and n-gram collocation.

    // Minimum frequency of the term in the entire collection to be considered as part of the dictionary file. Terms with lesser frequencies are ignored.
            int minSupport = 5;

    // Maximum size of n-grams to be selected. For more information, visit:  ngram collocation in Mahout
            int maxNGramSize = 2;


    // Minimum log likelihood ratio (This is related to ngram collocation. Read more here.)
    // This work only when maxNGramSize > 1 (Less significant ngrams have lower score here)
            float minLLRValue = 50;


    // Parameters for Hadoop map reduce operations
            int reduceTasks = 1;
            int chunkSize = 200;
            boolean sequentialAccessOutput = true;

        DictionaryVectorizer.createTermFrequencyVectors(new Path(infoDirectory + DocumentProcessor.TOKENIZED_DOCUMENT_OUTPUT_FOLDER),
                    new Path(infoDirectory), DictionaryVectorizer.DOCUMENT_VECTOR_OUTPUT_FOLDER, conf, minSupport, maxNGramSize, minLLRValue,
                    -1.0f, false, reduceTasks, chunkSize, sequentialAccessOutput, true);

    Once the TF vectors are generated for each training document, the model can be created.

    Training

    • Generate term distribution for each topic and generate topic distribution for each training document 

      (Read about the CVB algorithm in mahout here.)
    CVB0Driver cvbDriver = new CVB0Driver();

    I will explain the parameters and how you need to assign them values. Before that you need to read the training dictionary in to memory as given below:

    Configuration conf = new Configuration();
            FileSystem fs = FileSystem.get(conf);
            SequenceFile.Reader reader = new SequenceFile.Reader(fs, new Path(
                    dictionaryFilePath), conf);
            Text key = new Text();
            IntWritable val = new IntWritable();
            ArrayList dictLst = new ArrayList();
            while (reader.next(key,val)) {
                System.out.println(key.toString()+" "+val.toString());
                dictLst.add(key.toString());
            }
            String[] dictionary = new String[dictLst.size()];
            dictionary = dictLst.toArray(dictionary);


    Then, you have to convert vector representation of documents to a matrix, like this.
            RowIdJob rowidjob = new RowIdJob();
            String[] para = { "-i", inputVectorPath, "-o",
                    TRAINING_DOCS_OUTPUTMATRIX_PATH };
            rowidjob.run(para);

    Now, I will explain each parameters and factors you should consider on deciding values.

    // Input path to the above created matrix using TF vectors
    Path inputPath = new Path(TRAINING_DOCS_OUTPUTMATRIX_PATH + "/matrix");

    // Path to save the model (Note: You may need this during inferring new documents)
    Path topicModelOutputPath = new Path(TRAINING_MODEL_PATH);

    // Numbe of topics (#important!) Lower value results in broader topics and higher value may result in niche topics. Optimal value for this parameter can vary depending on the given use case. Large number of topics may cause the system to slowdown.
    int numTopics = 2;

    // Number of terms in the training dictionary. Here's the method to read that:
    private static int getNumTerms(Configuration conf, Path dictionaryPath) throws IOException {
        FileSystem fs = dictionaryPath.getFileSystem(conf);
        Text key = new Text();
        IntWritable value = new IntWritable();
        int maxTermId = -1;
        for (FileStatus stat : fs.globStatus(dictionaryPath)) {
          SequenceFile.Reader reader = new SequenceFile.Reader(fs, stat.getPath(), conf);
          while (reader.next(key, value)) {
            maxTermId = Math.max(maxTermId, value.get());
          }
          reader.close();
        }
       
        return maxTermId + 1;
      }
          
    int numTerms = getNumTerms(conf, new Path(TRAINING_DOCS_ROOT_PATH + "dictionary.file-0"));

    // Smoothing parameters for p(topic|document) prior: This value can control how term topic likelihood is calculated for each document
            double alpha = 0.0001;
            double eta = 0.0001;
            int maxIterations = 10;
            int iterationBlockSize = 10;
            double convergenceDelta = 0;
            Path dictionaryPath = new Path(TRAINING_DOCS_ROOT_PATH + "dictionary.file-0");

    // Final output path for probabilistic topic distribution training documents
            Path docTopicOutputPath = new Path(TRAINING_DOCS_TOPIC_OUTPUT_PATH);

    // Temporary output path for saving models in each iteration
            Path topicModelStateTempPath = new Path(TRAINING_MODEL_TEMP_PATH);

            long randomSeed = 1;

    // This is a measurement of how well a probability distribution or probability model predicts a sample. LDA is a generative model, you start with a known model and try to explain the data by refining parameters to fit the model of the data. These values can be taken to evaluate the performance.
            boolean backfillPerplexity = false;

            int numReduceTasks = 1;
            int maxItersPerDoc = 10;
            int numUpdateThreads = 1;
            int numTrainThreads = 4;
            float testFraction = 0;

            cvbDriver.run(conf, inputPath, topicModelOutputPath,
                    numTopics, numTerms, alpha, eta, maxIterations, iterationBlockSize, convergenceDelta, dictionaryPath, docTopicOutputPath, topicModelStateTempPath, randomSeed, testFraction, numTrainThreads, numUpdateThreads, maxItersPerDoc, numReduceTasks, backfillPerplexity)    ;

    Once this step is completed the training phase of topic modeling is over. Now, lets see how to infer new documents using the trained model.
    • Topic Inference for new document

    To infer topic distribution for new document, you need to follow the same steps for the new document which I have mentioned earlier.
      • Pre-processing - stop word removal
      • Convert the document to sequence file format
      • Convert the content in the sequence file to TF vectors
    There is an important step here, (Even I missed this step at the first time and got wrong results as the outcome :( )

    We need to map the new document's dictionary with the training documents' dictionary and identify the common terms, that appears in both. Then, a TF vector needs to be created for the new document with the cardinality of training documents' dictionary. This is how you should do that.

            //Get the model dictionary file
                    HashMap modelDictionary = new HashMap<>();
                    SequenceFile.Reader reader = new SequenceFile.Reader(fs, new Path("reuters-dir/dictionary.file-0"), conf);
                    Text keyModelDict = new Text();
                    IntWritable valModelDict = new IntWritable();
                    int cardinality = 0;
                    while(reader.next(keyModelDict, valModelDict)){
                        cardinality++;
                        modelDictionary.put(keyModelDict.toString(), Integer.parseInt(valModelDict.toString()));
                    }   
                   
                    RandomAccessSparseVector newDocVector = new RandomAccessSparseVector(cardinality);
                   
                    reader.close();
                   
            //Get the new document dictionary file
                    ArrayList newDocDictionaryWords = new ArrayList<>();
                    reader = new SequenceFile.Reader(fs, new Path("reuters-test-dir/dictionary.file-0"), conf);
                    Text keyNewDict = new Text();
                    IntWritable newVal = new IntWritable();
                    while(reader.next(keyNewDict,newVal)){
                        System.out.println("Key: "+keyNewDict.toString()+" Val: "+newVal);
                        newDocDictionaryWords.add(keyNewDict.toString());
                    }
                   
                    //Get the document frequency count of the new vector
                    HashMap newDocTermFreq = new HashMap<>();
                    reader = new SequenceFile.Reader(fs, new Path("reuters-test-dir/wordcount/ngrams/part-r-00000"), conf);
                    Text keyTFNew = new Text();
                    DoubleWritable valTFNew = new DoubleWritable();
                    while(reader.next(keyTFNew, valTFNew)){
                        newDocTermFreq.put(keyTFNew.toString(), Double.parseDouble(valTFNew.toString()));
                    }
                   
                    //perform the process of term frequency vector creation
                    for (String string : newDocDictionaryWords) {
                        if(modelDictionary.containsKey(string)){
                            int index = modelDictionary.get(string);
                            double tf = newDocTermFreq.get(string);
                            newDocVector.set(index, tf);
                        }
                    }
                    System.out.println(newDocVector.asFormatString());

      • Read the model (Term distribution for each topic) 
     // Dictionary is the training dictionary

        double alpha = 0.0001; // default: doc-topic smoothing
        double eta = 0.0001; // default: term-topic smoothing
        double modelWeight = 1f;

    TopicModel model = new TopicModel(conf, eta, alpha, dictionary, 1, modelWeight, TRAINING_MODEL_PATH));
      • Infer topic distribution for the new document
    The final result, which is probabilistic topic distribution over new document will be stored  in this vector
    If you have a prior guess as to what the topic distribution should be, you can start with it here, instead of the uniform prior

            Vector docTopics = new DenseVector(new double[model.getNumTopics()]).assign(1.0/model.getNumTopics());

    Empty matrix holding intermediate data - Term-Topic likelihoods for each term in the new document will be stored here.

            Matrix docTopicModel = new SparseRowMatrix(model.getNumTopics(), newDocVector.size());

     int maxIters = 100;
            for(int i = 0; i < maxIters; i++) {
                model.trainDocTopicModel(newDocVector, docTopics, docTopicModel);
            }
        model.stop();

    To be continued...

    References: Mahout In Action, Wikipedia

    Wednesday, December 18, 2013

    How to resolve "import java.neo.file cannot be resolved" error?

    Ypu will get the "import java.neo.file cannot be resolved" error with following imports:
    import java.nio.file.Files;
    import java.nio.file.Paths;

    To resolve that do the following:
    Right click on the project > Properties > Java Compiler > Set Compiler compliance level as 1.7

    Refresh the project

    Tuesday, December 17, 2013

    Preview not responding in Mavericks

    To resolve the issue do the following:
    1. Force quit the non responding preview application
    2. In Finder, Go > Go to folder
    3. Type the following path in Go to the folder: ~/Library/
    4. Check com.apple.Preview. * folders/ files in the following folders and move to trash if any:
      1. Cache
      2. Containers
      3. Preferences
      4. Saved Application State
    5. Then, restart the computer

    Thursday, December 12, 2013

    Difference between Topic Modeling and Document Clustering

    Topic modeling is one way of implementing clustering for a document collection. In this article, by the term "clustering" I mean a popular clustering mechanism such as K-means, fuzzy K-means etc.

    So, the difference is the way how these both mechanisms have been implemented. Even though both of them  returns similar type of outcome, the actual data/ knowledge embedded in the outcome can be different.

    In topic modeling, each document is represented as a distribution of topics. And essentially, topic is a probability distribution over words. As opposed to topic modelling, in document clustering, cluster is composed of collection of documents. (not topics)

    .. to be continued!



    Issues with examples in Mahout In Action (Hello World program for clustering) with mahout 0.9

    I encountered following issues and here's how I fixed them:

    The method getIdentifier() is undefined for the type Cluster:

    Exception in thread "main" java.io.IOException: wrong value class: org.apache.mahout.clustering.kmeans.Kluster is not interface org.apache.mahout.clustering.Cluster

    Replace the error code with the following:

               SequenceFile.Writer writer
                  = new SequenceFile.Writer(
                      fs, conf,      path, Text.class, Kluster.class);

                Kluster cluster = new Kluster(vec, i, new EuclideanDistanceMeasure());
                writer.append(new Text(cluster.getIdentifier()), cluster);

    Exception in thread "main" java.io.IOException: wrong value class: 0.0: null is not class org.apache.mahout.clustering.classify.WeightedPropertyVectorWritable

    Replace import org.apache.mahout.clustering.classify.WeightedVectorWritable; with import org.apache.mahout.clustering.classify.WeightedPropertyVectorWritable; and the replaces the related types as well.

    The corrected code can be found here.

    package org.apache.mahout.jaytest;

    import java.io.File;
    import java.io.IOException;
    import java.util.ArrayList;
    import java.util.List;

    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.SequenceFile;
    import org.apache.hadoop.io.Text;
    import org.apache.mahout.clustering.Cluster;
    import org.apache.mahout.clustering.classify.WeightedPropertyVectorWritable;

    import org.apache.mahout.clustering.kmeans.KMeansDriver;
    import org.apache.mahout.clustering.kmeans.Kluster;
    import org.apache.mahout.common.distance.EuclideanDistanceMeasure;
    import org.apache.mahout.math.RandomAccessSparseVector;
    import org.apache.mahout.math.Vector;
    import org.apache.mahout.math.VectorWritable;

    public class helloworld {
       
        public static final double[][] points = { {1, 1}, {2, 1}, {1, 2},
            {2, 2}, {3, 3}, {8, 8},
            {9, 8}, {8, 9}, {9, 9}};
       
        // Write data to sequence files in Hadoop (write the vector to sequence file)
        public static void writePointsToFile(List points, String fileName,
                FileSystem fs,
                Configuration conf) throws IOException {
           
                        Path path = new Path(fileName);
                        SequenceFile.Writer writer = new SequenceFile.Writer(fs, conf,
                        path, LongWritable.class, VectorWritable.class);
                        long recNum = 0;
                        VectorWritable vec = new VectorWritable();
                       
                        for (Vector point : points) {
                            vec.set(point);
                            writer.append(new LongWritable(recNum++), vec);
                        }
                       
                        writer.close();
        }
       
        // Read the points to vector from 2D array
        public static List getPoints(double[][] raw) {
              List points = new ArrayList();
              for (int i = 0; i < raw.length; i++) {
                double[] fr = raw[i];
                Vector vec = new RandomAccessSparseVector(fr.length);
                vec.assign(fr);
                points.add(vec);
            }
              return points;
            }
       
        public static void main(String args[]) throws Exception {
           
            // specify the number of clusters
            int k = 2;
           
            // read the values (features) - generate vectors from input data
              List vectors = getPoints(points);
             
              // Create input directories for data
              File testData = new File("testdata");
             
              if (!testData.exists()) {
                testData.mkdir();
              }
              testData = new File("testdata/points");
              if (!testData.exists()) {
                testData.mkdir();
              }
             
              // Write initial centers
              Configuration conf = new Configuration();
             
              FileSystem fs = FileSystem.get(conf);

              // Write vectors to input directory
              writePointsToFile(vectors,
                  "testdata/points/file1", fs, conf);
             
              Path path = new Path("testdata/clusters/part-00000");
             
              SequenceFile.Writer writer
                  = new SequenceFile.Writer(
                      fs, conf,      path, Text.class, Kluster.class);
             
              for (int i = 0; i < k; i++) {
                Vector vec = vectors.get(i);
               
                // write the initial center here as vec
                Kluster cluster = new Kluster(vec, i, new EuclideanDistanceMeasure());
                writer.append(new Text(cluster.getIdentifier()), cluster);
              }
             
              writer.close();
             
              // Run K-means algorithm
            KMeansDriver.run(conf, new Path("testdata/points"), new Path("testdata/clusters"),
            new Path("output"), new EuclideanDistanceMeasure(),
                    0.001, 10, true, 0, false);
              SequenceFile.Reader reader
                  = new SequenceFile.Reader(fs,
                      new Path("output/" + Cluster.CLUSTERED_POINTS_DIR
                          + "/part-m-00000"), conf);
            IntWritable key = new IntWritable();
           
            // Read output values
            WeightedPropertyVectorWritable value = new WeightedPropertyVectorWritable(); while (reader.next(key, value)) {
                System.out.println(
                    value.toString() + " belongs to cluster "
                        + key.toString());
            }
              reader.close();
        }

    }