From Pandas to Dask - Part 2

From Pandas to Dask - Part 2

How to migrate from Pandas to Dask DataFrames in Python – group-by, data persistence, SQL database, testing

January 30th, 2022

In the previous post, From Pandas to Dask - Part 1, we discussed some challenges that may arise when moving from Pandas to Dask and how to tackle them. In this post, we look at additional challenges.

Let us start with aggregation operations on groups.

Aggregation Operations on Groups

Certain aggregation operations, like the median or the quantile, are not implemented for grouped DataFrames (or Series) in Dask. However, we can compute them using a workaround involving Pandas, which supports several operations on grouped DataFrames. To show this workaround, we consider the case of calculating the median over groups.

Let us start by defining a Pandas DataFrame (DF) with two columns, A and B. Column A contains duplicated values a and b, while column B contains integer numbers, from 0 to 5.

df = pd.DataFrame(
    data={
        "A": list("ababab"),
        "B": np.arange(6),
    }
)
print(df)
OUT:

   A  B
0  a  0
1  b  1
2  a  2
3  b  3
4  a  4
5  b  5

In Pandas, after grouping by column A, we can use the median method to calculate the median of column B for each group.

medians = df.groupby("A")["B"].median()
print(medians)
OUT:

   B
A
a  2
b  3

Now, we turn to Dask and define a Dask DataFrame (DDF) with two partitions from the above DF.

ddf = dd.from_pandas(df, npartitions=2)
print_ddf(ddf)
   A  B
------- 1st partition
0  a  0
1  b  1
2  a  2
------- 2nd partition
3  b  3
4  a  4
5  b  5

To calculate the median of each group, a and b, first, we isolate the two values by setting the index of the DDF to column A. As we saw in section Setting the Index and Sorting of Part 1, this operation puts all the rows with the same value of column A in the same partition.

ddf = ddf.set_index("A")
print_ddf(ddf)
   B
A
---- 1st partition
---- 2nd partition
a  0
a  2
a  4
b  1
b  3
b  5

With the DDF in this form, we apply a group-by operation to each partition using the map_partitions method. This method maps a function to each partition, which is treated as a DF, meaning that we can use the Pandas median method.

medians = ddf.map_partitions(
    lambda partition_df: partition_df.groupby("A").median()
)
medians = medians.compute()
print(medians)
OUT:

   B
A
a  2
b  3

This is how we can calculate the median over groups of a DDF. However, note that this procedure requires a complete shuffling of the index of the DDF, which, as we saw in section Changing the Index of Part 1, may be very slow.

Disk Cache

In section Changing the Index of Part 1, we saw how, every time the compute method is called on a DDF, Dask re-computes all the operations associated with that DDF. To see this more clearly, let us create a DDF and apply some operations to it.

df = pd.DataFrame(
    data={"A": np.arange(6), "B": np.array(list("xxyzyz"))},
    index=np.array(list("abcdef")),
)
ddf = dd.from_pandas(df, npartitions=2)
ddf = ddf.categorize(columns=["B"], index=True)
ddf["C"] = ddf["A"] ** 2

print_ddf(ddf, end="\n\n")
print(ddf)
   A  B   C
----------- 1st partition
a  0  x  0
b  1  x  1
c  2  y  4
----------- 2nd partition
d  3  z   9
e  4  y  16
f  5  z  25


Dask DataFrame Structure:
                   A                B      C
npartitions=2
a              int64  category[known]  int64
d                ...              ...    ...
f                ...              ...    ...
Dask Name: assign, 10 tasks

The first operation casts column B to a categorical data type. The second operation squares column A and stores the result in a new column, C. Printing the DDF after these operations shows that there are 10 tasks associated to it.

If we have to use DataFrame ddf in multiple places, every time the compute method is called, Dask is going to compute all the 10 tasks defined in ddf because Dask uses lazy computation, meaning that the output of a computation is not stored in memory and is re-computed every time it is needed. Therefore, it is up to us to decide when to persist the result of a Dask computation.

Since DDFs are typically too large to be stored in memory, the result of a computation may instead be stored on disk. Dask suggests persisting DataFrames to disk using the Parquet format because this format allows for efficient reading and writing operations. So, let us save the above DDF to disk and reload it immediately from there.

data_path = Path("data.parquet")
ddf.to_parquet(data_path)
ddf_cached = dd.read_parquet(data_path)

print_ddf(ddf_cached, end="\n\n")
print(ddf_cached)
OUT:

   A  B   C
----------- 1st partition
a  0  x  0
b  1  x  1
c  2  y  4
----------- 2nd partition
d  3  z   9
e  4  y  16
f  5  z  25


Dask DataFrame Structure:
                   A                  B      C
npartitions=2
               int64  category[unknown]  int64
                 ...                ...    ...
                 ...                ...    ...
Dask Name: read-parquet, 2 tasks

ddf_cached is the same DDF that we had before but has 2 tasks, instead of 10. The 2 tasks correspond to reading the 2 partitions from disk. Hence, every time the compute method is called, the 2 partitions are read from disk. More generally, this means that persisting the data to disk is effective only when loading the partitions from disk is faster than computing the tasks.

To manage the persistence of DDFs to disk more easily, you may also use the utility class, DaskDiskCache, defined in this file. Here is an example usage:

dask_disk_cache = DaskDiskCache()
ddf = dask_disk_cache.persist(ddf, cache_name="data")

print(f"Cache names: {dask_disk_cache.cache_names}", end="\n\n")

print_ddf(ddf, end="\n\n")
print(ddf, end="\n\n")

dask_disk_cache.remove(cache_name="data")
dask_disk_cache.cleanup()
OUT:

Cache names: ['data']

   A  B   C
----------- 1st partition
a  0  x  0
b  1  x  1
c  2  y  4
----------- 2nd partition
d  3  z   9
e  4  y  16
f  5  z  25


Dask DataFrame Structure:
                   A                B      C
npartitions=2
               int64  category[known]  int64
                 ...              ...    ...
                 ...              ...    ...
Dask Name: categorize_block, 4 tasks

Note that, when persisting to disk a Dask DataFrame with categorical columns, the categorical information is not shared across the partitions. So, when loading the data, in order to synchronise the categorical information across the partitions, we would have to re-categorise the data with the categorize method. This is already implemented in the above utility class, DaskDiskCache. Having categorised column B is the reason why the data type is category[known], rather than category[unknown], as in the code block above. Categorising column B adds a step after loading each partition and is the reason for having 4 tasks rather than the 2 seen in the code block above.

Debugging

While developing or debugging a data processing pipeline, it is convenient to restrict the data to a small subset, so that we can quickly run the data through the pipeline. However, sometimes, when debugging, we have to use a large subset of the data or perhaps the whole data. In this case, running the entire pipeline from the start multiple times is slow and inconvenient.

As a possible workaround, we may cache to disk the result of intermediate steps of the pipeline during its first run. In the following runs, instead of re-computing the intermediate steps, we just load the cached results from disk.

The debug_cache module, defined in this file, allows caching the output of functions easily.1 As an example, let us define a function that creates a DDF and applies some operations to it. For simplicity, we re-use the operations defined in the first code-block of the Disk Cache section. To cache the output of the function, we import the debug_cache module and decorate the function using the cache decorator.

import debug_cache

@debug_cache.cache
def create_ddf() -> dd.DataFrame:
    print("Creating DDF")

    df = pd.DataFrame(
        data={"A": np.arange(6), "B": np.array(list("xxyzyz"))},
        index=np.array(list("abcdef")),
    )
    ddf = dd.from_pandas(df, npartitions=2)
    ddf = ddf.categorize(columns=["B"], index=True)
    ddf["C"] = ddf["A"] ** 2

    return ddf

The cache decorator persists the output of the function to disk and loads it from disk when the function is called again in subsequent runs. To see this, we enable the debug cache and call the function.

from pathlib import Path

debug_cache.enable(data_dir=Path("debug_cache_dir"))

ddf = create_ddf()
print(f"Number of tasks: {len(ddf.dask)}\n")
OUT:

Creating DDF
Number of tasks: 4

The first time we run this code, the function creates the output, as you can see from the presence of "Creating DDF" in the terminal output. The second time we run the code, we get the following terminal output.

OUT:

Number of tasks: 4

The absence of "Creating DDF" tells us that the function is not run and that its output is loaded from the cache.

Now, let us see how the debug_cache module becomes useful in a data processing pipeline. Let us assume that the data processing pipeline is the following chain of functions, each one decorated with the cache decorator.

from pathlib import Path

debug_cache.enable(data_dir=Path("debug_cache_dir"))

ddf = create_dask_dataframe()
ddf = engineer_features(ddf)
ddf = preprocess_for_modelling(ddf)

After running the data processing pipeline once, the results of the functions are cached. Now, let us say that we want to debug the last function, preprocess_for_modelling. When we launch the pipeline, the first two functions are not run, because their output is loaded from the cache. Furthermore, unless the compute method is called, Dask does not actually load the data, as we saw in section Changing the Index of Part 1. This means that the first two functions are essentially skipped. In this way, debugging the last function becomes quicker and more convenient.

Finally, in production, the debug_cache.enable function can be skipped to disable the debug cache.

Loading Data from SQL Database

While Pandas allows reading data from an SQL database using an SQL query in the form of a string through the read_sql_query method, this cannot be done in Dask. Instead, Dask requires the read_sql_table method and SQLAlchemy. As I could not find any easy example showing how to do this, I use this post to write a reference example of how to load data from an SQL database table into a DDF.

import sqlalchemy

sql_connection_string = "dialect+driver://username:password@host:port/database"

sql_connection = sqlalchemy.create_engine(sql_connection_string)
metadata = sqlalchemy.MetaData(schema="schema_name")
table = sqlalchemy.Table(
    "table_name", metadata, autoload=True, autoload_with=sql_connection
)

query = (
    sqlalchemy.select(
        [
            table.c.id,  # Type: BIGINT
            table.c.a,  # Type: INT, no nulls
            sqlalchemy.cast(table.c.b, sqlalchemy.Float()).label(
                "c"
            ),  # Type: INT, with nulls
            table.c.c,  # Type: VARCHAR
            table.c.d,  # Type: DATETIME
        ]
    )
    .where(
        sqlalchemy.and_(
            table.c.a > 0,
            table.c.d.isnot(None),  # Drop null values
        )
    )
    .limit(10_000)
    .alias("my_data")  # Any alias name would work
)

dtypes = {
    "id": "int64",
    "a": "int32",
    "b": "float32",
    "c": "object",
    "d": "datetime64[ns]",
}
metadata = (
    pd.DataFrame(columns=dtypes.keys())
    .astype(dtypes)
    .set_index("id", drop=True)
)

# Query
dask_partition_size = 268_435_456  # Bytes == 256 MiB
ddf = dd.read_sql_table(
    table=query,
    uri=sql_connection_string,
    index_col="id",
    bytes_per_chunk=dask_partition_size,
    meta=metadata,
)

First, we define an SQLAlchemy connection string, sql_connection_string, which we use to create the SQL connection, sql_connection. Using this connection, we create an interface to the SQL table, table. Next, we define a query, query, using the SQLAlchemy query API. For the sake of this example, the query selects columns of different types from the table. Column c is of integer type with null values, which is not supported by Dask (or Pandas); so, we cast it to a float because the Dask floating type supports null values. After selecting the columns, we apply two filters and limit the result to 10,000 rows. Then, we define the Dask data types, dtypes, corresponding to each SQL column and create the DDF metadata, metadata. Finally, we use the query and metadata to fetch the data from the SQL table into a DDf.

Testing

When using a DDF in a data processing pipeline, it is important to make sure that the output of the data processed by Dask is independent of the partition size. This is normally the case, but there are exceptions. For example, if, in section Repartitioning with String Index of Part 1, we used the map_partitions method after repartitioning the DDF without resetting the index, the result of the aggregation would have been wrong and dependent on the number of partitions.

To make sure that the output of operations over a DDF is independent of the number of partitions, we may use a unit test. For example, let us say that we have a function, mse, that evaluates the mean squared error between two columns of a DDF.

def mse(ddf, column_a, column_b):
    return ((ddf[column_a] - ddf[column_b]) ** 2).mean()

Then, a unit test for this function would check that the output of the function does not change if we change the number of partitions, as shown in the following example.

import unittest

class TestMse(unittest.TestCase):
    def test(self):
        df = pd.DataFrame(
            data={
                "A": [2.0, 5.0, 2.0],
                "B": [1.0, 5.0, 4.0],
            }
        )

        expected_output = 1.6666666666666667

        for npartitions in [1, 2, 3]:
            with self.subTest(npartitions=npartitions):
                ddf = dd.from_pandas(df, npartitions=npartitions)
                output = mse(ddf, "A", "B").compute()
                self.assertEqual(output, expected_output)

In this example, the output of our function is a floating number. Instead, if the output of a function is a DF or a DDF, you may use the pandas.testing.assert_frame_equal or dask.dataframe.utils.assert_eq methods, respectively.

Conclusions

In this post, we saw some more challenges that arise when moving from Pandas to Dask and how to deal with them:

  • Certain aggregation operations on groups should be done after each group is isolated into a single Dask DataFrame partition.
  • Persisting the data to disk is a useful way to avoid running the same operations multiple times and may be useful when debugging, as well.
  • Loading data from an SQL database in Dask, unlike in Pandas, requires SQLAlchemy.
  • In unit tests involving operations on Dask DataFrames, it is useful to check that the output is independent of the number of partitions.

Finally, as a piece of advice, for a first project in Dask, to avoid unexpected behaviours, I recommend using a test-driven approach.



  1. The function output may be a DDF, but does not have to be a DDF.
© Filippo Bovo 2022