Options
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
07-25-2025 01:52 PM - edited 07-25-2025 01:54 PM
Actually, I use only host & port:
# Define OpenSearch connection parameters and OSBackupManager client
host = config["OS_HOST"]
port = config["OS_PORT"]
os_client = OSClient(host=host, port=port)
# Where:
class OSClient:
"""
A client for interacting with OpenSearch.
"""
def __init__(
self,
host: str,
port: int,
region: str = "us-west-2",
use_ssl: bool = True,
logger=None,
):
"""
Initializes the OSClient instance.
Args:
host (str): OpenSearch host.
port (int): OpenSearch port.
region (str, optional): AWS region for authentication.
use_ssl (bool, optional): Whether to use SSL.
logger (Logger, optional): Logger instance for logging.
"""
self.host = host
self.port = port
self.region = region
self.use_ssl = use_ssl
self._logger = logger
# Create OpenSearch client
self.client = self._create_os_client()
@property
def os_url(self):
return f"https://{self.host}:{self.port}"
def _create_os_client(self) -> OpenSearch:
"""
Creates and returns an OpenSearch client with AWS SigV4 authentication.
Returns:
OpenSearch: OpenSearch client instance.
"""
http_auth = AWSV4SignerAuth(boto3.Session().get_credentials(), self.region, "es")
return OpenSearch(
hosts=[{"host": self.host, "port": self.port, "use_ssl": self.use_ssl}],
http_auth=http_auth,
connection_class=RequestsHttpConnection,
http_compress=True,
)
# ...
def upsert_data(
self,
df: DataFrame,
index_name: str,
id_column: str,
parallelism: int = 20,
batch_size: int = 5000,
max_bytes_size: str = "100mb",
upsert: bool = True,
refresh: bool = False,
auto_repartition: bool = True,
):
"""
Performs upserts (insert or update) on OpenSearch for a given Spark DataFrame.
Args:
df (DataFrame): The Spark DataFrame containing the data to write.
index_name (str): The OpenSearch index where data will be written.
id_column (str): Column name to use as the unique identifier for upsert operations.
parallelism (int, optional): Number of partitions for Spark processing.
batch_size (int, optional): Number of documents per batch in each OpenSearch request.
max_bytes_size (str, optional): Maximum size of each batch in bytes.
upsert (bool, optional): Whether to perform upsert operations instead of index.
refresh (bool, optional): Whether to refresh the index immediately after writing.
- False (default): Faster writes, but new data may take ~1s to appear in search.
- True: Ensures new documents are immediately searchable, but slows down ingestion.
auto_repartition (bool, optional): Whether to repartition the DataFrame for parallelism.
"""
if auto_repartition:
# Ensure the DataFrame is properly partitioned for parallel processing
df = df.repartition(parallelism)
# Configure OpenSearch writer
writer = (
df.write.format("opensearch")
.option("opensearch.nodes", self.os_url)
.option("opensearch.port", self.port)
.option("pushdown", "true")
.option("opensearch.batch.write.refresh", str(refresh).lower())
.option("opensearch.mapping.id", id_column)
.option("opensearch.write.operation", "upsert" if upsert else "index")
.option("opensearch.aws.sigv4.enabled", "true")
.option("opensearch.aws.sigv4.region", self.region)
.option("opensearch.nodes.resolve.hostname", "false")
.option("opensearch.nodes.wan.only", "true")
.option("opensearch.net.ssl", "true")
.option("opensearch.batch.size.entries", str(batch_size))
.option("opensearch.batch.size.bytes", max_bytes_size)
.option("resource", index_name)
)
writer.mode("append").save()
self.logger.info(f"Successfully upserted records into OpenSearch index: {index_name}")And this should be executed on dedicated cluster with Instance profile that has access to OpenSearch (aws.sigv4).