- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
09-04-2021 12:46 AM
Hi team,
I'm getting weird error in one of my jobs when connecting to Snowflake. All my other jobs (I've got plenty) work fine.
The current one also works fine when I have only one coding step (except installing needed libraries in my very first step).
The error appears when I have more steps in the same job:
1) install needed libraries (this works fine in both cases)
pip install numpy xgboost>=1.3.2 category-encoders feature-engine==0.6.1 snowflake.sqlalchemy eventlet pyarrow==0.17.0 asn1crypto==1.4.0 auditwheel==3.1.1 azure-common==1.1.25 azure-core==1.8.2 azure-storage-blob==12.5.0 boto3==1.15.18 botocore==1.18.18 certifi==2020.4.5.1 cffi==1.14.3 chardet==3.0.4 cryptography==2.9.2 idna==2.10 isodate==0.6.0 jmespath==0.10.0 msrest==0.6.19 oauthlib==3.1.0 oscrypto pycparser==2.20 pycryptodomex==3.9.8 pyelftools==0.26 PyJWT==1.7.1 pyOpenSSL==19.1.0 python-dateutil==2.8.1 pytz==2020.1 requests==2.23.0 requests-oauthlib==1.3.0 s3transfer==0.3.3 six==1.15.0 urllib3==1.25.11 snowflake-connector-python==2.3.10(my guess is that I can install those libraries once in configuration so that I don't waste time on each job run, but this is rather secondary question here)
2) fetch data from postgres into pandas dataframe
#!/usr/bin/env python3
import psycopg2 as pg
import pandas.io.sql as psql
pass1 = dbutils.secrets.get("monolith", "prod_pass")
conn1 = pg.connect("host=host1 dbname=db1 user=user1 password=" + pass1)
query1 = """ select column1, column2 from table1 """
df1 = psql.read_sql(query1, conn1)
print("df1: ")
print(df1)(works fine in both cases)
3) store pandas dataframe into Snowflake table using python connector
from sqlalchemy import create_engine
from snowflake.sqlalchemy import URL
engine = create_engine(URL(
account = 'my_account_name',
user=dbutils.secrets.get("snowflake", "snowflakeusr"),
password=dbutils.secrets.get("snowflake", "snowflakepwd"),
database = 'database1',
schema = 'schema1',
warehouse = 'warehouse1',
role='role1',
login_timeout=120,
network_timeout=120
))
df1.to_sql('tmp_df1', con=engine, index=False, if_exists='replace')(also works fine in both cases)
4) loop through snowflake table - this step fails if I have (2) and (3) in the same job, but works fine if (2) and (3) steps are absent
import requests
import eventlet
import time
import snowflake.connector
import json
time.sleep(2)
eventlet.monkey_patch()
try:
ctx = snowflake.connector.connect(
user=dbutils.secrets.get("snowflake", "snowflakeusr"),
password=dbutils.secrets.get("snowflake", "snowflakepwd"),
account="account1",
warehouse="warehouse1",
database="database1",
schema="schema1",
role="role1",
login_timeout=120,
network_timeout=120
)
cs = ctx.cursor()
...
except Exception as error:
raise Exception("Error fetching data from Snowflake: " + str(error))
finally:
cs.close()
ctx.close()it fails on line 10 with error:
487 try:
--> 488 cnx.do_handshake()
489 except OpenSSL.SSL.WantReadError:
...
---> 98 poll_obj = select.poll()
AttributeError: module 'select' has no attribute 'poll'Unfortunately I can't post full error stack because your forum complains that my post is too long.
Please see full details in pastebin:
As a workaround, I just run those in two separate jobs:
- (1), (2), (3) steps
- (1), (4) steps
But I hope that I can combine those with your help.
Please advise.
- Labels:
-
Python