Delta lakes prevent data with incompatible schema from being written, unlike Parquet lakes which allow for any data to get written.
Let’s demonstrate how Parquet allows for files with incompatible schemas to get written to the same data store. Then let’s explore how Delta prevents incompatible data from getting written with schema enforcement.
We’ll finish with an explanation of schema evolution.
Parquet allows for incompatible schemas
Let’s create a Parquet with num1
and num2
columns:
We’ll use the spark-daria createDF method to build DataFrames for these examples.
val df = spark.createDF( List( (1, 2), (3, 4) ), List( ("num1", IntegerType, true), ("num2", IntegerType, true) ) ) val parquetPath = new java.io.File("./tmp/parquet_schema/").getCanonicalPath df.write.parquet(parquetPath)
Let’s view the contents of the Parquet lake.
spark.read.parquet(parquetPath).show() +----+----+ |num1|num2| +----+----+ | 1| 2| | 3| 4| +----+----+
Let’s create another Parquet file with only a num2
column and append it to the same folder.
val df2 = spark.createDF( List( 88, 99 ), List( ("num2", IntegerType, true) ) ) df2.write.mode("append").parquet(parquetPath)
Let’s read the Parquet lake into a DataFrame and view the output that’s undesirable.
spark.read.parquet(parquetPath).show() +----+ |num2| +----+ | 2| | 4| | 88| | 99| +----+
We lost the num1
column! spark.read.parquet
is only returning a DataFrame with the num2
column.
This isn’t ideal. Let’s see if Delta provides a better result.
Delta automatic schema updates
Let’s create the same df
as earlier and write out a Delta data lake.
val df = spark.createDF( List( (1, 2), (3, 4) ), List( ("num1", IntegerType, true), ("num2", IntegerType, true) ) ) val deltaPath = new java.io.File("./tmp/schema_example/").getCanonicalPath df.write.format("delta").save(deltaPath)
The Delta table starts with two columns, as expected:
spark.read.format("delta").load(deltaPath).show() +----+----+ |num1|num2| +----+----+ | 1| 2| | 3| 4| +----+----+
Let’s append a file with only the num1
column to the Delta lake and see how Delta handles the schema mismatch.
val df2 = spark.createDF( List( 88, 99 ), List( ("num1", IntegerType, true) ) ) df2.write.format("delta").mode("append").save(deltaPath)
Delta gracefully fills in missing column values with nulls
.
spark.read.format("delta").load(deltaPath).show() +----+----+ |num1|num2| +----+----+ | 1| 2| | 3| 4| | 88|null| | 99|null| +----+----+
Let’s append a DataFrame that only has a num2
column to make sure Delta also handles that case gracefully.
val df3 = spark.createDF( List( 101, 102 ), List( ("num2", IntegerType, true) ) ) df3.write.format("delta").mode("append").save(deltaPath)
We can see Delta gracefully populates the num2
values and nulls out the num1
values.
spark.read.format("delta").load(deltaPath).show() +----+----+ |num1|num2| +----+----+ | 1| 2| | 3| 4| | 88|null| | 99|null| |null| 101| |null| 102| +----+----+
Let’s see if Delta can handle a DataFrame with num1
, num2
, and num3
columns.
val df4 = spark.createDF( List( (7, 7, 7), (8, 8, 8) ), List( ("num1", IntegerType, true), ("num2", IntegerType, true), ("num3", IntegerType, true) ) ) df4.write.format("delta").mode("append").save(deltaPath)
This causes the code to error out with the following message:
org.apache.spark.sql.AnalysisException: A schema mismatch detected when writing to the Delta table. To enable schema migration, please set: '.option("mergeSchema", "true")'. Table schema: root -- num1: integer (nullable = true) -- num2: integer (nullable = true) Data schema: root -- num1: integer (nullable = true) -- num2: integer (nullable = true) -- num3: integer (nullable = true)
mergeSchema
We can fix this by setting mergeSchema
to true
, as indicated by the error message.
The codes works perfectly once the option is set:
df4 .write .format("delta") .mode("append") .option("mergeSchema", "true") .save(deltaPath) spark.read.format("delta").load(deltaPath).show() +----+----+----+ |num1|num2|num3| +----+----+----+ | 7| 7| 7| | 8| 8| 8| | 1| 2|null| | 3| 4|null| |null| 101|null| |null| 102|null| | 88|null|null| | 99|null|null| +----+----+----+
Replace table schema
mergeSchema
will work when you append a file with a completely different schema, but it probably won’t give you the result you’re looking for.
val df5 = spark.createDF( List( ("nice", "person"), ("like", "madrid") ), List( ("word1", StringType, true), ("word2", StringType, true) ) ) df5 .write .format("delta") .mode("append") .option("mergeSchema", "true") .save(deltaPath)
mergeSchema
appends two new columns to the DataFrame because the save mode was set to append.
spark.read.format("delta").load(deltaPath).show() +----+----+----+-----+------+ |num1|num2|num3|word1| word2| +----+----+----+-----+------+ | 7| 7| 7| null| null| | 8| 8| 8| null| null| | 1| 2|null| null| null| | 3| 4|null| null| null| |null|null|null| nice|person| |null|null|null| like|madrid| | 88|null|null| null| null| | 99|null|null| null| null| |null| 101|null| null| null| |null| 102|null| null| null| +----+----+----+-----+------+
Let’s see how mergeSchema
behaves when using a completely different schema and setting the save mode to overwrite.
df5 .write .format("delta") .mode("overwrite") .option("mergeSchema", "true") .save(deltaPath) spark.read.format("delta").load(deltaPath).show() +----+----+----+-----+------+ |num1|num2|num3|word1| word2| +----+----+----+-----+------+ |null|null|null| nice|person| |null|null|null| like|madrid| +----+----+----+-----+------+
mergeSchema
isn’t the best when the schemas are completely different. It’s better for incremental schema changes.
overwriteSchema
Setting overwriteSchema
to true will wipe out the old schema and let you create a completely new table.
df5 .write .format("delta") .option("overwriteSchema", "true") .mode("overwrite") .save(deltaPath)
spark.read.format("delta").load(deltaPath).show() +-----+------+ |word1| word2| +-----+------+ | nice|person| | like|madrid| +-----+------+
Conclusion
Delta lakes offer powerful schema evolution features that are not available in Parquet lakes.
Delta lakes also enforce schemas and make it less likely that a bad write will mess up your entire lake.
Delta offers some great features that are simply not available in plain vanilla Parquet lakes.
Permalink
Hi this statement contradicts with your next paragraph –
“mergeSchema will even work when you append a file with a completely different schema”
“But mergeSchema might not give you the result you’re looking for when the save mode is set to append and the new data’s schema is completely different”
The result gracefully handles new string columns without any issue.
Permalink
Thanks for pointing this out. I just rewrote most of that section. Can you take a look and let me know if it looks better now?
Permalink
Good article.. now i am a big fan of you and closing watching your article. can you please post article about sqoop techniques and performance tunning.
Permalink
or data transfer using spark.. that will also help to
Permalink
hi, i am a fan of you , and i test this scenario in spark 2.4.4, and found it did’t lose the column num1 ,it just show the “null” value . it’s related to the version of spark ?
============
by the way , could you give some example of spark use case in ETL,pipline data process and something like this close to the production environment, thanks .
Permalink
Hey, merge schema does not work when data is partition. Do you know a workaroud for that?