cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
cancel
Showing results for 
Search instead for 
Did you mean: 

A streaming job going into infinite looping

Sas
New Contributor II

Hi

Below i am trying to read data from kafka, determine whether its fraud or not and then i need to write it back to mongodb

below is my code read_kafka.py

from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.window import Window
from os.path import abspath
from geo_map import GEO_Map
from calculate_speed import *
 
warehouse_location = abspath('spark-warehouse')
spark = (
        SparkSession.builder.appName("Kafka mongo integration")
        .master("local[*]")
        .config("spark.sql.warehouse.dir", warehouse_location)\
        .config("spark.mongodb.read.connection.uri", "mongodb://172.31.85.183:27017/capstone.lookup_transaction?readPreference=primaryPreferred")\
        .config("spark.mongodb.write.connection.uri", "mongodb://172.31.85.183:27017/capstone.cardTransaction")\
        .enableHiveSupport() \
        .getOrCreate()
    )
 
kafka_df = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "18.211.252.152:9092") \
    .option("subscribe", "transactions-topic-verified") \
    .option("startingOffsets", "earliest") \
    .option("group_id","my-group")\
    .load()
 
 
 
 
kafka_schema = StructType([
    StructField("card_id", LongType(), True),
    StructField("member_id", LongType(), True),
    StructField("amount", DoubleType(), True),
    StructField("postcode", IntegerType(), True),
    StructField("pos_id", LongType(), True),
    StructField("transaction_dt", StringType(), True),
])
kafka_df = kafka_df.selectExpr("CAST(value AS STRING)")
 
kafka_df = kafka_df.select(from_json(kafka_df.value, kafka_schema).alias("json"))
 
kafka_df = kafka_df.select("json.*")
 
card_transaction_df=spark.read.format("mongodb").load().select("card_id","postcode","transaction_dt","UCL","score")
 
card_transaction_df.show(10)
 
schema = StructType() \
      .add("postcode",IntegerType(),True) \
      .add("lat",DoubleType(),True) \
      .add("lon",DoubleType(),True) \
      .add("city",StringType(),True) \
      .add("state",StringType(),True)
 
us_zip_df = spark.read.format("csv")\
                .options(inferSchema='True',delimiter=',') \
                .schema(schema)\
                .load("file:///home/hadoop/uszipsv.csv")\
                .select("postcode","lat","lon")
 
us_zip_df.show()
us_zip_df.printSchema()
 
 
card_transaction_df2 = card_transaction_df.join(us_zip_df,"postcode","left")\
                            .select("card_id",col("postcode").alias("postcode_old"),col("transaction_dt").alias("transaction_dt_old"),"UCL","score",col("lat").alias("lat_old"),col("lon").alias("lon_old"))
 
 
card_transaction_df2.printSchema()
 
kafka_df1 = kafka_df.join(card_transaction_df2,"card_id","left")
 
kafka_df1.printSchema()
 
 
 
kafka_df2 = kafka_df1.join(us_zip_df,"postcode","left")
 
ZIPCodeAnalysisUDF = udf(lambda w,x,y,z:ZIPCodeAnalysis(w,x,y,z),StringType())
 
kafka_df3=kafka_df2.withColumn("distance",ZIPCodeAnalysisUDF(col('postcode'),col('postcode_old'),to_timestamp(col('transaction_dt'),'dd-MM-yyyy HH:mm:ss'),col('transaction_dt_old')))
 
kafka_df3.printSchema()
 
kafka_df4=kafka_df3.filter(kafka_df3.card_id.isNotNull())\
                .withColumn("status",when(kafka_df3.amount>=kafka_df3.UCL,"FRAUD")\
                                    .when(kafka_df3.score<=200,"FRAUD")\
                                    .when(kafka_df3.distance == "GENUIN","FRAUD")\
                                    .otherwise("GENUINE")
                           )
 
kafka_df5=kafka_df4.filter(kafka_df4.status=="FRAUD")
 
 
 
kafka_query = kafka_df5 \
    .writeStream \
    .trigger(once=True)\
    .format("console") \
    .option("truncate", "false") \
    .start()\
    .awaitTermination()()

The issue i am facing is if i add code in line# 90 i.e. .when(kafka_df3.distance == "GENUIN","FRAUD")\

Code is going through an infine loop and no output is coming to console

However if i remove that one line, output is coming to console

There are two python program that i am calling from above pyspark code

geo_map.py

import math
import pandas as pd
 
 
class GEO_Map():
    """
    It hold the  map for zip code and its latitute and longitute
    """
    __instance = None
 
    @staticmethod
    def get_instance():
        """ Static access method. """
        if GEO_Map.__instance == None:
            GEO_Map()
        return GEO_Map.__instance
 
    def __init__(self):
        """ Virtually private constructor. """
        if GEO_Map.__instance != None:
            raise Exception("This class is a singleton!")
        else:
            GEO_Map.__instance = self
            self.map = pd.read_csv("uszipsv.csv", header=None, names=['A',"B",'C','D','E'])
            self.map['A'] =  self.map['A'].astype(str)
 
    def get_lat(self, pos_id):
        return self.map[self.map.A == pos_id ].B
 
    def get_long(self, pos_id):
        return self.map[self.map.A == pos_id ].C
 
    def distance(self, lat1, long1, lat2, long2):
        theta = long1 - long2
        dist = math.sin(self.deg2rad(lat1)) * math.sin(self.deg2rad(lat2)) + math.cos(self.deg2rad(lat1)) * math.cos(self.deg2rad(lat2)) * math.cos(self.deg2rad(theta))
        dist = math.acos(dist)
        dist = self.rad2deg(dist)
        dist = dist * 60 * 1.1515 * 1.609344
        return dist
 
    def rad2deg(self, rad):
        return rad * 180.0 / math.pi
 
    def deg2rad(self, deg):
        return deg * math.pi / 180.0
 

calculate_speed.py

import pandas as pd  
import numpy as np  
from geo_map import GEO_Map
from datetime import datetime
 
 
format_string = "%d-%m-%Y %H:%M:%S"
geo_obj = GEO_Map()
def getUCLScore(last_trans_list): 
    if len(last_trans_list)==0: 
        return 0 
    final_trans_df = pd.DataFrame(last_trans_list) 
    moving_average = final_trans_df['amount'].rolling(window=10).mean().iloc[-1]
    standard_deviation = final_trans_df['amount'].std()
    UCL_Score = moving_average+3*standard_deviation 
    return UCL_Score 
 
def ZIPCodeAnalysis(currposid,pastposid,currtransdt,pasttransdt): 
    currposid = str(currposid) 
    pastposid = str(pastposid)
    curr_lat = geo_obj.get_lat(currposid).to_frame()["B"]
    curr_long = geo_obj.get_long(currposid).to_frame()["C"]
    past_lat = geo_obj.get_lat(pastposid).to_frame()["B"]
    past_long = geo_obj.get_long(pastposid).to_frame()["C"]
    
    if len(curr_lat) and len(curr_long) and len(past_lat) and len(past_long): 
        curr_lat = curr_lat.iloc[-1] 
        curr_long = curr_long.iloc[-1] 
        past_lat = past_lat.iloc[-1] 
        past_long = past_long.iloc[-1] 
    else: 
        return False 
    print("*******************************************************")
 
    # print("type is",type(curr_lat))
    print(curr_lat)
    print("*******************************************************")
    
    total_dist = geo_obj.distance(curr_lat,curr_long,past_lat,past_long) 
    print(total_dist)
    time_diff = currtransdt - pasttransdt 
    print(currtransdt)
    print(pasttransdt)
    
    sec_diff = time_diff.total_seconds() 
    if sec_diff == 0:
        print("seconds difference is 0") 
        return False 
    total_speed = total_dist/(sec_diff/3600)
    
    if total_speed> 1152: 
        return False 
    else: 
        return True 
    # The fastest airplane for business or medical travel is currently the Cessna Citation X+ which has a top speed of Mach 0.935 or 717 mph (1,152 km/h). This aircraft is popular for business and medical travel due to its speed, range, and luxury amenities. Other fast options for business and medical travel include the Gulfstream G650 and the Bombardier Global 7500.

Can anyone please help here?

Thanks in adavnce

Saswata

1 REPLY 1

swethaNandan
New Contributor III
New Contributor III

Hi Saswata,

Can you remove the filter and see if it is printing output to console?

kafka_df5=kafka_df4.filter(kafka_df4.status=="FRAUD")

Thanks and Regards

Swetha Nandajan

Welcome to Databricks Community: Lets learn, network and celebrate together

Join our fast-growing data practitioner and expert community of 80K+ members, ready to discover, help and collaborate together while making meaningful connections. 

Click here to register and join today! 

Engage in exciting technical discussions, join a group with your peers and meet our Featured Members.