How to Load Big Data from Snowflake Into Python
We at Saturn Cloud are dedicated to fast and scalable data science with Python. Often this looks like querying data that resides in cloud storage or a data warehouse, then performing analysis, feature engineering, and machine learning with Python. Snowflake is a scalable cloud data warehouse that is used across many organizations. It provides a central place to house and query data, big and small.
If your organization uses Snowflake, there’s a chance you have had to pull data out of it to build and train machine learning models. Their snowflake-connector-python package makes it fast and easy to write a Snowflake query and pull it into a pandas DataFrame. There is just one challenge with this — your big Snowflake table probably doesn’t fit into pandas! What now?
Enter: Dask!
Dask is a Python-native parallel computing library that makes it easy to process large datasets. It has a low-level parallelization API as well as high-level objects for parallel DataFrame, arrays, and machine learning. These make it so you, the data scientist, can write code like you would with pandas, numpy, or scikit-learn, except it now runs on big data across a compute cluster!
We’ll use a combination of Dask’s low-level and DataFrame APIs to pull large data from Snowflake. Essentially, we tell Dask to load chunks of the full data we want, then it will organize that into a DataFrame that we can work with the same way we would with a pandas DataFrame.
The Basics
First, some basics, the standard way to load Snowflake data into pandas:
import snowflake.connector
import pandas as pdctx = snowflake.connector.connect(
user='YOUR_USER',
password='YOUR_PASSWORD',
account='YOUR_ACCOUNT'
)
query = "SELECT * FROM SNOWFLAKE_SAMPLE_DATA.TPCH_SF1.CUSTOMER"
pd.read_sql(query, ctx)
Snowflake recently introduced a much faster method for this operation, fetch_pandas_all(), and fetch_pandas_batches() which leverages Arrow.
cur = ctx.cursor()
cur.execute(query)
df = cur.fetch_pandas_all()
fetch_pandas_batches() returns an iterator, but since we’re going to focus on loading this into a distributed DataFrame (pulling from multiple machines), we’re going to setup our query to shard the data, and use fetch_pandas_all() on our workers.
When to use Snowflake or Dask?
It can be very tempting to rip all the data out of Snowflake so that you can work with it in Dask. That definitely works, however Snowflake is going to be much faster at applying SQL-like operations to the data. Snowflake stores the data and has highly optimized routines to get every ounce of performance out of your query. These examples will pretend like we’re loading the entire data into Dask — in your case, you will probably have some SQL query, which performs the SQL-like transformations you care about, and you’ll be loading the result set into Dask, for the things that Dask is good at (possibly some types of feature engineering, and machine learning).
How does Dask load data?
You can think about a Dask DataFrame as a giant pandas DataFrame, that has been chopped up and scattered across a bunch of computers. When we are loading data from Snowflake (assuming that the data is large), it’s not efficient to load all the data on one machine, and then scatter that out to your cluster. We are going to focus on having all machines in your Dask cluster load a partition (a small slice) of your data.
Load it!
We aren’t going to use read_sql_table() from the Dask library here. To have more control over how we load the data from Snowflake, we want to call fetch_pandas_all(), which is a Snowflake specific function, and therefore not supported with read_sql_table().
We need to set up a query template containing a binding that will result in Dask issuing multiple queries that each extract a slice of the data based on a partitioning column. These slices will become the partitions in a Dask DataFrame.
query = """
SELECT *
FROM customer
WHERE c_custkey BETWEEN %s AND %s
"""
It’s important to pick a column that evenly divides your data, like a row ID or a uniformly distributed timestamp. Otherwise one query may take much longer to execute than the others. We then use a dask.delayed function to execute this query multiple times in parallel for each partition. Note that we put our Snowflake connection information in a dict called conn_info to be able to reference it multiple times.
import snowflake.connector
import daskconn_info = {
"account": 'YOUR_ACCOUNT',
"user": 'YOUR_USER',
"password": 'YOUR_PASSWORD',
"database": 'SNOWFLAKE_SAMPLE_DATA',
"schema": 'TPCH_SF1',
}@dask.delayed
def load(conn_info, query, start, end):
with snowflake.connector.connect(**conn_info) as conn:
cur = conn.cursor().execute(query, (start, end))
return cur.fetch_pandas_all()
@dask.delayed is a decorator that turns a Python function into a function suitable for running on the Dask cluster. When you call it, instead of executing, it returns a Delayed result that represents what the return value of the function will be. The from_delayed() function takes a list of these Delayed objects, and concatenates them into a giant DataFrame. For more information about @dask.delayed and the concept of lazy evaluation, see this article.
We can now call this load() function multiple times and convert the results into a Dask DataFrame using dask.dataframe.from_delayed().
import dask.dataframe as ddresults = [
load(conn_info, query, 0, 10000),
load(conn_info, query, 10001, 20000),
load(conn_info, query, 20001, 30000),
]
ddf = dd.from_delayed(results)
The start and end values were hard-coded for the above example, but you would normally write a query to determine what the partitions will look like based on your data. For example, with our customer table, we know that the c_custkey column is an auto-incrementing, non-null ID column (the cardinality of the column is equal to the number of rows in the table). We can write a function that will determine the appropriate start and end values given a desired number of partitions, then use those results to create the Dask DataFrame:
def get_partitions(table, id_col, num_partitions=100):
with snowflake.connector.connect(**conn_info) as conn:
part_query = f"SELECT MAX({id_col}) from {table}"
part_max = conn.cursor().execute(part_query).fetchall()[0][0] inc = part_max // num_partitions
parts = [(i, i + inc - 1) for i in range(0, part_max, inc)]
return partsparts = get_partitions('customer', 'c_custkey')ddf = dd.from_delayed(
[load(conn_info, query, part[0], part[1]) for part in parts]
)
As long as the full dataset fits into the memory of your cluster, you can persist the DataFrame to ensure the Snowflake queries only execute once. Otherwise, they will execute each time you trigger computation on the DataFrame.
ddf = ddf.persist()
A note about data clustering
As seen above, Dask is going to query Snowflake for many partitions of your table based on a specific partition column. One query would be something like:
select * from mytable where id between 10000 and 20000
With very large tables, it’s important that data clustering is properly optimized on the Snowflake side. If it’s not set right, there’s a chance that every query is going to trigger a full table scan on the table. For moderately-sized tables this isn’t a problem, but it can be expensive for very large tables. For more information about Snowflake’s table structures and when it’s necessary to setup data clustering, see this Snowflake docs page.
Putting it all together
This is what the code would look like for this example table. You will likely want to do many more transformations in the Snowflake query as you can leverage the power of Snowflake’s data warehouse there. The partition column you use will also be different depending on how your data is organized.
import snowflake.connector
import dask
import dask.dataframe as ddconn_info = {
"account": 'YOUR_ACCOUNT',
"user": 'YOUR_USER',
"password": 'YOUR_PASSWORD',
"database": 'SNOWFLAKE_SAMPLE_DATA',
"schema": 'TPCH_SF1',
}query = """
SELECT *
FROM customer
WHERE c_custkey BETWEEN %s AND %s
"""def get_partitions(table, id_col, num_partitions=100):
with snowflake.connector.connect(**conn_info) as conn:
part_query = f"SELECT MAX({id_col}) from {table}"
part_max = conn.cursor().execute(part_query).fetchall()[0][0] inc = part_max // num_partitions
parts = [(i, i + inc - 1) for i in range(0, part_max, inc)]
return parts
parts = get_partitions('customer', 'c_custkey')ddf = dd.from_delayed(
[load(conn_info, query, part[0], part[1]) for part in parts]
)
ddf = ddf.persist()
How do I get a Dask cluster?
The example shown is a great way to process large data pulled from Snowflake in Python. The last piece is — where do you get your Dask cluster? While you can run Dask on your laptop, it will run a lot faster with a cluster of many machines. Saturn Cloud provides a low-friction way for you and your team to get going with Dask. It takes just two minutes to sign up for a free trial and spin up a cluster with access to Snowflake!
Read more data science articles on OpenDataScience.com, including tutorials and guides from beginner to advanced levels! Subscribe to our weekly newsletter here and receive the latest news every Thursday. You can also get data science training on-demand wherever you are with our Ai+ Training platform.