cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
cancel
Showing results for 
Search instead for 
Did you mean: 

foreachBatch in pyspark throwing OSError: [WinError 10022] An invalid argument was supplied

Anonymous
Not applicable

Hello Team,

Since last 3 weeks I am trying to move my project from batch to structure streaming.

But every time I am running my code I am getting below error:

Traceback (most recent call last): File "C:\Users\avisriva1\git_4May2021\comint-ml-scores\src\scores_common_functions\ml_nlp_streams.py", line 67, in <module> jsonFromKafka.writeStream.foreachBatch(foreach_batch_function).start(); File "C:\Users\avisriva1\AppData\Local\Programs\Python\Python39\lib\site-packages\pyspark\sql\streaming.py", line 1421, in "foreachBatch" ensure_callback_server_started(gw)

File "C:\Users\avisriva1\AppData\Local\Programs\Python\Python39\lib\site-packages\pyspark\java_gateway.py", line 230, in ensure_callback_server_started cbport = gw._callback_server.server_socket.getsockname()[1] OSError: [WinError 10022] An invalid argument was supplied During handling of the above exception, another exception occurred: Traceback (most recent call last):

File "C:\Users\avisriva1\git_4May2021\comint-ml-scores\src\scores_common_functions\ml_nlp_streams.py", line 69, in <module> raise

Exception(">>>>>>>>>>>>>>>>>>>>", e); Exception: ('>>>>>>>>>>>>>>>>>>>>', OSError(10022, 'An invalid argument was supplied', None, 10022, None))

PFB my code that I am using

def foreach_batch_function(df, epoch_id):

print("\nProcesing Batch : ", epoch_id);

pass

# end of foreach_batch_function()

def get_input_schema():

'''Copy-Quality Columns'''

schema = StructType().add("fileName", StringType()).add("date", StringType()).add("brandName", StringType()).add("breadcrumb", ArrayType(StringType())).add("subcategory", StringType()).add("scoring_subcategory", StringType()).add("bullet_features", StringType()).add("product_description", StringType()).add("rich_content_text", StringType()).add("currency_code", StringType());

return schema;

# end of get_input_schema()

if name == "main":

spark = get_spark_session_for_streaming();

lines_from_kafka = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "localhost:9092, localhost:9093").option("subscribe", "first_topic").option("starting_offsets", "earliest").load();

'''Get Schema For Input'''

required_schema = get_input_schema();

'''Read Input JSON'''

obj_from_kafka = lines_from_kafka.select(from_json(col("value").cast("string"),

required_schema).alias("jsonObj"));

json_from_kafka = objFromKafka.selectExpr("jsonObj.brandName as brand_name", "jsonObj.subcategory as subcategory", "jsonObj.scoringSubcategory as scoring_subcategory", "jsonObj.date as date", "jsonObj.breadcrumb as breadcrumb", "jsonObj.fileName as FileName", "jsonObj.bulletFeatures as bullet_features", "jsonObj.productDescription as product_description", "jsonObj.richContentText as rich_content_text", "jsonObj.currencyCode as currencyCode")

try:

jsonFromKafka.writeStream.foreachBatch(foreach_batch_function).start();

except Exception as e:

raise Exception(">>>>>>>>>>>>>>>>>>>>", e);

# end of main()

It is requested to please help me fix this issue. We have to move our Batch product to structured streaming on GCP very shortly, but I am stuck here, not able to move ahead because of this.

0 REPLIES 0
Welcome to Databricks Community: Lets learn, network and celebrate together

Join our fast-growing data practitioner and expert community of 80K+ members, ready to discover, help and collaborate together while making meaningful connections. 

Click here to register and join today! 

Engage in exciting technical discussions, join a group with your peers and meet our Featured Members.