In [1]:
# DBFS paths
base_path = 'dbfs:/mnt/deltalake/temp/deordie3'

order_checkpoints_path = '{0}/checkpoints/order'.format(base_path)
order_source_path = '{0}/data/order_source'.format(base_path)
order_sink_path = '{0}/data/order_sink'.format(base_path)

product_checkpoints_path = '{0}/checkpoints/product'.format(base_path)
product_increment_path = '{0}/data/product_increment'.format(base_path)
product_sink_path = '{0}/data/product_sink'.format(base_path)

### Pipeline 
Prepare data: write source rows into delta table <p/>
Preparation before processing: cleanup sink path and checkpoints path (to start stream from the begining) <p/>
ETL: read source data from delta as stream, transform and write stream into delta table <p/>

In [3]:
from pyspark.sql import Row, DataFrame

Product = Row("ProductId", "ProductName", "ProductPri—Åe")
OrderAcceptedEvent = Row("OrderId", "ClientId", "SaleDate", "Products")

pepperoni_pizza = Product(1, "Pepperoni",  100)
meat_pizza = Product(2, "Meat", 150)
order_events_seq = [ 
  OrderAcceptedEvent(1, 2, "2020-06-01 10:00:00", [pepperoni_pizza]),  
  OrderAcceptedEvent(1, 2, "2020-06-01 10:00:00", [pepperoni_pizza]), # duplicate row for test
  OrderAcceptedEvent(2, 2, "2020-06-15 10:00:00", [pepperoni_pizza,meat_pizza]) 
]
order_events_df = spark.createDataFrame(order_events_seq)

dbutils.fs.rm(order_source_path, True)
order_events_df.write.format("delta").mode("append").save(order_source_path)
spark.read.format("delta").load(order_source_path).show(10, False)

In [4]:
import time
from pyspark.sql import DataFrame

# cleanup checkpoints, cleanup sink data
dbutils.fs.rm(order_checkpoints_path, True)
dbutils.fs.rm(order_sink_path, True)

def transform_data(source: DataFrame) -> DataFrame:
  return source.select("OrderId", "ClientId", "SaleDate")

# create 1 duplicate row at sink side
init_sink = transform_data(order_events_df.orderBy("SaleDate").limit(1))
init_sink\
.write\
.format("delta")\
.save(order_sink_path)

# ETL
stream_df = (
  spark 
  .readStream
  .format("delta")
  .load(order_source_path)
)

transformed_df = transform_data(stream_df)

# (!) use processingTime to reduce costs
transformed_df\
.writeStream\
.format("delta")\
.option("checkpointLocation", order_checkpoints_path)\
.trigger(processingTime="30 seconds")\
.outputMode("append")\
.start(order_sink_path)
   
time.sleep(int(10))
for stream in spark.streams.active:
  stream.stop()

display(spark.read.format("delta").load(order_sink_path).sort("OrderId","SaleDate").show(10, False))
# 3 duplicate rows for order 1 (one duplicate from microbatch, second - at  sink side)

In [5]:
from delta.tables import *


# cleanup checkpoints, cleanup sink data
dbutils.fs.rm(order_checkpoints_path, True)
dbutils.fs.rm(order_sink_path, True)

def transform_data(source: DataFrame) -> DataFrame:
  return source.select("OrderId", "ClientId", "SaleDate")

# create 1 duplicate row at sink side
init_sink = transform_data(order_events_df.orderBy("SaleDate").limit(1))
init_sink\
.write\
.format("delta")\
.save(order_sink_path)

deltaTable = DeltaTable.forPath(spark, order_sink_path)
# upsert function 
def upsertOrderToDelta(microBatchOutputDF, batchId):
  deltaTable.alias("t")\
  .merge(microBatchOutputDF.alias("s"),"s.OrderId = t.OrderId")\
  .whenMatchedUpdateAll()\
  .whenNotMatchedInsertAll()\
  .execute()

# ETL
stream_df = (
  spark 
  .readStream
  .format("delta")
  .load(order_source_path)
)

transformed_df = (
  transform_data(stream_df)
  .distinct() # deduplicate microbatch
) 
    
transformed_df\
.writeStream\
.format("delta")\
.foreachBatch(upsertOrderToDelta)\
.option("checkpointLocation", order_checkpoints_path)\
.trigger(processingTime="30 seconds")\
.outputMode("update")\
.start(order_sink_path)
      
time.sleep(int(30))
for stream in spark.streams.active:
  stream.stop()

spark.read.format("delta").load(order_sink_path).sort("OrderId","SaleDate").show(10, False)
# no duplicate rows for order 1

In [6]:
from pyspark.sql import functions as F
from pyspark.sql.types import TimestampType

# source batch rows
Product = Row("ProductId", "ProductName", "ProductPrice", "EffectiveDateTime")
products_seq = [
  Product(1, "Pepperoni",  100, "2020-06-01 10:00:00"),
  Product(2, "Meat", 150, "2020-06-10 00:00:00")  
]

# update batch rows
products_seq_for_update = [
   Product(1, "Pepperoni Update",  300, "2020-06-02 12:00:00"),
   Product(2, "Meat pizza Update", 120, "2020-06-11 16:00:00")    
]

products_df = spark.createDataFrame(products_seq)
products_update_df = spark.createDataFrame(products_seq_for_update)

# save increment
dbutils.fs.rm(product_increment_path, True)
products_update_df.write.format("delta").mode("append").save(product_increment_path)

products_df.show(10, False)
products_update_df.show(10, False)

In [7]:
END_OF_TIME_UNIX_TIMESTAMP = "253402214400"; # 9999-12-31

# cleanup checkpoints
dbutils.fs.rm(product_checkpoints_path, True)
dbutils.fs.rm(product_sink_path, True)

# write first batch to sink as SCD table
products_df\
.withColumn("Current", F.lit(True))\
.withColumn("EffectiveFrom", F.col("EffectiveDateTime").cast(TimestampType()))\
.withColumn("EffectiveTo", F.lit("9999-12-31").cast(TimestampType()))\
.select("ProductId", "ProductName", "ProductPrice", "Current", "EffectiveFrom", "EffectiveTo")\
.write\
.format("delta")\
.save(product_sink_path)

# upsert function
deltaTable = DeltaTable.forPath(spark, product_sink_path)

def upsertProductToDelta(microBatchOutputDF, batchId):
  deltaTable.alias("t")\
  .merge(microBatchOutputDF.alias("s"),"t.ProductId = s.mergeKey") \
  .whenMatchedUpdate(
    condition = "t.Current = True",
    set = {                                     
      "Current": "false",
      "EffectiveTo": "s.EffectiveDateTime"
    }
  )\
  .whenNotMatchedInsert(
    values = {
      "ProductId": "s.ProductId",
      "ProductName": "s.ProductName",
      "ProductPrice": "s.ProductPrice",
      "Current": "True",
      "EffectiveFrom": "s.EffectiveDateTime",  
      "EffectiveTo": END_OF_TIME_UNIX_TIMESTAMP
    }
  )\
  .execute()
    
# ETL
stream_df = (
  spark 
  .readStream
  .format("delta")
  .load(product_increment_path)
)

transformed_df = stream_df.distinct()

# prepare rows for insert (older version already exists in delta)
new_rows_to_insert = ( 
  transformed_df
  .alias("s")
  .join(deltaTable.toDF().alias("t"), "ProductId")
  .where("t.Current = True")
)

stagedUpdates = (
  new_rows_to_insert
  .selectExpr("NULL as mergeKey", "s.*")  
  .union(transformed_df.selectExpr("ProductId as mergeKey", "*"))
)

stagedUpdates\
.writeStream\
.format("delta")\
.foreachBatch(upsertProductToDelta)\
.option("checkpointLocation", product_checkpoints_path)\
.trigger(processingTime="30 seconds")\
.outputMode("update")\
.start(product_sink_path)

time.sleep(int(40))
for stream in spark.streams.active:
  stream.stop()

spark.read.format("delta").load(product_sink_path).orderBy("ProductId", "EffectiveFrom").show(10, False)