{"cells":[{"cell_type":"code","source":["# DBFS paths\nbase_path = 'dbfs:/mnt/deltalake/temp/deordie3'\n\norder_checkpoints_path = '{0}/checkpoints/order'.format(base_path)\norder_source_path = '{0}/data/order_source'.format(base_path)\norder_sink_path = '{0}/data/order_sink'.format(base_path)\n\nproduct_checkpoints_path = '{0}/checkpoints/product'.format(base_path)\nproduct_increment_path = '{0}/data/product_increment'.format(base_path)\nproduct_sink_path = '{0}/data/product_sink'.format(base_path)"],"metadata":{},"outputs":[{"metadata":{},"output_type":"display_data","data":{"text/html":["\n
"]}}],"execution_count":1},{"cell_type":"markdown","source":["### Pipeline \nPrepare data: write source rows into delta table \nPreparation before processing: cleanup sink path and checkpoints path (to start stream from the begining) \nETL: read source data from delta as stream, transform and write stream into delta table "],"metadata":{}},{"cell_type":"code","source":["from pyspark.sql import Row, DataFrame\n\nProduct = Row(\"ProductId\", \"ProductName\", \"ProductPriсe\")\nOrderAcceptedEvent = Row(\"OrderId\", \"ClientId\", \"SaleDate\", \"Products\")\n\npepperoni_pizza = Product(1, \"Pepperoni\", 100)\nmeat_pizza = Product(2, \"Meat\", 150)\norder_events_seq = [ \n OrderAcceptedEvent(1, 2, \"2020-06-01 10:00:00\", [pepperoni_pizza]), \n OrderAcceptedEvent(1, 2, \"2020-06-01 10:00:00\", [pepperoni_pizza]), # duplicate row for test\n OrderAcceptedEvent(2, 2, \"2020-06-15 10:00:00\", [pepperoni_pizza,meat_pizza]) \n]\norder_events_df = spark.createDataFrame(order_events_seq)\n\ndbutils.fs.rm(order_source_path, True)\norder_events_df.write.format(\"delta\").mode(\"append\").save(order_source_path)\nspark.read.format(\"delta\").load(order_source_path).show(10, False)"],"metadata":{},"outputs":[{"metadata":{},"output_type":"display_data","data":{"text/html":["\n