cancel
Showing results for 
Search instead for 
Did you mean: 
Technical Blog
Explore in-depth articles, tutorials, and insights on data analytics and machine learning in the Databricks Technical Blog. Stay updated on industry trends, best practices, and advanced techniques.
cancel
Showing results for 
Search instead for 
Did you mean: 
SergeRielau
Databricks Employee
Databricks Employee

execute_immediate.png

Motivation

In Databricks, you have many means to compose and execute queries. You can:

  • Incrementally build a query and execute it using the DataFrame API
  • Use Python, Scala, or some supported other language to glue together a SQL string and use spark.sql() to compile and execute the SQL
  • In a variation of the above, you can also protect against SQL injection by using spark.sql() to pass different values to a parameterized SQL statement string.

All these solutions, however, require you to use a language outside of SQL to build the query.

If you prefer Python or Scala, this may be fine, but if you are into SQL you’re probably looking for a native solution. EXECUTE IMMEDIATE allows you to do just that.

Note: You can find a notebook with all the SQL here.

A concrete problem: Finding PRIMARY KEY violations

Note: PRIMARY KEY is Delta Tables feature, and the INFORMATION SCHEMA is a Unity Catalog feature.

Given a set of Delta tables in Unity Catalog with primary keys, and given only the name of the table as input:

Generate and execute a query that returns all the duplicate keys in a table.

What do we need to pull this of?

  1. We need to run queries against the Information Schema to find the list of columns composing the primary key
  2. We need to collect the list of columns, so we can use it in a GROUP BY
  3. We need to compose a query that then groups by the key columns and selects only those with a count greater one.
  4. We finally need to execute this query.

But can we do all this without leaving SQL?
Using session variables as glue we certainly can!

Let’s get started.

Binding in the "input"

First, we need a variable to hold the table name. Remember that a table name has three parts:

DECLARE tableId STRUCT<catalog STRING, schema STRING, name STRING>;

Here, for simplicity, I’m going to hardcode the name, but of course you can bind it in using widgets in a notebook.

SET VAR table = named_struct('catalog', current_catalog(),
'schema' , current_schema() ,
'name' , 'persons' );

Collecting the primary key columns

Now we have hunt for the primary key columns. Two tables are in play here:

  • TABLE_CONSTRAINTS
    tells us what constraints exist on the table and what their types are.
    So we need to find the name of the constraint that relates to the primary key.
  • CONSTRAINT_COLUMN_USAGE
    tells us which columns a given constraint depends on.

That’s all we need. We don’t need to know the column types, the order of the key columns, or anything else more detailed. 

DECLARE pkColumns ARRAY<STRING>;
SET VAR pkColumns =
(SELECT array_agg(ccu.column_name)
FROM INFORMATION_SCHEMA.TABLE_CONSTRAINTS AS tc
NATURAL JOIN INFORMATION_SCHEMA.CONSTRAINT_COLUMN_USAGE AS ccu
WHERE tableId.catalog = tc.table_catalog
AND tableId.schema = tc.table_schema
AND tableId.name = tc.table_name
AND tc.constraint_type = 'PRIMARY KEY');

Let’s be nice and throw in some error handling:

SELECT CASE WHEN array_size(pkColumns) == 0
THEN raise_error('No primary key found for: `' || tableId.catalog || '`.`' || tableId.schema || '`.`' || tableId.name || '`')
END;

Building the query

Now it’s time to compose the query. We need to turn the primary key column array into a comma separated list. Lambda functions are great for this sort of work.

DECLARE queryStr STRING;
SET VAR queryStr =
'SELECT ' || aggregate(pkColumns, '', (list, col) -> list || col || ', ') || ' count(1) AS num_dups '
' FROM `' || tableId.catalog || '`. `' || tableId.schema || '`. `' || tableId.name || '` '
' GROUP BY ALL HAVING COUNT(1) > 1';

Run it!

Now all that’s left is to execute the query. We do this by using the new EXECUTE IMMEDIATE statement in DBR 14.3.
EXECUTE IMMEDIATE takes a STRING literal or a variable, treats it as a query to compile and runs it.

EXECUTE IMMEDIATE queryStr;
[USER_RAISED_EXCEPTION] No primary key found for: `main`.`srielau`.`t` SQLSTATE: P0001

Oh, well, I guess we need a table:

SET CATALOG main;
SET SCHEMA srielau;

CREATE TABLE persons(firstname STRING NOT NULL, lastname STRING NOT NULL, location STRING);
ALTER TABLE persons ADD CONSTRAINT persons_pk PRIMARY KEY (firstname, lastname);
INSERT INTO persons VALUES
('Tom' , 'Sawyer' , 'St. Petersburg' ),
('Benjamin' , 'Bluemchen' , 'Neustadt' ),
('Benjamin' , 'Bluemchen' , 'Neustaedter Zoo' ),
('Emil' , 'Svensson' , 'Loenneberga' ),
('Pippi' , 'Longstocking', 'Villa Villekulla' ),
('Pippi' , 'Longstocking', 'Kurrekurredutt Island');

Next try:

EXECUTE IMMEDIATE queryStr;
firstname lastname num_dups
--------- ------------ --------
Benjamin Bluemchen 2
Pippi Longstocking 2

Nice!

Let's look into EXECUTE IMMEDIATE in more detail.

EXECUTE IMMEDIATE tear down

Having hopefully sufficiently motivated the use of EXECUTE IMMEDIATE let's dive deeper into what it can do.

The syntax is pretty straight forward:

EXECUTE IMMEDIATE sql_string
[ INTO var_name [, ...] ]
[ USING { arg_expr [ AS ] [alias] } [, ...] ]

What statements can be run?

sql_string must be a string literal or a session variable of a string type. There are no limitations about which SQL statement you can run, except that you cannot execute an EXECUTE IMMEDIATE. So you can run a DML statement:

SET VAR queryStr = 'INSERT INTO persons'
' VALUES (\'Josefine\', \'Mausekind\', \'Sprotten vor dem Wind\')';
EXECUTE IMMEDIATE queryStr;
num_affected_rows num_inserted_rows
----------------- -----------------
1 1

SET VAR queryStr = 'UPDATE persons SET location = \'Leuchtturm Josefine\''
' WHERE firstname =\'Josefine\' AND lastname =\'Mausekind\'';
EXECUTE IMMEDIATE queryStr;
num_affected_rows
-----------------
1

EXECUTE IMMEDIATE 'DELETE FROM persons WHERE location = \'Leuchtturm Josefine\'';
num_affected_rows
-----------------
1

Or you can do DDL:

EXECUTE IMMEDIATE 'ALTER TABLE persons ADD COLUMN dob DATE';

You can do grants and revokes:

EXECUTE IMMEDIATE 'GRANT MODIFY ON TABLE persons TO `alf@melmak.et`';

and of course queries as shows in the example above.

Some users store a their SQL statements in tables, look them up some form of statement ID and execute them using EXECUTE IMMEDIATE.

Statement Parameters

Obviously you can compose any SQL statement using the sql_string. However it can be advantageous to parameterize the string just like you want to parameterized SQL statements in Scala or Python when executing spark.sql(). The purpose is to provide values, often coming directly from the end user without risking SQL injection.

EXECUTE IMMEDIATE supports both named and unnamed parameters:

EXECUTE IMMEDIATE 'SELECT location FROM persons WHERE firstname = ? AND lastname = ?'
USING 'Tom', 'Sawyer';
location
--------------
St. Petersburg

EXECUTE IMMEDIATE 'SELECT location FROM persons WHERE firstname = :first AND lastname = :last'
USING 'Tom' AS first, 'Sawyer' AS last;
location
--------------
St. Petersburg 

 Of course you can use session variables:

DECLARE queryStr = 'SELECT location FROM persons WHERE firstname = ? AND lastname = ?';
DECLARE first = 'Tom';
DECLARE last = 'Sawyer';

EXECUTE IMMEDIATE queryStr USING first AS first, last AS last;
location
--------------
St. Petersburg 

SET VAR
queryStr = 'SELECT location FROM persons WHERE firstname = :first AND lastname = :last';
EXECUTE IMMEDIATE queryStr USING last AS last, first AS first;
location
--------------
St. Petersburg 

You can also use variables directly instead of using formal parameter passing. This may be considered rather sloppy.

SET VAR queryStr = 'SELECT location FROM persons WHERE firstname = first AND lastname = last';
EXECUTE IMMEDIATE queryStr;
location
--------------
St. Petersburg  

Where do the results go?

By default EXECUTE IMMEDIATE will return the same results as the SQL Statement passed to it. For queries that is the query result. For INSERT statement it is the meta data:

EXECUTE IMMEDIATE 'INSERT INTO persons(firstname, lastname, location)  (?, ?, ?)'
USING 'Bibi', 'Blocksberg', 'Neustadt';
num_affected_rows num_inserted_rows
----------------- -----------------
1 1

But for queries only there is another options. If the query returns at most one row you can instruct EXECUTE IMMEDIATE to assign the result to a list of session variables.

DECLARE location STRING;
SET VAR first = 'Emil';
EXECUTE IMMEDIATE 'SELECT lastname, location FROM persons WHERE firstname = ?'
INTO last, location
USING first;
SELECT last, location;
last location
-------- -----------
Svensson Loenneberga

If the query returns no rows NULL is assigned:

EXECUTE IMMEDIATE 'SELECT lastname, location FROM persons WHERE firstname = ?'
INTO last, location
USING 'Huckleberry';
last location
---- --------
null null

If there is more than one row EXECUTE IMMEDIATE returns an error:

EXECUTE IMMEDIATE 'SELECT lastname, location FROM persons WHERE firstname = ?'
INTO last, location
USING 'Benjamin';
[ROW_SUBQUERY_TOO_MANY_ROWS] More than one row returned by a subquery used as a row. SQLSTATE: 21000

Conclusion

EXECUTE IMMEDIATE is a powerful new statement introduced in Databricks Runtime 14.3. It allows you to compose SQL out of SQL operations and pass session state via SQL variables. This allows for linear scripting in SQL which otherwise would have required you to utilize a host language such as Python.

Related

 

5 Comments