Hatter1337
New Contributor III

Actually, I use only host & port:

# Define OpenSearch connection parameters and OSBackupManager client
host = config["OS_HOST"]
port = config["OS_PORT"]

os_client = OSClient(host=host, port=port)

# Where:
class OSClient:
    """
    A client for interacting with OpenSearch.
    """

    def __init__(
        self,
        host: str,
        port: int,
        region: str = "us-west-2",
        use_ssl: bool = True,
        logger=None,
    ):
        """
        Initializes the OSClient instance.

        Args:
            host (str): OpenSearch host.
            port (int): OpenSearch port.
            region (str, optional): AWS region for authentication.
            use_ssl (bool, optional): Whether to use SSL.
            logger (Logger, optional): Logger instance for logging.
        """
        self.host = host
        self.port = port
        self.region = region
        self.use_ssl = use_ssl
        self._logger = logger
        # Create OpenSearch client
        self.client = self._create_os_client()

    @property
    def os_url(self):
        return f"https://{self.host}:{self.port}"

    def _create_os_client(self) -> OpenSearch:
        """
        Creates and returns an OpenSearch client with AWS SigV4 authentication.

        Returns:
            OpenSearch: OpenSearch client instance.
        """
        http_auth = AWSV4SignerAuth(boto3.Session().get_credentials(), self.region, "es")
        return OpenSearch(
            hosts=[{"host": self.host, "port": self.port, "use_ssl": self.use_ssl}],
            http_auth=http_auth,
            connection_class=RequestsHttpConnection,
            http_compress=True,
        )

    # ...

    def upsert_data(
        self,
        df: DataFrame,
        index_name: str,
        id_column: str,
        parallelism: int = 20,
        batch_size: int = 5000,
        max_bytes_size: str = "100mb",
        upsert: bool = True,
        refresh: bool = False,
        auto_repartition: bool = True,
    ):
        """
        Performs upserts (insert or update) on OpenSearch for a given Spark DataFrame.

        Args:
            df (DataFrame): The Spark DataFrame containing the data to write.
            index_name (str): The OpenSearch index where data will be written.
            id_column (str): Column name to use as the unique identifier for upsert operations.
            parallelism (int, optional): Number of partitions for Spark processing.
            batch_size (int, optional): Number of documents per batch in each OpenSearch request.
            max_bytes_size (str, optional): Maximum size of each batch in bytes.
            upsert (bool, optional): Whether to perform upsert operations instead of index.
            refresh (bool, optional): Whether to refresh the index immediately after writing.
                - False (default): Faster writes, but new data may take ~1s to appear in search.
                - True: Ensures new documents are immediately searchable, but slows down ingestion.
            auto_repartition (bool, optional): Whether to repartition the DataFrame for parallelism.
        """

        if auto_repartition:
            # Ensure the DataFrame is properly partitioned for parallel processing
            df = df.repartition(parallelism)

        # Configure OpenSearch writer
        writer = (
            df.write.format("opensearch")
            .option("opensearch.nodes", self.os_url)
            .option("opensearch.port", self.port)
            .option("pushdown", "true")
            .option("opensearch.batch.write.refresh", str(refresh).lower())
            .option("opensearch.mapping.id", id_column)
            .option("opensearch.write.operation", "upsert" if upsert else "index")
            .option("opensearch.aws.sigv4.enabled", "true")
            .option("opensearch.aws.sigv4.region", self.region)
            .option("opensearch.nodes.resolve.hostname", "false")
            .option("opensearch.nodes.wan.only", "true")
            .option("opensearch.net.ssl", "true")
            .option("opensearch.batch.size.entries", str(batch_size))
            .option("opensearch.batch.size.bytes", max_bytes_size)
            .option("resource", index_name)
        )

        writer.mode("append").save()

        self.logger.info(f"Successfully upserted records into OpenSearch index: {index_name}")

 And this should be executed on dedicated cluster with Instance profile that has access to OpenSearch (aws.sigv4).