diff --git a/src/main/java/com/bizo/hive/serde/csv/CSVSerde.java b/src/main/java/com/bizo/hive/serde/csv/CSVSerde.java index 82fceb9..d2752b3 100644 --- a/src/main/java/com/bizo/hive/serde/csv/CSVSerde.java +++ b/src/main/java/com/bizo/hive/serde/csv/CSVSerde.java @@ -1,15 +1,7 @@ package com.bizo.hive.serde.csv; -import java.io.CharArrayReader; -import java.io.IOException; -import java.io.Reader; -import java.io.StringWriter; -import java.io.Writer; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; -import java.util.Properties; - +import au.com.bytecode.opencsv.CSVReader; +import au.com.bytecode.opencsv.CSVWriter; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.serde.Constants; import org.apache.hadoop.hive.serde2.SerDe; @@ -26,68 +18,74 @@ import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; -import au.com.bytecode.opencsv.CSVReader; -import au.com.bytecode.opencsv.CSVWriter; +import java.io.*; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Properties; /** * CSVSerde uses opencsv (http://opencsv.sourceforge.net/) to serialize/deserialize columns as CSV. - * + * * @author Larry Ogrodnek */ public final class CSVSerde implements SerDe { - + private ObjectInspector inspector; private String[] outputFields; private int numCols; private List row; - + private char separatorChar; private char quoteChar; private char escapeChar; - - + private String lineEnd; + private String nullDefinedAs; + @Override public void initialize(final Configuration conf, final Properties tbl) throws SerDeException { final List columnNames = Arrays.asList(tbl.getProperty(Constants.LIST_COLUMNS).split(",")); final List columnTypes = TypeInfoUtils.getTypeInfosFromTypeString(tbl.getProperty(Constants.LIST_COLUMN_TYPES)); - + numCols = columnNames.size(); - + final List columnOIs = new ArrayList(numCols); - + for (int i=0; i< numCols; i++) { columnOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector); } - + this.inspector = ObjectInspectorFactory.getStandardStructObjectInspector(columnNames, columnOIs); this.outputFields = new String[numCols]; row = new ArrayList(numCols); - + for (int i=0; i< numCols; i++) { row.add(null); } - + separatorChar = getProperty(tbl, "separatorChar", CSVWriter.DEFAULT_SEPARATOR); quoteChar = getProperty(tbl, "quoteChar", CSVWriter.DEFAULT_QUOTE_CHARACTER); escapeChar = getProperty(tbl, "escapeChar", CSVWriter.DEFAULT_ESCAPE_CHARACTER); + lineEnd = tbl.getProperty("lineEnd", CSVWriter.DEFAULT_LINE_END); + nullDefinedAs = tbl.getProperty("nullDefinedAs", "\\N"); } - + private final char getProperty(final Properties tbl, final String property, final char def) { final String val = tbl.getProperty(property); - + if (val != null) { return val.charAt(0); } - + return def; } - + @Override public Writable serialize(Object obj, ObjectInspector objInspector) throws SerDeException { final StructObjectInspector outputRowOI = (StructObjectInspector) objInspector; final List outputFieldRefs = outputRowOI.getAllStructFieldRefs(); - + if (outputFieldRefs.size() != numCols) { throw new SerDeException("Cannot serialize the object because there are " + outputFieldRefs.size() + " fields but the table has " + numCols + " columns."); @@ -97,45 +95,48 @@ public final class CSVSerde implements SerDe { for (int c = 0; c < numCols; c++) { final Object field = outputRowOI.getStructFieldData(obj, outputFieldRefs.get(c)); final ObjectInspector fieldOI = outputFieldRefs.get(c).getFieldObjectInspector(); - + // The data must be of type String final StringObjectInspector fieldStringOI = (StringObjectInspector) fieldOI; - + // Convert the field to Java class String, because objects of String type // can be stored in String, Text, or some other classes. outputFields[c] = fieldStringOI.getPrimitiveJavaObject(field); + if (outputFields[c] == null) { + outputFields[c] = nullDefinedAs; + } } - + final StringWriter writer = new StringWriter(); - final CSVWriter csv = newWriter(writer, separatorChar, quoteChar, escapeChar); - + final CSVWriter csv = newWriter(writer, separatorChar, quoteChar, escapeChar, lineEnd); + try { csv.writeNext(outputFields); csv.close(); - + return new Text(writer.toString()); } catch (final IOException ioe) { throw new SerDeException(ioe); } - } + } @Override public Object deserialize(final Writable blob) throws SerDeException { Text rowText = (Text) blob; - + CSVReader csv = null; try { - csv = newReader(new CharArrayReader(rowText.toString().toCharArray()), separatorChar, quoteChar, escapeChar); + csv = newReader(new CharArrayReader(rowText.toString().toCharArray()), separatorChar, quoteChar, escapeChar); final String[] read = csv.readNext(); - + for (int i=0; i< numCols; i++) { if (read != null && i < read.length) { - row.set(i, read[i]); + row.set(i, read[i] == nullDefinedAs ? null : read[i]); } else { row.set(i, null); } } - + return row; } catch (final Exception e) { throw new SerDeException(e); @@ -149,22 +150,22 @@ public final class CSVSerde implements SerDe { } } } - - private CSVReader newReader(final Reader reader, char separator, char quote, char escape) { - // CSVReader will throw an exception if any of separator, quote, or escape is the same, but + + private static CSVReader newReader(final Reader reader, char separator, char quote, char escape) { + // CSVReader will throw an exception if any of separator, quote, or escape is the same, but // the CSV format specifies that the escape character and quote char are the same... very weird if (CSVWriter.DEFAULT_ESCAPE_CHARACTER == escape) { return new CSVReader(reader, separator, quote); } else { - return new CSVReader(reader, separator, quote, escape); + return new CSVReader(reader, separator, quote, escape); } } - - private CSVWriter newWriter(final Writer writer, char separator, char quote, char escape) { + + private static CSVWriter newWriter(final Writer writer, char separator, char quote, char escape, String lineEnd) { if (CSVWriter.DEFAULT_ESCAPE_CHARACTER == escape) { - return new CSVWriter(writer, separator, quote, ""); + return new CSVWriter(writer, separator, quote, lineEnd); } else { - return new CSVWriter(writer, separator, quote, escape, ""); + return new CSVWriter(writer, separator, quote, escape, lineEnd); } } @@ -177,7 +178,7 @@ public final class CSVSerde implements SerDe { public Class getSerializedClass() { return Text.class; } - + public SerDeStats getSerDeStats() { return null; }