Microsoft Fabric: Data Flow Gen 2, Medal Loader Pattern.
Previously I identified the destination options for Dataflow as having only insert capability with either Append or Replace and no updates. As such, I started thinking about the nature of the ingestion and how it restricts data loading into more complex processes. However, could we use this limitation to our advantage and leverage it to process limited sets of data. In this blog we will explore a pattern of ingestion:
- Load Raw Data
- Capture the CDC of this Data
- Stream the CDC into a table
- Apply further transformations
We will review the process to answer:
- How does CDC perform on tables where data is append?
- What happens to CDC when data is replaced?
- How can we stream the data into our transform stage?
Approach
I am using
a metadata table to store the table name and the last modified date from the
inbound customer data this is used then to filter the data to ensure that only
newly modified records are ingested.
- I have two branches for the customer data and a third for the metadata records.
- I am interested in how the CDC on these tables interacts with the Dataflow write methods.
%%sql
SHOW
TBLPROPERTIES SalesLT_Customer_APPEND;
SHOW
TBLPROPERTIES SalesLT_Customer_REPLACE;
|
Index |
key |
value |
|
1 |
delta.enableChangeDataFeed |
true |
|
2 |
delta.minReaderVersion |
1 |
|
3 |
delta.minWriterVersion |
4 |
So I’m confident that before I write my Data CDC is in
place.
I will run data into the tables from a SQL Update Statement and refresh the data using the Power Query. Once done I will attempt to acquire the CDC data.
changedatefeed_df = spark.read.format("delta") \.option("readChangeData", True) \.option("startingVersion", 4) \.table('landingzone.SalesLT_Customer_APPEND')display(changedatefeed_df.sort("_commit_version"))
Results
After I run the Dataflow to add records I recheck my table
properties.
Given these results it's clear to see that the Dataflow’s
replace capabilities indeed replace the table anew but do not obey the
command to enable the feature for all new tables
spark.microsoft.delta.properties.defaults.enableChangeDataFeed
Other tables created, however, do apply this directive.
|
operation |
engineInfo |
|
ReplaceTable |
Mashup
Engine/2.123.561.0 |
Reading the CDC Data
from delta import *
#Read the Streamdef readTheStream(version,tableName):'''Starts the read stream for Change Data Capture from the transaction on the given table
version: is the the start version for change data capturetableName: is the name of the table for CDC'''DateChangeStream = spark.readStream.format("delta") \.option("readChangeFeed", "true") \.option("startingVersion", version) \.table(tableName)print("Source stream created...")return DateChangeStream# Write the stream to a delta table
def streamToDelta(delta_stream_table_path,check_point_path,inputStream):
'''
Writes stream data to a delta table
delta_stream_table_path - the location of the delta table
check_point_path - the folder that stores the check point files
inputStream - the stream processor
'''
deltastream = inputStream.writeStream.format("delta")\
.option("checkpointLocation", checkpointpath)\
.start(delta_stream_table_path)
print ("Streaming to delta sink...")
return (deltastream)
When these functions are run the stream status provides confidence that the processing of the data is being executed.
{'message': 'Getting offsets from DeltaSource[abfss://xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx@onelake.dfs.fabric.microsoft.com/xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx/Tables/SalesLT_Customer_APPEND]',
'isDataAvailable': False,
'isTriggerActive': True}
I chose to use Tables/{nameOfDeltaTable} for my delta table path variable and Files/{path for checkpoint} as this feels like the correct separation for Fabric Lakhouses. Looking at the history of the resultant Delta table a couple of things stand out.
- The operation of the logs is STREAMING UPDATE
- The operationParameters contains the:
- outputMode of update and
- epoch containing the number that corresponds with the Checkpoint file created during the stream write operation
- Usual operationMetrics are available
Conclusion
- Append keeps the entirety of data ingested
- Replace provides only the most recently ingested data
|
Append |
Replace |
|
Retains all data should future data be available |
Only the most recent data held |
CDC can be captured for onward processing or
audit log. |
Easier to extract current data for onward
processing |
|
Most Recent Data requires extra processing |
No CDC as Delta Table created by Mashup Engine |







Comments
Post a Comment