cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

A streaming job going into infinite looping

Sas
New Contributor II

Hi

Below i am trying to read data from kafka, determine whether its fraud or not and then i need to write it back to mongodb

below is my code read_kafka.py

from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.window import Window
from os.path import abspath
from geo_map import GEO_Map
from calculate_speed import *
 
warehouse_location = abspath('spark-warehouse')
spark = (
        SparkSession.builder.appName("Kafka mongo integration")
        .master("local[*]")
        .config("spark.sql.warehouse.dir", warehouse_location)\
        .config("spark.mongodb.read.connection.uri", "mongodb://172.31.85.183:27017/capstone.lookup_transaction?readPreference=primaryPreferred")\
        .config("spark.mongodb.write.connection.uri", "mongodb://172.31.85.183:27017/capstone.cardTransaction")\
        .enableHiveSupport() \
        .getOrCreate()
    )
 
kafka_df = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "18.211.252.152:9092") \
    .option("subscribe", "transactions-topic-verified") \
    .option("startingOffsets", "earliest") \
    .option("group_id","my-group")\
    .load()
 
 
 
 
kafka_schema = StructType([
    StructField("card_id", LongType(), True),
    StructField("member_id", LongType(), True),
    StructField("amount", DoubleType(), True),
    StructField("postcode", IntegerType(), True),
    StructField("pos_id", LongType(), True),
    StructField("transaction_dt", StringType(), True),
])
kafka_df = kafka_df.selectExpr("CAST(value AS STRING)")
 
kafka_df = kafka_df.select(from_json(kafka_df.value, kafka_schema).alias("json"))
 
kafka_df = kafka_df.select("json.*")
 
card_transaction_df=spark.read.format("mongodb").load().select("card_id","postcode","transaction_dt","UCL","score")
 
card_transaction_df.show(10)
 
schema = StructType() \
      .add("postcode",IntegerType(),True) \
      .add("lat",DoubleType(),True) \
      .add("lon",DoubleType(),True) \
      .add("city",StringType(),True) \
      .add("state",StringType(),True)
 
us_zip_df = spark.read.format("csv")\
                .options(inferSchema='True',delimiter=',') \
                .schema(schema)\
                .load("file:///home/hadoop/uszipsv.csv")\
                .select("postcode","lat","lon")
 
us_zip_df.show()
us_zip_df.printSchema()
 
 
card_transaction_df2 = card_transaction_df.join(us_zip_df,"postcode","left")\
                            .select("card_id",col("postcode").alias("postcode_old"),col("transaction_dt").alias("transaction_dt_old"),"UCL","score",col("lat").alias("lat_old"),col("lon").alias("lon_old"))
 
 
card_transaction_df2.printSchema()
 
kafka_df1 = kafka_df.join(card_transaction_df2,"card_id","left")
 
kafka_df1.printSchema()
 
 
 
kafka_df2 = kafka_df1.join(us_zip_df,"postcode","left")
 
ZIPCodeAnalysisUDF = udf(lambda w,x,y,z:ZIPCodeAnalysis(w,x,y,z),StringType())
 
kafka_df3=kafka_df2.withColumn("distance",ZIPCodeAnalysisUDF(col('postcode'),col('postcode_old'),to_timestamp(col('transaction_dt'),'dd-MM-yyyy HH:mm:ss'),col('transaction_dt_old')))
 
kafka_df3.printSchema()
 
kafka_df4=kafka_df3.filter(kafka_df3.card_id.isNotNull())\
                .withColumn("status",when(kafka_df3.amount>=kafka_df3.UCL,"FRAUD")\
                                    .when(kafka_df3.score<=200,"FRAUD")\
                                    .when(kafka_df3.distance == "GENUIN","FRAUD")\
                                    .otherwise("GENUINE")
                           )
 
kafka_df5=kafka_df4.filter(kafka_df4.status=="FRAUD")
 
 
 
kafka_query = kafka_df5 \
    .writeStream \
    .trigger(once=True)\
    .format("console") \
    .option("truncate", "false") \
    .start()\
    .awaitTermination()()

The issue i am facing is if i add code in line# 90 i.e. .when(kafka_df3.distance == "GENUIN","FRAUD")\

Code is going through an infine loop and no output is coming to console

However if i remove that one line, output is coming to console

There are two python program that i am calling from above pyspark code

geo_map.py

import math
import pandas as pd
 
 
class GEO_Map():
    """
    It hold the  map for zip code and its latitute and longitute
    """
    __instance = None
 
    @staticmethod
    def get_instance():
        """ Static access method. """
        if GEO_Map.__instance == None:
            GEO_Map()
        return GEO_Map.__instance
 
    def __init__(self):
        """ Virtually private constructor. """
        if GEO_Map.__instance != None:
            raise Exception("This class is a singleton!")
        else:
            GEO_Map.__instance = self
            self.map = pd.read_csv("uszipsv.csv", header=None, names=['A',"B",'C','D','E'])
            self.map['A'] =  self.map['A'].astype(str)
 
    def get_lat(self, pos_id):
        return self.map[self.map.A == pos_id ].B
 
    def get_long(self, pos_id):
        return self.map[self.map.A == pos_id ].C
 
    def distance(self, lat1, long1, lat2, long2):
        theta = long1 - long2
        dist = math.sin(self.deg2rad(lat1)) * math.sin(self.deg2rad(lat2)) + math.cos(self.deg2rad(lat1)) * math.cos(self.deg2rad(lat2)) * math.cos(self.deg2rad(theta))
        dist = math.acos(dist)
        dist = self.rad2deg(dist)
        dist = dist * 60 * 1.1515 * 1.609344
        return dist
 
    def rad2deg(self, rad):
        return rad * 180.0 / math.pi
 
    def deg2rad(self, deg):
        return deg * math.pi / 180.0
 

calculate_speed.py

import pandas as pd  
import numpy as np  
from geo_map import GEO_Map
from datetime import datetime
 
 
format_string = "%d-%m-%Y %H:%M:%S"
geo_obj = GEO_Map()
def getUCLScore(last_trans_list): 
    if len(last_trans_list)==0: 
        return 0 
    final_trans_df = pd.DataFrame(last_trans_list) 
    moving_average = final_trans_df['amount'].rolling(window=10).mean().iloc[-1]
    standard_deviation = final_trans_df['amount'].std()
    UCL_Score = moving_average+3*standard_deviation 
    return UCL_Score 
 
def ZIPCodeAnalysis(currposid,pastposid,currtransdt,pasttransdt): 
    currposid = str(currposid) 
    pastposid = str(pastposid)
    curr_lat = geo_obj.get_lat(currposid).to_frame()["B"]
    curr_long = geo_obj.get_long(currposid).to_frame()["C"]
    past_lat = geo_obj.get_lat(pastposid).to_frame()["B"]
    past_long = geo_obj.get_long(pastposid).to_frame()["C"]
    
    if len(curr_lat) and len(curr_long) and len(past_lat) and len(past_long): 
        curr_lat = curr_lat.iloc[-1] 
        curr_long = curr_long.iloc[-1] 
        past_lat = past_lat.iloc[-1] 
        past_long = past_long.iloc[-1] 
    else: 
        return False 
    print("*******************************************************")
 
    # print("type is",type(curr_lat))
    print(curr_lat)
    print("*******************************************************")
    
    total_dist = geo_obj.distance(curr_lat,curr_long,past_lat,past_long) 
    print(total_dist)
    time_diff = currtransdt - pasttransdt 
    print(currtransdt)
    print(pasttransdt)
    
    sec_diff = time_diff.total_seconds() 
    if sec_diff == 0:
        print("seconds difference is 0") 
        return False 
    total_speed = total_dist/(sec_diff/3600)
    
    if total_speed> 1152: 
        return False 
    else: 
        return True 
    # The fastest airplane for business or medical travel is currently the Cessna Citation X+ which has a top speed of Mach 0.935 or 717 mph (1,152 km/h). This aircraft is popular for business and medical travel due to its speed, range, and luxury amenities. Other fast options for business and medical travel include the Gulfstream G650 and the Bombardier Global 7500.

Can anyone please help here?

Thanks in adavnce

Saswata

1 REPLY 1

swethaNandan
Databricks Employee
Databricks Employee

Hi Saswata,

Can you remove the filter and see if it is printing output to console?

kafka_df5=kafka_df4.filter(kafka_df4.status=="FRAUD")

Thanks and Regards

Swetha Nandajan

Connect with Databricks Users in Your Area

Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.

If there isn’t a group near you, start one and help create a community that brings people together.

Request a New Group