Enabling CDC in Delta for a Lakehouse

Change Data Capture in Delta Tables is a great way to look at changes. This post does not discuss the details of using this Change Feed but limits itself the how we will enable it.

In an ideal world, we will be looking at setting up Change Data Capture as part of a planned deployment and we have a couple of options in that case.

1. All tables will capture Change Data Capture.

In this scenario, if we want all tables to have Change Data Capture to be set we can enable this default. 
NB: this applies only to the current session. To apply this across all sessions we must add this property to the Pool Configuration.

spark.conf.set("spark.microsoft.delta.properties.defaults.enableChangeDataFeed", "true")

Synapse Apache Spark Pool Settings
In Synapse this can be specified in a configuration file. This file can be applied to any pool.

Environment Spark Settings Dialogue in Fabric

Setting this in Fabric is at the Environment Level - note Environments are still in Preview.

However, we may not want all of our tables to have CDC applied.

2. Table by table Change Data Capture.

We can set CDC at the point at which we create the table.
%%sql
CREATE TABLE student (id INT, name STRING, age INT) 
TBLPROPERTIES (delta.enableChangeDataFeed = true)

We don't always work in an ideal world indeed it might be that the Delta Table was created with a version lower than 2.0 and no CDC capability.

To set CDC on a single table the  following Command sets the CDC

ALTER TABLE Student SET TBLPROPERTIES (delta.enableChangeDataFeed = TRUE);

I have developed a couple of functions that I use for all the tables in a database. I have some lake houses where not all tables are Delta so in this case attempting to enable CDC would fail and fail my function. 
  1. Get the tables from the catalog
  2. Check if they are delta 

# Is the Table a delta table?
def delta_check(TableName: str) -> bool:
    '''
    Takes a table name 
    Gets the table properties
    Checks the location to see if the _delta_log folder exists
    '''
    desc_table = spark.sql(f"describe formatted {TableName}").collect()
    location = [i[1] for i in desc_table if i[0] == 'Location'][0]
    try:
        dir_check = dbutils.fs.ls(f"{location}/_delta_log")
        is_delta = True
    except Exception as e:
        is_delta = False
    return is_delta

# For Each table in a catalog set CDC on or off
def setCDC(switch: bool, database: str):
    '''
    switch: Set On or Off (True/False)
    database: The name of the database to alter
    '''
    t = spark.catalog.listTables(database)
    for  table in t:    
        print(table)
        if (delta_check):
            spark.sql(f"ALTER TABLE {database}.{table.name} SET TBLPROPERTIES (delta.enableChangeDataFeed = {switch})")  
            
# Call the function on a data base
setCDC(True,'landingzone')      

More information on Delta Lake Change Data Feed can be found here.

Comments

Popular posts from this blog

Microsoft Fabric: Data Flow Gen 2, Medal Loader Pattern.

Semantic Model Building Using Direct Lake