cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

Delta sharing json predicate doesn't work

drag7ter
Contributor

I'm trying to push predicates via python delta_sharing pkg: https://github.com/delta-io/delta-sharing/tree/main/python/delta_sharing

and delta sharing protocol: https://github.com/delta-io/delta-sharing/blob/main/PROTOCOL.md#json-predicates-for-filtering
And get not all rows from the table. As far as I understand in my delta table there are 8 parquet files, which are in json delta log with all needed statistics.
From 8 files only 1 contains my data that I push in predicate to filter:

Min Values: {'PropertyID': '1072897', 'RespondentID': 60481680, 'RespondentName': '', 'RespondentEmail': '', 'DateResponded': '2021-02-02T10:30:21.000Z', 'Type': 'Design', 'QuestionID': 839, 'QuestionType': 'Categorical', 'ResponseValue': '0', 'VerbatimText': ''}
▪️ Max Values: {'PropertyID': '1203209', 'RespondentID': 68543688, 'RespondentName': 'QWERT', 'RespondentEmail': 'qqq@qqq.com', 'DateResponded': '2021-11-03T12:17:57.000Z', 'Type': 'Work Order', 'QuestionID': 12383, 'QuestionType': 'Text', 'ResponseValue': '95',  'VerbatimText': 'wouldnt really respond to us '}

Here is my code with json predicate:

import delta_sharing
import json

date_responded = json.dumps([{
        "op": "equal",
        "children": [
            {"op": "column", "name": "DateResponded", "valueType": "timestamp"},
            {"op": "literal", "value": "2021-08-05T19:20:02.000Z", "valueType": "timestamp"}
        ]
    }])

config_file = "D:\config.share"
client = delta_sharing.SharingClient(config_file)

shares = client.list_shares()
share = shares[0]
schemas = client.list_schemas(share)
schema = schemas[0]
tables = client.list_tables(schema)
table = tables[0]
table_url = f"{config_file}#{share.name}.{schema.name}.{table.name}"

df = delta_sharing.load_as_pandas(table_url, use_delta_format=True, jsonPredicateHints=date_responded)

print(f"Table count: {len(df.index)}")

Also I tried to pass as in the example: https://github.com/delta-io/delta-sharing

    date_responded = '''{
      "op": "equal",
      "children": [
        {"op": "column", "name":"DateResponded", "valueType":"timestamp"},
        {"op":"literal","value":"2021-08-05T19:20:02.000Z","valueType":"timestamp"}
      ]
    }'''

But the result always the same - total count I printed is:
Table count: 275744 rows - the total count of the whole table 8 parquet files
But it should be 11 rows, at least id delta sharing server returns 1 file it should be like in the file 33354 that contains my data:

File: part-00001-3762ca0c-0967-438b-9af2-58acdc1d35ca-c000.snappy.parquet
   ▪️ Num Records: 33354

 Also I ran in debug mode, but dont see the body with my json predicate, just post request to the delta sharing server url:
DEBUG:urllib3.connectionpool:https://ireland.cloud.databricks.com:443 "POST /api/2.0/delta-sharing/metastores/f7391b87-5c08-478b-97ec-5328e578iu89/shares/my_test_share/schemas/deltashares/tables/my_test_table/query HTTP/1.1" 200 None

What I'm doing wrong and how does it work internally when pass predicates?
Is there any other way not to get all tables data to pandas df?

2 REPLIES 2

szymon_dybczak
Esteemed Contributor III

Hi @drag7ter ,

PredicateHints are just hints, not enforced filters. They do not guarantee that the returned data will be filtered.

Check below thread for detailed discussion. Also check if your delta server has evaluate predicate hints flag set to true

https://community.databricks.com/t5/data-governance/filtering-partitioned-data-in-databricks-delta-s...

thx, for an explanation, but I still don't understand why do we need jsonPredicateHints  if there is no guarantee that delta sharing server returns less files, it is like a random decision made by server. In my case I want to reduce number of files transferred via network. But it doesn't work on huge table and Pandas df fails with OOM error.

Also I'm using internal delta sharing server in databricks account, not deployed one by myself. It is not visible for me as far as I understand and I'm not able to set  evaluatePredicateHints option to true in databricks UI?

Join Us as a Local Community Builder!

Passionate about hosting events and connecting people? Help us grow a vibrant local community—sign up today to get started!

Sign Up Now