From f30d2831dd0f9ae4a7f204194220716af05374b7 Mon Sep 17 00:00:00 2001 From: Larry Ogrodnek Date: Fri, 12 Nov 2010 17:03:34 -0800 Subject: [PATCH] initial --- .classpath | 10 ++ .project | 17 +++ build.xml | 26 ++++ ivy.xml | 39 ++++++ project.properties | 7 + readme.md | 10 ++ .../com/bizo/hive/serde/csv/CSVSerde.java | 132 ++++++++++++++++++ 7 files changed, 241 insertions(+) create mode 100644 .classpath create mode 100644 .project create mode 100644 build.xml create mode 100644 ivy.xml create mode 100644 project.properties create mode 100644 readme.md create mode 100644 src/main/java/com/bizo/hive/serde/csv/CSVSerde.java diff --git a/.classpath b/.classpath new file mode 100644 index 0000000..d6a3d16 --- /dev/null +++ b/.classpath @@ -0,0 +1,10 @@ + + + + + + + + + + diff --git a/.project b/.project new file mode 100644 index 0000000..6f35c77 --- /dev/null +++ b/.project @@ -0,0 +1,17 @@ + + + @PROJECT_NAME@ + + + + + + org.eclipse.jdt.core.javabuilder + + + + + + org.eclipse.jdt.core.javanature + + diff --git a/build.xml b/build.xml new file mode 100644 index 0000000..01700d3 --- /dev/null +++ b/build.xml @@ -0,0 +1,26 @@ + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/ivy.xml b/ivy.xml new file mode 100644 index 0000000..f252abb --- /dev/null +++ b/ivy.xml @@ -0,0 +1,39 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/project.properties b/project.properties new file mode 100644 index 0000000..f5e6bbc --- /dev/null +++ b/project.properties @@ -0,0 +1,7 @@ +# to override these or other properties with local user settings, +# create a unversioned build.properties file + +# The location of the common build system +common.build.dir=${basedir}/../common-build + +project.base.version=1.1-dev diff --git a/readme.md b/readme.md new file mode 100644 index 0000000..a22448d --- /dev/null +++ b/readme.md @@ -0,0 +1,10 @@ +*Hive CSV Support* + +add jar path/to/csv-serde.jar; + +create table my_table(a string, b string, ...) + row format serde 'com.bizo.hive.serde.csv.CSVSerde' + stored as textfile +; + +see: ... \ No newline at end of file diff --git a/src/main/java/com/bizo/hive/serde/csv/CSVSerde.java b/src/main/java/com/bizo/hive/serde/csv/CSVSerde.java new file mode 100644 index 0000000..2be9260 --- /dev/null +++ b/src/main/java/com/bizo/hive/serde/csv/CSVSerde.java @@ -0,0 +1,132 @@ +package com.bizo.hive.serde.csv; + +import java.io.CharArrayReader; +import java.io.IOException; +import java.io.StringWriter; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Properties; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.serde.Constants; +import org.apache.hadoop.hive.serde2.SerDe; +import org.apache.hadoop.hive.serde2.SerDeException; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; +import org.apache.hadoop.hive.serde2.objectinspector.StructField; +import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.StringObjectInspector; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.Writable; + +import au.com.bytecode.opencsv.CSVReader; +import au.com.bytecode.opencsv.CSVWriter; + + +/** + * CSVSerde uses opencsv (http://opencsv.sourceforge.net/) to serialize/deserialize columns as CSV. + * + * @author Larry Ogrodnek + */ +public final class CSVSerde implements SerDe { + + private ObjectInspector inspector; + String[] outputFields; + int numCols; + List row; + + @Override + public void initialize(final Configuration conf, final Properties tbl) throws SerDeException { + final List columnNames = Arrays.asList(tbl.getProperty(Constants.LIST_COLUMNS).split(",")); + + final List columnTypes = TypeInfoUtils.getTypeInfosFromTypeString(tbl.getProperty(Constants.LIST_COLUMN_TYPES)); + + numCols = columnNames.size(); + + final List columnOIs = new ArrayList(numCols); + + for (int i=0; i< numCols; i++) { + columnOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector); + } + + this.inspector = ObjectInspectorFactory.getStandardStructObjectInspector(columnNames, columnOIs); + this.outputFields = new String[numCols]; + row = new ArrayList(numCols); + + for (int i=0; i< numCols; i++) { + row.add(null); + } + } + + @Override + public Writable serialize(Object obj, ObjectInspector objInspector) throws SerDeException { + final StructObjectInspector outputRowOI = (StructObjectInspector) objInspector; + final List outputFieldRefs = outputRowOI.getAllStructFieldRefs(); + + if (outputFieldRefs.size() != numCols) { + throw new SerDeException("Cannot serialize the object because there are " + + outputFieldRefs.size() + " fields but the table has " + numCols + " columns."); + } + + // Get all data out. + for (int c = 0; c < numCols; c++) { + final Object field = outputRowOI.getStructFieldData(obj, outputFieldRefs.get(c)); + final ObjectInspector fieldOI = outputFieldRefs.get(c).getFieldObjectInspector(); + + // The data must be of type String + final StringObjectInspector fieldStringOI = (StringObjectInspector) fieldOI; + + // Convert the field to Java class String, because objects of String type + // can be stored in String, Text, or some other classes. + outputFields[c] = fieldStringOI.getPrimitiveJavaObject(field); + } + + final StringWriter writer = new StringWriter(); + final CSVWriter csv = new CSVWriter(writer, CSVWriter.DEFAULT_SEPARATOR, CSVWriter.DEFAULT_QUOTE_CHARACTER, ""); + + try { + csv.writeNext(outputFields); + csv.close(); + + return new Text(writer.toString()); + } catch (final IOException ioe) { + throw new SerDeException(ioe); + } + } + + @Override + public Object deserialize(final Writable blob) throws SerDeException { + Text rowText = (Text) blob; + + try { + final CSVReader csv = new CSVReader(new CharArrayReader(rowText.toString().toCharArray())); + final String[] read = csv.readNext(); + + for (int i=0; i< numCols; i++) { + if (read != null && i < read.length) { + row.set(i, read[i]); + } else { + row.set(i, null); + } + } + + return row; + } catch (final Exception e) { + throw new SerDeException(e); + } + } + + @Override + public ObjectInspector getObjectInspector() throws SerDeException { + return inspector; + } + + @Override + public Class getSerializedClass() { + return Text.class; + } +}