Extending/Wrapping Hadoop InputFormat

While the database files that IMDb publishes every 2 weeks are great for converting into a domain model where the Movie POJO stands central, the actors and actresses files are somewhat a burden to import since they are not movie centric.

First I devised a movie centric domain model:

public class Movie implements Transmutable {
    private Long id;
    private String title;
    private Set<Year> year = new HashSet<Year>();
    private String plot;
    private Set<String> keywords = new HashSet<String>();
    private Double rating;
    private Integer votes;
    private VoteDistribution voteDistribution;
    private Set<Genre> genres = new HashSet<Genre>();
    private String country;
    private Set<Actor> actors = new HashSet<Actor>();
    private Set<Director> directors = new HashSet<Director>();

Because of the fact that I have a ManyToMany between Movie and Actor and I already imported all the movie-data into my database it would be query-intensive to retrieve all the movies of an actor to simply attach the actor to the movie and save the movie. This approach would mean a select on the movie and subsequently an insert of the actor and insert of the movie_actor link table. So we get a select-statement per actor-insert.

Now take a look at a small portion of the actors.list file:

't Hoen, Daniël		Zonde (2010)  [Yorgos Znapoudos]  <2>
't Hoen, Frans		De gebroken kruik (1958) (TV)  [Ruprecht]
			De zaak M.P. (1960)  [Hollandse boer]
			Dorp aan de rivier (1958)  [Dirk Jan]  <15>
			Gijsbrecht van Aemstel (1957) (TV)  [Diederick van Haerlem]
			Traan (1962) (TV)  [Joe]  <6>
			"De fuik" (1962) {'n lichtvoetig meisje (#1.6)} 
			"De vier dochters Bennet" (1961)  [Wickham]
			"School voor volwassenen" (1960) {(#1.1)}  [Meneer klundert]
't Hooft, Gerard	"Celebrating Science" (2008) {Science...

You can see that this file is actor centric and not at all a great way for importing it into a database. If we could invert the actor-movies relation we could do the following in Hibernate: getByTitle on the Movie entity, add Actors to the Actors-set and saveOrUpdate the Movie. This would result in one select for all the actors of that movie.

Of course we could convert this file into a lengthy insert SQL-script but that would mean we didn’t get to play with Hadoop. Somewhere in the next posts I will be using Hadoop in conjunction with Mahout to concoct meaningful clusters from the movie-pool, which is a great way to use Hadoop but in this case I am aware that there is an easier way of importing this file, it just looks like a shiny nail to hammer with Hadoop. If you are not familiar with Hadoop and the MapReduce paradigm take a look here or simply read the more than excellent Hadoop in Action book. Now lets invert the actors and turn them “inside out”!

The first hurdle we must take when using this file with Hadoop is the file-reading hurdle. Since Hadoop provides its users with some ready to use implementations of org.apache.hadoop.mapreduce.InputFormat we could be using the FileInputFormat but on a closer investigation it just reads one line at a time returning the byte-offset (LongWritable) of the line as a key and the line as value (Text). What we need is a FileInputFormat that can read multiple lines and returning the actor’s name as a key (Text) and his movies as the value (also as Text with the movies separated with a delimiter for instance the pie character). This way we can “emit” (i.e. write) the movie-actor keyValue-pairs to the org.apache.hadoop.mapreduce.Mapper.Context in our own specially devised Mapper which looks somewhat like this:

public static class MapClass extends Mapper {
       public void map(Text actor, Text movies, Mapper.Context context)
                throws IOException, InterruptedException {
            String[] seperateMovies = movies.toString().split("|");
            for (String movie : seperateMovies) {
                context.write(new Text(movie), new Text(actor));

When the map has done it’s work by emitting the movie-actor pairs the reduce-step will write/emit out movie-actors entries to the context like so:

public static class ReduceClass extends Reducer {
        public void reduce(Text movie, Iterable actors,
                           Context context) throws IOException, InterruptedException {
            StringBuilder actorsList = new StringBuilder();
            for (Text actor : actors) {
                if (actorsList.length() > 0){
            context.write(movie, new Text(actorsList.toString()));

So in the end we will have inverted the actor-movie relation in a highly parallelized way. The end result will look like this:

Rambo (1987)                 Stallone, Sylvester|Benz, Julie|...
Kindergarten Cop (1990)      Schwarzenegger, Arnold|Reed, Pamela|...

That was for the easy part, now comes the somewhat tricky part of reading the intial actors.list file.
On a side note: the files that come from IMDb are encoded in ISO-8859-1 and contain some superfluous data in the head and the tail so you should first perform some bash gymnastics on these files before transferring them onto the HDFS:
* Converting the encoding to a user-friendly encoding:

iconv --from-code=ISO-8859-1 --to-code=UTF-8 actors.list > actors.txt

* Finding out where the actual actor data begins and ends:

nl -ba actors.txt | more
nl -ba actors.txt | tail -500

* Cutting it up:

sed -n '240,10564279p' actors.txt > actors.data

Now for the reading of the actors.data I wrapped the FileInputFormat with the multi-line functionality. We simply delegate all the non-relevant methods to the LineRecordReader and only rewrite the nextKeyValue-method. This method returns false for every line we encounter that belongs to the same actor slowly appending all the movies of that actor. When we encounter another actor, we set the actorKey and moviesValue Text objects and return true this way the getCurrentKey and getCurrentValue methods are called.