Reading fixed length/width input records with Hadoop mapreduce

While working on a project where I needed to quickly import 50-100 million records I ended up using Hadoop for the job. Unfortunately the input files I was dealing with were fixed width/length records, hence they had no delimiters which separated records, nor did they have any CR/LFs to separate records. Each record was exactly 502 bytes in size. Hadoop provides a TextInputFormat out of the box for reading input files, however it requires that your files contain CR/LFs or some combination thereof.

This patch source is available on GitHub @ https://github.com/bitsofinfo/hadoopSandbox/tree/master/fixedLengthInputFormat

So…. I went ahead a wrote a couple of classes to support fixed length, fixed width (same thing) records in input files. These classes were inspired by Hadoop’s TextInputFormat and LineRecordReader. The two classes are FixedLengthInputFormat and FixedLengthRecordReader, they are presented below. I have also created a Hadoop JIRA issue to contribute these classes to the Hadoop project.

This input format overrides computeSplitSize() in order to ensure that InputSplits do not contain any partial records since with fixed records there is no way to determine where a record begins if that were to occur. Each InputSplit passed to the FixedLengthRecordReader will start at the beginning of a record, and the last byte in the InputSplit will be the last byte of a record. The override of computeSplitSize() delegates to FileInputFormat’s compute method, and then adjusts the returned split size by doing the following: (Math.floor(fileInputFormatsComputedSplitSize / fixedRecordLength) * fixedRecordLength)

FixedLengthInputFormat does NOT support compressed files. To use this input format, you do so as follows:

// setup your job configuration etc
...

// be sure to set the length of your fixed length records, so the
// FixedLengthRecordReader can extract the records correctly.
myJobConf.setInt(FixedLengthInputFormat.FIXED_RECORD_LENGTH, 502);

// OR alternatively you can set it this way, the name of the property is
// "mapreduce.input.fixedlengthinputformat.record.length"
myJobConf.setInt("mapreduce.input.fixedlengthinputformat.record.length",502);

// create your job
Job job = new Job(myJobConf);
job.setInputFormatClass(FixedLengthInputFormat.class);

// do the rest of your job setup, specifying input locations etc
...

myJob.submit();

Below are the two classes which you are free to use. Hope this helps you out if you have a need to read fixed width/length records out of input files using Hadoop MapReduce! Enjoy.

FixedLengthInputFormat.javadownload

package org.bitsofinfo.hadoop.mapreduce.lib.input;

import java.io.IOException;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionCodecFactory;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

/**
 * FixedLengthInputFormat is an input format which can be used
 * for input files which contain fixed length records with NO
 * delimiters and NO carriage returns (CR, LF, CRLF) etc. Such
 * files typically only have one gigantic line and each "record"
 * is of a fixed length, and padded with spaces if the record's actual
 * value is shorter than the fixed length.


 *
 * Users must configure the record length property before submitting
 * any jobs which use FixedLengthInputFormat.


 *
 * myJobConf.setInt("mapreduce.input.fixedlengthinputformat.record.length",[myFixedRecordLength]);


 *
 * This input format overrides <code>computeSplitSize()</code> in order to ensure
 * that InputSplits do not contain any partial records since with fixed records
 * there is no way to determine where a record begins if that were to occur.
 * Each InputSplit passed to the FixedLengthRecordReader will start at the beginning
 * of a record, and the last byte in the InputSplit will be the last byte of a record.
 * The override of <code>computeSplitSize()</code> delegates to FileInputFormat's
 * compute method, and then adjusts the returned split size by doing the following:
 * <code>(Math.floor(fileInputFormatsComputedSplitSize / fixedRecordLength) * fixedRecordLength)</code>
 *
 *


 * This InputFormat returns a FixedLengthRecordReader.


 *
 * Compressed files currently are not supported.
 *
 * @see	FixedLengthRecordReader
 *
 * @author bitsofinfo.g (AT) gmail.com
 *
 */
public class FixedLengthInputFormat extends FileInputFormat {

	/**
	 * When using FixedLengthInputFormat you MUST set this
	 * property in your job configuration to specify the fixed
	 * record length.
	 *


	 *
	 * i.e. myJobConf.setInt("mapreduce.input.fixedlengthinputformat.record.length",[myFixedRecordLength]);
	 */
	public static final String FIXED_RECORD_LENGTH = "mapreduce.input.fixedlengthinputformat.record.length";

	// our logger reference
	private static final Log LOG = LogFactory.getLog(FixedLengthInputFormat.class);

	// the default fixed record length (-1), error if this does not change
	private int recordLength = -1;

	/**
	 * Return the int value from the given Configuration found
	 * by the FIXED_RECORD_LENGTH property.
	 *
	 * @param config
	 * @return	int record length value
	 * @throws IOException if the record length found is 0 (non-existant, not set etc)
	 */
	public static int getRecordLength(Configuration config) throws IOException {
		int recordLength = config.getInt(FixedLengthInputFormat.FIXED_RECORD_LENGTH, 0);

		// this would be an error
		if (recordLength == 0) {
			throw new IOException("FixedLengthInputFormat requires the Configuration property:" + FIXED_RECORD_LENGTH + " to" +
					" be set to something > 0. Currently the value is 0 (zero)");
		}

		return recordLength;
	}

	/**
	 * This input format overrides <code>computeSplitSize()</code> in order to ensure
	 * that InputSplits do not contain any partial records since with fixed records
	 * there is no way to determine where a record begins if that were to occur.
	 * Each InputSplit passed to the FixedLengthRecordReader will start at the beginning
	 * of a record, and the last byte in the InputSplit will be the last byte of a record.
	 * The override of <code>computeSplitSize()</code> delegates to FileInputFormat's
	 * compute method, and then adjusts the returned split size by doing the following:
	 * <code>(Math.floor(fileInputFormatsComputedSplitSize / fixedRecordLength) * fixedRecordLength)</code>
	 *
	 * @inheritDoc
	 */
	@Override
	protected long computeSplitSize(long blockSize, long minSize, long maxSize) {
		long defaultSize = super.computeSplitSize(blockSize, minSize, maxSize);

		// 1st, if the default size is less than the length of a
		// raw record, lets bump it up to a minimum of at least ONE record length
		if (defaultSize 			return recordLength;
		}

		// determine the split size, it should be as close as possible to the
		// default size, but should NOT split within a record... each split
		// should contain a complete set of records with the first record
		// starting at the first byte in the split and the last record ending
		// with the last byte in the split.

		long splitSize = ((long)(Math.floor((double)defaultSize / (double)recordLength))) * recordLength;
		LOG.info("FixedLengthInputFormat: calculated split size: " + splitSize);

		return splitSize;

	}

	/**
	 * Returns a FixedLengthRecordReader instance
	 *
	 * @inheritDoc
	 */
	@Override
	public RecordReader createRecordReader(InputSplit split,
			TaskAttemptContext context) throws IOException, InterruptedException {
		return new FixedLengthRecordReader();
	}

	/**
	 * @inheritDoc
	 */
 	@Override
 	protected boolean isSplitable(JobContext context, Path file) {

 		try {
			if (this.recordLength == -1) {
				this.recordLength = getRecordLength(context.getConfiguration());
			}
			LOG.info("FixedLengthInputFormat: my fixed record length is: " + recordLength);

 		} catch(Exception e) {
 			LOG.error("Error in FixedLengthInputFormat.isSplitable() when trying to determine the fixed record length, returning false, input files will NOT be split!",e);
 			return false;
 		}

 		CompressionCodec codec = new CompressionCodecFactory(context.getConfiguration()).getCodec(file);
	 	if (codec != null) {
	 		return false;
	 	}

	 	return true;
	 }

}

FixedLengthRecordReader.javadownload

package org.bitsofinfo.hadoop.mapreduce.lib.input;

import java.io.IOException;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.Seekable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionCodecFactory;
import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.MapContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;

/**
 *
 * FixedLengthRecordReader is returned by FixedLengthInputFormat. This reader
 * uses the record length property set within the FixedLengthInputFormat to
 * read one record at a time from the given InputSplit. This record reader
 * does not support compressed files.


 *
 * Each call to nextKeyValue() updates the LongWritable KEY and Text VALUE.


 *
 * KEY = byte position in the file the record started at

 * VALUE = the record itself (Text)
 *
 *
 * @author bitsofinfo.g (AT) gmail.com
 *
 */
public class FixedLengthRecordReader extends RecordReader {

	// reference to the logger
	private static final Log LOG = LogFactory.getLog(FixedLengthRecordReader.class);

	// the start point of our split
	private long splitStart;

	// the end point in our split
	private long splitEnd;

	// our current position in the split
	private long currentPosition;

	// the length of a record
	private int recordLength;

	// reference to the input stream
	private FSDataInputStream fileInputStream;

	// the input byte counter
	private Counter inputByteCounter;

	// reference to our FileSplit
	private FileSplit fileSplit;

	// our record key (byte position)
	private LongWritable recordKey = null;

	// the record value
	private Text recordValue = null;

	@Override
	public void close() throws IOException {
		if (fileInputStream != null) {
			fileInputStream.close();
		}
	}

	@Override
	public LongWritable getCurrentKey() throws IOException,
			InterruptedException {
		return recordKey;
	}

	@Override
	public Text getCurrentValue() throws IOException, InterruptedException {
		return recordValue;
	}

	@Override
	public float getProgress() throws IOException, InterruptedException {
		if (splitStart == splitEnd) {
			return (float)0;
		} else {
			return Math.min((float)1.0, (currentPosition - splitStart) / (float)(splitEnd - splitStart));
		}
	}

	@Override
	public void initialize(InputSplit inputSplit, TaskAttemptContext context)
			throws IOException, InterruptedException {

		// the file input fileSplit
		this.fileSplit = (FileSplit)inputSplit;

		// the byte position this fileSplit starts at within the splitEnd file
		splitStart = fileSplit.getStart();

		// splitEnd byte marker that the fileSplit ends at within the splitEnd file
		splitEnd = splitStart + fileSplit.getLength();

		// log some debug info
		LOG.info("FixedLengthRecordReader: SPLIT START="+splitStart + " SPLIT END=" +splitEnd + " SPLIT LENGTH="+fileSplit.getLength() );

		// the actual file we will be reading from
		Path file = fileSplit.getPath();

		// job configuration
		Configuration job = context.getConfiguration();

		// check to see if compressed....
		CompressionCodec codec = new CompressionCodecFactory(job).getCodec(file);
	 	if (codec != null) {
	 		throw new IOException("FixedLengthRecordReader does not support reading compressed files");
	 	}

		// for updating the total bytes read in
	 	inputByteCounter = ((MapContext)context).getCounter("FileInputFormatCounters", "BYTES_READ");

	 	// THE JAR COMPILED AGAINST 0.20.1 does not contain a version of FileInputFormat with these constants (but they exist in trunk)
	 	// uncomment the below, then comment or discard the line above
	 	//inputByteCounter = ((MapContext)context).getCounter(FileInputFormat.COUNTER_GROUP, FileInputFormat.BYTES_READ);

		// the size of each fixed length record
		this.recordLength = FixedLengthInputFormat.getRecordLength(job);

		// get the filesystem
		final FileSystem fs = file.getFileSystem(job);

		// open the File
		fileInputStream = fs.open(file,(64 * 1024));

		// seek to the splitStart position
		fileInputStream.seek(splitStart);

		// set our current position
	 	this.currentPosition = splitStart;
	}

	@Override
	public boolean nextKeyValue() throws IOException, InterruptedException {
		if (recordKey == null) {
		 	recordKey = new LongWritable();
	 	}

		// the Key is always the position the record starts at
	 	recordKey.set(currentPosition);

	 	// the recordValue to place the record text in
	 	if (recordValue == null) {
	 		recordValue = new Text();
	 	} else {
	 		recordValue.clear();
	 	}

	 	// if the currentPosition is less than the split end..
	 	if (currentPosition < splitEnd) {

	 		// setup a buffer to store the record
	 		byte[] buffer = new byte[this.recordLength];
	 		int totalRead = 0; // total bytes read
	 		int totalToRead = recordLength; // total bytes we need to read

	 		// while we still have record bytes to read
	 		while(totalRead != recordLength) {
	 			// read in what we need
	 			int read = this.fileInputStream.read(buffer, 0, totalToRead);

	 			// append to the buffer
	 			recordValue.append(buffer,0,read);

	 			// update our markers
	 			totalRead += read;
	 			totalToRead -= read;
	 			//LOG.info("READ: just read=" + read +" totalRead=" + totalRead + " totalToRead="+totalToRead);
	 		}

	 		// update our current position and log the input bytes
	 		currentPosition = currentPosition +recordLength;
	 		inputByteCounter.increment(recordLength);

	 		//LOG.info("VALUE=|"+fileInputStream.getPos()+"|"+currentPosition+"|"+splitEnd+"|" + recordLength + "|"+recordValue.toString());

	 		// return true
	 		return true;
	 	}

	 	// nothing more to read....
		return false;
	}

}

About these ads
Tagged , , , ,

6 thoughts on “Reading fixed length/width input records with Hadoop mapreduce

  1. Alan Ratner says:

    In your code at the top MyJob is undefined. Do you mean MyJobConf or job which are defined?

    Is this code compatible with Hadoop 0.21.0? Eclipse doesn’t seem to find FixedLengthInputFormat compatible with setInputFormat.

    Thanks.

  2. bitsofinfo says:

    Alan -
    First please check the patches @
    https://issues.apache.org/jira/browse/MAPREDUCE-1176

    (the code on this post is old) If they work with .21 I am note sure, I have not had time to touch the patches since earlier this year.

    Regarding MyJob, yes that is just a reference to a Job configuration that you must create and configure on your own. The reference above was just a sample/ad hoc thing

  3. Sumit Kumar Ghosh says:

    Hi,

    Is this already added to the Hadoop distribution. I see that the bug is not closed.

  4. hello

    i try to use hadoop 2.3 with FixedLengthInputFormat in an UDF But i have a problem with setting the fixed value.

    here is what i wrote :
    public InputFormat getInputFormat() throws IOException {
    // return new TextInputFormat();
    FixedLengthInputFormat FL = new FixedLengthInputFormat();
    try {
    FL.setRecordLength(conf, 405);

    UDFContext.getUDFContext().addJobConf(conf);

    }catch (Exception e) {
    throw new IllegalArgumentException(“Big Problem” + e.getMessage());
    }
    return FL;
    }

    It throw me an error : Big Problem
    i don’t know how to solve it !!
    any help would be greatfull!
    tahnks

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s

Follow

Get every new post delivered to your Inbox.

Join 26 other followers

%d bloggers like this: