Hey folks, I am experiencing unexpected behavior p...
# daft-dev
p
Hey folks, I am experiencing unexpected behavior performance-wise. I have a wide dataframe of
518
numerical columns and
106574
rows. I am applying
z-score
standardization onto each column as I am working on a nearest neighbor search algorithm and I want to eliminate bias caused by different vector magnitudes. On screenshot #1 I am materializing results via
to_pandas
by first limiting the dataframe to the top 5 rows via
limit(5)
. This takes
24.0
seconds to execute, as VSCode indicates. On screenshot #2 I am materializing results via
to_pandas
without doing any limit. This takes
23.2
seconds to execute. I would have expected that the approach where I
limit(5)
first performs significantly better. In contrast, it performs slightly worse. Do you have an idea why this might be the case? Maybe the query optimizer does not consider the presence of
limit
when having
with_column
statements including UDFs? Also a note on the use of a UDF for the z-score standardization: I had to use it as a last resort, as I could not find a Daft-native implementation of variance nor standard deviation. Any suggestions on optimizing things there?
j
Hi @Peter ! Could you print your plan for us to take a look?
df.explain(True)
My guess is that the scan is probably taking most of the time, and limiting the number of rows doesn’t actually limit the scan enough (depending on the file format)
p
Hi @jay, of course, please see the attached logs. The query plan was produced by:
Copy code
from functools import reduce
import numpy as np


@daft.udf(return_dtype=daft.DataType.float32())
def standardize(series: daft.Series) -> daft.Series:
    arr: np.array = series.to_arrow().to_numpy()
    standardized = (arr - arr.mean()) / arr.std()
    return daft.Series.from_numpy(standardized)


feature_names = [c for c in features.column_names if c != "track_id"]
features = reduce(
    lambda df, c: df.with_column(c, standardize(df[c])), feature_names, features
)
features.limit(5).explain(show_all=True)
j
Hmm very odd. I don’t see a scan, so your input dataframe is already collected and in-memory? I think we might need to run some more profiling here to get a better sense of the overhead. I wonder if it’s something dumb like serialization of the functions you’re running 😓
If you run a simpler transform without UDFs (e.g.
+ 1
) do you see a similar behavior?
p
That seems to be the case even with the simple expression of
daft.col(c) + 1
. I attached screenshot #1 as a minimal reproduction. I believe the issue I am facing here comes from the way how I instantiate the Daft DataFrame. I use
daft.from_pandas(features_df)
. So yes, it looks like the input dataframe is already in-memory. For some reason I expected some magical Daft lazification to happen after
daft.from_pandas(features_df)
though. Unfortunately, I am forced to read the CSV with Pandas initially, because it has hierarchical headers (screenshot #2, also attached a tiny subset of the file) and
daft.read_csv
lacks the advanced parsing options that
pd.read_csv
has. Reading such a CSV with Daft will just raise an error, as screenshot #3 exhibits.
j
I’m going to take a closer look today, will provide some updates when I have them!
p
many thanks!
j
No updates yet, still investigating! 😢
p
No worries at all, thanks for looking into it. Don't hesitate to let me know if I can help by providing some more context around the issue.
🙌 1
k
Hey Péter! We think we've identified the issue, which is that we are doing unnecessary copies of the schema when something like
with_column
is being called, which makes it costly for a dataframe with a lot of columns. I'm working on fixing it right now, but a temporary workaround would be to call
features.select
once with all of the desired columns specified, instead of using reduce. Something like this:
Copy code
features = features.select(*(feature_names + [standardize(col(name)) for name in feature_names]))
p
Hey @Kevin Wang, thank you for the suggestion!
k
Hey @Peter we're still working on the fix but in the meantime, did you get a chance to try using select instead, and if so how was the performance of that?
p
Hey @Kevin Wang, yes, I just finished some benchmarking. Things work really nicely when using
select
instead of the reduction, as the screenshots indicate. Interestingly, the
reduce(lambda df, c: df.with_column(c, standardize(df[c])), feature_names, features)
line takes really long and not the expression where I explicitly materialize via
.limit(5).to_pandas()
.
k
Great! Thanks for the update. This confirms what we suspect.
Hey @Peter! We just merged in PR #2167 which significantly improves the performance of
with_column
, as well as adds an additional
with_columns
(plural) DataFrame operation, which you might also find useful. These changes will be included in our next release!
p
Hey @Kevin Wang, many thanks for letting me know.
with_columns
will be super useful. Out of curiosity, how does the semantics and performance of
with_columns
compare to those of
select
?
k
If you are planning on selecting every existing column,
with_columns
will be more efficient than
select
. As for semantics, it will take in a dictionary of name: expression instead of a list of columns like select