<?xml version="1.0" encoding="UTF-8"?>
<rss xmlns:content="http://purl.org/rss/1.0/modules/content/" xmlns:dc="http://purl.org/dc/elements/1.1/" xmlns:rdf="http://www.w3.org/1999/02/22-rdf-syntax-ns#" xmlns:taxo="http://purl.org/rss/1.0/modules/taxonomy/" version="2.0">
  <channel>
    <title>topic foreachBatch in pyspark throwing OSError: [WinError 10022] An invalid argument was supplied in Data Engineering</title>
    <link>https://community.databricks.com/t5/data-engineering/foreachbatch-in-pyspark-throwing-oserror-winerror-10022-an/m-p/26892#M18885</link>
    <description>&lt;P&gt;Hello Team,&lt;/P&gt;&lt;P&gt;Since last 3 weeks I am trying to move my project from batch to structure streaming.&lt;/P&gt;&lt;P&gt;But every time I am running my code I am getting below error:&lt;/P&gt;&lt;P&gt;Traceback (most recent call last): File "C:\Users\avisriva1\git_4May2021\comint-ml-scores\src\scores_common_functions\ml_nlp_streams.py", line 67, in &amp;lt;module&amp;gt; jsonFromKafka.writeStream.foreachBatch(foreach_batch_function).start(); File "C:\Users\avisriva1\AppData\Local\Programs\Python\Python39\lib\site-packages\pyspark\sql\streaming.py", line 1421, in "foreachBatch" ensure_callback_server_started(gw)&lt;/P&gt;&lt;P&gt;File "C:\Users\avisriva1\AppData\Local\Programs\Python\Python39\lib\site-packages\pyspark\java_gateway.py", line 230, in ensure_callback_server_started cbport = gw._callback_server.server_socket.getsockname()[1] OSError: [WinError 10022] An invalid argument was supplied During handling of the above exception, another exception occurred: Traceback (most recent call last):&lt;/P&gt;&lt;P&gt;File "C:\Users\avisriva1\git_4May2021\comint-ml-scores\src\scores_common_functions\ml_nlp_streams.py", line 69, in &amp;lt;module&amp;gt; raise&lt;/P&gt;&lt;P&gt;Exception("&amp;gt;&amp;gt;&amp;gt;&amp;gt;&amp;gt;&amp;gt;&amp;gt;&amp;gt;&amp;gt;&amp;gt;&amp;gt;&amp;gt;&amp;gt;&amp;gt;&amp;gt;&amp;gt;&amp;gt;&amp;gt;&amp;gt;&amp;gt;", e); Exception: ('&amp;gt;&amp;gt;&amp;gt;&amp;gt;&amp;gt;&amp;gt;&amp;gt;&amp;gt;&amp;gt;&amp;gt;&amp;gt;&amp;gt;&amp;gt;&amp;gt;&amp;gt;&amp;gt;&amp;gt;&amp;gt;&amp;gt;&amp;gt;', OSError(10022, 'An invalid argument was supplied', None, 10022, None))&lt;/P&gt;&lt;P&gt;PFB my code that I am using&lt;/P&gt;&lt;P&gt;def foreach_batch_function(df, epoch_id):&lt;/P&gt;&lt;P&gt;print("\nProcesing Batch : ", epoch_id);&lt;/P&gt;&lt;P&gt;pass&lt;/P&gt;&lt;P&gt;# end of foreach_batch_function()&lt;/P&gt;&lt;P&gt;def get_input_schema():&lt;/P&gt;&lt;P&gt;'''Copy-Quality Columns'''&lt;/P&gt;&lt;P&gt;schema = StructType().add("fileName", StringType()).add("date", StringType()).add("brandName", StringType()).add("breadcrumb", ArrayType(StringType())).add("subcategory", StringType()).add("scoring_subcategory", StringType()).add("bullet_features", StringType()).add("product_description", StringType()).add("rich_content_text", StringType()).add("currency_code", StringType());&lt;/P&gt;&lt;P&gt;return schema;&lt;/P&gt;&lt;P&gt;# end of get_input_schema()&lt;/P&gt;&lt;P&gt;if&amp;nbsp;&lt;B&gt;name&lt;/B&gt;&amp;nbsp;== "&lt;B&gt;main&lt;/B&gt;":&lt;/P&gt;&lt;P&gt;spark = get_spark_session_for_streaming();&lt;/P&gt;&lt;P&gt;lines_from_kafka = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "localhost:9092, localhost:9093").option("subscribe", "first_topic").option("starting_offsets", "earliest").load();&lt;/P&gt;&lt;P&gt;'''Get Schema For Input'''&lt;/P&gt;&lt;P&gt;required_schema = get_input_schema();&lt;/P&gt;&lt;P&gt;'''Read Input JSON'''&lt;/P&gt;&lt;P&gt;obj_from_kafka = lines_from_kafka.select(from_json(col("value").cast("string"),&lt;/P&gt;&lt;P&gt;required_schema).alias("jsonObj"));&lt;/P&gt;&lt;P&gt;json_from_kafka = objFromKafka.selectExpr("jsonObj.brandName as brand_name", "jsonObj.subcategory as subcategory", "jsonObj.scoringSubcategory as scoring_subcategory", "jsonObj.date as date", "jsonObj.breadcrumb as breadcrumb", "jsonObj.fileName as FileName", "jsonObj.bulletFeatures as bullet_features", "jsonObj.productDescription as product_description", "jsonObj.richContentText as rich_content_text", "jsonObj.currencyCode as currencyCode")&lt;/P&gt;&lt;P&gt;try:&lt;/P&gt;&lt;P&gt;jsonFromKafka.writeStream.foreachBatch(foreach_batch_function).start();&lt;/P&gt;&lt;P&gt;except Exception as e:&lt;/P&gt;&lt;P&gt;raise Exception("&amp;gt;&amp;gt;&amp;gt;&amp;gt;&amp;gt;&amp;gt;&amp;gt;&amp;gt;&amp;gt;&amp;gt;&amp;gt;&amp;gt;&amp;gt;&amp;gt;&amp;gt;&amp;gt;&amp;gt;&amp;gt;&amp;gt;&amp;gt;", e);&lt;/P&gt;&lt;P&gt;# end of main()&lt;/P&gt;&lt;P&gt;It is requested to please help me fix this issue. We have to move our Batch product to structured streaming on GCP very shortly, but I am stuck here, not able to move ahead because of this.&lt;/P&gt;</description>
    <pubDate>Thu, 13 May 2021 15:26:17 GMT</pubDate>
    <dc:creator>Anonymous</dc:creator>
    <dc:date>2021-05-13T15:26:17Z</dc:date>
    <item>
      <title>foreachBatch in pyspark throwing OSError: [WinError 10022] An invalid argument was supplied</title>
      <link>https://community.databricks.com/t5/data-engineering/foreachbatch-in-pyspark-throwing-oserror-winerror-10022-an/m-p/26892#M18885</link>
      <description>&lt;P&gt;Hello Team,&lt;/P&gt;&lt;P&gt;Since last 3 weeks I am trying to move my project from batch to structure streaming.&lt;/P&gt;&lt;P&gt;But every time I am running my code I am getting below error:&lt;/P&gt;&lt;P&gt;Traceback (most recent call last): File "C:\Users\avisriva1\git_4May2021\comint-ml-scores\src\scores_common_functions\ml_nlp_streams.py", line 67, in &amp;lt;module&amp;gt; jsonFromKafka.writeStream.foreachBatch(foreach_batch_function).start(); File "C:\Users\avisriva1\AppData\Local\Programs\Python\Python39\lib\site-packages\pyspark\sql\streaming.py", line 1421, in "foreachBatch" ensure_callback_server_started(gw)&lt;/P&gt;&lt;P&gt;File "C:\Users\avisriva1\AppData\Local\Programs\Python\Python39\lib\site-packages\pyspark\java_gateway.py", line 230, in ensure_callback_server_started cbport = gw._callback_server.server_socket.getsockname()[1] OSError: [WinError 10022] An invalid argument was supplied During handling of the above exception, another exception occurred: Traceback (most recent call last):&lt;/P&gt;&lt;P&gt;File "C:\Users\avisriva1\git_4May2021\comint-ml-scores\src\scores_common_functions\ml_nlp_streams.py", line 69, in &amp;lt;module&amp;gt; raise&lt;/P&gt;&lt;P&gt;Exception("&amp;gt;&amp;gt;&amp;gt;&amp;gt;&amp;gt;&amp;gt;&amp;gt;&amp;gt;&amp;gt;&amp;gt;&amp;gt;&amp;gt;&amp;gt;&amp;gt;&amp;gt;&amp;gt;&amp;gt;&amp;gt;&amp;gt;&amp;gt;", e); Exception: ('&amp;gt;&amp;gt;&amp;gt;&amp;gt;&amp;gt;&amp;gt;&amp;gt;&amp;gt;&amp;gt;&amp;gt;&amp;gt;&amp;gt;&amp;gt;&amp;gt;&amp;gt;&amp;gt;&amp;gt;&amp;gt;&amp;gt;&amp;gt;', OSError(10022, 'An invalid argument was supplied', None, 10022, None))&lt;/P&gt;&lt;P&gt;PFB my code that I am using&lt;/P&gt;&lt;P&gt;def foreach_batch_function(df, epoch_id):&lt;/P&gt;&lt;P&gt;print("\nProcesing Batch : ", epoch_id);&lt;/P&gt;&lt;P&gt;pass&lt;/P&gt;&lt;P&gt;# end of foreach_batch_function()&lt;/P&gt;&lt;P&gt;def get_input_schema():&lt;/P&gt;&lt;P&gt;'''Copy-Quality Columns'''&lt;/P&gt;&lt;P&gt;schema = StructType().add("fileName", StringType()).add("date", StringType()).add("brandName", StringType()).add("breadcrumb", ArrayType(StringType())).add("subcategory", StringType()).add("scoring_subcategory", StringType()).add("bullet_features", StringType()).add("product_description", StringType()).add("rich_content_text", StringType()).add("currency_code", StringType());&lt;/P&gt;&lt;P&gt;return schema;&lt;/P&gt;&lt;P&gt;# end of get_input_schema()&lt;/P&gt;&lt;P&gt;if&amp;nbsp;&lt;B&gt;name&lt;/B&gt;&amp;nbsp;== "&lt;B&gt;main&lt;/B&gt;":&lt;/P&gt;&lt;P&gt;spark = get_spark_session_for_streaming();&lt;/P&gt;&lt;P&gt;lines_from_kafka = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "localhost:9092, localhost:9093").option("subscribe", "first_topic").option("starting_offsets", "earliest").load();&lt;/P&gt;&lt;P&gt;'''Get Schema For Input'''&lt;/P&gt;&lt;P&gt;required_schema = get_input_schema();&lt;/P&gt;&lt;P&gt;'''Read Input JSON'''&lt;/P&gt;&lt;P&gt;obj_from_kafka = lines_from_kafka.select(from_json(col("value").cast("string"),&lt;/P&gt;&lt;P&gt;required_schema).alias("jsonObj"));&lt;/P&gt;&lt;P&gt;json_from_kafka = objFromKafka.selectExpr("jsonObj.brandName as brand_name", "jsonObj.subcategory as subcategory", "jsonObj.scoringSubcategory as scoring_subcategory", "jsonObj.date as date", "jsonObj.breadcrumb as breadcrumb", "jsonObj.fileName as FileName", "jsonObj.bulletFeatures as bullet_features", "jsonObj.productDescription as product_description", "jsonObj.richContentText as rich_content_text", "jsonObj.currencyCode as currencyCode")&lt;/P&gt;&lt;P&gt;try:&lt;/P&gt;&lt;P&gt;jsonFromKafka.writeStream.foreachBatch(foreach_batch_function).start();&lt;/P&gt;&lt;P&gt;except Exception as e:&lt;/P&gt;&lt;P&gt;raise Exception("&amp;gt;&amp;gt;&amp;gt;&amp;gt;&amp;gt;&amp;gt;&amp;gt;&amp;gt;&amp;gt;&amp;gt;&amp;gt;&amp;gt;&amp;gt;&amp;gt;&amp;gt;&amp;gt;&amp;gt;&amp;gt;&amp;gt;&amp;gt;", e);&lt;/P&gt;&lt;P&gt;# end of main()&lt;/P&gt;&lt;P&gt;It is requested to please help me fix this issue. We have to move our Batch product to structured streaming on GCP very shortly, but I am stuck here, not able to move ahead because of this.&lt;/P&gt;</description>
      <pubDate>Thu, 13 May 2021 15:26:17 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/foreachbatch-in-pyspark-throwing-oserror-winerror-10022-an/m-p/26892#M18885</guid>
      <dc:creator>Anonymous</dc:creator>
      <dc:date>2021-05-13T15:26:17Z</dc:date>
    </item>
  </channel>
</rss>

