En el artículo anterior hice una introducción sobre el uso de ficheros Parquet en Java, pero no puse ningún ejemplo. En este artículo explicaré cómo hacerlo a través de la librería Avro.
Parquet con Avro es una de las formas más populares de trabajar con archivos Parquet en Java debido a su sencillez, flexibilidad, y porque es la librería que más ejemplos tiene.
Tanto Avro como Parquet permiten estructuras de datos complejas y hay un mapeo entre los tipos de uno sobre los del otro.
Me basaré en el mismo ejemplo que usé en artículos anteriores hablando de serialización. El código será muy parecido al del artículo sobre Avro. Para los detalles específicos de Avro os remito al artículo.
En el ejemplo trabajaremos con una colección de objetos Organización (Org
), que a su vez tienen una lista de Atributos (Attr
):
record Org(String name, String category, String country, Type type, List<Attr> attributes) {
}
record Attr(String id, byte quantity, byte amount, boolean active, double percent, short size) {
}
enum Type {
FOO, BAR, BAZ
}
Al igual que cuando persistimos ficheros en formato Avro, esta versión de Parquet con Avro permite escribir ficheros usando clases generadas a partir del IDL o la estructura de datos GenericRecord
. Esta capacidad es propia de Avro, no de Parquet, pero es heredada por parquet-avro
, la librería que implementa esta integración.
Internamente la librería transforma el esquema de Avro al esquema de Parquet, por lo que la mayoría de las herramientas y librerías que sepan trabajar con las clases de Avro podrán trabajar indirectamente con Parquet con pocos cambios.
Usando generación de código
Lo único que cambia respecto a cuando serializamos a formato Avro es la clase con la que vamos a escribir o leer los ficheros, el resto de lógica para construir las clases generadas por Avro o leer sus datos es idéntica.
Serialización
Necesitaremos instanciar un writer de Parquet que soporte la escritura de los objetos creados por Avro:
Path path = new Path("/tmp/my_output_file.parquet");
OutputFile outputFile = HadoopOutputFile.fromPath(path, new Configuration());
ParquetWriter<Organization> writer = AvroParquetWriter.<Organization>builder(outputFile)
.withSchema(new Organization().getSchema())
.withWriteMode(Mode.OVERWRITE)
.config(AvroWriteSupport.WRITE_OLD_LIST_STRUCTURE, "false")
.build();
Parquet define una clase llamada ParquetWriter<T>
y la librería parquet-avro
la extiende implementando en AvroParquetWriter<T>
la lógica de convertir objetos de Avro a llamadas al API de Parquet. El objeto que serializaremos será Organization
, que ha sido generado usando la utilidad de Avro e implementa el API de Avro.
La clase Path
no es la existente en java.nio.file
, sino una abstracción propia de Hadoop para referenciar rutas de ficheros. Mientras que la clase OutputFile
es la abstracción de ficheros de Parquet con capacidad de escribir en ellos.
Por tanto:
Path
,OutputFile
,HadoopOutputFile
yParquetWriter
son clases definidas por el API de ParquetAvroParquetWriter
es una clase definida por el API deparquet-avro
, librería que encapsula Parquet con AvroOrganization
yAttribute
son clases generadas por la utilidad de Avro, no relacionada con Parquet
La forma de construir una instancia de ParquetWriter
es mediante un Builder, donde se le pueden configurar bastantes parámetros propios de Parquet o de la librería que estemos usando (Avro). Por ejemplo:
withSchema
: esquema de la clase Organization en Avro, que internamente convertirá a schema de ParquetwithCompressionCodec
: método de compresión a usar: SNAPPY, GZIP, LZ4, etc. Por defecto no configura ninguno.withWriteMode
: por defecto es CREATE, por lo que si el fichero ya existiera no lo sobreescribiría y lanza una excepción. Para evitarlo debes usar OVERWRITEwithValidation
: si queremos que valide los tipos de datos que se pasan respecto al esquema definidowithBloomFilterEnabled
: si queremos habilitar la creación de bloom filters
Una configuración más genérica (no definida en el API) de ambas librerías se puede pasar con el método config(String property, String value)
. En este caso configuramos que internamente debe usar una estructura de tres niveles para representar listas anidadas.
Una vez instanciada la clase ParquetWriter
, la mayor complejidad reside en transformar tus POJOs a las clases Organization
generadas a partir del IDL de Avro. El código completo sería este:
Path path = new Path("/tmp/my_output_file.parquet");
OutputFile outputFile = HadoopOutputFile.fromPath(path, new Configuration());
try (ParquetWriter<Organization> writer = AvroParquetWriter.<Organization>builder(outputFile)
.withSchema(new Organization().getSchema())
.withWriteMode(Mode.OVERWRITE)
.config(AvroWriteSupport.WRITE_OLD_LIST_STRUCTURE, "false")
.build()) {
for (var org : organizations) {
List<Attribute> attrs = org.attributes().stream()
.map(a -> Attribute.newBuilder()
.setId(a.id())
.setQuantity(a.quantity())
.setAmount(a.amount())
.setSize(a.size())
.setPercent(a.percent())
.setActive(a.active())
.build())
.toList();
Organization organization = Organization.newBuilder()
.setName(org.name())
.setCategory(org.category())
.setCountry(org.country())
.setOrganizationType(OrganizationType.valueOf(org.type().name()))
.setAttributes(attrs)
.build();
writer.write(organization);
}
}
En vez de convertir toda la colección de organizaciones y luego escribirla, podemos convertir y persistir cada Organization
una por una.
El código lo puedes encontrar en GitHub.
Deserialización
La deserialización es muy sencilla si aceptamos trabajar luego con las clases generadas por Avro.
Para leer el fichero necesitaremos instanciar un reader de Parquet:
Path path = new Path(filePath);
InputFile inputFile = HadoopInputFile.fromPath(path, new Configuration());
ParquetReader<Organization> reader = AvroParquetReader.<Organization>builder(inputFile).build();
Parquet define una clase llamada ParquetReader<T>
y la librería parquet-avro
la extiende implementado en AvroParquetReader
la lógica de convertir las estructuras de datos internas de Parquet a las clases generadas por Avro.
En este caso InputFile
es la abstracción de ficheros de Parquet con capacidad de leer de ellos.
Por tanto:
Path
,InputFile
,HadoopInputFile
yParquetReader
son clases definidas por el API de ParquetAvroParquetReader
implementaParquetReader
y está definida enparquet-avro
, librería que encapsula Parquet con AvroOrganization
(yAttribute
) son clases generadas por la utilidad de Avro, no relacionada con Parquet
La instanciación de la clase ParquetReader
también se hace con un Builder, aunque las opciones a configurar son mucho menores, ya que toda su configuración viene dada por el propio fichero que vamos a leer. El reader no necesita saber si el fichero usa codificación de diccionario o si está comprimido, por lo que no es necesario configurarlo, ya lo descubre él leyendo el fichero.
Path path = new Path(filePath);
InputFile inputFile = HadoopInputFile.fromPath(path, new Configuration());
try (ParquetReader<Organization> reader = AvroParquetReader.<Organization>builder(inputFile).build()) {
List<Organization> organizations = new ArrayList<>();
Organization next = null;
while ((next = reader.read()) != null) {
organizations.add(next);
}
return organizations;
}
Si el IDL empleado para generar el código contuviera un subconjunto de los atributos persistidos en el fichero, al leerlo estaríamos ignorando todas las columnas no presentes en el IDL. Nos ahorraremos lecturas de disco y deserialización/decodificación de datos.
El código lo puedes encontrar en GitHub.
Usando GenericRecord
Aquí no será necesario generar ningún código y trabajaremos con la clase GenericRecord
proporcionada por Avro, pero el código será un poco más verboso.
Serialización
Como no tenemos ficheros generados que contengan el esquema embebido, necesitamos definir programáticamente el schema de Avro que vamos a usar. El código es el mismo que el del artículo sobre Avro:
Schema attrSchema = SchemaBuilder.record("Attribute")
.fields()
.requiredString("id")
.requiredInt("quantity")
.requiredInt("amount")
.requiredInt("size")
.requiredDouble("percent")
.requiredBoolean("active")
.endRecord();
var enumSymbols = Stream.of(Type.values()).map(Type::name).toArray(String[]::new);
Schema orgsSchema = SchemaBuilder.record("Organizations")
.fields()
.requiredString("name")
.requiredString("category")
.requiredString("country")
.name("organizationType").type().enumeration("organizationType")
.symbols(enumSymbols).noDefault()
.name("attributes").type().array().items(attrSchema).noDefault()
.endRecord();
var typeField = orgsSchema.getField("organizationType").schema();
EnumMap<Type, EnumSymbol> enums = new EnumMap<>(Type.class);
enums.put(Type.BAR, new EnumSymbol(typeField, Type.BAR));
enums.put(Type.BAZ, new EnumSymbol(typeField, Type.BAZ));
enums.put(Type.FOO, new EnumSymbol(typeField, Type.FOO));
En vez de usar un AvroParquetWriter
del tipo Organization
, creamos uno del tipo GenericRecord
y construimos instancias del mismo como si fuera un Map
:
Path path = new Path(filePath);
OutputFile outputFile = HadoopOutputFile.fromPath(path, new Configuration());
try (ParquetWriter<GenericRecord> writer = AvroParquetWriter.<GenericRecord>builder(outputFile)
.withSchema(orgsSchema)
.withWriteMode(Mode.OVERWRITE)
.config(AvroWriteSupport.WRITE_OLD_LIST_STRUCTURE, "false")
.build()) {
for (var org : organizations) {
List<GenericRecord> attrs = new ArrayList<>();
for (var attr : org.attributes()) {
GenericRecord attrRecord = new GenericData.Record(attrSchema);
attrRecord.put("id", attr.id());
attrRecord.put("quantity", attr.quantity());
attrRecord.put("amount", attr.amount());
attrRecord.put("size", attr.size());
attrRecord.put("percent", attr.percent());
attrRecord.put("active", attr.active());
attrs.add(attrRecord);
}
GenericRecord orgRecord = new GenericData.Record(orgsSchema);
orgRecord.put("name", org.name());
orgRecord.put("category", org.category());
orgRecord.put("country", org.country());
orgRecord.put("organizationType", enums.get(org.type()));
orgRecord.put("attributes", attrs);
writer.write(orgRecord);
}
}
El código lo puedes encontrar en GitHub.
Deserialización
Como en la versión original de Avro, la mayor parte del trabajo consiste en convertir el GenricRecord
en nuestra estructura de datos. Al comportarse como un Map
, tendremos que castear los tipos de los valores:
Path path = new Path(filePath);
InputFile inputFile = HadoopInputFile.fromPath(path, new Configuration());
try (ParquetReader<GenericRecord> reader = AvroParquetReader.<GenericRecord>builder(inputFile).build()) {
List<Org> organizations = new ArrayList<>();
GenericRecord record = null;
while ((record = reader.read()) != null) {
List<GenericRecord> attrsRecords = (List<GenericRecord>) record.get("attributes");
var attrs = attrsRecords.stream().map(attr -> new Attr(attr.get("id").toString(),
((Integer) attr.get("quantity")).byteValue(),
((Integer) attr.get("amount")).byteValue(),
(boolean) attr.get("active"),
(double) attr.get("percent"),
((Integer) attr.get("size")).shortValue())).toList();
Utf8 name = (Utf8) record.get("name");
Utf8 category = (Utf8) record.get("category");
Utf8 country = (Utf8) record.get("country");
Type type = Type.valueOf(record.get("organizationType").toString());
organizations.add(new Org(name.toString(), category.toString(), country.toString(), type, attrs));
}
return organizations;
}
Al estar utilizando la interface de Avro, mantiene su lógica de que los Strings se codifican dentro de la clase Utf8
y será necesario extraer sus valores.
El código lo puedes encontrar en GitHub.
Por defecto cuando lee el fichero deserializa todos los campos del objeto, ya que desconoce el esquema de lo que necesitas leer, y lo procesa todo. Si quisieras una proyección de los campos deberás pasárselo en forma de schema de Avro en la creación del ParquetReader
:
Schema projection = SchemaBuilder.record("Organizations")
.fields()
.requiredString("name")
.requiredString("category")
.requiredString("country")
.endRecord();
Configuration configuration = new Configuration();
configuration.set(AvroReadSupport.AVRO_REQUESTED_PROJECTION, orgsSchema.toString());
try (ParquetReader<GenericRecord> reader = AvroParquetReader.<GenericRecord>builder(inputFile)
.withConf(configuration)
.build()) {
....
El resto del proceso sería igual, pero con menos campos. Puedes ver el todo el código fuente del ejemplo aquí.
Rendimiento
¿Qué rendimiento da Parquet Avro a la hora de serializar y deserializar un gran volumen de datos? ¿En qué medida influyen las distintas opciones de compresión? ¿elegimos compresión con Snappy o no comprimir? ¿y entre activar el diccionario o no?
Aprovechando los análisis que hice anteriormente sobre distintos formatos de serialización podemos hacernos una idea de sus virtudes y carencias. Los benchmarks los he hecho con el mismo ordenador, por lo que son comparables para hacernos una idea.
Tamaño del fichero
Tanto usando generación de código como GenericRecord, el resultado es el mismo, ya que son distintas maneras de definir el mismo esquema y persistir los mismos datos:
Sin comprimir | Snappy | |
---|---|---|
Dictionary False | 1 034 MB | 508 MB |
Dictionary True | 289 MB | 281 MB |
Dadas la diferencia de tamaños, podemos ver que en mi ejemplo sintético el uso de diccionarios comprime bastante la información, mejor que el propio algoritmo de Snappy. La activación de la compresión o no vendrá dada por la penalización en rendimiento que suponga.
Tiempo de serialización
Usando generación de código:
Sin comprimir | Snappy | |
---|---|---|
Dictionary False | 14 386 ms | 14 920 ms |
Dictionary True | 15 110 ms | 15 381 ms |
Usando GenericRecord:
Sin comprimir | Snappy | |
---|---|---|
Dictionary False | 15 287 ms | 15 809 ms |
Dictionary True | 16 119 ms | 16 432 ms |
El tiempo es muy similar en todos los casos, y podemos decir que las distintas técnicas de compresión no afectan sensiblemente al tiempo empleado.
No hay diferencias de tiempos reseñables entre código generado y el uso de GenericRecord
, por lo que el performance no debería ser un factor determinante a la hora de elegir una solución.
Comparado con otros formatos de serialización, tarda entre un 40% (Jackson) y un 300% (Protocol Buffers/Avro) más de tiempo, pero a cambio consigue ficheros entre un 70% (Protocol Buffers/Avro) o 90% (Jackson) menores.
Tiempo de deserialización
Usando generación de código:
Sin comprimir | Snappy | |
---|---|---|
Dictionary False | 10 722 ms | 10 736 ms |
Dictionary True | 7 707 ms | 7 665 ms |
Usando GenericRecord:
Sin comprimir | Snappy | |
---|---|---|
Dictionary False | 12 089 ms | 11 931 ms |
Dictionary True | 8 374 ms | 8 451 ms |
En este caso el uso del diccionario tiene un impacto relevante en el tiempo, al ahorrarse decodificar información que está repetida. Definitivamente no hay una razón para desactivar la funcionalidad.
Si comparamos con otros formatos, es el doble de lento que Protocol Buffers y está a la par con Avro, pero más de dos veces más rápido que Jackson.
Para poner en perspectiva el rendimiento, en mi portátil lee 50 000 Organization
s por segundo, que a su vez contienen casi 3 millones de instancias de tipo Attribute
.
Tiempo de deserialización usando una proyección
¿Cómo es el rendimiento si usamos una proyección y sólo leemos tres campos del objeto Organización e ignoramos su colección de atributos?
Sin comprimir | Snappy | |
---|---|---|
Dictionary False | 289 ms | 304 ms |
Dictionary True | 195 ms | 203 ms |
Confirmamos la promesa de que si accedemos a un subconjunto de columnas, leeremos y decodificaremos mucha menos información. En este caso emplea un 2.5% del tiempo, o lo que es lo mismo, es 40 veces más rápido procesando el mismo fichero.
Aquí es donde Parquet muestra toda su potencia, al permitir leer y decodificar un subconjunto de datos en poco tiempo, aprovechando cómo están distribuidos los mismos en el fichero.
Conclusión
Si ya estás usando Avro o ya lo conoces, la mayoría del código y particularidades relativas a Avro te sonarán. Si no lo conoces, aumenta la barrera de entrada, al tener que aprender sobre dos tecnologías distintas, y no tener claro qué corresponde a cada una.
El mayor cambio respecto a usar sólo Avro es la forma de construir los objetos writer y reader, donde tendremos que lidiar con toda la configuración y particularidades propias de Parquet.
Si tuviera que elegir entre usar sólo Avro o Parquet con Avro, yo elegiría la segunda opción, ya que produce ficheros más compactos y tenemos la oportunidad de sacar provecho del formato columnar.
Los datos que he usado en el ejemplo son sintéticos y los resultados pueden variar según las características de tus datos. Te recomiendo hacer pruebas, pero a menos que todos tus valores sean muy aleatorios, las tasas de compresión serán altas.
En entornos de escribir una vez y leer múltiples veces, el tiempo empleado en serializar no debería ser determinante. Son más importantes, por ejemplo, el consumo de tu almacenamiento, el tiempo de transferencia de los ficheros, o la velocidad de procesamiento (más si puedes filtrar las columnas a las que accedes).
A pesar de usar diferentes técnicas de compresión y codificación, el tiempo de procesamiento de ficheros es bastante rápido. Junto a su capacidad de trabajar con un esquema tipado, lo convierte en un formato de intercambio de datos a tener en cuenta en proyectos con alto volumen de datos.