<?xml version="1.0" encoding="UTF-8"?>
<rss xmlns:content="http://purl.org/rss/1.0/modules/content/" xmlns:dc="http://purl.org/dc/elements/1.1/" xmlns:rdf="http://www.w3.org/1999/02/22-rdf-syntax-ns#" xmlns:taxo="http://purl.org/rss/1.0/modules/taxonomy/" version="2.0">
  <channel>
    <title>topic Re: A streaming job going into infinite looping in Data Engineering</title>
    <link>https://community.databricks.com/t5/data-engineering/a-streaming-job-going-into-infinite-looping/m-p/4401#M1141</link>
    <description>&lt;P&gt;Hi Saswata,&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;Can you remove the filter and see if it is printing output to console?&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;kafka_df5=kafka_df4.filter(kafka_df4.status=="FRAUD")&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;Thanks and Regards&lt;/P&gt;&lt;P&gt;Swetha Nandajan&lt;/P&gt;&lt;P&gt;&lt;/P&gt;</description>
    <pubDate>Wed, 14 Jun 2023 17:35:00 GMT</pubDate>
    <dc:creator>swethaNandan</dc:creator>
    <dc:date>2023-06-14T17:35:00Z</dc:date>
    <item>
      <title>A streaming job going into infinite looping</title>
      <link>https://community.databricks.com/t5/data-engineering/a-streaming-job-going-into-infinite-looping/m-p/4400#M1140</link>
      <description>&lt;P&gt;Hi&lt;/P&gt;&lt;P&gt;Below i am trying to read data from kafka, determine whether its fraud or not and then i need to write it back to mongodb&lt;/P&gt;&lt;P&gt;below is my code read_kafka.py&lt;/P&gt;&lt;PRE&gt;&lt;CODE&gt;from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.window import Window
from os.path import abspath
from geo_map import GEO_Map
from calculate_speed import *
&amp;nbsp;
warehouse_location = abspath('spark-warehouse')
spark = (
        SparkSession.builder.appName("Kafka mongo integration")
        .master("local[*]")
        .config("spark.sql.warehouse.dir", warehouse_location)\
        .config("spark.mongodb.read.connection.uri", "mongodb://172.31.85.183:27017/capstone.lookup_transaction?readPreference=primaryPreferred")\
        .config("spark.mongodb.write.connection.uri", "mongodb://172.31.85.183:27017/capstone.cardTransaction")\
        .enableHiveSupport() \
        .getOrCreate()
    )
&amp;nbsp;
kafka_df = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "18.211.252.152:9092") \
    .option("subscribe", "transactions-topic-verified") \
    .option("startingOffsets", "earliest") \
    .option("group_id","my-group")\
    .load()
&amp;nbsp;
&amp;nbsp;
&amp;nbsp;
&amp;nbsp;
kafka_schema = StructType([
    StructField("card_id", LongType(), True),
    StructField("member_id", LongType(), True),
    StructField("amount", DoubleType(), True),
    StructField("postcode", IntegerType(), True),
    StructField("pos_id", LongType(), True),
    StructField("transaction_dt", StringType(), True),
])
kafka_df = kafka_df.selectExpr("CAST(value AS STRING)")
&amp;nbsp;
kafka_df = kafka_df.select(from_json(kafka_df.value, kafka_schema).alias("json"))
&amp;nbsp;
kafka_df = kafka_df.select("json.*")
&amp;nbsp;
card_transaction_df=spark.read.format("mongodb").load().select("card_id","postcode","transaction_dt","UCL","score")
&amp;nbsp;
card_transaction_df.show(10)
&amp;nbsp;
schema = StructType() \
      .add("postcode",IntegerType(),True) \
      .add("lat",DoubleType(),True) \
      .add("lon",DoubleType(),True) \
      .add("city",StringType(),True) \
      .add("state",StringType(),True)
&amp;nbsp;
us_zip_df = spark.read.format("csv")\
                .options(inferSchema='True',delimiter=',') \
                .schema(schema)\
                .load("file:///home/hadoop/uszipsv.csv")\
                .select("postcode","lat","lon")
&amp;nbsp;
us_zip_df.show()
us_zip_df.printSchema()
&amp;nbsp;
&amp;nbsp;
card_transaction_df2 = card_transaction_df.join(us_zip_df,"postcode","left")\
                            .select("card_id",col("postcode").alias("postcode_old"),col("transaction_dt").alias("transaction_dt_old"),"UCL","score",col("lat").alias("lat_old"),col("lon").alias("lon_old"))
&amp;nbsp;
&amp;nbsp;
card_transaction_df2.printSchema()
&amp;nbsp;
kafka_df1 = kafka_df.join(card_transaction_df2,"card_id","left")
&amp;nbsp;
kafka_df1.printSchema()
&amp;nbsp;
&amp;nbsp;
&amp;nbsp;
kafka_df2 = kafka_df1.join(us_zip_df,"postcode","left")
&amp;nbsp;
ZIPCodeAnalysisUDF = udf(lambda w,x,y,z:ZIPCodeAnalysis(w,x,y,z),StringType())
&amp;nbsp;
kafka_df3=kafka_df2.withColumn("distance",ZIPCodeAnalysisUDF(col('postcode'),col('postcode_old'),to_timestamp(col('transaction_dt'),'dd-MM-yyyy HH:mm:ss'),col('transaction_dt_old')))
&amp;nbsp;
kafka_df3.printSchema()
&amp;nbsp;
kafka_df4=kafka_df3.filter(kafka_df3.card_id.isNotNull())\
                .withColumn("status",when(kafka_df3.amount&amp;gt;=kafka_df3.UCL,"FRAUD")\
                                    .when(kafka_df3.score&amp;lt;=200,"FRAUD")\
                                    .when(kafka_df3.distance == "GENUIN","FRAUD")\
                                    .otherwise("GENUINE")
                           )
&amp;nbsp;
kafka_df5=kafka_df4.filter(kafka_df4.status=="FRAUD")
&amp;nbsp;
&amp;nbsp;
&amp;nbsp;
kafka_query = kafka_df5 \
    .writeStream \
    .trigger(once=True)\
    .format("console") \
    .option("truncate", "false") \
    .start()\
    .awaitTermination()()&lt;/CODE&gt;&lt;/PRE&gt;&lt;P&gt;The issue i am facing is if i add code in line# 90 i.e.  .when(kafka_df3.distance == "GENUIN","FRAUD")\&lt;/P&gt;&lt;P&gt;Code is going through an infine loop and no output is coming to console&lt;/P&gt;&lt;P&gt;However if i remove that one line, output is coming to console&lt;/P&gt;&lt;P&gt;There are two python program that i am calling from above pyspark code&lt;/P&gt;&lt;P&gt;geo_map.py&lt;/P&gt;&lt;PRE&gt;&lt;CODE&gt;import math
import pandas as pd
&amp;nbsp;
&amp;nbsp;
class GEO_Map():
    """
    It hold the  map for zip code and its latitute and longitute
    """
    __instance = None
&amp;nbsp;
    @staticmethod
    def get_instance():
        """ Static access method. """
        if GEO_Map.__instance == None:
            GEO_Map()
        return GEO_Map.__instance
&amp;nbsp;
    def __init__(self):
        """ Virtually private constructor. """
        if GEO_Map.__instance != None:
            raise Exception("This class is a singleton!")
        else:
            GEO_Map.__instance = self
            self.map = pd.read_csv("uszipsv.csv", header=None, names=['A',"B",'C','D','E'])
            self.map['A'] =  self.map['A'].astype(str)
&amp;nbsp;
    def get_lat(self, pos_id):
        return self.map[self.map.A == pos_id ].B
&amp;nbsp;
    def get_long(self, pos_id):
        return self.map[self.map.A == pos_id ].C
&amp;nbsp;
    def distance(self, lat1, long1, lat2, long2):
        theta = long1 - long2
        dist = math.sin(self.deg2rad(lat1)) * math.sin(self.deg2rad(lat2)) + math.cos(self.deg2rad(lat1)) * math.cos(self.deg2rad(lat2)) * math.cos(self.deg2rad(theta))
        dist = math.acos(dist)
        dist = self.rad2deg(dist)
        dist = dist * 60 * 1.1515 * 1.609344
        return dist
&amp;nbsp;
    def rad2deg(self, rad):
        return rad * 180.0 / math.pi
&amp;nbsp;
    def deg2rad(self, deg):
        return deg * math.pi / 180.0
&amp;nbsp;&lt;/CODE&gt;&lt;/PRE&gt;&lt;P&gt;calculate_speed.py&lt;/P&gt;&lt;PRE&gt;&lt;CODE&gt;import pandas as pd  
import numpy as np  
from geo_map import GEO_Map
from datetime import datetime
&amp;nbsp;
&amp;nbsp;
format_string = "%d-%m-%Y %H:%M:%S"
geo_obj = GEO_Map()
def getUCLScore(last_trans_list): 
    if len(last_trans_list)==0: 
        return 0 
    final_trans_df = pd.DataFrame(last_trans_list) 
    moving_average = final_trans_df['amount'].rolling(window=10).mean().iloc[-1]
    standard_deviation = final_trans_df['amount'].std()
    UCL_Score = moving_average+3*standard_deviation 
    return UCL_Score 
&amp;nbsp;
def ZIPCodeAnalysis(currposid,pastposid,currtransdt,pasttransdt): 
    currposid = str(currposid) 
    pastposid = str(pastposid)
    curr_lat = geo_obj.get_lat(currposid).to_frame()["B"]
    curr_long = geo_obj.get_long(currposid).to_frame()["C"]
    past_lat = geo_obj.get_lat(pastposid).to_frame()["B"]
    past_long = geo_obj.get_long(pastposid).to_frame()["C"]
    
    if len(curr_lat) and len(curr_long) and len(past_lat) and len(past_long): 
        curr_lat = curr_lat.iloc[-1] 
        curr_long = curr_long.iloc[-1] 
        past_lat = past_lat.iloc[-1] 
        past_long = past_long.iloc[-1] 
    else: 
        return False 
    print("*******************************************************")
&amp;nbsp;
    # print("type is",type(curr_lat))
    print(curr_lat)
    print("*******************************************************")
    
    total_dist = geo_obj.distance(curr_lat,curr_long,past_lat,past_long) 
    print(total_dist)
    time_diff = currtransdt - pasttransdt 
    print(currtransdt)
    print(pasttransdt)
    
    sec_diff = time_diff.total_seconds() 
    if sec_diff == 0:
        print("seconds difference is 0") 
        return False 
    total_speed = total_dist/(sec_diff/3600)
    
    if total_speed&amp;gt; 1152: 
        return False 
    else: 
        return True 
    # The fastest airplane for business or medical travel is currently the Cessna Citation X+ which has a top speed of Mach 0.935 or 717 mph (1,152 km/h). This aircraft is popular for business and medical travel due to its speed, range, and luxury amenities. Other fast options for business and medical travel include the Gulfstream G650 and the Bombardier Global 7500.&lt;/CODE&gt;&lt;/PRE&gt;&lt;P&gt;Can anyone please help here?&lt;/P&gt;&lt;P&gt;Thanks in adavnce&lt;/P&gt;&lt;P&gt;Saswata&lt;/P&gt;</description>
      <pubDate>Mon, 15 May 2023 06:12:32 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/a-streaming-job-going-into-infinite-looping/m-p/4400#M1140</guid>
      <dc:creator>Sas</dc:creator>
      <dc:date>2023-05-15T06:12:32Z</dc:date>
    </item>
    <item>
      <title>Re: A streaming job going into infinite looping</title>
      <link>https://community.databricks.com/t5/data-engineering/a-streaming-job-going-into-infinite-looping/m-p/4401#M1141</link>
      <description>&lt;P&gt;Hi Saswata,&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;Can you remove the filter and see if it is printing output to console?&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;kafka_df5=kafka_df4.filter(kafka_df4.status=="FRAUD")&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;Thanks and Regards&lt;/P&gt;&lt;P&gt;Swetha Nandajan&lt;/P&gt;&lt;P&gt;&lt;/P&gt;</description>
      <pubDate>Wed, 14 Jun 2023 17:35:00 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/a-streaming-job-going-into-infinite-looping/m-p/4401#M1141</guid>
      <dc:creator>swethaNandan</dc:creator>
      <dc:date>2023-06-14T17:35:00Z</dc:date>
    </item>
  </channel>
</rss>

