Read Write Stream Data from Event Hub to Databricks Delta Lake
Options
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
Saturday
I am trying to read streaming data from EventHub which is in JSON Format, able to read data in a data frame but the body type was coming as binary I have converted it to string and decoded but while implementing the write stream I am facing an ongoing error as follows:
terminated with exception: Illegal base64 character 7b SQLSTATE: XXKST and another error as Input byte array has wrong 4-byte ending unit
It also does not accept a JSON input array. I have also tried implementing UTF-8 and base64 decoding logic, but it didn't work.
Labels:
- Labels:
-
Delta Lake
-
Spark
2 REPLIES 2
Options
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
Saturday
Hi @Meghana89 ,
Can you please share your code snippet?
Options
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
Monday - last edited Monday
@SantoshJoshi Thanks for reply please find the code snippet below
from pyspark.sql import functions as F
from pyspark.sql.types import StringType
import base64
# Define the Event Hubs connection string
connectionString = endpoint (replace with endpoint from event hub)
ehConf = {
'eventhubs.connectionString': connectionString
}
# Function to decode base64 with proper error handling
def try_base64_decode(data😞
if data is None:
return None
try:
# Replace URL-safe characters
data = data.replace('-', '+').replace('_', '/')
# Handle padding issues
padding_needed = 4 - (len(data) % 4) if len(data) % 4 != 0 else 0
data += '=' * padding_needed
decoded_bytes = base64.b64decode(data)
return decoded_bytes.decode('utf-8', errors='replace')
except Exception as e:
return f"Failed to decode: {str(e)}"
# Apply the UDF for base64 decoding
decode_udf = F.udf(try_base64_decode, StringType())
# Read raw data from Event Hubs
df = spark \
.read \
.format("eventhubs") \
.options(**ehConf) \
.load()
# Show the schema of the raw data to inspect the types
df.printSchema()
# Cast the body column to string and apply base64 decoding if necessary
df = df.withColumn("body", df["body"].cast("string")) # Cast body to string
df = df.withColumn("decoded_body", decode_udf(F.col("body"))) # Apply the decoding UDF
# Show the results (first few rows of the raw and decoded body)
df.select("body", "decoded_body").show(truncate=False)
error :
Input byte array has wrong 4-byte ending unit

