cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

Write Spark DataFrame into OpenSearch

Hatter1337
New Contributor III

Hi Databricks Community,

I'm trying to read an index from OpenSearch or write a DataFrame into an OpenSearch index using the native Spark OpenSearch connector:

host = dbutils.secrets.get(scope="opensearch", key="host")
port = dbutils.secrets.get(scope="opensearch", key="port")
index = "index_name"

def save_to_open_search(df):
    df.write.format("org.opensearch.spark.sql") \
        .option("opensearch.nodes", host) \
        .option("opensearch.port", port) \
        .option("opensearch.nodes.resolve.hostname", "false") \
        .option("opensearch.nodes.wan.only", "true") \
        .option("opensearch.net.ssl", "true") \
        .option("opensearch.aws.sigv4.enabled", "true") \
        .option("opensearch.aws.sigv4.region", "us-west-2") \
        .option("opensearch.batch.size.entries", "200") \
        .option("opensearch.mapping.id", "id") \
        .option("opensearch.write.operation", "upsert") \
        .option("opensearch.resource", index) \
        .mode("append") \
        .save()

However, I’m encountering the following error:

Py4JJavaError: An error occurred while calling o456.save.
org.opensearch.hadoop.OpenSearchHadoopIllegalArgumentException: Cannot detect OpenSearch version - 
typically this happens if the network/OpenSearch cluster is not accessible or when targeting a WAN/Cloud instance without the proper setting

Environment Details

  • Databricks Workspace: Running on AWS
  • Databricks Runtime: 15.4 LTS (Apache Spark 3.5.0, Scala 2.12)
  • OpenSearch Spark Connector: org.opensearch.client:opensearch-spark-30_2.12:1.3.0
  • Cluster Type: Dedicated

What I’ve Tried

I ruled out network access issues, because I am able to successfully connect to OpenSearch on the same cluster using opensearch-py==2.8.0:

import boto3
from opensearchpy import (
    AWSV4SignerAuth,
    OpenSearch,
    RequestsHttpConnection,
)

def create_os_client(host, port, use_ssl=True, region="us-west-2") -> OpenSearch:
    """
    Function to create an OpenSearch client.
    """
    http_auth = AWSV4SignerAuth(boto3.Session().get_credentials(), region, "es")
    return OpenSearch(
        hosts=[{"host": host, "port": port, "use_ssl": use_ssl}],
        http_auth=http_auth,
        connection_class=RequestsHttpConnection,
        http_compress=True,
    )

os_client = create_os_client(
    host=dbutils.secrets.get(scope="opensearch", key="host"), 
    port=dbutils.secrets.get(scope="opensearch", key="port"),
)

Even though I can connect using opensearch-py, I specifically want to use the native Spark OpenSearch connector (org.opensearch.spark.sql).

Has anyone encountered a similar issue? Any suggestions on how to resolve this?
I’d really appreciate any insights or workarounds!

1 ACCEPTED SOLUTION

Accepted Solutions

Hatter1337
New Contributor III

I was close 🙃
For opensearch.nodes, you should include https:

# OpenSearch connection details
host = dbutils.secrets.get(scope="opensearch", key="host")
port = dbutils.secrets.get(scope="opensearch", key="port")
os_url = f"https://{host}"
index = "index_name"

def save_to_open_search(df):
    writer = (
        df.write.format("opensearch")
        .option("opensearch.nodes", os_url)
        .option("opensearch.port", port)
        .option("pushdown", "true")
        .option("opensearch.batch.write.refresh", "false")
        .option("opensearch.mapping.id", "id")
        .option("opensearch.write.operation", "upsert")
        .option("opensearch.aws.sigv4.enabled", "true")
        .option("opensearch.aws.sigv4.region", "us-west-2")
        .option("opensearch.nodes.resolve.hostname", "false")
        .option("opensearch.nodes.wan.only", "true")
        .option("opensearch.net.ssl", "true")
        .option("opensearch.batch.size.entries", "5000")
        .option("opensearch.batch.size.bytes", "10mb")
    )

    writer.mode("append").save(index)

View solution in original post

5 REPLIES 5

Hatter1337
New Contributor III

Probably, this syntax is more correct:

def save_to_open_search(df):
    writer = (
        df.write.format("opensearch")
        .option("opensearch.nodes", host)
        .option("opensearch.port", port)
        .option("pushdown", "true")
        .option("opensearch.batch.write.refresh", "false")
        .option("opensearch.mapping.id", "id")
        .option("opensearch.write.operation", "upsert")
        .option("opensearch.aws.sigv4.enabled", "true")
        .option("opensearch.aws.sigv4.region", "us-west-2")
        .option("opensearch.nodes.resolve.hostname", "false")
        .option("opensearch.nodes.wan.only", "true")
        .option("opensearch.net.ssl", "true")
        .option("opensearch.batch.size.entries", "5000")
        .option("opensearch.batch.size.bytes", "10mb")
    )

    writer.mode("append").save(index)

But I still have the same error:

Py4JJavaError: An error occurred while calling o615.save.
: org.opensearch.hadoop.OpenSearchHadoopIllegalArgumentException: Cannot detect OpenSearch version - typically this happens if the network/OpenSearch cluster is not accessible or when targeting a WAN/Cloud instance without the proper setting 'opensearch.nodes.wan.only'...

Hatter1337
New Contributor III

I was close 🙃
For opensearch.nodes, you should include https:

# OpenSearch connection details
host = dbutils.secrets.get(scope="opensearch", key="host")
port = dbutils.secrets.get(scope="opensearch", key="port")
os_url = f"https://{host}"
index = "index_name"

def save_to_open_search(df):
    writer = (
        df.write.format("opensearch")
        .option("opensearch.nodes", os_url)
        .option("opensearch.port", port)
        .option("pushdown", "true")
        .option("opensearch.batch.write.refresh", "false")
        .option("opensearch.mapping.id", "id")
        .option("opensearch.write.operation", "upsert")
        .option("opensearch.aws.sigv4.enabled", "true")
        .option("opensearch.aws.sigv4.region", "us-west-2")
        .option("opensearch.nodes.resolve.hostname", "false")
        .option("opensearch.nodes.wan.only", "true")
        .option("opensearch.net.ssl", "true")
        .option("opensearch.batch.size.entries", "5000")
        .option("opensearch.batch.size.bytes", "10mb")
    )

    writer.mode("append").save(index)

Alena
New Contributor II

Thank you for posting this. What did you use to authenticate? I keep getting the error below. I tried passing env vars - did not work. 

 

EnvironmentVariableCredentialsProvider: Unable to load AWS credentials from environment variables (AWS_ACCESS_KEY_ID (or AWS_ACCESS_KEY) and AWS_SECRET_KEY (or AWS_SECRET_ACCESS_KEY)),
SystemPropertiesCredentialsProvider: Unable to load AWS credentials from Java system properties (aws.accessKeyId and aws.secretKey),
WebIdentityTokenCredentialsProvider: You must specify a value for roleArn and roleSessionName,
com.amazonaws.auth.profile.ProfileCredentialsProvider@6d1f111c: No AWS profile named 'default',
com.amazonaws.auth.EC2ContainerCredentialsProviderWrapper@1ff4a5e1: Failed to connect to service endpoint: ]); no other nodes left - aborting...

Hatter1337
New Contributor III

Actually, I use only host & port:

# Define OpenSearch connection parameters and OSBackupManager client
host = config["OS_HOST"]
port = config["OS_PORT"]

os_client = OSClient(host=host, port=port)

# Where:
class OSClient:
    """
    A client for interacting with OpenSearch.
    """

    def __init__(
        self,
        host: str,
        port: int,
        region: str = "us-west-2",
        use_ssl: bool = True,
        logger=None,
    ):
        """
        Initializes the OSClient instance.

        Args:
            host (str): OpenSearch host.
            port (int): OpenSearch port.
            region (str, optional): AWS region for authentication.
            use_ssl (bool, optional): Whether to use SSL.
            logger (Logger, optional): Logger instance for logging.
        """
        self.host = host
        self.port = port
        self.region = region
        self.use_ssl = use_ssl
        self._logger = logger
        # Create OpenSearch client
        self.client = self._create_os_client()

    @property
    def os_url(self):
        return f"https://{self.host}:{self.port}"

    def _create_os_client(self) -> OpenSearch:
        """
        Creates and returns an OpenSearch client with AWS SigV4 authentication.

        Returns:
            OpenSearch: OpenSearch client instance.
        """
        http_auth = AWSV4SignerAuth(boto3.Session().get_credentials(), self.region, "es")
        return OpenSearch(
            hosts=[{"host": self.host, "port": self.port, "use_ssl": self.use_ssl}],
            http_auth=http_auth,
            connection_class=RequestsHttpConnection,
            http_compress=True,
        )

    # ...

    def upsert_data(
        self,
        df: DataFrame,
        index_name: str,
        id_column: str,
        parallelism: int = 20,
        batch_size: int = 5000,
        max_bytes_size: str = "100mb",
        upsert: bool = True,
        refresh: bool = False,
        auto_repartition: bool = True,
    ):
        """
        Performs upserts (insert or update) on OpenSearch for a given Spark DataFrame.

        Args:
            df (DataFrame): The Spark DataFrame containing the data to write.
            index_name (str): The OpenSearch index where data will be written.
            id_column (str): Column name to use as the unique identifier for upsert operations.
            parallelism (int, optional): Number of partitions for Spark processing.
            batch_size (int, optional): Number of documents per batch in each OpenSearch request.
            max_bytes_size (str, optional): Maximum size of each batch in bytes.
            upsert (bool, optional): Whether to perform upsert operations instead of index.
            refresh (bool, optional): Whether to refresh the index immediately after writing.
                - False (default): Faster writes, but new data may take ~1s to appear in search.
                - True: Ensures new documents are immediately searchable, but slows down ingestion.
            auto_repartition (bool, optional): Whether to repartition the DataFrame for parallelism.
        """

        if auto_repartition:
            # Ensure the DataFrame is properly partitioned for parallel processing
            df = df.repartition(parallelism)

        # Configure OpenSearch writer
        writer = (
            df.write.format("opensearch")
            .option("opensearch.nodes", self.os_url)
            .option("opensearch.port", self.port)
            .option("pushdown", "true")
            .option("opensearch.batch.write.refresh", str(refresh).lower())
            .option("opensearch.mapping.id", id_column)
            .option("opensearch.write.operation", "upsert" if upsert else "index")
            .option("opensearch.aws.sigv4.enabled", "true")
            .option("opensearch.aws.sigv4.region", self.region)
            .option("opensearch.nodes.resolve.hostname", "false")
            .option("opensearch.nodes.wan.only", "true")
            .option("opensearch.net.ssl", "true")
            .option("opensearch.batch.size.entries", str(batch_size))
            .option("opensearch.batch.size.bytes", max_bytes_size)
            .option("resource", index_name)
        )

        writer.mode("append").save()

        self.logger.info(f"Successfully upserted records into OpenSearch index: {index_name}")

 And this should be executed on dedicated cluster with Instance profile that has access to OpenSearch (aws.sigv4).

Alena
New Contributor II

Thank you so much for your help—it works! One thing I’m trying to do is authenticate hadoop-opensearch using a different role than the one my cluster is mapped to. Environment variables only seem to work if they’re set in the cluster configuration. I’ll post a solution here if I make any progress.

Join Us as a Local Community Builder!

Passionate about hosting events and connecting people? Help us grow a vibrant local community—sign up today to get started!

Sign Up Now