Microsoft Fabric: Data Flow Gen 2, Medal Loader Pattern.


Previously I identified the destination options for Dataflow as having only insert capability with either Append or Replace and no updates. As such, I started thinking about the nature of the ingestion and how it restricts data loading into more complex processes. However, could we use this limitation to our advantage and leverage it to process limited sets of data. In this blog we will explore a pattern of ingestion:

  • Load Raw Data 
  • Capture the CDC of this Data
  • Stream the CDC into a table
  • Apply further transformations 


Process Flow Diagram

We will review the process to answer:

  1. How does CDC perform on tables where data is append?
  2. What happens to CDC when data is replaced?
  3. How can we stream the data into our transform stage?

Approach

First off let's look at enabling CDC on the tables. I did not set my Lake House to enable by default so needed to do this per table.  I have previously posted on how this can be achieved.

I set up a power query in Dataflow Gen 2 which will load data from [AdventureWorksLT].[SalesLT].[Customer] from an Azure SQL database. I have chosen Dataflow Gen 2 over Copy data because as I write this is the only way that I would be able to load on-premises data from SQL Server. I am using Azure SQL for ease and speed as my Gateway is local and a little underpowered.

I am using a metadata table to store the table name and the last modified date from the inbound customer data this is used then to filter the data to ensure that only newly modified records are ingested.



  • I have two branches for the customer data and a third for the metadata records.
  • I am interested in how the CDC on these tables interacts with the Dataflow write methods.

%%sql

SHOW TBLPROPERTIES SalesLT_Customer_APPEND;

SHOW TBLPROPERTIES SalesLT_Customer_REPLACE;

 Both tables have the same properties:

Index

key

value

1

delta.enableChangeDataFeed

true

2

delta.minReaderVersion

1

3

delta.minWriterVersion

4

So I’m confident that before I write my Data CDC is in place.

I will run data into the tables from a SQL Update Statement and refresh the data using the Power Query. Once done I will attempt to acquire the CDC data.

changedatefeed_df = spark.read.format("delta") \
    .option("readChangeData", True) \
    .option("startingVersion", 4) \
    .table('landingzone.SalesLT_Customer_APPEND')
display(changedatefeed_df.sort("_commit_version"))


This will confirm whether or not the CDC data is available. Assuming that it is I will look at how this data can be passed forward. I will look at a notebook mechanism using a stream reader and stream writer. I do not intend to have this as a live stream none-the-less the checkpoint for this table will ensure that 

Results

After I run the Dataflow to add records I recheck my table properties.



Given these results it's clear to see that the Dataflow’s replace capabilities indeed replace the table anew but do not obey the command to enable the feature for all new tables

spark.microsoft.delta.properties.defaults.enableChangeDataFeed

Other tables created, however, do apply this directive.

Looking at the history on the Delta Table offers a clue as to what's happening, the Power Query Mashup Engine is being used to deploy data to the Delta Table and I can only assume this does not utilize the environments in this process.

operation

engineInfo

ReplaceTable

Mashup Engine/2.123.561.0


In contrast the Append Operation does usefully retain the TBLPROPERTIES

Grid of History Details for a Delta Table

Reading the CDC Data

I have used a couple of functions to read the stream and then write it to a delta table.

from delta import *

#Read the Stream
def readTheStream(version,tableName):
  '''
  Starts the read stream for Change Data Capture from the transaction on the given table

  version: is the the start version for change data capture
  tableName: is the name of the table for CDC
  '''
  DateChangeStream = spark.readStream.format("delta") \
    .option("readChangeFeed", "true") \
    .option("startingVersion", version) \
    .table(tableName)
  print("Source stream created...")
  return DateChangeStream

# Write the stream to a delta table

def streamToDelta(delta_stream_table_path,check_point_path,inputStream):

    '''

    Writes stream data to a delta table

 

    delta_stream_table_path - the location of the delta table

    check_point_path - the folder that stores the check point files

    inputStream - the stream processor

    '''

    deltastream = inputStream.writeStream.format("delta")\

        .option("checkpointLocation", checkpointpath)\

        .start(delta_stream_table_path)

    print  ("Streaming to delta sink...")

    return (deltastream)

 

When these functions are run the stream status provides confidence that the processing of the data is being executed.

 

{'message': 'Getting offsets from DeltaSource[abfss://xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx@onelake.dfs.fabric.microsoft.com/xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx/Tables/SalesLT_Customer_APPEND]',

 'isDataAvailable': False,

 'isTriggerActive': True}

Its worth considering the approach here, to understand where and what data is stored in each location. Considering the above functions; readTheStream returns a query streaming dataframe representing an Unbounded Table, while streamToDelta employs writestream.start to deploy the data to the destination in this case a Delta Table. The Stream is in operation while the sessions is active however the checkpoint determines what data from the source is required from the unbounded table to write only those unprocessed rows. This means these functions can be called periodically rather than needing to be running continuously.

I chose to use Tables/{nameOfDeltaTable} for my delta table path variable and Files/{path for checkpoint} as this feels like the correct separation for Fabric Lakhouses. Looking at the history of the resultant Delta table a couple of things stand out.

  • The operation of the logs is STREAMING UPDATE
  • The operationParameters contains the:
    •  outputMode of update and 
    • epoch containing the number that corresponds with the Checkpoint file created during the stream write operation
  • Usual operationMetrics are available
Folder structure in windows showing checkpoint files

I did look at how this same process might be accomplished using Event Streaming in Fabric this would require a custom app. I played with the idea of using a Function app which connects to the endpoint but I can't see any advantage to this approach. A web call to the function app seems redundant when a call to the notebook would work as well.

Conclusion

Either of the Dataflow Gen2 Write Mechanisms can be used to carry data forward for other processing and transformations.
  • Append keeps the entirety of  data ingested 
  • Replace provides only the most recently ingested data

Append

Replace

Retains all data should future data be available

Only the most recent data held

CDC can be captured for onward processing or audit log.

Easier to extract current data for onward processing

Most Recent Data requires extra processing

No CDC as Delta Table created by Mashup Engine



I did observe some extra transactions using append. Every other transaction appears to not contain any operational data. There appear to be two simultaneous writes occurring at the same instance.


File Explorer view of files created simultaneously

Side-by-side log files one with content added to the table and one blank

I can only assume this is some quirk of the Mashup Engine.

Data from the Change Data Capture can be streamed using readstream and writestream and this works well. I would say that the CDC extraction despite adding some additional processing is a useful way to pull out the data as an archive log. The attraction of the Replace-only data is that it is easy to identify the current rows for further processing. That being said the notion of the the full audit log that CDC offers is appealing. Due to the anomaly on the logs I would probably go as far as to add a step to the write stream destination for CDC and use this as my extraction for ongoing processing.




Comments

Popular posts from this blog

Semantic Model Building Using Direct Lake