I have been working on writing a book on Machine Learning, namely “Apache Mahout Essentials" for about 6 months, which was published recently by Packt Publishing - UK.
I’m sharing my experience in this article, as it may help others who want to pursue the same.
So, I got an invitation to write a book, what’s next?
When I got an email from Shaon (Acquisition Editor at Packt Publishing) to write a book, I immediately replied saying that I’m currently occupied (if not overloaded) with MSc and office work and I won’t be able to do that. Then, Shaon again approached me saying they can give flexible timelines for chapter deliverables and asked me to give it a second thought.
Then I spoke to Abi, with three possible options in my hand and one was “not writing the book” which she straight away eliminated saying that “even writing a bed time story book itself is something she won’t miss out”.
Also, I spoke to Rafa, who was the Head of Research at Zaizi sometime back. He assured that I can do this and gave an advice which was just three words but helped me vastly though out the journey of writing the book. “Step by step!”
So, I want to emphasis the fact that, even though I’m getting some recognition on writing a book, if it wasn’t for these people it would have been just a rejected invitation. I have no words to explain my gratitude for them for the motivation they provided.
From my side, the steady and compelling reason to start writing this book is my unquenchable curiosity about machine learning and the desire to learn more.
Yup, decided to go ahead and try out, But still…
So, I started writing and within no time I realised that this is not as easy as I imagined.
One reason is I was following MSc in Artificial Intelligence where we had to follow 4 modules in 8 weeks (and the following week exams! - no study leave) and we had lectures during entire week end 8-5 (Those who went through this struggle will realise the pain ;)). Apart from that, I was working full time as well. To make the situation even worse, I had to travel for 2 hours daily as I stayed out of Colombo.
So, I decided to utilise the travel time effectively and I was reading the required content using my smart phone even if I’m standing in a crowded train. There was a time which I worked almost all the hours continuously. As a result, I got stressed out and most of the time I was sick.
This is where "focusing one thing at a time" helped me, as it was so overwhelming to think about all the items in my “things-to-do” list. Also, I planned out the structure and the content before start writing, with fresh mindset. And then I spent all night before the deadline finalising everything.
However, regardless of the problems that came along my way, I was determined to complete what I started. I remember one day I was having a terrible ear infection and still I was struggling to meet a chapter deadline until 3 a.m.
Shaon and Nikhil (Content editor at Packt Publishing) were working with me during this time and they were kind enough to give me flexible chapter deadlines which will not overlap with my university exams.
Finally, it all worth the effort!
The book went through several stages of reviews/ revisions etc. before publishing and the happiest of all was the time I completed all the first drafts.
And the next may be getting the opportunity to decide an image with n-Shades of Grey as the cover page. ;)
Reading has been my favourite and consistent hobby since my childhood, yet I was unaware of the publishing process a book has to go through before it reaches reader’s hands. So, getting to know the process itself was another exciting factor.
In addition to learning and writing about ML concepts, planning out on how to structure and present the content to ensure others can understand was a novel experience as well.
Finally, writing a book was one of the bucket list item in my life and it turned out to be immensely rewarding that exceeded my expectations.
However, this is just one milestone in the long journey of machine learning. There is lot to learn, lot to experience and lot of things that needs to get better :)
Showing posts with label Apache Mahout. Show all posts
Showing posts with label Apache Mahout. Show all posts
Saturday, February 13, 2016
Monday, March 30, 2015
org.apache.hadoop.mapred.YarnChild: Exception running child : java.net.ConnectException: Connection refused
I got the following error (in Hadoop user logs) while trying to run Mahout map reduce job in Hadoop (fully distribution mode):
2015-03-25 08:31:52,858 WARN [main] org.apache.hadoop.mapred.YarnChild: Exception running child : java.net.ConnectException: Call From slave01.net/127.0.1.1 to slave01.net:60926 failed on connection exception: java.net.ConnectException: Connection refused; For more details see: http://wiki.apache.org/hadoop/ConnectionRefused
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
at org.apache.hadoop.net.NetUtils.wrapWithMessage(NetUtils.java:791)
at org.apache.hadoop.net.NetUtils.wrapException(NetUtils.java:731)
at org.apache.hadoop.ipc.Client.call(Client.java:1472)
at org.apache.hadoop.ipc.Client.call(Client.java:1399)
at org.apache.hadoop.ipc.WritableRpcEngine$Invoker.invoke(WritableRpcEngine.java:244)
at com.sun.proxy.$Proxy7.getTask(Unknown Source)
at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:132)
Caused by: java.net.ConnectException: Connection refused
at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:739)
at org.apache.hadoop.net.SocketIOWithTimeout.connect(SocketIOWithTimeout.java:206)
at org.apache.hadoop.net.NetUtils.connect(NetUtils.java:530)
at org.apache.hadoop.net.NetUtils.connect(NetUtils.java:494)
at org.apache.hadoop.ipc.Client$Connection.setupConnection(Client.java:607)
at org.apache.hadoop.ipc.Client$Connection.setupIOstreams(Client.java:705)
at org.apache.hadoop.ipc.Client$Connection.access$2800(Client.java:368)
at org.apache.hadoop.ipc.Client.getConnection(Client.java:1521)
at org.apache.hadoop.ipc.Client.call(Client.java:1438)
I could solve this issue by,
Replacing 127.0.1.1 host name mapping to permanent IP as given below:
33.33.33.10 master
2015-03-25 08:31:52,858 WARN [main] org.apache.hadoop.mapred.YarnChild: Exception running child : java.net.ConnectException: Call From slave01.net/127.0.1.1 to slave01.net:60926 failed on connection exception: java.net.ConnectException: Connection refused; For more details see: http://wiki.apache.org/hadoop/ConnectionRefused
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
at org.apache.hadoop.net.NetUtils.wrapWithMessage(NetUtils.java:791)
at org.apache.hadoop.net.NetUtils.wrapException(NetUtils.java:731)
at org.apache.hadoop.ipc.Client.call(Client.java:1472)
at org.apache.hadoop.ipc.Client.call(Client.java:1399)
at org.apache.hadoop.ipc.WritableRpcEngine$Invoker.invoke(WritableRpcEngine.java:244)
at com.sun.proxy.$Proxy7.getTask(Unknown Source)
at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:132)
Caused by: java.net.ConnectException: Connection refused
at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:739)
at org.apache.hadoop.net.SocketIOWithTimeout.connect(SocketIOWithTimeout.java:206)
at org.apache.hadoop.net.NetUtils.connect(NetUtils.java:530)
at org.apache.hadoop.net.NetUtils.connect(NetUtils.java:494)
at org.apache.hadoop.ipc.Client$Connection.setupConnection(Client.java:607)
at org.apache.hadoop.ipc.Client$Connection.setupIOstreams(Client.java:705)
at org.apache.hadoop.ipc.Client$Connection.access$2800(Client.java:368)
at org.apache.hadoop.ipc.Client.getConnection(Client.java:1521)
at org.apache.hadoop.ipc.Client.call(Client.java:1438)
I could solve this issue by,
Replacing 127.0.1.1 host name mapping to permanent IP as given below:
33.33.33.10 master
Tuesday, February 10, 2015
java.io.IOException: No FileSystem for scheme: HDFS
To solve the above issue, add the following to hadoop-2.6.0/etc/hadoop/core-site.xml:
fs.hdfs.impl
org.apache.hadoop.hdfs. DistributedFileSystem
The FileSystem for hdfs: uris.
Thursday, December 19, 2013
Topic Modeling: Infer topics for documents using Latent Dirichlet Allocation (LDA)
Introduction to Latent Dirichlet Allocation (LDA)
In LDA model, first you need to create a vocabulary on probabilistic term distribution over each topic using a set of training documents.
In a simple scenario, assume there are 2 documents in the training set and their content has following unique, important terms. (Important terms is extracted using TF vectors as I have mentioned later)
Document 1: "car", "hybrid", "Toyota"
Document 2: "birds", "parrot", "Sri Lanka"
Using the above terms, LDA creates a vocabulary on probabilistic term distribution over each topic as given below: We define that we need to form 2 topics from this training content.
Topic 1: car: 0.7, hybrid: 0.1, Toyota: 0.1, birds: 0.02, parrot: 0.03, Sri Lanka: 0.05
![]() |
Topic 1: Term-Topic distribution |
Topic 2: car: 0.05, hybrid: 0.03, Toyota: 0.02, birds: 0.4, parrot: 0.5, Sri Lanka: 0.1
![]() |
Topic 2: Term-Topic distribution |
The topic model is created based on above training data which will be later used for inference.
For a new document, you need to infer the probabilistic topic distribution over document. Assume the document content is as follows:
Document 3: "Toyota", "Prius", "Hybrid", "For sale", "2003"
For the above document, probabilistic topic distribution over document will (roughly!) be a value like this:
Topic 1: 0.99, Topic 2: 0.01
![]() |
Topic distribution over the new document |
So, we can use the terms in the topics with high probability (E.g., car, hybrid) as metadata for the document which can be used in different applications such as search indexing, document clustering, business analytic etc.
Pre-processing
Preparing input TF vectors
(In vector space model [VSM] it is assumed that occurrences of the words are independent of each other, but this assumption is wrong in many cases! n-gram generation is a solution for this problem)
Convert input documents to SequenceFile format
HadoopUtil.delete(conf, new Path(infoDirectory));
SequenceFilesFromDirectory sfd = new SequenceFilesFromDirectory();
// input: directory contains number of text documents
// output: the directory where the sequence files will be created
String[] para = { "-i", targetInputDirectoryPath, "-o", sequenceFileDirectoryPath };
sfd.run(para);
Convert sequence files to TF vectors
Tokenization and Analyzing
During the tokenization, document content will be split in to set of terms/tokens. Different analyzers may use different tokenizers. Stemming and removing stop words can be done and customized in this stage. Please note that both stemming and stop words are language dependent.
You can specify your own analyzer if you want, specifying on how you want the terms to be extracted. That has to be extended by the Lucene Analyzer class.
StandardAnalyzer analyzer = new StandardAnalyzer(Version.LUCENE_43);
DocumentProcessor.tokenizeDocuments(new Path(sequenceFileinputDirectoryPath + "/" + "part-m-00000"), analyzer.getClass().asSubclass(Analyzer.class),
new Path(infoDirectory + "/" + DocumentProcessor.TOKENIZED_DOCUMENT_OUTPUT_FOLDER), conf);
analyzer.close();
There are couple of important parameters for generating TF vectors.
In mahout, DictionaryVectorizer class is used for TF weighting and n-gram collocation.
// Minimum frequency of the term in the entire collection to be considered as part of the dictionary file. Terms with lesser frequencies are ignored.
int minSupport = 5;
// Maximum size of n-grams to be selected. For more information, visit: ngram collocation in Mahout
int maxNGramSize = 2;
// Minimum log likelihood ratio (This is related to ngram collocation. Read more here.)
// This work only when maxNGramSize > 1 (Less significant ngrams have lower score here)
float minLLRValue = 50;
// Parameters for Hadoop map reduce operations
int reduceTasks = 1;
int chunkSize = 200;
boolean sequentialAccessOutput = true;
DictionaryVectorizer.createTermFrequencyVectors(new Path(infoDirectory + DocumentProcessor.TOKENIZED_DOCUMENT_OUTPUT_FOLDER),
new Path(infoDirectory), DictionaryVectorizer.DOCUMENT_VECTOR_OUTPUT_FOLDER, conf, minSupport, maxNGramSize, minLLRValue,
-1.0f, false, reduceTasks, chunkSize, sequentialAccessOutput, true);
Once the TF vectors are generated for each training document, the model can be created.
Training
Generate term distribution for each topic and generate topic distribution for each training document
(Read about the CVB algorithm in mahout here.)
I will explain the parameters and how you need to assign them values. Before that you need to read the training dictionary in to memory as given below:
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(conf);
SequenceFile.Reader reader = new SequenceFile.Reader(fs, new Path(
dictionaryFilePath), conf);
Text key = new Text();
IntWritable val = new IntWritable();
ArrayList
while (reader.next(key,val)) {
System.out.println(key.toString()+" "+val.toString());
dictLst.add(key.toString());
}
String[] dictionary = new String[dictLst.size()];
dictionary = dictLst.toArray(dictionary);
Then, you have to convert vector representation of documents to a matrix, like this.
RowIdJob rowidjob = new RowIdJob();
String[] para = { "-i", inputVectorPath, "-o",
TRAINING_DOCS_OUTPUTMATRIX_PATH };
rowidjob.run(para);
Now, I will explain each parameters and factors you should consider on deciding values.
// Input path to the above created matrix using TF vectors
Path inputPath = new Path(TRAINING_DOCS_OUTPUTMATRIX_PATH + "/matrix");
// Path to save the model (Note: You may need this during inferring new documents)
Path topicModelOutputPath = new Path(TRAINING_MODEL_PATH);
// Numbe of topics (#important!) Lower value results in broader topics and higher value may result in niche topics. Optimal value for this parameter can vary depending on the given use case. Large number of topics may cause the system to slowdown.
int numTopics = 2;
// Number of terms in the training dictionary. Here's the method to read that:
private static int getNumTerms(Configuration conf, Path dictionaryPath) throws IOException {
FileSystem fs = dictionaryPath.getFileSystem(conf);
Text key = new Text();
IntWritable value = new IntWritable();
int maxTermId = -1;
for (FileStatus stat : fs.globStatus(dictionaryPath)) {
SequenceFile.Reader reader = new SequenceFile.Reader(fs, stat.getPath(), conf);
while (reader.next(key, value)) {
maxTermId = Math.max(maxTermId, value.get());
}
reader.close();
}
return maxTermId + 1;
}
int numTerms = getNumTerms(conf, new Path(TRAINING_DOCS_ROOT_PATH + "dictionary.file-0"));
// Smoothing parameters for p(topic|document) prior: This value can control how term topic likelihood is calculated for each document
double alpha = 0.0001;
double eta = 0.0001;
int maxIterations = 10;
int iterationBlockSize = 10;
double convergenceDelta = 0;
Path dictionaryPath = new Path(TRAINING_DOCS_ROOT_PATH + "dictionary.file-0");
// Final output path for probabilistic topic distribution training documents
Path docTopicOutputPath = new Path(TRAINING_DOCS_TOPIC_OUTPUT_PATH);
// Temporary output path for saving models in each iteration
Path topicModelStateTempPath = new Path(TRAINING_MODEL_TEMP_PATH);
long randomSeed = 1;
// This is a measurement of how well a probability distribution or probability model predicts a sample. LDA is a generative model, you start with a known model and try to explain the data by refining parameters to fit the model of the data. These values can be taken to evaluate the performance.
boolean backfillPerplexity = false;
int numReduceTasks = 1;
int maxItersPerDoc = 10;
int numUpdateThreads = 1;
int numTrainThreads = 4;
float testFraction = 0;
cvbDriver.run(conf, inputPath, topicModelOutputPath,
numTopics, numTerms, alpha, eta, maxIterations, iterationBlockSize, convergenceDelta, dictionaryPath, docTopicOutputPath, topicModelStateTempPath, randomSeed, testFraction, numTrainThreads, numUpdateThreads, maxItersPerDoc, numReduceTasks, backfillPerplexity) ;
Once this step is completed the training phase of topic modeling is over. Now, lets see how to infer new documents using the trained model.
Topic Inference for new document
- Pre-processing - stop word removal
- Convert the document to sequence file format
- Convert the content in the sequence file to TF vectors
We need to map the new document's dictionary with the training documents' dictionary and identify the common terms, that appears in both. Then, a TF vector needs to be created for the new document with the cardinality of training documents' dictionary. This is how you should do that.
//Get the model dictionary file
HashMap
SequenceFile.Reader reader = new SequenceFile.Reader(fs, new Path("reuters-dir/dictionary.file-0"), conf);
Text keyModelDict = new Text();
IntWritable valModelDict = new IntWritable();
int cardinality = 0;
while(reader.next(keyModelDict, valModelDict)){
cardinality++;
modelDictionary.put(keyModelDict.toString(), Integer.parseInt(valModelDict.toString()));
}
RandomAccessSparseVector newDocVector = new RandomAccessSparseVector(cardinality);
reader.close();
//Get the new document dictionary file
ArrayList
reader = new SequenceFile.Reader(fs, new Path("reuters-test-dir/dictionary.file-0"), conf);
Text keyNewDict = new Text();
IntWritable newVal = new IntWritable();
while(reader.next(keyNewDict,newVal)){
System.out.println("Key: "+keyNewDict.toString()+" Val: "+newVal);
newDocDictionaryWords.add(keyNewDict.toString());
}
//Get the document frequency count of the new vector
HashMap
reader = new SequenceFile.Reader(fs, new Path("reuters-test-dir/wordcount/ngrams/part-r-00000"), conf);
Text keyTFNew = new Text();
DoubleWritable valTFNew = new DoubleWritable();
while(reader.next(keyTFNew, valTFNew)){
newDocTermFreq.put(keyTFNew.toString(), Double.parseDouble(valTFNew.toString()));
}
//perform the process of term frequency vector creation
for (String string : newDocDictionaryWords) {
if(modelDictionary.containsKey(string)){
int index = modelDictionary.get(string);
double tf = newDocTermFreq.get(string);
newDocVector.set(index, tf);
}
}
System.out.println(newDocVector.asFormatString());
- Read the model (Term distribution for each topic)
double alpha = 0.0001; // default: doc-topic smoothing
double eta = 0.0001; // default: term-topic smoothing
double modelWeight = 1f;
TopicModel model = new TopicModel(conf, eta, alpha, dictionary, 1, modelWeight, TRAINING_MODEL_PATH));
- Infer topic distribution for the new document
If you have a prior guess as to what the topic distribution should be, you can start with it here, instead of the uniform prior
Vector docTopics = new DenseVector(new double[model.getNumTopics()]).assign(1.0/model.getNumTopics());
Empty matrix holding intermediate data - Term-Topic likelihoods for each term in the new document will be stored here.
Matrix docTopicModel = new SparseRowMatrix(model.getNumTopics(), newDocVector.size());
int maxIters = 100;
for(int i = 0; i < maxIters; i++) {
model.trainDocTopicModel(newDocVector, docTopics, docTopicModel);
}
model.stop();
To be continued...
References: Mahout In Action, Wikipedia
Thursday, December 12, 2013
Issues with examples in Mahout In Action (Hello World program for clustering) with mahout 0.9
I encountered following issues and here's how I fixed them:
The method getIdentifier() is undefined for the type Cluster:
Exception in thread "main" java.io.IOException: wrong value class: org.apache.mahout.clustering.kmeans.Kluster is not interface org.apache.mahout.clustering.Cluster
Replace the error code with the following:
SequenceFile.Writer writer
= new SequenceFile.Writer(
fs, conf, path, Text.class, Kluster.class);
Kluster cluster = new Kluster(vec, i, new EuclideanDistanceMeasure());
writer.append(new Text(cluster.getIdentifier()), cluster);
Exception in thread "main" java.io.IOException: wrong value class: 0.0: null is not class org.apache.mahout.clustering.classify.WeightedPropertyVectorWritable
Replace import org.apache.mahout.clustering.classify.WeightedVectorWritable; with import org.apache.mahout.clustering.classify.WeightedPropertyVectorWritable; and the replaces the related types as well.
The corrected code can be found here.
The method getIdentifier() is undefined for the type Cluster:
Exception in thread "main" java.io.IOException: wrong value class: org.apache.mahout.clustering.kmeans.Kluster is not interface org.apache.mahout.clustering.Cluster
Replace the error code with the following:
SequenceFile.Writer writer
= new SequenceFile.Writer(
fs, conf, path, Text.class, Kluster.class);
Kluster cluster = new Kluster(vec, i, new EuclideanDistanceMeasure());
writer.append(new Text(cluster.getIdentifier()), cluster);
Exception in thread "main" java.io.IOException: wrong value class: 0.0: null is not class org.apache.mahout.clustering.classify.WeightedPropertyVectorWritable
Replace import org.apache.mahout.clustering.classify.WeightedVectorWritable; with import org.apache.mahout.clustering.classify.WeightedPropertyVectorWritable; and the replaces the related types as well.
The corrected code can be found here.
package org.apache.mahout.jaytest;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
import org.apache.mahout.clustering.Cluster;
import org.apache.mahout.clustering.classify.WeightedPropertyVectorWritable;
import org.apache.mahout.clustering.kmeans.KMeansDriver;
import org.apache.mahout.clustering.kmeans.Kluster;
import org.apache.mahout.common.distance.EuclideanDistanceMeasure;
import org.apache.mahout.math.RandomAccessSparseVector;
import org.apache.mahout.math.Vector;
import org.apache.mahout.math.VectorWritable;
public class helloworld {
public static final double[][] points = { {1, 1}, {2, 1}, {1, 2},
{2, 2}, {3, 3}, {8, 8},
{9, 8}, {8, 9}, {9, 9}};
// Write data to sequence files in Hadoop (write the vector to sequence file)
public static void writePointsToFile(Listpoints, String fileName,
FileSystem fs,
Configuration conf) throws IOException {
Path path = new Path(fileName);
SequenceFile.Writer writer = new SequenceFile.Writer(fs, conf,
path, LongWritable.class, VectorWritable.class);
long recNum = 0;
VectorWritable vec = new VectorWritable();
for (Vector point : points) {
vec.set(point);
writer.append(new LongWritable(recNum++), vec);
}
writer.close();
}
// Read the points to vector from 2D array
public static ListgetPoints(double[][] raw) {
Listpoints = new ArrayList ();
for (int i = 0; i < raw.length; i++) {
double[] fr = raw[i];
Vector vec = new RandomAccessSparseVector(fr.length);
vec.assign(fr);
points.add(vec);
}
return points;
}
public static void main(String args[]) throws Exception {
// specify the number of clusters
int k = 2;
// read the values (features) - generate vectors from input data
Listvectors = getPoints(points);
// Create input directories for data
File testData = new File("testdata");
if (!testData.exists()) {
testData.mkdir();
}
testData = new File("testdata/points");
if (!testData.exists()) {
testData.mkdir();
}
// Write initial centers
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(conf);
// Write vectors to input directory
writePointsToFile(vectors,
"testdata/points/file1", fs, conf);
Path path = new Path("testdata/clusters/part-00000");
SequenceFile.Writer writer
= new SequenceFile.Writer(
fs, conf, path, Text.class, Kluster.class);
for (int i = 0; i < k; i++) {
Vector vec = vectors.get(i);
// write the initial center here as vec
Kluster cluster = new Kluster(vec, i, new EuclideanDistanceMeasure());
writer.append(new Text(cluster.getIdentifier()), cluster);
}
writer.close();
// Run K-means algorithm
KMeansDriver.run(conf, new Path("testdata/points"), new Path("testdata/clusters"),
new Path("output"), new EuclideanDistanceMeasure(),
0.001, 10, true, 0, false);
SequenceFile.Reader reader
= new SequenceFile.Reader(fs,
new Path("output/" + Cluster.CLUSTERED_POINTS_DIR
+ "/part-m-00000"), conf);
IntWritable key = new IntWritable();
// Read output values
WeightedPropertyVectorWritable value = new WeightedPropertyVectorWritable(); while (reader.next(key, value)) {
System.out.println(
value.toString() + " belongs to cluster "
+ key.toString());
}
reader.close();
}
}
Subscribe to:
Posts (Atom)