Monday, April 7, 2025

Delta Lake - Time Travel

 Time Travel allows you to query, restore, or compare data from a previous version of a Delta table.

Delta Lake automatically keeps track of every version of the table using the transaction log (_delta_log).

Purpose :-

  • Undo accidental deletes/updates

  • Debug or audit historical data

  • Test data pipelines on past versions

  • Compare changes over time

Ways to use it:-

  • versionAsOf (version number)

  • timestampAsOf (specific timestamp)


from pyspark.sql import SparkSession
from delta import *

builder = SparkSession.builder.appName("DeltaLakeACID") \
    .config("spark.jars.packages", "io.delta:delta-core_2.12:2.4.0") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
   
spark = configure_spark_with_delta_pip(builder).getOrCreate()
# Sample DataFrame
data = [(1, "Virat", 10000), (2, "Rohit", 9000)]
df = spark.createDataFrame(data, ["id", "name", "Runs"])

# Write DataFrame as Delta Table
df.write.format("delta").mode("overwrite").save("/target/timetrvl")

# Read and show Delta table
df_read = spark.read.format("delta").load("/target/timetrvl")

delta_table = DeltaTable.forPath(spark, "/target/timetrvl")
delta_table.delete("name = 'Virat'")

#updating the condition
delta_table.update(condition="name = 'Rohit'", set={"name": "'Sachin'"})

#Cheking the history
spark.sql("DESCRIBE HISTORY delta.`/target/timetrvl`").show(truncate=False)

#Time Travel to older verison
df_older = spark.read.format("delta") \
    .option("versionAsOf", 0) \
    .load("/target/timetrvl")

df_older.show()


History of the Table


Time Travel Output 







Wednesday, April 2, 2025

Spark - Explode

 The explode function in PySpark is used to transform an array or map column into multiple rows. It is commonly used when dealing with nested data structures like JSON or arrays within a DataFrame.

1.) explode is used to convert array or map elements into separate rows.

2.) If the array is empty, that row is removed.

3.) When applied to a map, explode creates key-value pairs as rows.

4.) If you want to keep empty rows, use posexplode, which retains the row index.


from pyspark.sql import SparkSession
from pyspark.sql.functions import col,explode,lit

spark=SparkSession.builder.appName("sangam_test_explode").getOrCreate()

data = [
    (1, ["apple", "banana", "cherry"]),
    (2, ["grape", "orange"]),
    (3, [])
]

df=spark.createDataFrame(data,["id","fruits"])
df.show(truncate=False)

df_explode=df.withColumn("fruits",explode(col("fruits")))
df_explode.show(truncate=False)






Delta Lake - Time Travel

  Time Travel allows you to query, restore, or compare data from a previous version of a Delta table. Delta Lake automatically keeps tra...