In this programming assignment, you are going to compute tf-idf weights for a small dataset in Stratosphere. You will be introduced to the main concepts in the respective subtasks.
The following figure gives an overview of what we are going to do in each task:
You are first going to calculate the document frequency. That is, in how many documents each distinct word occurs. So if a document contains a word three times, it is counted only once for this document. But if another document contains the word as well, the overall frequency is two.
Besides this, you need to do some data cleansing on the input data. That is, accept only terms (words) that are alphanumerical. There is also a list of stopwords (provided by us) which should be rejected.
To achieve this, you need to implement a Mapper and a Reducer. The input file is a regular text file that contains a line for each document.
The schema of the input file looks like this:
docid, document contents
The input text data will first apply a flatMap and later a reduce method. These are both overrides under respectively a mapper class that extends the abstract class FlatMapFunction and a reducer class that extends the abstract class GroupReduceFunction.
Remember to "group by" the correct index. (In our case the first field: the string representing the splitted words)
Further instructions:
-The mapper class: DocumentFrequencyMapper
You will need to split the input line so that only alphanumerical words are accepted. Remember to remove the document id. (Hint: In case that the length of the docId is unknown, you can opt to use a scanner that uses "," as its delimiter.)
Next, you will need to check if the individual terms are in the HashSet (Util.STOP_WORDS
) of stopwords. You do not need to do additional extra checks because we already checked the input.
You should also use a HashSet to check if the word already occured for the given document. Remember, repeted words in the same docuement are only counted once.
The last step is to emit all words with a frequency count of 1. (Hint: the tuple's second field type is Integer.) The output collectors can be used as follows:
<nameOfCollector>.collect(new Tuple2<String, Integer>(... , ...))
-The reducer class: DocumentFrequencyReducer
The reducer will take over the mapper. It uses an iterator to go through the previously collected tuple. While doing so, it adds up the number of times a word has occured.
The result is then sent to the collector.
out.collect(new Tuple2<String, Integer>(key, intSum));
Use the provided main()
method to test your code.
Task #1 is quite similar to the classical WordCount example, which in Big Data is analogous to "Hello World".
Implement a second Mapper that also reads in the documents from the same file.
This time, the output tuples shall look like this (docid, word, frequency)
.
You should use the same splitter for the input line as in Task #1. But this time, use the docid
.
Use a HashMap to identify the frequency of each word in the document.
The result will as usual be outputted to the collector:
out.collect(new Tuple3<Integer, String, Integer>(... , ... , ...);
Stratosphere comes with a variety of build-in datatypes. For this assignment, you are going to implement your own. This step is a preparation for the last one. We want to store a WeightVector
.
The datatype has an add(String, double)
method to store the weight of each word.
This class can be viewed as the plan. It brings together four main classes: - the previously implemented DocumentFrequency and TermFrequency - the newly created TfIdCount which is a class extending JoinFunction (see USER FUNCTIONS section) - the provided ReduceToVector class.
Further instructions:
TfIdCount outputs a new tuple with the following field types
Tuple3<Integer, String, Double>
You will need to override the join method.
The following pseudo code describes what the join does.
join( (word, df), (docid, word, tf)) {
tf_idf(word) = tf * log [Util.NUM_DOCUMENTS/df]
return (docid, word, tf_idf(word))
}
-ReduceToVector
This reduce task takes the output of the join and groups it by the document ids (docid
).
Write the document id and the terms including their weight into the WeightVector
you have created in the previous task.
Remember to uncomment the provided code.
Then implement the reduceGroup method accordingly in TfId.