Hello Team,
Since last 3 weeks I am trying to move my project from batch to structure streaming.
But every time I am running my code I am getting below error:
Traceback (most recent call last): File "C:\Users\avisriva1\git_4May2021\comint-ml-scores\src\scores_common_functions\ml_nlp_streams.py", line 67, in <module> jsonFromKafka.writeStream.foreachBatch(foreach_batch_function).start(); File "C:\Users\avisriva1\AppData\Local\Programs\Python\Python39\lib\site-packages\pyspark\sql\streaming.py", line 1421, in "foreachBatch" ensure_callback_server_started(gw)
File "C:\Users\avisriva1\AppData\Local\Programs\Python\Python39\lib\site-packages\pyspark\java_gateway.py", line 230, in ensure_callback_server_started cbport = gw._callback_server.server_socket.getsockname()[1] OSError: [WinError 10022] An invalid argument was supplied During handling of the above exception, another exception occurred: Traceback (most recent call last):
File "C:\Users\avisriva1\git_4May2021\comint-ml-scores\src\scores_common_functions\ml_nlp_streams.py", line 69, in <module> raise
Exception(">>>>>>>>>>>>>>>>>>>>", e); Exception: ('>>>>>>>>>>>>>>>>>>>>', OSError(10022, 'An invalid argument was supplied', None, 10022, None))
PFB my code that I am using
def foreach_batch_function(df, epoch_id):
print("\nProcesing Batch : ", epoch_id);
pass
# end of foreach_batch_function()
def get_input_schema():
'''Copy-Quality Columns'''
schema = StructType().add("fileName", StringType()).add("date", StringType()).add("brandName", StringType()).add("breadcrumb", ArrayType(StringType())).add("subcategory", StringType()).add("scoring_subcategory", StringType()).add("bullet_features", StringType()).add("product_description", StringType()).add("rich_content_text", StringType()).add("currency_code", StringType());
return schema;
# end of get_input_schema()
if name == "main":
spark = get_spark_session_for_streaming();
lines_from_kafka = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "localhost:9092, localhost:9093").option("subscribe", "first_topic").option("starting_offsets", "earliest").load();
'''Get Schema For Input'''
required_schema = get_input_schema();
'''Read Input JSON'''
obj_from_kafka = lines_from_kafka.select(from_json(col("value").cast("string"),
required_schema).alias("jsonObj"));
json_from_kafka = objFromKafka.selectExpr("jsonObj.brandName as brand_name", "jsonObj.subcategory as subcategory", "jsonObj.scoringSubcategory as scoring_subcategory", "jsonObj.date as date", "jsonObj.breadcrumb as breadcrumb", "jsonObj.fileName as FileName", "jsonObj.bulletFeatures as bullet_features", "jsonObj.productDescription as product_description", "jsonObj.richContentText as rich_content_text", "jsonObj.currencyCode as currencyCode")
try:
jsonFromKafka.writeStream.foreachBatch(foreach_batch_function).start();
except Exception as e:
raise Exception(">>>>>>>>>>>>>>>>>>>>", e);
# end of main()
It is requested to please help me fix this issue. We have to move our Batch product to structured streaming on GCP very shortly, but I am stuck here, not able to move ahead because of this.