Apache Spark supports many different data formats, such as the ubiquitous CSV format and web-friendly JSON format. Common formats used primarily for big data analytical purposes are Apache Parquet and Apache Avro.
In this post, we’re going to cover the properties of these 4 formats — CSV, JSON, Parquet and Avro with Apache Spark.
CSV files (comma separated values) are commonly used to exchange tabular data between systems using plain text. CSV is a row-based file format, which means that every line of the file is the row in the table. Basically, CSV contains a header row that provides column names for the data, otherwise, files are considered partially structured. CSV files cannot initially present hierarchical or relational data. Data connections are typically organized by using multiple CSV files. Foreign keys are stored in columns of one or more files, but the links between these files are not expressed by the format itself. Also, CSV format is not fully standardized, files can use delimiters other than commas, such as tabs or spaces.
One of the other properties of CSV files is that they are only splittable when it is a raw, uncompressed file or when splittable compression format is used such as bzip2 or lzo (note: lzo needs to be indexed to be splittable).
- CSV is human-readable and easy to edit manually;
- CSV provides a straightforward information schema;
- CSV is processed by almost all existing applications;
- CSV is simple to implement and parse;
- CSV is compact. For XML you start tag and end tag for each column in each row. In CSV you write the column headers only once.
- CSV allows working with flat data. Complex data structures need to be handled aside from format;
- No support for column types. There is no distinction between text and numeric columns;
- No standard way to represent binary data;
- Problems with importing CSV (no distinction between NULL and quotes);
- Poor support of special characters;
- Lack of universal standard.
Despite the limitations, CSV files are a popular choice for data sharing, as they are supported by a wide range of business applications, consumer and scientific applications. Similarly, most batch and streaming data processing modules (for example, Spark and Hadoop) initially support serialization and deserialization of CSV files and offer ways to add a schema when reading.
Since a lot of data transferring is already using JSON format, most web languages initially support JSON or use external libraries to serialize and deserialize JSON data. Thanks to this support, JSON is used in logical formats by presenting data structures, interchange formats for hot data, and cold data stores.
Many batches and stream data processing modules natively support JSON serialization and deserialization. Although the data contained in JSON documents can ultimately be stored in more performance-optimized formats, such as Parquet or Avro, they serve as raw data, which is very important for reprocessing data (if necessary).
JSON files have several advantages:
- JSON supports hierarchical structures, simplifying the storage of related data in one document and the presentation of complex relations;
- Most languages provide simplified JSON serialization libraries or built-in support for JSON serialization/deserialization;
- JSON supports lists of objects, helping to avoid erratic transformations of lists into a relational data model;
- JSON is a widely used file format for NoSQL databases such as MongoDB, Couchbase and Azure Cosmos DB;
- Built-in support in most nowadays tools.
Launched in 2013, Parquet was developed by Cloudera and Twitter to serve as a column-based storage format, optimized for work with multi-column datasets. Because data is stored by columns, it can be highly compressed (compression algorithms perform better on data with low information entropy which is usually contained in columns) and splittable. The developers of the format claim that this storage format is ideal for Big Data problems.
Unlike CSV and JSON, Parquet files are binary files that contain metadata about their content. So, without reading/parsing the content of the file(s), Spark can just rely on the metadata to determine column names, compression/encodings, data types and even some basic statistics. The column metadata for a Parquet file is stored at the end of the file, which allows for fast, one-pass writing.
Parquet is optimized for the Write Once Read Many (WORM) paradigm. It's slow to write, but incredibly fast to read, especially when you're only accessing a subset of the total columns. Parquet is a good choice for read-heavy workloads. For use cases requiring operating on entire rows of data, a format like CSV or AVRO should be used.
The advantages of data storage in Parquet:
- Parquet is a columnar format. Only required columns would be fetched/read, it reduces the disk I/O. This concept is called projection pushdown;
- Schema travels with the data so data is self-describing;
- Despite the fact that it is created for HDFS, data can be stored in other file systems, such as GlusterFs or on top of NFS;
- Parquet are just files, which means that it is easy to work with them, move, back up and replicate;
- Native support in Spark out of the box provides the ability to simply take and save the file to your storage;
- Parquet provides very good compression up to 75% when used even with the compression formats like snappy;
- As practice shows, this format is the fastest for reading workflows compared to other file formats;
- Parquet is well suited for data warehouse kind of solutions where aggregations are required on certain column over a huge set of data;
- Parquet can be read and write using Avro API and Avro Schema(which gives the idea to store all raw data in Avro format but all processed data in Parquet);
- It also provides predicate pushdown, thus reducing further disk I/O cost.
- The column-based design makes you think about the schema and data types;
- Parquet does not always have native support in other tools other than Spark;
- It does not support data modification(Parquet files are immutable) and schematic evolution. Of course, Spark knows how to merge the scheme, if you change it over time (you need to specify a special option when reading). But to change something in an already existing file, you can do nothing other than overwriting, except that you can add a new column.
Predicate Pushdown / Filter Pushdown
The basic idea of predicate pushdown is that certain parts of queries (the predicates) can be "pushed" to where the data stored. For example, when we give some filter criteria, the data store tries to filter the records at the time of reading from disk. The advantage of predicate pushdown is fewer disks I/O and hence overall performance would be better. Otherwise, whole data would be brought into memory and then filtering needs to be done, which results in large memory requirements.
This optimization can drastically reduce query/processing time by filtering out data earlier rather than later. Depending on the processing framework, predicate pushdown can optimize your query by doing things like filtering data before it is transferred over the network, filtering data before loading into memory, or skipping reading entire files or chunks of files.
This concept is followed by most RDBMS and has been followed by big data storage formats like Parquet and ORC as well.
When data is read from the data store, only those columns would be read which are required as per the query, not all the fields would be read. Generally, columnar formats like Parquets and ORC follow this concept, which results in better I/O performance.
Apache Avro was released by the Hadoop working group in 2009. It is a row-based format that is highly splittable. It also described as a data serialization system similar to Java Serialization. The schema is stored in JSON format while the data is stored in binary format, minimizing file size and maximizing efficiency. Avro has robust support for schema evolution by managing added fields, missing fields, and fields that have changed. This allows old software to read the new data and new software to read the old data — a critical feature if your data has the potential to change.
With Avro’s capacity to manage schema evolution, it’s possible to update components independently, at different times, with a low risk of incompatibility. This saves applications from having to write if-else statements to process different schema versions and saves the developer from having to look at old code to understand old schemas. Because all versions of the schema are stored in a human-readable JSON header, it’s easy to understand all the fields that you have available.
Avro can support many different programming languages. Because the schema is stored in JSON while the data is in binary, Avro is a relatively compact option for both persistent data storage and wire transfer. Avro is typically the format of choice for write-heavy workloads given its easy to append new rows.
- Avro is language-neutral data serialization
- Avro stores the schema in the header of the file so data is self-describing;
- Lightweight and fast data serialization and deserialization, which can deliver very good ingestion performance;
- Avro formatted files are splittable and compressible and hence it's a good candidate for data storage in Hadoop ecosystem;
- The schema used to read an Avro file need not be the same as schema which was used to write the files. This makes it possible to add new fields independently.
- Just as with Sequence Files, Avro files also contain Sync markers to separate the blocks. This makes it highly splittable;
- These blocks can be compressed using compression formats such as snappy.
* JSON has the same conditions about splittability when compressed as CSV with one extra difference. When “wholeFile” option is set to true (re: SPARK-18352), JSON is NOT splittable.
In order to somehow compare the formats with each other, I created a set of tests using a Netflix dataset. This is the so-called narrow dataset — it has only three columns and a large number of lines.
All the tests were run on my laptop(I will not show the proper characteristics I think they do not matter for comparison) using pyspark 2.4.3 with running
spark-submit. Experiments were run 3 times for each of the formats used and then averaged. Test scripts are public: here.
Space utilization per format
The basic function of any files is to store the data. For the big data sphere, it is a necessity to store different types of data for different purposes, at least storing raw data. How much this data will take up space means how much money it will cost.
It should be noted that all configuration was used by default and compression was not used anywhere. I admit that an aggressively gzip-based source CSV file may take no more than 1.5 GB. So the advantage of binary formats will not be so dramatic. Conclusion — do not use JSON for storing raw data.
Ingestion latency per format
Next, I tried to save the data in various file formats and calculate the latency.
It may turn out that most of this time is reading a 2-gigabyte source data. And everything else takes 5-10 seconds(I didn't check Spark UI). Anyway, the results are understandable — plain text always will be easier to write than encoding and collecting the metadata of more complex formats(like Avro and parquet).
Random data lookup latency per format
In this test, I try to understand the latency of getting the records by providing a record identifier(using
sample transformation in spark).
The individual record stored in CSV, JSON and Avro format can only get in a brute force scan of an entire data partition. In parquet, the columnar nature of the format allows performing partition scans relatively fast. Thanks to column projection and column predicate push down, a scan input set is ultimately reduced.
Basic statistics per format
This test shows how quickly we can calculate frequently used column statistics(
Parquet and Avro use their hack when to calculate the number of rows in a dataset it’s enough to read metadata. For CSV and JSON, the situation is worse — they have to parse every byte.
Processing data per format
This test uses the time column filtering function.
Grouping by column per format
This test uses a more complex transformation — grouping by column.
Get distinct values latency per format
This test will try to measure the latency of getting distinct values in the column.
The result looks exactly the same as for random data lookup, but the
distinct operation significantly complicates the task. For simplicity, we can assume that it adds a reduce phase to the job, which means that the overhead for exchanging data between workers is also measured.
Lessons learned from performance tests
- CSV should generally be the fastest to write, JSON the easiest for a human to understand and Parquet the fastest to read a subset of the columns, while Avro is the fastest to read all columns at once.
- JSON is the standard for communicating on the web. APIs and websites are constantly communicating using JSON because of their usability properties such as well-defined schemas.
- Parquet and Avro are definitely more optimized for Big Data needs — splittability, compression support, great support for complex data structures but readability and writing speed are quite bad.
- Unit of data access parallelization in case of Parquet and Avro is an HDFS file block – thanks to that it is very easy to evenly distribute processing across all the resources available on a Hadoop cluster.
- When you have to choose a data storage format in Hadoop, you need to consider many factors, such as integration with third-party applications, the evolution of the scheme, support for specific data types ... But if you put performance at the forefront, the tests above convincingly show that Parquet is your best choice.
- Notably, compression algorithms played a significant role not only in reducing the data volume but also in enhancing the performance of data ingestion and data access. But this part of the picture wasn't tested;
- Apache Avro has proven to be a fast universal encoder for structured data. Due to very efficient serialization and deserialization, this format can guarantee very good performance whenever access to all the attributes of a record is required at the same time.
- According to the tests, columnar stores like Apache Parquet delivered very good flexibility between fast data ingestion, fast random data lookup, and scalable data analytics. In many cases, this provides the added advantage of keeping systems simple as only one type of technology is needed for storing the data and serving different use cases (random data access and analytics).
- Whatever you do -- never use JSON format. In almost all tests, it proved to be the worst format to use.
P.S. Columnar formats are generally used where you need to query upon few columns rather than all the fields in a row because their column-oriented storage pattern is well suited for the same. On the other hand Row formats are used where you need to access all the fields of row. So generally Avro is used to store the raw data because during processing usually all the fields are required.
Daily dose of