<?xml version="1.0" encoding="UTF-8"?>
<rss xmlns:content="http://purl.org/rss/1.0/modules/content/" xmlns:dc="http://purl.org/dc/elements/1.1/" xmlns:rdf="http://www.w3.org/1999/02/22-rdf-syntax-ns#" xmlns:taxo="http://purl.org/rss/1.0/modules/taxonomy/" version="2.0">
  <channel>
    <title>topic Re: Write Spark DataFrame into OpenSearch in Data Engineering</title>
    <link>https://community.databricks.com/t5/data-engineering/write-spark-dataframe-into-opensearch/m-p/126514#M47705</link>
    <description>&lt;P&gt;Actually, I use only host &amp;amp; port:&lt;/P&gt;&lt;LI-CODE lang="python"&gt;# Define OpenSearch connection parameters and OSBackupManager client
host = config["OS_HOST"]
port = config["OS_PORT"]

os_client = OSClient(host=host, port=port)

# Where:
class OSClient:
    """
    A client for interacting with OpenSearch.
    """

    def __init__(
        self,
        host: str,
        port: int,
        region: str = "us-west-2",
        use_ssl: bool = True,
        logger=None,
    ):
        """
        Initializes the OSClient instance.

        Args:
            host (str): OpenSearch host.
            port (int): OpenSearch port.
            region (str, optional): AWS region for authentication.
            use_ssl (bool, optional): Whether to use SSL.
            logger (Logger, optional): Logger instance for logging.
        """
        self.host = host
        self.port = port
        self.region = region
        self.use_ssl = use_ssl
        self._logger = logger
        # Create OpenSearch client
        self.client = self._create_os_client()

    @property
    def os_url(self):
        return f"https://{self.host}:{self.port}"

    def _create_os_client(self) -&amp;gt; OpenSearch:
        """
        Creates and returns an OpenSearch client with AWS SigV4 authentication.

        Returns:
            OpenSearch: OpenSearch client instance.
        """
        http_auth = AWSV4SignerAuth(boto3.Session().get_credentials(), self.region, "es")
        return OpenSearch(
            hosts=[{"host": self.host, "port": self.port, "use_ssl": self.use_ssl}],
            http_auth=http_auth,
            connection_class=RequestsHttpConnection,
            http_compress=True,
        )

    # ...

    def upsert_data(
        self,
        df: DataFrame,
        index_name: str,
        id_column: str,
        parallelism: int = 20,
        batch_size: int = 5000,
        max_bytes_size: str = "100mb",
        upsert: bool = True,
        refresh: bool = False,
        auto_repartition: bool = True,
    ):
        """
        Performs upserts (insert or update) on OpenSearch for a given Spark DataFrame.

        Args:
            df (DataFrame): The Spark DataFrame containing the data to write.
            index_name (str): The OpenSearch index where data will be written.
            id_column (str): Column name to use as the unique identifier for upsert operations.
            parallelism (int, optional): Number of partitions for Spark processing.
            batch_size (int, optional): Number of documents per batch in each OpenSearch request.
            max_bytes_size (str, optional): Maximum size of each batch in bytes.
            upsert (bool, optional): Whether to perform upsert operations instead of index.
            refresh (bool, optional): Whether to refresh the index immediately after writing.
                - False (default): Faster writes, but new data may take ~1s to appear in search.
                - True: Ensures new documents are immediately searchable, but slows down ingestion.
            auto_repartition (bool, optional): Whether to repartition the DataFrame for parallelism.
        """

        if auto_repartition:
            # Ensure the DataFrame is properly partitioned for parallel processing
            df = df.repartition(parallelism)

        # Configure OpenSearch writer
        writer = (
            df.write.format("opensearch")
            .option("opensearch.nodes", self.os_url)
            .option("opensearch.port", self.port)
            .option("pushdown", "true")
            .option("opensearch.batch.write.refresh", str(refresh).lower())
            .option("opensearch.mapping.id", id_column)
            .option("opensearch.write.operation", "upsert" if upsert else "index")
            .option("opensearch.aws.sigv4.enabled", "true")
            .option("opensearch.aws.sigv4.region", self.region)
            .option("opensearch.nodes.resolve.hostname", "false")
            .option("opensearch.nodes.wan.only", "true")
            .option("opensearch.net.ssl", "true")
            .option("opensearch.batch.size.entries", str(batch_size))
            .option("opensearch.batch.size.bytes", max_bytes_size)
            .option("resource", index_name)
        )

        writer.mode("append").save()

        self.logger.info(f"Successfully upserted records into OpenSearch index: {index_name}")&lt;/LI-CODE&gt;&lt;P&gt;&amp;nbsp;And this should be executed on &lt;STRONG&gt;dedicated&lt;/STRONG&gt; cluster with&amp;nbsp;&lt;STRONG&gt;Instance profile&lt;/STRONG&gt; that has access to OpenSearch (aws.sigv4).&lt;/P&gt;</description>
    <pubDate>Fri, 25 Jul 2025 20:54:09 GMT</pubDate>
    <dc:creator>Hatter1337</dc:creator>
    <dc:date>2025-07-25T20:54:09Z</dc:date>
    <item>
      <title>Write Spark DataFrame into OpenSearch</title>
      <link>https://community.databricks.com/t5/data-engineering/write-spark-dataframe-into-opensearch/m-p/113019#M44392</link>
      <description>&lt;P&gt;Hi &lt;STRONG&gt;Databricks Community&lt;/STRONG&gt;,&lt;/P&gt;&lt;P&gt;I'm trying to &lt;STRONG&gt;read an index from OpenSearch&lt;/STRONG&gt; or &lt;STRONG&gt;write a DataFrame into an OpenSearch index&lt;/STRONG&gt; using the native Spark OpenSearch connector:&lt;/P&gt;&lt;LI-CODE lang="python"&gt;host = dbutils.secrets.get(scope="opensearch", key="host")
port = dbutils.secrets.get(scope="opensearch", key="port")
index = "index_name"

def save_to_open_search(df):
    df.write.format("org.opensearch.spark.sql") \
        .option("opensearch.nodes", host) \
        .option("opensearch.port", port) \
        .option("opensearch.nodes.resolve.hostname", "false") \
        .option("opensearch.nodes.wan.only", "true") \
        .option("opensearch.net.ssl", "true") \
        .option("opensearch.aws.sigv4.enabled", "true") \
        .option("opensearch.aws.sigv4.region", "us-west-2") \
        .option("opensearch.batch.size.entries", "200") \
        .option("opensearch.mapping.id", "id") \
        .option("opensearch.write.operation", "upsert") \
        .option("opensearch.resource", index) \
        .mode("append") \
        .save()&lt;/LI-CODE&gt;&lt;P&gt;However, I’m encountering the following error:&lt;/P&gt;&lt;LI-CODE lang="markup"&gt;Py4JJavaError: An error occurred while calling o456.save.
org.opensearch.hadoop.OpenSearchHadoopIllegalArgumentException: Cannot detect OpenSearch version - 
typically this happens if the network/OpenSearch cluster is not accessible or when targeting a WAN/Cloud instance without the proper setting&lt;/LI-CODE&gt;&lt;H3&gt;&lt;STRONG&gt;Environment Details&lt;/STRONG&gt;&lt;/H3&gt;&lt;UL&gt;&lt;LI&gt;&lt;STRONG&gt;Databricks Workspace:&lt;/STRONG&gt; Running on AWS&lt;/LI&gt;&lt;LI&gt;&lt;STRONG&gt;Databricks Runtime:&lt;/STRONG&gt; &lt;STRONG&gt;15.4 LTS&lt;/STRONG&gt; (Apache Spark 3.5.0, Scala 2.12)&lt;/LI&gt;&lt;LI&gt;&lt;STRONG&gt;OpenSearch Spark Connector:&lt;/STRONG&gt; org.opensearch.client:opensearch-spark-30_2.12:1.3.0&lt;/LI&gt;&lt;LI&gt;&lt;STRONG&gt;Cluster Type:&lt;/STRONG&gt; Dedicated&lt;/LI&gt;&lt;/UL&gt;&lt;H3&gt;&lt;STRONG&gt;What I’ve Tried&lt;/STRONG&gt;&lt;/H3&gt;&lt;P&gt;I &lt;STRONG&gt;ruled out network access issues&lt;/STRONG&gt;, because I am able to successfully connect to OpenSearch &lt;STRONG&gt;on the same cluster&lt;/STRONG&gt; using opensearch-py==2.8.0:&lt;/P&gt;&lt;LI-CODE lang="python"&gt;import boto3
from opensearchpy import (
    AWSV4SignerAuth,
    OpenSearch,
    RequestsHttpConnection,
)

def create_os_client(host, port, use_ssl=True, region="us-west-2") -&amp;gt; OpenSearch:
    """
    Function to create an OpenSearch client.
    """
    http_auth = AWSV4SignerAuth(boto3.Session().get_credentials(), region, "es")
    return OpenSearch(
        hosts=[{"host": host, "port": port, "use_ssl": use_ssl}],
        http_auth=http_auth,
        connection_class=RequestsHttpConnection,
        http_compress=True,
    )

os_client = create_os_client(
    host=dbutils.secrets.get(scope="opensearch", key="host"), 
    port=dbutils.secrets.get(scope="opensearch", key="port"),
)&lt;/LI-CODE&gt;&lt;P&gt;Even though I can connect using opensearch-py, I specifically want to &lt;STRONG&gt;use the native Spark OpenSearch connector&lt;/STRONG&gt; (org.opensearch.spark.sql).&lt;/P&gt;&lt;P&gt;Has anyone encountered a similar issue? Any suggestions on how to resolve this?&lt;BR /&gt;I’d really appreciate any insights or workarounds!&lt;/P&gt;</description>
      <pubDate>Wed, 19 Mar 2025 08:42:21 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/write-spark-dataframe-into-opensearch/m-p/113019#M44392</guid>
      <dc:creator>Hatter1337</dc:creator>
      <dc:date>2025-03-19T08:42:21Z</dc:date>
    </item>
    <item>
      <title>Re: Write Spark DataFrame into OpenSearch</title>
      <link>https://community.databricks.com/t5/data-engineering/write-spark-dataframe-into-opensearch/m-p/113034#M44401</link>
      <description>&lt;P&gt;Probably, this syntax is more correct:&lt;/P&gt;&lt;LI-CODE lang="python"&gt;def save_to_open_search(df):
    writer = (
        df.write.format("opensearch")
        .option("opensearch.nodes", host)
        .option("opensearch.port", port)
        .option("pushdown", "true")
        .option("opensearch.batch.write.refresh", "false")
        .option("opensearch.mapping.id", "id")
        .option("opensearch.write.operation", "upsert")
        .option("opensearch.aws.sigv4.enabled", "true")
        .option("opensearch.aws.sigv4.region", "us-west-2")
        .option("opensearch.nodes.resolve.hostname", "false")
        .option("opensearch.nodes.wan.only", "true")
        .option("opensearch.net.ssl", "true")
        .option("opensearch.batch.size.entries", "5000")
        .option("opensearch.batch.size.bytes", "10mb")
    )

    writer.mode("append").save(index)&lt;/LI-CODE&gt;&lt;P&gt;But I still have the same error:&lt;/P&gt;&lt;LI-CODE lang="markup"&gt;Py4JJavaError: An error occurred while calling o615.save.
: org.opensearch.hadoop.OpenSearchHadoopIllegalArgumentException: Cannot detect OpenSearch version - typically this happens if the network/OpenSearch cluster is not accessible or when targeting a WAN/Cloud instance without the proper setting 'opensearch.nodes.wan.only'...&lt;/LI-CODE&gt;</description>
      <pubDate>Wed, 19 Mar 2025 10:23:10 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/write-spark-dataframe-into-opensearch/m-p/113034#M44401</guid>
      <dc:creator>Hatter1337</dc:creator>
      <dc:date>2025-03-19T10:23:10Z</dc:date>
    </item>
    <item>
      <title>Re: Write Spark DataFrame into OpenSearch</title>
      <link>https://community.databricks.com/t5/data-engineering/write-spark-dataframe-into-opensearch/m-p/113040#M44404</link>
      <description>&lt;P&gt;I was close&amp;nbsp;&lt;span class="lia-unicode-emoji" title=":upside_down_face:"&gt;🙃&lt;/span&gt;&lt;BR /&gt;For opensearch.nodes, you should include https:&lt;/P&gt;&lt;LI-CODE lang="python"&gt;# OpenSearch connection details
host = dbutils.secrets.get(scope="opensearch", key="host")
port = dbutils.secrets.get(scope="opensearch", key="port")
os_url = f"https://{host}"
index = "index_name"

def save_to_open_search(df):
    writer = (
        df.write.format("opensearch")
        .option("opensearch.nodes", os_url)
        .option("opensearch.port", port)
        .option("pushdown", "true")
        .option("opensearch.batch.write.refresh", "false")
        .option("opensearch.mapping.id", "id")
        .option("opensearch.write.operation", "upsert")
        .option("opensearch.aws.sigv4.enabled", "true")
        .option("opensearch.aws.sigv4.region", "us-west-2")
        .option("opensearch.nodes.resolve.hostname", "false")
        .option("opensearch.nodes.wan.only", "true")
        .option("opensearch.net.ssl", "true")
        .option("opensearch.batch.size.entries", "5000")
        .option("opensearch.batch.size.bytes", "10mb")
    )

    writer.mode("append").save(index)&lt;/LI-CODE&gt;</description>
      <pubDate>Wed, 19 Mar 2025 10:52:20 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/write-spark-dataframe-into-opensearch/m-p/113040#M44404</guid>
      <dc:creator>Hatter1337</dc:creator>
      <dc:date>2025-03-19T10:52:20Z</dc:date>
    </item>
    <item>
      <title>Re: Write Spark DataFrame into OpenSearch</title>
      <link>https://community.databricks.com/t5/data-engineering/write-spark-dataframe-into-opensearch/m-p/126513#M47704</link>
      <description>&lt;P&gt;Thank you for posting this. What did you use to authenticate? I keep getting the error below. I tried passing env vars - did not work.&amp;nbsp;&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;DIV&gt;&lt;PRE&gt;EnvironmentVariableCredentialsProvider: Unable to load AWS credentials from environment variables (AWS_ACCESS_KEY_ID (or AWS_ACCESS_KEY) and AWS_SECRET_KEY (or AWS_SECRET_ACCESS_KEY)),&lt;BR /&gt;SystemPropertiesCredentialsProvider: Unable to load AWS credentials from Java system properties (aws.accessKeyId and aws.secretKey),&lt;BR /&gt;WebIdentityTokenCredentialsProvider: You must specify a value for roleArn and roleSessionName,&lt;BR /&gt;com.amazonaws.auth.profile.ProfileCredentialsProvider@6d1f111c: No AWS profile named 'default',&lt;BR /&gt;com.amazonaws.auth.EC2ContainerCredentialsProviderWrapper@1ff4a5e1: Failed to connect to service endpoint: ]); no other nodes left - aborting...&lt;/PRE&gt;&lt;/DIV&gt;</description>
      <pubDate>Fri, 25 Jul 2025 20:29:14 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/write-spark-dataframe-into-opensearch/m-p/126513#M47704</guid>
      <dc:creator>Alena</dc:creator>
      <dc:date>2025-07-25T20:29:14Z</dc:date>
    </item>
    <item>
      <title>Re: Write Spark DataFrame into OpenSearch</title>
      <link>https://community.databricks.com/t5/data-engineering/write-spark-dataframe-into-opensearch/m-p/126514#M47705</link>
      <description>&lt;P&gt;Actually, I use only host &amp;amp; port:&lt;/P&gt;&lt;LI-CODE lang="python"&gt;# Define OpenSearch connection parameters and OSBackupManager client
host = config["OS_HOST"]
port = config["OS_PORT"]

os_client = OSClient(host=host, port=port)

# Where:
class OSClient:
    """
    A client for interacting with OpenSearch.
    """

    def __init__(
        self,
        host: str,
        port: int,
        region: str = "us-west-2",
        use_ssl: bool = True,
        logger=None,
    ):
        """
        Initializes the OSClient instance.

        Args:
            host (str): OpenSearch host.
            port (int): OpenSearch port.
            region (str, optional): AWS region for authentication.
            use_ssl (bool, optional): Whether to use SSL.
            logger (Logger, optional): Logger instance for logging.
        """
        self.host = host
        self.port = port
        self.region = region
        self.use_ssl = use_ssl
        self._logger = logger
        # Create OpenSearch client
        self.client = self._create_os_client()

    @property
    def os_url(self):
        return f"https://{self.host}:{self.port}"

    def _create_os_client(self) -&amp;gt; OpenSearch:
        """
        Creates and returns an OpenSearch client with AWS SigV4 authentication.

        Returns:
            OpenSearch: OpenSearch client instance.
        """
        http_auth = AWSV4SignerAuth(boto3.Session().get_credentials(), self.region, "es")
        return OpenSearch(
            hosts=[{"host": self.host, "port": self.port, "use_ssl": self.use_ssl}],
            http_auth=http_auth,
            connection_class=RequestsHttpConnection,
            http_compress=True,
        )

    # ...

    def upsert_data(
        self,
        df: DataFrame,
        index_name: str,
        id_column: str,
        parallelism: int = 20,
        batch_size: int = 5000,
        max_bytes_size: str = "100mb",
        upsert: bool = True,
        refresh: bool = False,
        auto_repartition: bool = True,
    ):
        """
        Performs upserts (insert or update) on OpenSearch for a given Spark DataFrame.

        Args:
            df (DataFrame): The Spark DataFrame containing the data to write.
            index_name (str): The OpenSearch index where data will be written.
            id_column (str): Column name to use as the unique identifier for upsert operations.
            parallelism (int, optional): Number of partitions for Spark processing.
            batch_size (int, optional): Number of documents per batch in each OpenSearch request.
            max_bytes_size (str, optional): Maximum size of each batch in bytes.
            upsert (bool, optional): Whether to perform upsert operations instead of index.
            refresh (bool, optional): Whether to refresh the index immediately after writing.
                - False (default): Faster writes, but new data may take ~1s to appear in search.
                - True: Ensures new documents are immediately searchable, but slows down ingestion.
            auto_repartition (bool, optional): Whether to repartition the DataFrame for parallelism.
        """

        if auto_repartition:
            # Ensure the DataFrame is properly partitioned for parallel processing
            df = df.repartition(parallelism)

        # Configure OpenSearch writer
        writer = (
            df.write.format("opensearch")
            .option("opensearch.nodes", self.os_url)
            .option("opensearch.port", self.port)
            .option("pushdown", "true")
            .option("opensearch.batch.write.refresh", str(refresh).lower())
            .option("opensearch.mapping.id", id_column)
            .option("opensearch.write.operation", "upsert" if upsert else "index")
            .option("opensearch.aws.sigv4.enabled", "true")
            .option("opensearch.aws.sigv4.region", self.region)
            .option("opensearch.nodes.resolve.hostname", "false")
            .option("opensearch.nodes.wan.only", "true")
            .option("opensearch.net.ssl", "true")
            .option("opensearch.batch.size.entries", str(batch_size))
            .option("opensearch.batch.size.bytes", max_bytes_size)
            .option("resource", index_name)
        )

        writer.mode("append").save()

        self.logger.info(f"Successfully upserted records into OpenSearch index: {index_name}")&lt;/LI-CODE&gt;&lt;P&gt;&amp;nbsp;And this should be executed on &lt;STRONG&gt;dedicated&lt;/STRONG&gt; cluster with&amp;nbsp;&lt;STRONG&gt;Instance profile&lt;/STRONG&gt; that has access to OpenSearch (aws.sigv4).&lt;/P&gt;</description>
      <pubDate>Fri, 25 Jul 2025 20:54:09 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/write-spark-dataframe-into-opensearch/m-p/126514#M47705</guid>
      <dc:creator>Hatter1337</dc:creator>
      <dc:date>2025-07-25T20:54:09Z</dc:date>
    </item>
    <item>
      <title>Re: Write Spark DataFrame into OpenSearch</title>
      <link>https://community.databricks.com/t5/data-engineering/write-spark-dataframe-into-opensearch/m-p/127476#M47981</link>
      <description>&lt;P&gt;Thank you so much for your help—it works! One thing I’m trying to do is authenticate hadoop-opensearch using a different role than the one my cluster is mapped to. Environment variables only seem to work if they’re set in the cluster configuration. I’ll post a solution here if I make any progress.&lt;/P&gt;</description>
      <pubDate>Tue, 05 Aug 2025 15:02:47 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/write-spark-dataframe-into-opensearch/m-p/127476#M47981</guid>
      <dc:creator>Alena</dc:creator>
      <dc:date>2025-08-05T15:02:47Z</dc:date>
    </item>
    <item>
      <title>Re: Write Spark DataFrame into OpenSearch</title>
      <link>https://community.databricks.com/t5/data-engineering/write-spark-dataframe-into-opensearch/m-p/141043#M51611</link>
      <description>&lt;P&gt;Hi,&lt;BR /&gt;I am getting the same error and i also was able to connect using opensearch-py.&lt;BR /&gt;I also founded in this doc&amp;nbsp;&lt;A href="https://github.com/opensearch-project/opensearch-hadoop/blob/main/README.md#requirements" target="_blank"&gt;https://github.com/opensearch-project/opensearch-hadoop/blob/main/README.md#requirements&lt;/A&gt;&amp;nbsp;that i need to have some jars i already add it without any luck and here it mentions another jar i already added&amp;nbsp;&lt;A href="https://github.com/opensearch-project/opensearch-hadoop/blob/main/USER_GUIDE.md#signing-requests-for-iam-authentication" target="_blank"&gt;https://github.com/opensearch-project/opensearch-hadoop/blob/main/USER_GUIDE.md#signing-requests-for-iam-authentication&lt;/A&gt;&lt;BR /&gt;&lt;BR /&gt;I am using service credential to access OS not instance profile, is this make a difference ? and if someone can help it will be highly appreciated&amp;nbsp;&lt;BR /&gt;&lt;BR /&gt;&lt;BR /&gt;&lt;/P&gt;</description>
      <pubDate>Wed, 03 Dec 2025 16:51:04 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/write-spark-dataframe-into-opensearch/m-p/141043#M51611</guid>
      <dc:creator>SayedAbdallah</dc:creator>
      <dc:date>2025-12-03T16:51:04Z</dc:date>
    </item>
  </channel>
</rss>

