cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

Write Spark DataFrame into OpenSearch

Hatter1337
New Contributor

Hi Databricks Community,

I'm trying to read an index from OpenSearch or write a DataFrame into an OpenSearch index using the native Spark OpenSearch connector:

host = dbutils.secrets.get(scope="opensearch", key="host")
port = dbutils.secrets.get(scope="opensearch", key="port")
index = "index_name"

def save_to_open_search(df):
    df.write.format("org.opensearch.spark.sql") \
        .option("opensearch.nodes", host) \
        .option("opensearch.port", port) \
        .option("opensearch.nodes.resolve.hostname", "false") \
        .option("opensearch.nodes.wan.only", "true") \
        .option("opensearch.net.ssl", "true") \
        .option("opensearch.aws.sigv4.enabled", "true") \
        .option("opensearch.aws.sigv4.region", "us-west-2") \
        .option("opensearch.batch.size.entries", "200") \
        .option("opensearch.mapping.id", "id") \
        .option("opensearch.write.operation", "upsert") \
        .option("opensearch.resource", index) \
        .mode("append") \
        .save()

However, I’m encountering the following error:

Py4JJavaError: An error occurred while calling o456.save.
org.opensearch.hadoop.OpenSearchHadoopIllegalArgumentException: Cannot detect OpenSearch version - 
typically this happens if the network/OpenSearch cluster is not accessible or when targeting a WAN/Cloud instance without the proper setting

Environment Details

  • Databricks Workspace: Running on AWS
  • Databricks Runtime: 15.4 LTS (Apache Spark 3.5.0, Scala 2.12)
  • OpenSearch Spark Connector: org.opensearch.client:opensearch-spark-30_2.12:1.3.0
  • Cluster Type: Dedicated

What I’ve Tried

I ruled out network access issues, because I am able to successfully connect to OpenSearch on the same cluster using opensearch-py==2.8.0:

import boto3
from opensearchpy import (
    AWSV4SignerAuth,
    OpenSearch,
    RequestsHttpConnection,
)

def create_os_client(host, port, use_ssl=True, region="us-west-2") -> OpenSearch:
    """
    Function to create an OpenSearch client.
    """
    http_auth = AWSV4SignerAuth(boto3.Session().get_credentials(), region, "es")
    return OpenSearch(
        hosts=[{"host": host, "port": port, "use_ssl": use_ssl}],
        http_auth=http_auth,
        connection_class=RequestsHttpConnection,
        http_compress=True,
    )

os_client = create_os_client(
    host=dbutils.secrets.get(scope="opensearch", key="host"), 
    port=dbutils.secrets.get(scope="opensearch", key="port"),
)

Even though I can connect using opensearch-py, I specifically want to use the native Spark OpenSearch connector (org.opensearch.spark.sql).

Has anyone encountered a similar issue? Any suggestions on how to resolve this?
I’d really appreciate any insights or workarounds!

1 ACCEPTED SOLUTION

Accepted Solutions

Hatter1337
New Contributor

I was close 🙃
For opensearch.nodes, you should include https:

# OpenSearch connection details
host = dbutils.secrets.get(scope="opensearch", key="host")
port = dbutils.secrets.get(scope="opensearch", key="port")
os_url = f"https://{host}"
index = "index_name"

def save_to_open_search(df):
    writer = (
        df.write.format("opensearch")
        .option("opensearch.nodes", os_url)
        .option("opensearch.port", port)
        .option("pushdown", "true")
        .option("opensearch.batch.write.refresh", "false")
        .option("opensearch.mapping.id", "id")
        .option("opensearch.write.operation", "upsert")
        .option("opensearch.aws.sigv4.enabled", "true")
        .option("opensearch.aws.sigv4.region", "us-west-2")
        .option("opensearch.nodes.resolve.hostname", "false")
        .option("opensearch.nodes.wan.only", "true")
        .option("opensearch.net.ssl", "true")
        .option("opensearch.batch.size.entries", "5000")
        .option("opensearch.batch.size.bytes", "10mb")
    )

    writer.mode("append").save(index)

View solution in original post

2 REPLIES 2

Hatter1337
New Contributor

Probably, this syntax is more correct:

def save_to_open_search(df):
    writer = (
        df.write.format("opensearch")
        .option("opensearch.nodes", host)
        .option("opensearch.port", port)
        .option("pushdown", "true")
        .option("opensearch.batch.write.refresh", "false")
        .option("opensearch.mapping.id", "id")
        .option("opensearch.write.operation", "upsert")
        .option("opensearch.aws.sigv4.enabled", "true")
        .option("opensearch.aws.sigv4.region", "us-west-2")
        .option("opensearch.nodes.resolve.hostname", "false")
        .option("opensearch.nodes.wan.only", "true")
        .option("opensearch.net.ssl", "true")
        .option("opensearch.batch.size.entries", "5000")
        .option("opensearch.batch.size.bytes", "10mb")
    )

    writer.mode("append").save(index)

But I still have the same error:

Py4JJavaError: An error occurred while calling o615.save.
: org.opensearch.hadoop.OpenSearchHadoopIllegalArgumentException: Cannot detect OpenSearch version - typically this happens if the network/OpenSearch cluster is not accessible or when targeting a WAN/Cloud instance without the proper setting 'opensearch.nodes.wan.only'...

Hatter1337
New Contributor

I was close 🙃
For opensearch.nodes, you should include https:

# OpenSearch connection details
host = dbutils.secrets.get(scope="opensearch", key="host")
port = dbutils.secrets.get(scope="opensearch", key="port")
os_url = f"https://{host}"
index = "index_name"

def save_to_open_search(df):
    writer = (
        df.write.format("opensearch")
        .option("opensearch.nodes", os_url)
        .option("opensearch.port", port)
        .option("pushdown", "true")
        .option("opensearch.batch.write.refresh", "false")
        .option("opensearch.mapping.id", "id")
        .option("opensearch.write.operation", "upsert")
        .option("opensearch.aws.sigv4.enabled", "true")
        .option("opensearch.aws.sigv4.region", "us-west-2")
        .option("opensearch.nodes.resolve.hostname", "false")
        .option("opensearch.nodes.wan.only", "true")
        .option("opensearch.net.ssl", "true")
        .option("opensearch.batch.size.entries", "5000")
        .option("opensearch.batch.size.bytes", "10mb")
    )

    writer.mode("append").save(index)

Connect with Databricks Users in Your Area

Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.

If there isn’t a group near you, start one and help create a community that brings people together.

Request a New Group