Read Write Stream Data from Event Hub to Databricks Delta Lake

Meghana89
New Contributor II

I am trying to read streaming data from EventHub which is in JSON Format, able to read data in a data frame but the body type was coming as binary I have converted it to string and decoded but while implementing the write stream I am facing an ongoing error as follows: 

terminated with exception: Illegal base64 character 7b SQLSTATE: XXKST and another error as Input byte array has wrong 4-byte ending unit
It also does not accept a JSON input array. I have also tried implementing UTF-8 and base64 decoding logic, but it didn't work.
 

SantoshJoshi
New Contributor III

Hi @Meghana89 ,

Can you please share your code snippet?

Meghana89
New Contributor II
@SantoshJoshi  Thanks for reply please find the code snippet below
from pyspark.sql import functions as F
from pyspark.sql.types import StringType
import base64

# Define the Event Hubs connection string
connectionString = endpoint (replace with endpoint from event hub)
ehConf = {
    'eventhubs.connectionString': connectionString
}

# Function to decode base64 with proper error handling
def try_base64_decode(data😞
    if data is None:
        return None
    try:
        # Replace URL-safe characters
        data = data.replace('-', '+').replace('_', '/')
        # Handle padding issues
        padding_needed = 4 - (len(data) % 4) if len(data) % 4 != 0 else 0
        data += '=' * padding_needed
        decoded_bytes = base64.b64decode(data)
        return decoded_bytes.decode('utf-8', errors='replace')
    except Exception as e:
        return f"Failed to decode: {str(e)}"

# Apply the UDF for base64 decoding
decode_udf = F.udf(try_base64_decode, StringType())

# Read raw data from Event Hubs
df = spark \
    .read \
    .format("eventhubs") \
    .options(**ehConf) \
    .load()

# Show the schema of the raw data to inspect the types
df.printSchema()

# Cast the body column to string and apply base64 decoding if necessary
df = df.withColumn("body", df["body"].cast("string"))  # Cast body to string
df = df.withColumn("decoded_body", decode_udf(F.col("body")))  # Apply the decoding UDF

# Show the results (first few rows of the raw and decoded body)
df.select("body", "decoded_body").show(truncate=False)
 
error :
Input byte array has wrong 4-byte ending unit