Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
Showing results for 
Search instead for 
Did you mean: 

A streaming job going into infinite looping

New Contributor II


Below i am trying to read data from kafka, determine whether its fraud or not and then i need to write it back to mongodb

below is my code

from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.window import Window
from os.path import abspath
from geo_map import GEO_Map
from calculate_speed import *
warehouse_location = abspath('spark-warehouse')
spark = (
        SparkSession.builder.appName("Kafka mongo integration")
        .config("spark.sql.warehouse.dir", warehouse_location)\
        .config("", "mongodb://")\
        .config("spark.mongodb.write.connection.uri", "mongodb://")\
        .enableHiveSupport() \
kafka_df = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "") \
    .option("subscribe", "transactions-topic-verified") \
    .option("startingOffsets", "earliest") \
kafka_schema = StructType([
    StructField("card_id", LongType(), True),
    StructField("member_id", LongType(), True),
    StructField("amount", DoubleType(), True),
    StructField("postcode", IntegerType(), True),
    StructField("pos_id", LongType(), True),
    StructField("transaction_dt", StringType(), True),
kafka_df = kafka_df.selectExpr("CAST(value AS STRING)")
kafka_df =, kafka_schema).alias("json"))
kafka_df ="json.*")"mongodb").load().select("card_id","postcode","transaction_dt","UCL","score")
schema = StructType() \
      .add("postcode",IntegerType(),True) \
      .add("lat",DoubleType(),True) \
      .add("lon",DoubleType(),True) \
      .add("city",StringType(),True) \
us_zip_df ="csv")\
                .options(inferSchema='True',delimiter=',') \
card_transaction_df2 = card_transaction_df.join(us_zip_df,"postcode","left")\
kafka_df1 = kafka_df.join(card_transaction_df2,"card_id","left")
kafka_df2 = kafka_df1.join(us_zip_df,"postcode","left")
ZIPCodeAnalysisUDF = udf(lambda w,x,y,z:ZIPCodeAnalysis(w,x,y,z),StringType())
kafka_df3=kafka_df2.withColumn("distance",ZIPCodeAnalysisUDF(col('postcode'),col('postcode_old'),to_timestamp(col('transaction_dt'),'dd-MM-yyyy HH:mm:ss'),col('transaction_dt_old')))
                                    .when(kafka_df3.distance == "GENUIN","FRAUD")\
kafka_query = kafka_df5 \
    .writeStream \
    .format("console") \
    .option("truncate", "false") \

The issue i am facing is if i add code in line# 90 i.e. .when(kafka_df3.distance == "GENUIN","FRAUD")\

Code is going through an infine loop and no output is coming to console

However if i remove that one line, output is coming to console

There are two python program that i am calling from above pyspark code

import math
import pandas as pd
class GEO_Map():
    It hold the  map for zip code and its latitute and longitute
    __instance = None
    def get_instance():
        """ Static access method. """
        if GEO_Map.__instance == None:
        return GEO_Map.__instance
    def __init__(self):
        """ Virtually private constructor. """
        if GEO_Map.__instance != None:
            raise Exception("This class is a singleton!")
            GEO_Map.__instance = self
   = pd.read_csv("uszipsv.csv", header=None, names=['A',"B",'C','D','E'])
  ['A'] =['A'].astype(str)
    def get_lat(self, pos_id):
        return[ == pos_id ].B
    def get_long(self, pos_id):
        return[ == pos_id ].C
    def distance(self, lat1, long1, lat2, long2):
        theta = long1 - long2
        dist = math.sin(self.deg2rad(lat1)) * math.sin(self.deg2rad(lat2)) + math.cos(self.deg2rad(lat1)) * math.cos(self.deg2rad(lat2)) * math.cos(self.deg2rad(theta))
        dist = math.acos(dist)
        dist = self.rad2deg(dist)
        dist = dist * 60 * 1.1515 * 1.609344
        return dist
    def rad2deg(self, rad):
        return rad * 180.0 / math.pi
    def deg2rad(self, deg):
        return deg * math.pi / 180.0

import pandas as pd  
import numpy as np  
from geo_map import GEO_Map
from datetime import datetime
format_string = "%d-%m-%Y %H:%M:%S"
geo_obj = GEO_Map()
def getUCLScore(last_trans_list): 
    if len(last_trans_list)==0: 
        return 0 
    final_trans_df = pd.DataFrame(last_trans_list) 
    moving_average = final_trans_df['amount'].rolling(window=10).mean().iloc[-1]
    standard_deviation = final_trans_df['amount'].std()
    UCL_Score = moving_average+3*standard_deviation 
    return UCL_Score 
def ZIPCodeAnalysis(currposid,pastposid,currtransdt,pasttransdt): 
    currposid = str(currposid) 
    pastposid = str(pastposid)
    curr_lat = geo_obj.get_lat(currposid).to_frame()["B"]
    curr_long = geo_obj.get_long(currposid).to_frame()["C"]
    past_lat = geo_obj.get_lat(pastposid).to_frame()["B"]
    past_long = geo_obj.get_long(pastposid).to_frame()["C"]
    if len(curr_lat) and len(curr_long) and len(past_lat) and len(past_long): 
        curr_lat = curr_lat.iloc[-1] 
        curr_long = curr_long.iloc[-1] 
        past_lat = past_lat.iloc[-1] 
        past_long = past_long.iloc[-1] 
        return False 
    # print("type is",type(curr_lat))
    total_dist = geo_obj.distance(curr_lat,curr_long,past_lat,past_long) 
    time_diff = currtransdt - pasttransdt 
    sec_diff = time_diff.total_seconds() 
    if sec_diff == 0:
        print("seconds difference is 0") 
        return False 
    total_speed = total_dist/(sec_diff/3600)
    if total_speed> 1152: 
        return False 
        return True 
    # The fastest airplane for business or medical travel is currently the Cessna Citation X+ which has a top speed of Mach 0.935 or 717 mph (1,152 km/h). This aircraft is popular for business and medical travel due to its speed, range, and luxury amenities. Other fast options for business and medical travel include the Gulfstream G650 and the Bombardier Global 7500.

Can anyone please help here?

Thanks in adavnce



New Contributor III
New Contributor III

Hi Saswata,

Can you remove the filter and see if it is printing output to console?


Thanks and Regards

Swetha Nandajan

Join 100K+ Data Experts: Register Now & Grow with Us!

Excited to expand your horizons with us? Click here to Register and begin your journey to success!

Already a member? Login and join your local regional user group! If there isn’t one near you, fill out this form and we’ll create one for you to join!