03-19-2025 01:42 AM
Hi Databricks Community,
I'm trying to read an index from OpenSearch or write a DataFrame into an OpenSearch index using the native Spark OpenSearch connector:
host = dbutils.secrets.get(scope="opensearch", key="host")
port = dbutils.secrets.get(scope="opensearch", key="port")
index = "index_name"
def save_to_open_search(df):
df.write.format("org.opensearch.spark.sql") \
.option("opensearch.nodes", host) \
.option("opensearch.port", port) \
.option("opensearch.nodes.resolve.hostname", "false") \
.option("opensearch.nodes.wan.only", "true") \
.option("opensearch.net.ssl", "true") \
.option("opensearch.aws.sigv4.enabled", "true") \
.option("opensearch.aws.sigv4.region", "us-west-2") \
.option("opensearch.batch.size.entries", "200") \
.option("opensearch.mapping.id", "id") \
.option("opensearch.write.operation", "upsert") \
.option("opensearch.resource", index) \
.mode("append") \
.save()
However, I’m encountering the following error:
Py4JJavaError: An error occurred while calling o456.save.
org.opensearch.hadoop.OpenSearchHadoopIllegalArgumentException: Cannot detect OpenSearch version -
typically this happens if the network/OpenSearch cluster is not accessible or when targeting a WAN/Cloud instance without the proper setting
I ruled out network access issues, because I am able to successfully connect to OpenSearch on the same cluster using opensearch-py==2.8.0:
import boto3
from opensearchpy import (
AWSV4SignerAuth,
OpenSearch,
RequestsHttpConnection,
)
def create_os_client(host, port, use_ssl=True, region="us-west-2") -> OpenSearch:
"""
Function to create an OpenSearch client.
"""
http_auth = AWSV4SignerAuth(boto3.Session().get_credentials(), region, "es")
return OpenSearch(
hosts=[{"host": host, "port": port, "use_ssl": use_ssl}],
http_auth=http_auth,
connection_class=RequestsHttpConnection,
http_compress=True,
)
os_client = create_os_client(
host=dbutils.secrets.get(scope="opensearch", key="host"),
port=dbutils.secrets.get(scope="opensearch", key="port"),
)
Even though I can connect using opensearch-py, I specifically want to use the native Spark OpenSearch connector (org.opensearch.spark.sql).
Has anyone encountered a similar issue? Any suggestions on how to resolve this?
I’d really appreciate any insights or workarounds!
03-19-2025 03:52 AM
I was close 🙃
For opensearch.nodes, you should include https:
# OpenSearch connection details
host = dbutils.secrets.get(scope="opensearch", key="host")
port = dbutils.secrets.get(scope="opensearch", key="port")
os_url = f"https://{host}"
index = "index_name"
def save_to_open_search(df):
writer = (
df.write.format("opensearch")
.option("opensearch.nodes", os_url)
.option("opensearch.port", port)
.option("pushdown", "true")
.option("opensearch.batch.write.refresh", "false")
.option("opensearch.mapping.id", "id")
.option("opensearch.write.operation", "upsert")
.option("opensearch.aws.sigv4.enabled", "true")
.option("opensearch.aws.sigv4.region", "us-west-2")
.option("opensearch.nodes.resolve.hostname", "false")
.option("opensearch.nodes.wan.only", "true")
.option("opensearch.net.ssl", "true")
.option("opensearch.batch.size.entries", "5000")
.option("opensearch.batch.size.bytes", "10mb")
)
writer.mode("append").save(index)
03-19-2025 03:23 AM
Probably, this syntax is more correct:
def save_to_open_search(df):
writer = (
df.write.format("opensearch")
.option("opensearch.nodes", host)
.option("opensearch.port", port)
.option("pushdown", "true")
.option("opensearch.batch.write.refresh", "false")
.option("opensearch.mapping.id", "id")
.option("opensearch.write.operation", "upsert")
.option("opensearch.aws.sigv4.enabled", "true")
.option("opensearch.aws.sigv4.region", "us-west-2")
.option("opensearch.nodes.resolve.hostname", "false")
.option("opensearch.nodes.wan.only", "true")
.option("opensearch.net.ssl", "true")
.option("opensearch.batch.size.entries", "5000")
.option("opensearch.batch.size.bytes", "10mb")
)
writer.mode("append").save(index)
But I still have the same error:
Py4JJavaError: An error occurred while calling o615.save.
: org.opensearch.hadoop.OpenSearchHadoopIllegalArgumentException: Cannot detect OpenSearch version - typically this happens if the network/OpenSearch cluster is not accessible or when targeting a WAN/Cloud instance without the proper setting 'opensearch.nodes.wan.only'...
03-19-2025 03:52 AM
I was close 🙃
For opensearch.nodes, you should include https:
# OpenSearch connection details
host = dbutils.secrets.get(scope="opensearch", key="host")
port = dbutils.secrets.get(scope="opensearch", key="port")
os_url = f"https://{host}"
index = "index_name"
def save_to_open_search(df):
writer = (
df.write.format("opensearch")
.option("opensearch.nodes", os_url)
.option("opensearch.port", port)
.option("pushdown", "true")
.option("opensearch.batch.write.refresh", "false")
.option("opensearch.mapping.id", "id")
.option("opensearch.write.operation", "upsert")
.option("opensearch.aws.sigv4.enabled", "true")
.option("opensearch.aws.sigv4.region", "us-west-2")
.option("opensearch.nodes.resolve.hostname", "false")
.option("opensearch.nodes.wan.only", "true")
.option("opensearch.net.ssl", "true")
.option("opensearch.batch.size.entries", "5000")
.option("opensearch.batch.size.bytes", "10mb")
)
writer.mode("append").save(index)
07-25-2025 01:29 PM
Thank you for posting this. What did you use to authenticate? I keep getting the error below. I tried passing env vars - did not work.
EnvironmentVariableCredentialsProvider: Unable to load AWS credentials from environment variables (AWS_ACCESS_KEY_ID (or AWS_ACCESS_KEY) and AWS_SECRET_KEY (or AWS_SECRET_ACCESS_KEY)),
SystemPropertiesCredentialsProvider: Unable to load AWS credentials from Java system properties (aws.accessKeyId and aws.secretKey),
WebIdentityTokenCredentialsProvider: You must specify a value for roleArn and roleSessionName,
com.amazonaws.auth.profile.ProfileCredentialsProvider@6d1f111c: No AWS profile named 'default',
com.amazonaws.auth.EC2ContainerCredentialsProviderWrapper@1ff4a5e1: Failed to connect to service endpoint: ]); no other nodes left - aborting...
07-25-2025 01:52 PM - edited 07-25-2025 01:54 PM
Actually, I use only host & port:
# Define OpenSearch connection parameters and OSBackupManager client
host = config["OS_HOST"]
port = config["OS_PORT"]
os_client = OSClient(host=host, port=port)
# Where:
class OSClient:
"""
A client for interacting with OpenSearch.
"""
def __init__(
self,
host: str,
port: int,
region: str = "us-west-2",
use_ssl: bool = True,
logger=None,
):
"""
Initializes the OSClient instance.
Args:
host (str): OpenSearch host.
port (int): OpenSearch port.
region (str, optional): AWS region for authentication.
use_ssl (bool, optional): Whether to use SSL.
logger (Logger, optional): Logger instance for logging.
"""
self.host = host
self.port = port
self.region = region
self.use_ssl = use_ssl
self._logger = logger
# Create OpenSearch client
self.client = self._create_os_client()
@property
def os_url(self):
return f"https://{self.host}:{self.port}"
def _create_os_client(self) -> OpenSearch:
"""
Creates and returns an OpenSearch client with AWS SigV4 authentication.
Returns:
OpenSearch: OpenSearch client instance.
"""
http_auth = AWSV4SignerAuth(boto3.Session().get_credentials(), self.region, "es")
return OpenSearch(
hosts=[{"host": self.host, "port": self.port, "use_ssl": self.use_ssl}],
http_auth=http_auth,
connection_class=RequestsHttpConnection,
http_compress=True,
)
# ...
def upsert_data(
self,
df: DataFrame,
index_name: str,
id_column: str,
parallelism: int = 20,
batch_size: int = 5000,
max_bytes_size: str = "100mb",
upsert: bool = True,
refresh: bool = False,
auto_repartition: bool = True,
):
"""
Performs upserts (insert or update) on OpenSearch for a given Spark DataFrame.
Args:
df (DataFrame): The Spark DataFrame containing the data to write.
index_name (str): The OpenSearch index where data will be written.
id_column (str): Column name to use as the unique identifier for upsert operations.
parallelism (int, optional): Number of partitions for Spark processing.
batch_size (int, optional): Number of documents per batch in each OpenSearch request.
max_bytes_size (str, optional): Maximum size of each batch in bytes.
upsert (bool, optional): Whether to perform upsert operations instead of index.
refresh (bool, optional): Whether to refresh the index immediately after writing.
- False (default): Faster writes, but new data may take ~1s to appear in search.
- True: Ensures new documents are immediately searchable, but slows down ingestion.
auto_repartition (bool, optional): Whether to repartition the DataFrame for parallelism.
"""
if auto_repartition:
# Ensure the DataFrame is properly partitioned for parallel processing
df = df.repartition(parallelism)
# Configure OpenSearch writer
writer = (
df.write.format("opensearch")
.option("opensearch.nodes", self.os_url)
.option("opensearch.port", self.port)
.option("pushdown", "true")
.option("opensearch.batch.write.refresh", str(refresh).lower())
.option("opensearch.mapping.id", id_column)
.option("opensearch.write.operation", "upsert" if upsert else "index")
.option("opensearch.aws.sigv4.enabled", "true")
.option("opensearch.aws.sigv4.region", self.region)
.option("opensearch.nodes.resolve.hostname", "false")
.option("opensearch.nodes.wan.only", "true")
.option("opensearch.net.ssl", "true")
.option("opensearch.batch.size.entries", str(batch_size))
.option("opensearch.batch.size.bytes", max_bytes_size)
.option("resource", index_name)
)
writer.mode("append").save()
self.logger.info(f"Successfully upserted records into OpenSearch index: {index_name}")
And this should be executed on dedicated cluster with Instance profile that has access to OpenSearch (aws.sigv4).
4 weeks ago
Thank you so much for your help—it works! One thing I’m trying to do is authenticate hadoop-opensearch using a different role than the one my cluster is mapped to. Environment variables only seem to work if they’re set in the cluster configuration. I’ll post a solution here if I make any progress.
Passionate about hosting events and connecting people? Help us grow a vibrant local community—sign up today to get started!
Sign Up Now