cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

foreachBatch in pyspark throwing OSError: [WinError 10022] An invalid argument was supplied

Anonymous
Not applicable

Hello Team,

Since last 3 weeks I am trying to move my project from batch to structure streaming.

But every time I am running my code I am getting below error:

Traceback (most recent call last): File "C:\Users\avisriva1\git_4May2021\comint-ml-scores\src\scores_common_functions\ml_nlp_streams.py", line 67, in <module> jsonFromKafka.writeStream.foreachBatch(foreach_batch_function).start(); File "C:\Users\avisriva1\AppData\Local\Programs\Python\Python39\lib\site-packages\pyspark\sql\streaming.py", line 1421, in "foreachBatch" ensure_callback_server_started(gw)

File "C:\Users\avisriva1\AppData\Local\Programs\Python\Python39\lib\site-packages\pyspark\java_gateway.py", line 230, in ensure_callback_server_started cbport = gw._callback_server.server_socket.getsockname()[1] OSError: [WinError 10022] An invalid argument was supplied During handling of the above exception, another exception occurred: Traceback (most recent call last):

File "C:\Users\avisriva1\git_4May2021\comint-ml-scores\src\scores_common_functions\ml_nlp_streams.py", line 69, in <module> raise

Exception(">>>>>>>>>>>>>>>>>>>>", e); Exception: ('>>>>>>>>>>>>>>>>>>>>', OSError(10022, 'An invalid argument was supplied', None, 10022, None))

PFB my code that I am using

def foreach_batch_function(df, epoch_id):

print("\nProcesing Batch : ", epoch_id);

pass

# end of foreach_batch_function()

def get_input_schema():

'''Copy-Quality Columns'''

schema = StructType().add("fileName", StringType()).add("date", StringType()).add("brandName", StringType()).add("breadcrumb", ArrayType(StringType())).add("subcategory", StringType()).add("scoring_subcategory", StringType()).add("bullet_features", StringType()).add("product_description", StringType()).add("rich_content_text", StringType()).add("currency_code", StringType());

return schema;

# end of get_input_schema()

if name == "main":

spark = get_spark_session_for_streaming();

lines_from_kafka = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "localhost:9092, localhost:9093").option("subscribe", "first_topic").option("starting_offsets", "earliest").load();

'''Get Schema For Input'''

required_schema = get_input_schema();

'''Read Input JSON'''

obj_from_kafka = lines_from_kafka.select(from_json(col("value").cast("string"),

required_schema).alias("jsonObj"));

json_from_kafka = objFromKafka.selectExpr("jsonObj.brandName as brand_name", "jsonObj.subcategory as subcategory", "jsonObj.scoringSubcategory as scoring_subcategory", "jsonObj.date as date", "jsonObj.breadcrumb as breadcrumb", "jsonObj.fileName as FileName", "jsonObj.bulletFeatures as bullet_features", "jsonObj.productDescription as product_description", "jsonObj.richContentText as rich_content_text", "jsonObj.currencyCode as currencyCode")

try:

jsonFromKafka.writeStream.foreachBatch(foreach_batch_function).start();

except Exception as e:

raise Exception(">>>>>>>>>>>>>>>>>>>>", e);

# end of main()

It is requested to please help me fix this issue. We have to move our Batch product to structured streaming on GCP very shortly, but I am stuck here, not able to move ahead because of this.

0 REPLIES 0

Connect with Databricks Users in Your Area

Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.

If there isn’t a group near you, start one and help create a community that brings people together.

Request a New Group