@SantoshJoshi Thanks for reply please find the code snippet below from pyspark.sql import functions as F
from pyspark.sql.types import StringType
import base64
# Define the Event Hubs connection string
connectionString = endpoint (replace with endpoint from event hub)
ehConf = {
'eventhubs.connectionString': connectionString
}
# Function to decode base64 with proper error handling
def try_base64_decode(data😞
if data is None:
return None
try:
# Replace URL-safe characters
data = data.replace('-', '+').replace('_', '/')
# Handle padding issues
padding_needed = 4 - (len(data) % 4) if len(data) % 4 != 0 else 0
data += '=' * padding_needed
decoded_bytes = base64.b64decode(data)
return decoded_bytes.decode('utf-8', errors='replace')
except Exception as e:
return f"Failed to decode: {str(e)}"
# Apply the UDF for base64 decoding
decode_udf = F.udf(try_base64_decode, StringType())
# Read raw data from Event Hubs
df = spark \
.read \
.format("eventhubs") \
.options(**ehConf) \
.load()
# Show the schema of the raw data to inspect the types
df.printSchema()
# Cast the body column to string and apply base64 decoding if necessary
df = df.withColumn("body", df["body"].cast("string")) # Cast body to string
df = df.withColumn("decoded_body", decode_udf(F.col("body"))) # Apply the decoding UDF
# Show the results (first few rows of the raw and decoded body)
df.select("body", "decoded_body").show(truncate=False)
error :
Input byte array has wrong 4-byte ending unit