If you have AWS CloudWatch subscribed to write out logs to AWS Kinesis, the Kinesis stream is base64 encoded and the CloudWatch logs are GZIP compressed. The challenge we faced was how to address that in pyspark to be able to read the data.
We were able to create a UDF that would process the data as it was being read in using Spark Structured Streaming
## Create UDF to decompress data
def decompress_func(x):
try:
return zlib.decompress(x, zlib.MAX_WBITS | 32).decode('utf-8')
except Exception as e:
return str(e) # return the error message instead of None
# Register UDF
udf_decompress = udf(decompress_func)
DataFrame from Kinesis
kinesis = spark.readStream \
.format("kinesis") \
.option("streamName", stream_name) \
.option("region", region) \
.option("roleArn", role) \
.option("initialPosition", "earliest") \
.load()
# Apply decompression
kinesis = kinesis.withColumn("uncompressed_data", udf_decompress(col("data"))).drop("partitionKey","data","shardId","sequenceNumber").withColumn('LOG_DATE',to_date('approximateArrivalTimestamp'))
Please make sure to replace stream_name, region, and role in the code with the actual values specific to your AWS environment. The code provided reads data from AWS Kinesis using Spark Structured Streaming, decompresses the GZIP compressed data using the decompress_func UDF, and adds a new column named uncompressed_data to the DataFrame containing the decompressed data. The unnecessary columns are dropped, and a new column LOG_DATE is added by extracting the date from the approximateArrivalTimestamp