cancel
Showing results forĀ 
Search instead forĀ 
Did you mean:Ā 
Administration & Architecture
Explore discussions on Databricks administration, deployment strategies, and architectural best practices. Connect with administrators and architects to optimize your Databricks environment for performance, scalability, and security.
cancel
Showing results forĀ 
Search instead forĀ 
Did you mean:Ā 

Setting up observability for serverless Databricks

APJESK
New Contributor III

I’m looking for best practices and guidance on setting up observability for serverless Databricks. Specifically, I’d like to know:

  • How to capture and monitor system-level metrics (CPU, memory, network, disk) in a serverless setup.

  • How to configure and collect application metrics (e.g., using Spark listeners, StreamingQueryListener, QueryExecutionListener).

  • Best way to manage and forward logs (driver logs, executor logs, audit logs, event logs) in a serverless environment.

  • Recommended approaches for integrating with external monitoring tools like Amazon CloudWatch, Datadog, or SIEM platforms.

  • Suggestions for building dashboards, alerts, and anomaly detection to ensure end-to-end observability.

If anyone has implemented observability for serverless Databricks workloads, I’d really appreciate insights into the architecture, tools used, and lessons learned.

Thanks in advance for your help!

2 ACCEPTED SOLUTIONS

Accepted Solutions

nayan_wylde
Honored Contributor

Here are few recommended methods:

 

  1. How to capture and monitor system-level metrics (CPU, memory, network, disk) in a serverless setup.
    • In serverless you don’t have host access (no node agents, no Ganglia). Treat the workspace/platform as your ā€œsystemā€ and monitor via
      • Databricks system tables for platform & job health (enable once per workspace). These are first-party tables in system.* you can query from any workspace. Start here for account activity, jobs, and Spark events.
      • Audit logs (low latency delivery to S3 and/or system table system.access.audit) to track who did what, when, from where—great for availability, security, and change correlation.
      • You can also use API to collect the metrices.
import requests
import json

def get_cluster_metrics(workspace_url, token, cluster_id):
    headers = {"Authorization": f"Bearer {token}"}
    
    # Get cluster events
    events_url = f"{workspace_url}/api/2.0/clusters/events"
    events_response = requests.get(
        events_url, 
        headers=headers, 
        params={"cluster_id": cluster_id}
    )
    
    # Get cluster details including resource utilization
    details_url = f"{workspace_url}/api/2.0/clusters/get"
    details_response = requests.get(
        details_url, 
        headers=headers, 
        params={"cluster_id": cluster_id}
    )
    
    return {
        "events": events_response.json(),
        "details": details_response.json()
    }​
  • How to configure and collect application metrics (e.g., using Spark listeners, StreamingQueryListener, QueryExecutionListener).

    • Spark listeners (e.g., SparkListener, QueryExecutionListener) and StreamingQueryListener work in serverless because they’re application-level. Register them in your notebook/job and push metrics out (HTTP to a gateway, StatsD, or directly to a lakehouse table). Note that some runtime/version caveats exist by language; check your DBR version.

    • For structured streaming, attach a StreamingQueryListener to publish: input rows/sec, batch duration, state ops, watermark, last progress. Persist to Delta tables or ship to CloudWatch/Datadog via HTTPS from the driver.

    • For SQL workloads, use SQL warehouse built-in telemetry plus query history tables to compute p95 latency, scan bytes, error rates (if you also run serverless SQL).

    • Sample code for listener implementation.
from pyspark.sql.streaming import StreamingQueryListener
import json
import boto3

class CustomStreamingQueryListener(StreamingQueryListener):
    def __init__(self, cloudwatch_client):
        self.cloudwatch = cloudwatch_client
    
    def onQueryStarted(self, event):
        """Track query start events"""
        self.cloudwatch.put_metric_data(
            Namespace='Databricks/Streaming',
            MetricData=[
                {
                    'MetricName': 'StreamingQueryStarted',
                    'Value': 1,
                    'Unit': 'Count',
                    'Dimensions': [
                        {'Name': 'QueryId', 'Value': event.id},
                        {'Name': 'QueryName', 'Value': event.name or 'unnamed'}
                    ]
                }
            ]
        )
    
    def onQueryProgress(self, event):
        """Track query progress metrics"""
        progress = event.progress
        
        metrics = [
            {
                'MetricName': 'InputRowsPerSecond',
                'Value': progress.inputRowsPerSecond,
                'Unit': 'Count/Second'
            },
            {
                'MetricName': 'ProcessedRowsPerSecond',
                'Value': progress.processedRowsPerSecond,
                'Unit': 'Count/Second'
            },
            {
                'MetricName': 'BatchDuration',
                'Value': progress.batchDuration,
                'Unit': 'Milliseconds'
            }
        ]
        
        self.cloudwatch.put_metric_data(
            Namespace='Databricks/Streaming',
            MetricData=metrics
        )
    
    def onQueryTerminated(self, event):
        """Track query termination"""
        status = "Success" if event.exception is None else "Failed"
        self.cloudwatch.put_metric_data(
            Namespace='Databricks/Streaming',
            MetricData=[
                {
                    'MetricName': 'StreamingQueryCompleted',
                    'Value': 1,
                    'Unit': 'Count',
                    'Dimensions': [
                        {'Name': 'Status', 'Value': status}
                    ]
                }
            ]
        )

# Register the listener
cloudwatch = boto3.client('cloudwatch')
listener = CustomStreamingQueryListener(cloudwatch)
spark.streams.addListener(listener)​
  • Query Execution Listener
from pyspark.sql.util import QueryExecutionListener
import time
import logging

class MetricsQueryExecutionListener(QueryExecutionListener):
    def __init__(self, metrics_client):
        self.metrics_client = metrics_client
        self.logger = logging.getLogger(__name__)
    
    def onSuccess(self, funcName, qe, durationNs):
        """Track successful query executions"""
        duration_ms = durationNs / 1000000
        
        # Extract query complexity metrics
        physical_plan = qe.executedPlan
        stages = len([node for node in physical_plan.children])
        
        metrics = {
            'query_duration_ms': duration_ms,
            'query_stages': stages,
            'function_name': funcName,
            'success': 1
        }
        
        self.metrics_client.send_metrics(metrics)
        
    def onFailure(self, funcName, qe, exception):
        """Track failed query executions"""
        metrics = {
            'function_name': funcName,
            'failure': 1,
            'error_type': type(exception).__name__
        }
        
        self.metrics_client.send_metrics(metrics)
        self.logger.error(f"Query failed: {funcName}, Error: {exception}")

# Register the listener
metrics_listener = MetricsQueryExecutionListener(your_metrics_client)
spark.listenerManager.register(metrics_listener)​
  • Best way to manage and forward logs (driver logs, executor logs, audit logs, event logs) in a serverless environment.

    1. Audit logs → authoritative security/ops timeline

      • Recommended path: use the audit log system table (system.access.audit) for querying; optionally configure S3 delivery (near-real-time JSON) for downstream tools and SIEMs.

  • Recommended approaches for integrating with external monitoring tools like Amazon CloudWatch, Datadog, or SIEM platforms.

    1. Datadog: Datadog’s Data Jobs Monitoring now supports serverless Databricks jobs and serverless SQL—it correlates job health, query issues, and cost. Pair with the Databricks integration and (for non-serverless) the Agent; for serverless, use API-based ingestion and system logs.
  • Suggestions for building dashboards, alerts, and anomaly detection to ensure end-to-end observability.

    1. Reliability

      • Job failure rate, mean time to recovery → system tables (jobs) + audit logs.

      • Streaming freshness (max event age, batch duration) → StreamingQueryListener + Delta table.

    2. Performance

      •  SQL query latency & scan size → system tables / SQL telemetry.

    3. Cost

      • DBUs by job/owner/workflow, $/successful run → system tables + your billing exports (or Datadog Cloud Cost Management).

    4. Security & governance

      • Admin actions, permission changes, token/credential events, external location changes → system.access.audit.

 

 

View solution in original post

Sharanya13
Contributor III

@APJESK Serverless is designed to relieve DevOps teams from monitoring these types of metrics. You should be able to track the cost and usage with system tables 

View solution in original post

5 REPLIES 5

APJESK
New Contributor III

Databricks deployed on AWS Platform

nayan_wylde
Honored Contributor

Here are few recommended methods:

 

  1. How to capture and monitor system-level metrics (CPU, memory, network, disk) in a serverless setup.
    • In serverless you don’t have host access (no node agents, no Ganglia). Treat the workspace/platform as your ā€œsystemā€ and monitor via
      • Databricks system tables for platform & job health (enable once per workspace). These are first-party tables in system.* you can query from any workspace. Start here for account activity, jobs, and Spark events.
      • Audit logs (low latency delivery to S3 and/or system table system.access.audit) to track who did what, when, from where—great for availability, security, and change correlation.
      • You can also use API to collect the metrices.
import requests
import json

def get_cluster_metrics(workspace_url, token, cluster_id):
    headers = {"Authorization": f"Bearer {token}"}
    
    # Get cluster events
    events_url = f"{workspace_url}/api/2.0/clusters/events"
    events_response = requests.get(
        events_url, 
        headers=headers, 
        params={"cluster_id": cluster_id}
    )
    
    # Get cluster details including resource utilization
    details_url = f"{workspace_url}/api/2.0/clusters/get"
    details_response = requests.get(
        details_url, 
        headers=headers, 
        params={"cluster_id": cluster_id}
    )
    
    return {
        "events": events_response.json(),
        "details": details_response.json()
    }​
  • How to configure and collect application metrics (e.g., using Spark listeners, StreamingQueryListener, QueryExecutionListener).

    • Spark listeners (e.g., SparkListener, QueryExecutionListener) and StreamingQueryListener work in serverless because they’re application-level. Register them in your notebook/job and push metrics out (HTTP to a gateway, StatsD, or directly to a lakehouse table). Note that some runtime/version caveats exist by language; check your DBR version.

    • For structured streaming, attach a StreamingQueryListener to publish: input rows/sec, batch duration, state ops, watermark, last progress. Persist to Delta tables or ship to CloudWatch/Datadog via HTTPS from the driver.

    • For SQL workloads, use SQL warehouse built-in telemetry plus query history tables to compute p95 latency, scan bytes, error rates (if you also run serverless SQL).

    • Sample code for listener implementation.
from pyspark.sql.streaming import StreamingQueryListener
import json
import boto3

class CustomStreamingQueryListener(StreamingQueryListener):
    def __init__(self, cloudwatch_client):
        self.cloudwatch = cloudwatch_client
    
    def onQueryStarted(self, event):
        """Track query start events"""
        self.cloudwatch.put_metric_data(
            Namespace='Databricks/Streaming',
            MetricData=[
                {
                    'MetricName': 'StreamingQueryStarted',
                    'Value': 1,
                    'Unit': 'Count',
                    'Dimensions': [
                        {'Name': 'QueryId', 'Value': event.id},
                        {'Name': 'QueryName', 'Value': event.name or 'unnamed'}
                    ]
                }
            ]
        )
    
    def onQueryProgress(self, event):
        """Track query progress metrics"""
        progress = event.progress
        
        metrics = [
            {
                'MetricName': 'InputRowsPerSecond',
                'Value': progress.inputRowsPerSecond,
                'Unit': 'Count/Second'
            },
            {
                'MetricName': 'ProcessedRowsPerSecond',
                'Value': progress.processedRowsPerSecond,
                'Unit': 'Count/Second'
            },
            {
                'MetricName': 'BatchDuration',
                'Value': progress.batchDuration,
                'Unit': 'Milliseconds'
            }
        ]
        
        self.cloudwatch.put_metric_data(
            Namespace='Databricks/Streaming',
            MetricData=metrics
        )
    
    def onQueryTerminated(self, event):
        """Track query termination"""
        status = "Success" if event.exception is None else "Failed"
        self.cloudwatch.put_metric_data(
            Namespace='Databricks/Streaming',
            MetricData=[
                {
                    'MetricName': 'StreamingQueryCompleted',
                    'Value': 1,
                    'Unit': 'Count',
                    'Dimensions': [
                        {'Name': 'Status', 'Value': status}
                    ]
                }
            ]
        )

# Register the listener
cloudwatch = boto3.client('cloudwatch')
listener = CustomStreamingQueryListener(cloudwatch)
spark.streams.addListener(listener)​
  • Query Execution Listener
from pyspark.sql.util import QueryExecutionListener
import time
import logging

class MetricsQueryExecutionListener(QueryExecutionListener):
    def __init__(self, metrics_client):
        self.metrics_client = metrics_client
        self.logger = logging.getLogger(__name__)
    
    def onSuccess(self, funcName, qe, durationNs):
        """Track successful query executions"""
        duration_ms = durationNs / 1000000
        
        # Extract query complexity metrics
        physical_plan = qe.executedPlan
        stages = len([node for node in physical_plan.children])
        
        metrics = {
            'query_duration_ms': duration_ms,
            'query_stages': stages,
            'function_name': funcName,
            'success': 1
        }
        
        self.metrics_client.send_metrics(metrics)
        
    def onFailure(self, funcName, qe, exception):
        """Track failed query executions"""
        metrics = {
            'function_name': funcName,
            'failure': 1,
            'error_type': type(exception).__name__
        }
        
        self.metrics_client.send_metrics(metrics)
        self.logger.error(f"Query failed: {funcName}, Error: {exception}")

# Register the listener
metrics_listener = MetricsQueryExecutionListener(your_metrics_client)
spark.listenerManager.register(metrics_listener)​
  • Best way to manage and forward logs (driver logs, executor logs, audit logs, event logs) in a serverless environment.

    1. Audit logs → authoritative security/ops timeline

      • Recommended path: use the audit log system table (system.access.audit) for querying; optionally configure S3 delivery (near-real-time JSON) for downstream tools and SIEMs.

  • Recommended approaches for integrating with external monitoring tools like Amazon CloudWatch, Datadog, or SIEM platforms.

    1. Datadog: Datadog’s Data Jobs Monitoring now supports serverless Databricks jobs and serverless SQL—it correlates job health, query issues, and cost. Pair with the Databricks integration and (for non-serverless) the Agent; for serverless, use API-based ingestion and system logs.
  • Suggestions for building dashboards, alerts, and anomaly detection to ensure end-to-end observability.

    1. Reliability

      • Job failure rate, mean time to recovery → system tables (jobs) + audit logs.

      • Streaming freshness (max event age, batch duration) → StreamingQueryListener + Delta table.

    2. Performance

      •  SQL query latency & scan size → system tables / SQL telemetry.

    3. Cost

      • DBUs by job/owner/workflow, $/successful run → system tables + your billing exports (or Datadog Cloud Cost Management).

    4. Security & governance

      • Admin actions, permission changes, token/credential events, external location changes → system.access.audit.

 

 

APJESK
New Contributor III

Thank you very much, I will go through your solution and get back to you, If I have any doubts. 

Sharanya13
Contributor III

@APJESK Serverless is designed to relieve DevOps teams from monitoring these types of metrics. You should be able to track the cost and usage with system tables 

APJESK
New Contributor III

Ok, Got it I will start explore it and get back to you.

Join Us as a Local Community Builder!

Passionate about hosting events and connecting people? Help us grow a vibrant local community—sign up today to get started!

Sign Up Now