cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

Reading CloudWatch Logs from AWS Kinesis

Harrison
New Contributor II

If you have AWS CloudWatch subscribed to write out logs to AWS Kinesis, the Kinesis stream is base64 encoded and the CloudWatch logs are GZIP compressed. The challenge we faced was how to address that in pyspark to be able to read the data. 

 

We were able to create a UDF that would process the data as it was being read in using Spark Structured Streaming

 

 

import zlib

## Create UDF to decompress data

def decompress_func(x):
    try:
        return zlib.decompress(x, zlib.MAX_WBITS | 32).decode('utf-8')
    except Exception as e:
        return str(e)  # return the error message instead of None

# Register UDF
udf_decompress = udf(decompress_func)

DataFrame from Kinesis

    kinesis = spark.readStream \
    .format("kinesis") \
    .option("streamName", stream_name) \
    .option("region", region) \
    .option("roleArn", role) \
    .option("initialPosition", "earliest") \
    .load()

    # Apply decompression
    kinesis = kinesis.withColumn("uncompressed_data", udf_decompress(col("data"))).drop("partitionKey","data","shardId","sequenceNumber").withColumn('LOG_DATE',to_date('approximateArrivalTimestamp'))
 
Please make sure to replace stream_name, region, and role in the code with the actual values specific to your AWS environment. The code provided reads data from AWS Kinesis using Spark Structured Streaming, decompresses the GZIP compressed data using the decompress_func UDF, and adds a new column named uncompressed_data to the DataFrame containing the decompressed data. The unnecessary columns are dropped, and a new column LOG_DATE is added by extracting the date from the approximateArrivalTimestamp
0 REPLIES 0

Connect with Databricks Users in Your Area

Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.

If there isn’t a group near you, start one and help create a community that brings people together.

Request a New Group