{"cells":[{"cell_type":"code","source":["# DBFS paths\nbase_path = 'dbfs:/mnt/deltalake/temp/deordie3'\n\norder_checkpoints_path = '{0}/checkpoints/order'.format(base_path)\norder_source_path = '{0}/data/order_source'.format(base_path)\norder_sink_path = '{0}/data/order_sink'.format(base_path)\n\nproduct_checkpoints_path = '{0}/checkpoints/product'.format(base_path)\nproduct_increment_path = '{0}/data/product_increment'.format(base_path)\nproduct_sink_path = '{0}/data/product_sink'.format(base_path)"],"metadata":{},"outputs":[{"metadata":{},"output_type":"display_data","data":{"text/html":["\n
"]}}],"execution_count":1},{"cell_type":"markdown","source":["### Pipeline \nPrepare data: write source rows into delta table

\nPreparation before processing: cleanup sink path and checkpoints path (to start stream from the begining)

\nETL: read source data from delta as stream, transform and write stream into delta table

"],"metadata":{}},{"cell_type":"code","source":["from pyspark.sql import Row, DataFrame\n\nProduct = Row(\"ProductId\", \"ProductName\", \"ProductPriсe\")\nOrderAcceptedEvent = Row(\"OrderId\", \"ClientId\", \"SaleDate\", \"Products\")\n\npepperoni_pizza = Product(1, \"Pepperoni\", 100)\nmeat_pizza = Product(2, \"Meat\", 150)\norder_events_seq = [ \n OrderAcceptedEvent(1, 2, \"2020-06-01 10:00:00\", [pepperoni_pizza]), \n OrderAcceptedEvent(1, 2, \"2020-06-01 10:00:00\", [pepperoni_pizza]), # duplicate row for test\n OrderAcceptedEvent(2, 2, \"2020-06-15 10:00:00\", [pepperoni_pizza,meat_pizza]) \n]\norder_events_df = spark.createDataFrame(order_events_seq)\n\ndbutils.fs.rm(order_source_path, True)\norder_events_df.write.format(\"delta\").mode(\"append\").save(order_source_path)\nspark.read.format(\"delta\").load(order_source_path).show(10, False)"],"metadata":{},"outputs":[{"metadata":{},"output_type":"display_data","data":{"text/html":["\n

+-------+--------+-------------------+-------------------------------------+\nOrderId|ClientId|SaleDate |Products |\n+-------+--------+-------------------+-------------------------------------+\n1 |2 |2020-06-01 10:00:00|[[1, Pepperoni, 100]] |\n1 |2 |2020-06-01 10:00:00|[[1, Pepperoni, 100]] |\n2 |2 |2020-06-15 10:00:00|[[1, Pepperoni, 100], [2, Meat, 150]]|\n+-------+--------+-------------------+-------------------------------------+\n\n
"]}}],"execution_count":3},{"cell_type":"code","source":["import time\nfrom pyspark.sql import DataFrame\n\n# cleanup checkpoints, cleanup sink data\ndbutils.fs.rm(order_checkpoints_path, True)\ndbutils.fs.rm(order_sink_path, True)\n\ndef transform_data(source: DataFrame) -> DataFrame:\n return source.select(\"OrderId\", \"ClientId\", \"SaleDate\")\n\n# create 1 duplicate row at sink side\ninit_sink = transform_data(order_events_df.orderBy(\"SaleDate\").limit(1))\ninit_sink\\\n.write\\\n.format(\"delta\")\\\n.save(order_sink_path)\n\n# ETL\nstream_df = (\n spark \n .readStream\n .format(\"delta\")\n .load(order_source_path)\n)\n\ntransformed_df = transform_data(stream_df)\n\n# (!) use processingTime to reduce costs\ntransformed_df\\\n.writeStream\\\n.format(\"delta\")\\\n.option(\"checkpointLocation\", order_checkpoints_path)\\\n.trigger(processingTime=\"30 seconds\")\\\n.outputMode(\"append\")\\\n.start(order_sink_path)\n \ntime.sleep(int(10))\nfor stream in spark.streams.active:\n stream.stop()\n\ndisplay(spark.read.format(\"delta\").load(order_sink_path).sort(\"OrderId\",\"SaleDate\").show(10, False))\n# 3 duplicate rows for order 1 (one duplicate from microbatch, second - at sink side)"],"metadata":{},"outputs":[{"metadata":{},"output_type":"display_data","data":{"text/html":["\n
+-------+--------+-------------------+\nOrderId|ClientId|SaleDate |\n+-------+--------+-------------------+\n1 |2 |2020-06-01 10:00:00|\n1 |2 |2020-06-01 10:00:00|\n1 |2 |2020-06-01 10:00:00|\n2 |2 |2020-06-15 10:00:00|\n+-------+--------+-------------------+\n\n
"]}}],"execution_count":4},{"cell_type":"code","source":["from delta.tables import *\n\n\n# cleanup checkpoints, cleanup sink data\ndbutils.fs.rm(order_checkpoints_path, True)\ndbutils.fs.rm(order_sink_path, True)\n\ndef transform_data(source: DataFrame) -> DataFrame:\n return source.select(\"OrderId\", \"ClientId\", \"SaleDate\")\n\n# create 1 duplicate row at sink side\ninit_sink = transform_data(order_events_df.orderBy(\"SaleDate\").limit(1))\ninit_sink\\\n.write\\\n.format(\"delta\")\\\n.save(order_sink_path)\n\ndeltaTable = DeltaTable.forPath(spark, order_sink_path)\n# upsert function \ndef upsertOrderToDelta(microBatchOutputDF, batchId):\n deltaTable.alias(\"t\")\\\n .merge(microBatchOutputDF.alias(\"s\"),\"s.OrderId = t.OrderId\")\\\n .whenMatchedUpdateAll()\\\n .whenNotMatchedInsertAll()\\\n .execute()\n\n# ETL\nstream_df = (\n spark \n .readStream\n .format(\"delta\")\n .load(order_source_path)\n)\n\ntransformed_df = (\n transform_data(stream_df)\n .distinct() # deduplicate microbatch\n) \n \ntransformed_df\\\n.writeStream\\\n.format(\"delta\")\\\n.foreachBatch(upsertOrderToDelta)\\\n.option(\"checkpointLocation\", order_checkpoints_path)\\\n.trigger(processingTime=\"30 seconds\")\\\n.outputMode(\"update\")\\\n.start(order_sink_path)\n \ntime.sleep(int(30))\nfor stream in spark.streams.active:\n stream.stop()\n\nspark.read.format(\"delta\").load(order_sink_path).sort(\"OrderId\",\"SaleDate\").show(10, False)\n# no duplicate rows for order 1"],"metadata":{},"outputs":[{"metadata":{},"output_type":"display_data","data":{"text/html":["\n
+-------+--------+-------------------+\nOrderId|ClientId|SaleDate |\n+-------+--------+-------------------+\n1 |2 |2020-06-01 10:00:00|\n2 |2 |2020-06-15 10:00:00|\n+-------+--------+-------------------+\n\n
"]}}],"execution_count":5},{"cell_type":"code","source":["from pyspark.sql import functions as F\nfrom pyspark.sql.types import TimestampType\n\n# source batch rows\nProduct = Row(\"ProductId\", \"ProductName\", \"ProductPrice\", \"EffectiveDateTime\")\nproducts_seq = [\n Product(1, \"Pepperoni\", 100, \"2020-06-01 10:00:00\"),\n Product(2, \"Meat\", 150, \"2020-06-10 00:00:00\") \n]\n\n# update batch rows\nproducts_seq_for_update = [\n Product(1, \"Pepperoni Update\", 300, \"2020-06-02 12:00:00\"),\n Product(2, \"Meat pizza Update\", 120, \"2020-06-11 16:00:00\") \n]\n\nproducts_df = spark.createDataFrame(products_seq)\nproducts_update_df = spark.createDataFrame(products_seq_for_update)\n\n# save increment\ndbutils.fs.rm(product_increment_path, True)\nproducts_update_df.write.format(\"delta\").mode(\"append\").save(product_increment_path)\n\nproducts_df.show(10, False)\nproducts_update_df.show(10, False)"],"metadata":{},"outputs":[{"metadata":{},"output_type":"display_data","data":{"text/html":["\n
+---------+-----------+------------+-------------------+\nProductId|ProductName|ProductPrice|EffectiveDateTime |\n+---------+-----------+------------+-------------------+\n1 |Pepperoni |100 |2020-06-01 10:00:00|\n2 |Meat |150 |2020-06-10 00:00:00|\n+---------+-----------+------------+-------------------+\n\n+---------+-----------------+------------+-------------------+\nProductId|ProductName |ProductPrice|EffectiveDateTime |\n+---------+-----------------+------------+-------------------+\n1 |Pepperoni Update |300 |2020-06-02 12:00:00|\n2 |Meat pizza Update|120 |2020-06-11 16:00:00|\n+---------+-----------------+------------+-------------------+\n\n
"]}}],"execution_count":6},{"cell_type":"code","source":["END_OF_TIME_UNIX_TIMESTAMP = \"253402214400\"; # 9999-12-31\n\n# cleanup checkpoints\ndbutils.fs.rm(product_checkpoints_path, True)\ndbutils.fs.rm(product_sink_path, True)\n\n# write first batch to sink as SCD table\nproducts_df\\\n.withColumn(\"Current\", F.lit(True))\\\n.withColumn(\"EffectiveFrom\", F.col(\"EffectiveDateTime\").cast(TimestampType()))\\\n.withColumn(\"EffectiveTo\", F.lit(\"9999-12-31\").cast(TimestampType()))\\\n.select(\"ProductId\", \"ProductName\", \"ProductPrice\", \"Current\", \"EffectiveFrom\", \"EffectiveTo\")\\\n.write\\\n.format(\"delta\")\\\n.save(product_sink_path)\n\n# upsert function\ndeltaTable = DeltaTable.forPath(spark, product_sink_path)\n\ndef upsertProductToDelta(microBatchOutputDF, batchId):\n deltaTable.alias(\"t\")\\\n .merge(microBatchOutputDF.alias(\"s\"),\"t.ProductId = s.mergeKey\") \\\n .whenMatchedUpdate(\n condition = \"t.Current = True\",\n set = { \n \"Current\": \"false\",\n \"EffectiveTo\": \"s.EffectiveDateTime\"\n }\n )\\\n .whenNotMatchedInsert(\n values = {\n \"ProductId\": \"s.ProductId\",\n \"ProductName\": \"s.ProductName\",\n \"ProductPrice\": \"s.ProductPrice\",\n \"Current\": \"True\",\n \"EffectiveFrom\": \"s.EffectiveDateTime\", \n \"EffectiveTo\": END_OF_TIME_UNIX_TIMESTAMP\n }\n )\\\n .execute()\n \n# ETL\nstream_df = (\n spark \n .readStream\n .format(\"delta\")\n .load(product_increment_path)\n)\n\ntransformed_df = stream_df.distinct()\n\n# prepare rows for insert (older version already exists in delta)\nnew_rows_to_insert = ( \n transformed_df\n .alias(\"s\")\n .join(deltaTable.toDF().alias(\"t\"), \"ProductId\")\n .where(\"t.Current = True\")\n)\n\nstagedUpdates = (\n new_rows_to_insert\n .selectExpr(\"NULL as mergeKey\", \"s.*\") \n .union(transformed_df.selectExpr(\"ProductId as mergeKey\", \"*\"))\n)\n\nstagedUpdates\\\n.writeStream\\\n.format(\"delta\")\\\n.foreachBatch(upsertProductToDelta)\\\n.option(\"checkpointLocation\", product_checkpoints_path)\\\n.trigger(processingTime=\"30 seconds\")\\\n.outputMode(\"update\")\\\n.start(product_sink_path)\n\ntime.sleep(int(40))\nfor stream in spark.streams.active:\n stream.stop()\n\nspark.read.format(\"delta\").load(product_sink_path).orderBy(\"ProductId\", \"EffectiveFrom\").show(10, False)"],"metadata":{},"outputs":[{"metadata":{},"output_type":"display_data","data":{"text/html":["\n
+---------+-----------------+------------+-------+-------------------+-------------------+\nProductId|ProductName |ProductPrice|Current|EffectiveFrom |EffectiveTo |\n+---------+-----------------+------------+-------+-------------------+-------------------+\n1 |Pepperoni |100 |false |2020-06-01 10:00:00|2020-06-02 12:00:00|\n1 |Pepperoni Update |300 |true |2020-06-02 12:00:00|9999-12-31 00:00:00|\n2 |Meat |150 |false |2020-06-10 00:00:00|2020-06-11 16:00:00|\n2 |Meat pizza Update|120 |true |2020-06-11 16:00:00|9999-12-31 00:00:00|\n+---------+-----------------+------------+-------+-------------------+-------------------+\n\n
"]}}],"execution_count":7}],"metadata":{"name":"DE or DIE demo. Tomak","notebookId":1017959710913628},"nbformat":4,"nbformat_minor":0}