From Pandas to Dask - Part 1
How to migrate from Pandas to Dask DataFrames in Python – index operations, sorting and repartitioning
November 25th, 2021
In a project that my team and I completed in Cazana, we engineered a data processing pipeline handling hundreds of gigabytes of data. Since the data processing pipeline was a component of a larger machine learning pipeline built in Python, we handled the data using Dask.
Dask is a library for parallel, larger-than-memory and distributed computing that extends Pandas, a familiar library for the machine learning community. Leveraging this familiarity, Dask mimics the Pandas API interface for an easier learning experience. However, despite the easier learning experience, using Dask may prove quite challenging in certain situations because, despite the syntactical similarities, at its core, Dask is different from Pandas. In this post, I will explain what I learnt about the transition from Pandas to Dask.
This post assumes that you are familiar with Pandas and the Dask introduction from the official website. The results in this post were obtained using a MacBook Pro (13-inch, 2018, i5) with Python 3.8.5, Pandas 1.2.2 and Dask 2021.2.0. For more details about the code, refer to the companion GitHub repository.
Before diving into the transition from Pandas to Dask, let us start with an overview of Dask DataFrames, which are the analogue of Pandas DataFrames.
Dask DataFrames
Imagine splitting a Pandas DataFrame (DF) into a collection of smaller DFs. Having a collection of smaller DFs instead of a single one is convenient if a single DF does not fit in memory. In this case, we may split the initial DF into smaller DFs, save them to disk and load them into memory one at a time to carry out a computation. Having a collection of DFs is also useful to leverage multiple CPUs. In this case, we may assign different DFs to each CPU and have them carry out a computation in parallel. More generally, if we express the computation through a computation graph, we may distribute the computation more efficiently across different CPUs. Generalising further, a computation may be distributed across multiple machines working in parallel, each one with its own CPU, memory and disk storage. These ways of achieving parallel, larger-than-memory and distributed computation hint at how Dask operates. With this in mind, a collection of DFs managed by Dask defines a Dask DataFrames (DDF).
Dask, being built on top of Pandas, adds a level of complexity to it. The great thing about Dask is that you do not have to choose either Dask or Pandas. Indeed, as Dask interoperates with Pandas, you may use a mix of both. This means that, in your data processing pipeline, you may turn to DDFs only when you need to and then revert back to DFs, limiting the amount of additional complexity on top of Pandas.
Aside from preventing higher complexity, the followings are additional situations where Pandas is better than Dask:
- Dask is slower than Pandas on data that fits into the memory of an average laptop or desktop computer, as we will see in a benchmark, below.
- Some Pandas operations are not possible in Dask because of the different ways that Dask handles data. For example, aggregations that require the whole data, such as calculating a quantile, are not possible in Dask.
- Since Dask is a project much younger than Pandas, some features from Pandas are not yet implemented in Dask. For example, multi-index operations are still limited in Dask.1
Now that you have a better idea of the advantages and disadvantages of DDFs, let us see how to use them effectively.
Changing the Index
A Dask DataFrame is a collection of Pandas DataFrames, called partitions, together with operations acting on those partitions. Dask uses the index of the data to create the partitions, whose number or memory size is defined by the user.
The most important thing to keep in mind is that operations that shuffle the index of the DDF are computationally expensive because they move data between partitions. Partitions are stored in the disk, making the operations even slower than if they were stored in memory. Hence, avoid shuffling the index whenever possible, or, put it simply, do not mess with the index.
If you are using Dask in just a few parts of the data pipeline in place of Pandas, it is easy to limit operations involving the index. However, if you are using Dask in most of your data pipeline, you are better off architecting the pipeline in a way that reduces the number of index operations.
A common operation that shuffles the index is changing the number of partitions using the repartition
method. Changing the number of partitions means that part of the data from one partition has to be moved to other partitions, leading to a data shuffle. However, changing the number of partitions leads to partial data shuffle, which may not be that bad.
Another common operation on DFs is setting the index. In Dask, setting the index leads to a full data shuffle because Dask partitions the data using the index, and changing the index shuffles all the data in all the partitions. Hence, setting the index is an expensive operation. However, sometimes it has to be used. So, let us have a more detailed look at the operation of setting the index in Dask.
First, we import the libraries that will be used throughout the article.
import numpy as np
import pandas as pd
import dask.dataframe as dd
Then, we create Pandas DataFrame df
with 3 columns, each with 100,000 random values.
df = pd.DataFrame(
data={
"a": np.random.permutation(100_000),
"b": np.random.random(100_000),
"c": np.random.random(100_000),
}
)
print(df.head())
OUT:
a b c
0 48680 0.740794 0.486614
1 38283 0.640355 0.003733
2 86664 0.075191 0.997144
3 75611 0.638546 0.056039
4 83474 0.217691 0.971367
Column a
is a random permutation of the current index, while columns b
and c
have random numbers between 0 and 1.
From the Pandas DataFrame, df
, we create DDF ddf
with 20 partitions.
ddf = dd.from_pandas(df, npartitions=20)
print(ddf)
OUT:
a b c
npartitions=20
0 int64 float64 float64
5000 ... ... ...
... ... ... ...
95000 ... ... ...
99999 ... ... ...
Dask Name: from_pandas, 20 tasks
The last line specifies that there are 20 tasks. In Dask, each task is a unit of computation and multiple tasks form a computational graph, which can be thought of as a network of tasks. Expressing the tasks using a network allows Dask to distribute the computation across different CPUs or machines.
In our case, the DDF uses a computational graph made up of 20 tasks, each task corresponding to the creation of each partition. Every time this DDF computes something, it runs the tasks defined in the computational graph. The input data "flows" through the network to produce the output. This flow of operations is lazy, meaning that Dask runs the 20 tasks every time the result of that computation is explicitly required. The computation being lazy is the reason for the absence of values in the columns of the DDF printed above, since the print function does not trigger the computation.
As an example of explicitly requiring the above computation, we take the head the DDFs, which triggers the computation and returns the same result that we had for Pandas, which we then print.
print(ddf.head())
OUT:
a b c
0 48680 0.740794 0.486614
1 38283 0.640355 0.003733
2 86664 0.075191 0.997144
3 75611 0.638546 0.056039
4 83474 0.217691 0.971367
Operations on DataFrames that fit in memory are slower in Dask than in Pandas. To show this with an example, we print the time it takes to sum columns b
and c
, first with Pandas and then with Dask.
start_time = time()
df["b"] + df["c"]
end_time = time()
print(f"Pandas DataFrame: {(end_time - start_time) * 1000:7.3f} ms")
start_time = time()
(ddf["b"] + ddf["c"]).compute()
end_time = time()
print(f"Dask DataFrame: {(end_time - start_time) * 1000:7.3f} ms")
OUT:
Pandas DataFrame: 1.733 ms
Dask DataFrame: 32.211 ms
Dask is about one order of magnitude slower than Pandas.2 This is due to the overhead of operations in Dask, like creating the computational graph, calling the sum function for each partition and joining the result of the partitions into a single output DF.
Operations that shuffle the data increase the computation gap between Dask and Pandas even more, as we see in the following example, where we set column a
as the new index, first with Pandas and then with Dask.
start_time = time()
df.set_index("a")
end_time = time()
print(f"Pandas DataFrame: {(end_time - start_time) * 1000:7.3f} ms")
start_time = time()
ddf.set_index("a").compute()
end_time = time()
print(f"Dask DataFrame: {(end_time - start_time) * 1000:7.3f} ms")
OUT:
Set index:
Pandas DataFrame: 2.492 ms
Dask DataFrame: 972.854 ms
Dask is now almost two orders of magnitude slower than Pandas.2
While operations that benefit from parallel processing may have an advantage over Pandas on a large dataset, shuffling the index gets worse with more data.
Setting the Index and Sorting
Unlike Pandas, Dask does not have an explicit method to sort a DataFrame by a column. To achieve a similar result, we have to set the index of the DDF to the column we want to sort by. Let us check this with an example and start by creating a DDF with 2 integer columns and 3 string columns split into 2 partitions.
df = pd.DataFrame(
data={
"A": np.arange(6),
"B": np.random.permutation(6),
"C": np.array(list("abcdef")),
"D": np.array(list("fbacde")),
"E": np.array(list("aabaab")),
}
)
ddf = dd.from_pandas(df, npartitions=2)
print_ddf(ddf)
OUT:
A B C D E
---------------- 1st partition
0 0 2 a f a
1 1 3 b b a
2 2 0 c a b
---------------- 2nd partition
3 3 4 d c a
4 4 1 e d a
5 5 5 f e b
Column A
goes from 0 to 5, following the index. Column B
is a permutation of column A
. Column C
has the first 6 letters of the alphabet in order. Columns D
is a permutation of column C
. Column E
has unsorted duplicate letters. print_ddf
is a custom function that prints a DDF and shows how it is partitioned.
First, let us check the effect of resetting the index of the DataFrame.
ddf = dd.from_pandas(df, npartitions=2).reset_index()
print_ddf(ddf)
OUT:
index A B C D E
----------------------- 1st partition
0 0 0 2 a f a
1 1 1 3 b b a
2 2 2 0 c a b
----------------------- 2nd partition
0 3 3 4 d c a
1 4 4 1 e d a
2 5 5 5 f e b
The partitions remain the same, but the index has been reset independently in each partition. Indeed, the index in each partition starts from 0. The old index is stored into a colum of the DataFrame called index
, like in Pandas.
Let us check what happens if we set column B
, which is unsorted, as the index.
ddf = dd.from_pandas(df, npartitions=2).set_index("B")
print_ddf(ddf)
OUT:
A C D E
B
------------- 1st partition
0 2 c a b
1 4 e d a
------------- 2nd partition
2 0 a f a
3 1 b b a
4 3 d c a
5 5 f e b
The first change that we notice is the different sizes of the partitions. The first partition has two rows, while the second partition has four. Hence, Dask does not preserve the partition size when setting the index using an unsorted column.
The second change is that the values of column B
, which now defines the index, were sorted ascendingly. Indeed, setting the index is how to sort a Dask DataFrame according to a column. Indeed, unlike a DF, a DDF does not have a sort_values
method.
Column C
in the original DF has letters sorted in alphabetical order. Let us see what happens when we set it as the index in the DDF.
ddf = dd.from_pandas(df, npartitions=2).set_index("C")
print_ddf(ddf)
OUT:
A B D E
C
------------- 1st partition
a 0 2 f a
b 1 3 b a
c 2 0 a b
------------- 2nd partition
d 3 4 c a
e 4 1 d a
f 5 5 e b
When setting column C
as the index, as the letters are already sorted alphabetically, there is no change in the order of the rows.
Column D
in the original DF is a permutation of column C
, meaning that it does not have duplicated values, but the values are unsorted. Let us see what happens when we set it as the index in the DDF.
ddf = dd.from_pandas(df, npartitions=2).set_index("D")
print_ddf(ddf)
OUT:
A B C E
D
------------- 1st partition
a 2 0 c b
b 1 3 b a
------------- 2nd partition
c 3 4 d a
d 4 1 e a
e 5 5 f b
f 0 2 a a
When setting column D
as the index, the rows are sorted alphabetically and the size of the partitions changes, similarly to what happened with the unsorted numerical values of column B
.
The values of column E
in the original DF are unsorted and duplicated letters. Setting column E
as the index in the DDF leads to the following partitions.
ddf = dd.from_pandas(df, npartitions=2).set_index("E")
print_ddf(ddf)
OUT:
A B C D
E
------------- 1st partition
------------- 2nd partition
a 3 4 d c
a 4 1 e d
a 0 2 a f
a 1 3 b b
b 5 5 f e
b 2 0 c a
The first partition is now empty and all the rows are in the second partition. Indeed, even if Dask tries to balance the size of partitions, it does not preserve it. This behaviour is is emphasised for very small DataFrames, like this one. For larger DataFrames, the partitions should more or less be even, unless there are too many duplicate values in the index.
However, as you can see from the above operations, unlike the partition size, when setting the index, the number of partitions remains the same.
Besides sorting the index, when setting a string column as the index, Dask isolates each duplicated value in a single partition. Indeed, all the rows with index a
are in the same partition and similarly for b
. This can be seen more clearly if we use a DDF with 3 partitions.
ddf = dd.from_pandas(df, npartitions=3).set_index("E")
print_ddf(ddf)
OUT:
A B C D
E
------------- 1st partition
------------- 2nd partition
a 0 2 a f
a 1 3 b b
a 4 1 e d
a 3 4 d c
------------- 3rd partition
b 5 5 f e
b 2 0 c a
In this case, all the rows with index a
are in the second partition and all the rows with index b
are in the third partition.
The isolation of rows with the same index to the same partition is very useful for certain group-by operations, as we will see in From Pandas to Dask - Part 2.
Repartitioning with String Index
As we saw in the previous section, setting the index using a string column isolates its duplicate values into single partitions. However, unlike setting the index, repartitioning a string index does not preserve this isolation. Let us see this with an example.
First, we create a DDF with a string index containing duplicates and a single partition.
df = pd.DataFrame(
data={"A": np.arange(6)},
index=pd.Index(np.array(list("ababab")), name="index"),
)
ddf = dd.from_pandas(df, npartitions=1)
print_ddf(ddf)
OUT:
A
index
-------- 1st partition
a 0
a 2
a 4
b 1
b 3
b 5
As we have a single partition, the distinct values of the index are isolated in that single partition. If we repartition the DataFrame into 3 partitions, we get the following result.
ddf = dd.from_pandas(df, npartitions=1).repartition(npartitions=3)
print_ddf(ddf)
OUT:
A
index
-------- 1st partition
a 0
a 2
-------- 2nd partition
a 4
b 1
-------- 3rd partition
b 3
b 5
Value a
is present both in the first and in the second partitions and value b
is present both in the second and third partitions. So, we lost the isolations of the distinct values into single partitions, and the reason is that the repartition
and set_index
methods allocate the data into the partitions in different ways. set_index
preserves the isolation, while repartition
does not. To confirm this, we repartition the data using set_index
by resetting the index and setting it again with 3 partitions.3
ddf = (
dd.from_pandas(df, npartitions=1)
.reset_index()
.set_index("index", npartitions=3)
)
print_ddf(ddf)
OUT:
A
index
-------- 1st partition
-------- 2nd partition
a 0
a 2
a 4
-------- 3rd partition
b 1
b 3
b 5
Now, all the rows with index value a
are only in the second partition and all the rows with index value b
are only in the third partition. (If you have an older version of Dask and this method does not work, see the Appendix.)
Conclusions
In this post, we saw how to tackle the following challenges that arise when moving from Pandas to Dask:
- Operations that shuffle the index of a DataFrame are computationally much more expensive in Dask than in Pandas and should be avoided wherever possible.
- Sorting the index in a Dask DataFrame is done by setting the column to sort by as the new index, but this involves shuffling the index, which is expensive.
- Repartitioning a Dask DataFrame, unlike setting the index, does not isolate its duplicate values into single partitions.
In the next post, we will look at aggregation operations over groups, persisting data to disk, loading data to a Dask DataFrame directly from an SQL database, and unit testing with Dask DataFrames.
- From Pandas to Dask - Part 2 – Coming next month!
- Companion Github repository with the code
Appendix
In older versions of Dask, specifying the number of partitions in the set_index
method may not work. In this case, we can use the following alternative.
ddf = (
dd.from_pandas(df, npartitions=1)
.reset_index()
.repartition(npartitions=3)
.set_index("index")
)
print_ddf(ddf)
OUT:
A
index
-------- 1st partition
-------- 2nd partition
a 4
a 0
a 2
-------- 3rd partition
b 1
b 3
b 5
This last way of repartitioning may also be useful when specifying a partition size in the set_index
method does not change the number of partitions.
- To get a quick idea of what DDFs may be missing from DFs, you may search for "Not implemented" or "Not supported" in the Dask DataFrame API page.↩
- For a more accurate comparison, the operation should be run multiple times and averaged. However, here we are only interested in the order of magnitude.↩
- Note that the
reindex
method is not implemented in Dask.↩