Hi
Below i am trying to read data from kafka, determine whether its fraud or not and then i need to write it back to mongodb
below is my code read_kafka.py
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.window import Window
from os.path import abspath
from geo_map import GEO_Map
from calculate_speed import *
warehouse_location = abspath('spark-warehouse')
spark = (
SparkSession.builder.appName("Kafka mongo integration")
.master("local[*]")
.config("spark.sql.warehouse.dir", warehouse_location)\
.config("spark.mongodb.read.connection.uri", "mongodb://172.31.85.183:27017/capstone.lookup_transaction?readPreference=primaryPreferred")\
.config("spark.mongodb.write.connection.uri", "mongodb://172.31.85.183:27017/capstone.cardTransaction")\
.enableHiveSupport() \
.getOrCreate()
)
kafka_df = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "18.211.252.152:9092") \
.option("subscribe", "transactions-topic-verified") \
.option("startingOffsets", "earliest") \
.option("group_id","my-group")\
.load()
kafka_schema = StructType([
StructField("card_id", LongType(), True),
StructField("member_id", LongType(), True),
StructField("amount", DoubleType(), True),
StructField("postcode", IntegerType(), True),
StructField("pos_id", LongType(), True),
StructField("transaction_dt", StringType(), True),
])
kafka_df = kafka_df.selectExpr("CAST(value AS STRING)")
kafka_df = kafka_df.select(from_json(kafka_df.value, kafka_schema).alias("json"))
kafka_df = kafka_df.select("json.*")
card_transaction_df=spark.read.format("mongodb").load().select("card_id","postcode","transaction_dt","UCL","score")
card_transaction_df.show(10)
schema = StructType() \
.add("postcode",IntegerType(),True) \
.add("lat",DoubleType(),True) \
.add("lon",DoubleType(),True) \
.add("city",StringType(),True) \
.add("state",StringType(),True)
us_zip_df = spark.read.format("csv")\
.options(inferSchema='True',delimiter=',') \
.schema(schema)\
.load("file:///home/hadoop/uszipsv.csv")\
.select("postcode","lat","lon")
us_zip_df.show()
us_zip_df.printSchema()
card_transaction_df2 = card_transaction_df.join(us_zip_df,"postcode","left")\
.select("card_id",col("postcode").alias("postcode_old"),col("transaction_dt").alias("transaction_dt_old"),"UCL","score",col("lat").alias("lat_old"),col("lon").alias("lon_old"))
card_transaction_df2.printSchema()
kafka_df1 = kafka_df.join(card_transaction_df2,"card_id","left")
kafka_df1.printSchema()
kafka_df2 = kafka_df1.join(us_zip_df,"postcode","left")
ZIPCodeAnalysisUDF = udf(lambda w,x,y,z:ZIPCodeAnalysis(w,x,y,z),StringType())
kafka_df3=kafka_df2.withColumn("distance",ZIPCodeAnalysisUDF(col('postcode'),col('postcode_old'),to_timestamp(col('transaction_dt'),'dd-MM-yyyy HH:mm:ss'),col('transaction_dt_old')))
kafka_df3.printSchema()
kafka_df4=kafka_df3.filter(kafka_df3.card_id.isNotNull())\
.withColumn("status",when(kafka_df3.amount>=kafka_df3.UCL,"FRAUD")\
.when(kafka_df3.score<=200,"FRAUD")\
.when(kafka_df3.distance == "GENUIN","FRAUD")\
.otherwise("GENUINE")
)
kafka_df5=kafka_df4.filter(kafka_df4.status=="FRAUD")
kafka_query = kafka_df5 \
.writeStream \
.trigger(once=True)\
.format("console") \
.option("truncate", "false") \
.start()\
.awaitTermination()()
The issue i am facing is if i add code in line# 90 i.e. .when(kafka_df3.distance == "GENUIN","FRAUD")\
Code is going through an infine loop and no output is coming to console
However if i remove that one line, output is coming to console
There are two python program that i am calling from above pyspark code
geo_map.py
import math
import pandas as pd
class GEO_Map():
"""
It hold the map for zip code and its latitute and longitute
"""
__instance = None
@staticmethod
def get_instance():
""" Static access method. """
if GEO_Map.__instance == None:
GEO_Map()
return GEO_Map.__instance
def __init__(self):
""" Virtually private constructor. """
if GEO_Map.__instance != None:
raise Exception("This class is a singleton!")
else:
GEO_Map.__instance = self
self.map = pd.read_csv("uszipsv.csv", header=None, names=['A',"B",'C','D','E'])
self.map['A'] = self.map['A'].astype(str)
def get_lat(self, pos_id):
return self.map[self.map.A == pos_id ].B
def get_long(self, pos_id):
return self.map[self.map.A == pos_id ].C
def distance(self, lat1, long1, lat2, long2):
theta = long1 - long2
dist = math.sin(self.deg2rad(lat1)) * math.sin(self.deg2rad(lat2)) + math.cos(self.deg2rad(lat1)) * math.cos(self.deg2rad(lat2)) * math.cos(self.deg2rad(theta))
dist = math.acos(dist)
dist = self.rad2deg(dist)
dist = dist * 60 * 1.1515 * 1.609344
return dist
def rad2deg(self, rad):
return rad * 180.0 / math.pi
def deg2rad(self, deg):
return deg * math.pi / 180.0
calculate_speed.py
import pandas as pd
import numpy as np
from geo_map import GEO_Map
from datetime import datetime
format_string = "%d-%m-%Y %H:%M:%S"
geo_obj = GEO_Map()
def getUCLScore(last_trans_list):
if len(last_trans_list)==0:
return 0
final_trans_df = pd.DataFrame(last_trans_list)
moving_average = final_trans_df['amount'].rolling(window=10).mean().iloc[-1]
standard_deviation = final_trans_df['amount'].std()
UCL_Score = moving_average+3*standard_deviation
return UCL_Score
def ZIPCodeAnalysis(currposid,pastposid,currtransdt,pasttransdt):
currposid = str(currposid)
pastposid = str(pastposid)
curr_lat = geo_obj.get_lat(currposid).to_frame()["B"]
curr_long = geo_obj.get_long(currposid).to_frame()["C"]
past_lat = geo_obj.get_lat(pastposid).to_frame()["B"]
past_long = geo_obj.get_long(pastposid).to_frame()["C"]
if len(curr_lat) and len(curr_long) and len(past_lat) and len(past_long):
curr_lat = curr_lat.iloc[-1]
curr_long = curr_long.iloc[-1]
past_lat = past_lat.iloc[-1]
past_long = past_long.iloc[-1]
else:
return False
print("*******************************************************")
# print("type is",type(curr_lat))
print(curr_lat)
print("*******************************************************")
total_dist = geo_obj.distance(curr_lat,curr_long,past_lat,past_long)
print(total_dist)
time_diff = currtransdt - pasttransdt
print(currtransdt)
print(pasttransdt)
sec_diff = time_diff.total_seconds()
if sec_diff == 0:
print("seconds difference is 0")
return False
total_speed = total_dist/(sec_diff/3600)
if total_speed> 1152:
return False
else:
return True
# The fastest airplane for business or medical travel is currently the Cessna Citation X+ which has a top speed of Mach 0.935 or 717 mph (1,152 km/h). This aircraft is popular for business and medical travel due to its speed, range, and luxury amenities. Other fast options for business and medical travel include the Gulfstream G650 and the Bombardier Global 7500.
Can anyone please help here?
Thanks in adavnce
Saswata