While working on a project where I needed to quickly import 50-100 million records I ended up using Hadoop for the job. Unfortunately the input files I was dealing with were fixed width/length records, hence they had no delimiters which separated records, nor did they have any CR/LFs to separate records. Each record was exactly 502 bytes in size. Hadoop provides a TextInputFormat out of the box for reading input files, however it requires that your files contain CR/LFs or some combination thereof.
This patch source is available on GitHub @ https://github.com/bitsofinfo/hadoopSandbox/tree/master/fixedLengthInputFormat
So…. I went ahead a wrote a couple of classes to support fixed length, fixed width (same thing) records in input files. These classes were inspired by Hadoop’s TextInputFormat and LineRecordReader. The two classes are FixedLengthInputFormat and FixedLengthRecordReader, they are presented below. I have also created a Hadoop JIRA issue to contribute these classes to the Hadoop project.
This input format overrides computeSplitSize() in order to ensure that InputSplits do not contain any partial records since with fixed records there is no way to determine where a record begins if that were to occur. Each InputSplit passed to the FixedLengthRecordReader will start at the beginning of a record, and the last byte in the InputSplit will be the last byte of a record. The override of computeSplitSize() delegates to FileInputFormat’s compute method, and then adjusts the returned split size by doing the following: (Math.floor(fileInputFormatsComputedSplitSize / fixedRecordLength) * fixedRecordLength)
FixedLengthInputFormat does NOT support compressed files. To use this input format, you do so as follows:
// setup your job configuration etc
...
// be sure to set the length of your fixed length records, so the
// FixedLengthRecordReader can extract the records correctly.
myJobConf.setInt(FixedLengthInputFormat.FIXED_RECORD_LENGTH, 502);
// OR alternatively you can set it this way, the name of the property is
// "mapreduce.input.fixedlengthinputformat.record.length"
myJobConf.setInt("mapreduce.input.fixedlengthinputformat.record.length",502);
// create your job
Job job = new Job(myJobConf);
job.setInputFormatClass(FixedLengthInputFormat.class);
// do the rest of your job setup, specifying input locations etc
...
myJob.submit();
Below are the two classes which you are free to use. Hope this helps you out if you have a need to read fixed width/length records out of input files using Hadoop MapReduce! Enjoy.
FixedLengthInputFormat.java – download
package org.bitsofinfo.hadoop.mapreduce.lib.input;
import java.io.IOException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionCodecFactory;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
/**
* FixedLengthInputFormat is an input format which can be used
* for input files which contain fixed length records with NO
* delimiters and NO carriage returns (CR, LF, CRLF) etc. Such
* files typically only have one gigantic line and each "record"
* is of a fixed length, and padded with spaces if the record's actual
* value is shorter than the fixed length.
*
* Users must configure the record length property before submitting
* any jobs which use FixedLengthInputFormat.
*
* myJobConf.setInt("mapreduce.input.fixedlengthinputformat.record.length",[myFixedRecordLength]);
*
* This input format overrides <code>computeSplitSize()</code> in order to ensure
* that InputSplits do not contain any partial records since with fixed records
* there is no way to determine where a record begins if that were to occur.
* Each InputSplit passed to the FixedLengthRecordReader will start at the beginning
* of a record, and the last byte in the InputSplit will be the last byte of a record.
* The override of <code>computeSplitSize()</code> delegates to FileInputFormat's
* compute method, and then adjusts the returned split size by doing the following:
* <code>(Math.floor(fileInputFormatsComputedSplitSize / fixedRecordLength) * fixedRecordLength)</code>
*
*
* This InputFormat returns a FixedLengthRecordReader.
*
* Compressed files currently are not supported.
*
* @see FixedLengthRecordReader
*
* @author bitsofinfo.g (AT) gmail.com
*
*/
public class FixedLengthInputFormat extends FileInputFormat {
/**
* When using FixedLengthInputFormat you MUST set this
* property in your job configuration to specify the fixed
* record length.
*
*
* i.e. myJobConf.setInt("mapreduce.input.fixedlengthinputformat.record.length",[myFixedRecordLength]);
*/
public static final String FIXED_RECORD_LENGTH = "mapreduce.input.fixedlengthinputformat.record.length";
// our logger reference
private static final Log LOG = LogFactory.getLog(FixedLengthInputFormat.class);
// the default fixed record length (-1), error if this does not change
private int recordLength = -1;
/**
* Return the int value from the given Configuration found
* by the FIXED_RECORD_LENGTH property.
*
* @param config
* @return int record length value
* @throws IOException if the record length found is 0 (non-existant, not set etc)
*/
public static int getRecordLength(Configuration config) throws IOException {
int recordLength = config.getInt(FixedLengthInputFormat.FIXED_RECORD_LENGTH, 0);
// this would be an error
if (recordLength == 0) {
throw new IOException("FixedLengthInputFormat requires the Configuration property:" + FIXED_RECORD_LENGTH + " to" +
" be set to something > 0. Currently the value is 0 (zero)");
}
return recordLength;
}
/**
* This input format overrides <code>computeSplitSize()</code> in order to ensure
* that InputSplits do not contain any partial records since with fixed records
* there is no way to determine where a record begins if that were to occur.
* Each InputSplit passed to the FixedLengthRecordReader will start at the beginning
* of a record, and the last byte in the InputSplit will be the last byte of a record.
* The override of <code>computeSplitSize()</code> delegates to FileInputFormat's
* compute method, and then adjusts the returned split size by doing the following:
* <code>(Math.floor(fileInputFormatsComputedSplitSize / fixedRecordLength) * fixedRecordLength)</code>
*
* @inheritDoc
*/
@Override
protected long computeSplitSize(long blockSize, long minSize, long maxSize) {
long defaultSize = super.computeSplitSize(blockSize, minSize, maxSize);
// 1st, if the default size is less than the length of a
// raw record, lets bump it up to a minimum of at least ONE record length
if (defaultSize return recordLength;
}
// determine the split size, it should be as close as possible to the
// default size, but should NOT split within a record... each split
// should contain a complete set of records with the first record
// starting at the first byte in the split and the last record ending
// with the last byte in the split.
long splitSize = ((long)(Math.floor((double)defaultSize / (double)recordLength))) * recordLength;
LOG.info("FixedLengthInputFormat: calculated split size: " + splitSize);
return splitSize;
}
/**
* Returns a FixedLengthRecordReader instance
*
* @inheritDoc
*/
@Override
public RecordReader createRecordReader(InputSplit split,
TaskAttemptContext context) throws IOException, InterruptedException {
return new FixedLengthRecordReader();
}
/**
* @inheritDoc
*/
@Override
protected boolean isSplitable(JobContext context, Path file) {
try {
if (this.recordLength == -1) {
this.recordLength = getRecordLength(context.getConfiguration());
}
LOG.info("FixedLengthInputFormat: my fixed record length is: " + recordLength);
} catch(Exception e) {
LOG.error("Error in FixedLengthInputFormat.isSplitable() when trying to determine the fixed record length, returning false, input files will NOT be split!",e);
return false;
}
CompressionCodec codec = new CompressionCodecFactory(context.getConfiguration()).getCodec(file);
if (codec != null) {
return false;
}
return true;
}
}
FixedLengthRecordReader.java – download
package org.bitsofinfo.hadoop.mapreduce.lib.input;
import java.io.IOException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.Seekable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionCodecFactory;
import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.MapContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
/**
*
* FixedLengthRecordReader is returned by FixedLengthInputFormat. This reader
* uses the record length property set within the FixedLengthInputFormat to
* read one record at a time from the given InputSplit. This record reader
* does not support compressed files.
*
* Each call to nextKeyValue() updates the LongWritable KEY and Text VALUE.
*
* KEY = byte position in the file the record started at
* VALUE = the record itself (Text)
*
*
* @author bitsofinfo.g (AT) gmail.com
*
*/
public class FixedLengthRecordReader extends RecordReader {
// reference to the logger
private static final Log LOG = LogFactory.getLog(FixedLengthRecordReader.class);
// the start point of our split
private long splitStart;
// the end point in our split
private long splitEnd;
// our current position in the split
private long currentPosition;
// the length of a record
private int recordLength;
// reference to the input stream
private FSDataInputStream fileInputStream;
// the input byte counter
private Counter inputByteCounter;
// reference to our FileSplit
private FileSplit fileSplit;
// our record key (byte position)
private LongWritable recordKey = null;
// the record value
private Text recordValue = null;
@Override
public void close() throws IOException {
if (fileInputStream != null) {
fileInputStream.close();
}
}
@Override
public LongWritable getCurrentKey() throws IOException,
InterruptedException {
return recordKey;
}
@Override
public Text getCurrentValue() throws IOException, InterruptedException {
return recordValue;
}
@Override
public float getProgress() throws IOException, InterruptedException {
if (splitStart == splitEnd) {
return (float)0;
} else {
return Math.min((float)1.0, (currentPosition - splitStart) / (float)(splitEnd - splitStart));
}
}
@Override
public void initialize(InputSplit inputSplit, TaskAttemptContext context)
throws IOException, InterruptedException {
// the file input fileSplit
this.fileSplit = (FileSplit)inputSplit;
// the byte position this fileSplit starts at within the splitEnd file
splitStart = fileSplit.getStart();
// splitEnd byte marker that the fileSplit ends at within the splitEnd file
splitEnd = splitStart + fileSplit.getLength();
// log some debug info
LOG.info("FixedLengthRecordReader: SPLIT START="+splitStart + " SPLIT END=" +splitEnd + " SPLIT LENGTH="+fileSplit.getLength() );
// the actual file we will be reading from
Path file = fileSplit.getPath();
// job configuration
Configuration job = context.getConfiguration();
// check to see if compressed....
CompressionCodec codec = new CompressionCodecFactory(job).getCodec(file);
if (codec != null) {
throw new IOException("FixedLengthRecordReader does not support reading compressed files");
}
// for updating the total bytes read in
inputByteCounter = ((MapContext)context).getCounter("FileInputFormatCounters", "BYTES_READ");
// THE JAR COMPILED AGAINST 0.20.1 does not contain a version of FileInputFormat with these constants (but they exist in trunk)
// uncomment the below, then comment or discard the line above
//inputByteCounter = ((MapContext)context).getCounter(FileInputFormat.COUNTER_GROUP, FileInputFormat.BYTES_READ);
// the size of each fixed length record
this.recordLength = FixedLengthInputFormat.getRecordLength(job);
// get the filesystem
final FileSystem fs = file.getFileSystem(job);
// open the File
fileInputStream = fs.open(file,(64 * 1024));
// seek to the splitStart position
fileInputStream.seek(splitStart);
// set our current position
this.currentPosition = splitStart;
}
@Override
public boolean nextKeyValue() throws IOException, InterruptedException {
if (recordKey == null) {
recordKey = new LongWritable();
}
// the Key is always the position the record starts at
recordKey.set(currentPosition);
// the recordValue to place the record text in
if (recordValue == null) {
recordValue = new Text();
} else {
recordValue.clear();
}
// if the currentPosition is less than the split end..
if (currentPosition < splitEnd) {
// setup a buffer to store the record
byte[] buffer = new byte[this.recordLength];
int totalRead = 0; // total bytes read
int totalToRead = recordLength; // total bytes we need to read
// while we still have record bytes to read
while(totalRead != recordLength) {
// read in what we need
int read = this.fileInputStream.read(buffer, 0, totalToRead);
// append to the buffer
recordValue.append(buffer,0,read);
// update our markers
totalRead += read;
totalToRead -= read;
//LOG.info("READ: just read=" + read +" totalRead=" + totalRead + " totalToRead="+totalToRead);
}
// update our current position and log the input bytes
currentPosition = currentPosition +recordLength;
inputByteCounter.increment(recordLength);
//LOG.info("VALUE=|"+fileInputStream.getPos()+"|"+currentPosition+"|"+splitEnd+"|" + recordLength + "|"+recordValue.toString());
// return true
return true;
}
// nothing more to read....
return false;
}
}
In your code at the top MyJob is undefined. Do you mean MyJobConf or job which are defined?
Is this code compatible with Hadoop 0.21.0? Eclipse doesn’t seem to find FixedLengthInputFormat compatible with setInputFormat.
Thanks.
Alan -
First please check the patches @
https://issues.apache.org/jira/browse/MAPREDUCE-1176
(the code on this post is old) If they work with .21 I am note sure, I have not had time to touch the patches since earlier this year.
Regarding MyJob, yes that is just a reference to a Job configuration that you must create and configure on your own. The reference above was just a sample/ad hoc thing
Hi,
Is this already added to the Hadoop distribution. I see that the bug is not closed.
[...] http://bitsofinfo.wordpress.com/2009/11/01/reading-fixed-length-width-input-record-reader-with-hadoo… [...]