Merged
Conversation
…etterSet for improved performance and clarity; add comprehensive tests for unique_set functionality
Contributor
There was a problem hiding this comment.
Pull request overview
This pull request introduces improvements to epoch assignment, file category labeling, and data sanitization in the dftracer analyzer. However, the implementation contains critical issues with duplicate method definitions that must be addressed before merging.
Key Changes:
- Refactored epoch assignment to use pid-based matching with time intervals instead of simple binning
- Added file category enrichment with purpose-based and filesystem-based suffixes
- Implemented size and offset sanitization to replace zero values with NaN
Comments suppressed due to low confidence (2)
python/dftracer/analyzer/dftracer.py:706
- The
_fix_file_posix_categorymethod is defined twice in this file (lines 638-661 and 689-706). This creates a bug where only the last definition will be used at runtime. Please remove the duplicate definition and keep only one implementation.
def _fix_file_posix_category(df: pd.DataFrame):
base_condition = (df["cat"].str.contains("posix|stdio") & ~df["file_name"].isna())
# Step 1: Map file purpose suffixes first
purpose_updates = {
"/data": "_reader",
"/checkpoint": "_checkpoint"
}
for path, suffix in purpose_updates.items():
mask = base_condition & df["file_name"].str.contains(path)
df.loc[mask, "cat"] = df.loc[mask, "cat"] + suffix
# Step 2: Map filesystem suffixes
filesystem_updates = {
"/lustre": "_lustre",
"/ssd": "_ssd"
}
for path, suffix in filesystem_updates.items():
mask = base_condition & df["file_name"].str.contains(path)
df.loc[mask, "cat"] = df.loc[mask, "cat"] + suffix
return df
@staticmethod
def _sanitize_size_offset(df: pd.DataFrame):
df["size"] = df["size"].replace(0, np.nan)
if "offset" in df.columns:
df["offset"] = df["offset"].replace(0, np.nan)
return df
@staticmethod
def _set_epochs(df: pd.DataFrame, epoch_boundaries: pd.DataFrame):
df["epoch"] = pd.NA
# Iterate over each epoch boundary to find matching events
for _, epoch_boundary in epoch_boundaries.iterrows():
pid = epoch_boundary["pid"]
start = epoch_boundary["time_start"]
end = epoch_boundary["time_end"]
# Find rows in the partition that match the pid and fall within the time interval
mask = (df["pid"] == pid) & (df["time_start"] >= start) & (df["time_start"] < end)
# Assign the epoch number to the matching rows
df.loc[mask, "epoch"] = epoch_boundary["epoch"]
return df
@staticmethod
def _fix_file_posix_category(df: pd.DataFrame):
base_condition = df["cat"].str.contains("posix|stdio") & ~df["file_name"].isna()
# Step 1: Map file purpose suffixes first
purpose_updates = {"/data": "_reader", "/checkpoint": "_checkpoint"}
for path, suffix in purpose_updates.items():
mask = base_condition & df["file_name"].str.contains(path)
df.loc[mask, "cat"] = df.loc[mask, "cat"] + suffix
# Step 2: Map filesystem suffixes
filesystem_updates = {"/lustre": "_lustre", "/ssd": "_ssd"}
for path, suffix in filesystem_updates.items():
mask = base_condition & df["file_name"].str.contains(path)
df.loc[mask, "cat"] = df.loc[mask, "cat"] + suffix
return df
python/dftracer/analyzer/dftracer.py:713
- The
_sanitize_size_offsetmethod is defined twice in this file (lines 664-668 and 709-713). Additionally, these two implementations are inconsistent: the first usesnp.nan(line 665, 667) while the second usespd.NA(lines 710, 712). This creates a bug where only the last definition will be used. Please remove the duplicate definition and decide on a consistent approach (eithernp.nanorpd.NA).
def _sanitize_size_offset(df: pd.DataFrame):
df["size"] = df["size"].replace(0, np.nan)
if "offset" in df.columns:
df["offset"] = df["offset"].replace(0, np.nan)
return df
@staticmethod
def _set_epochs(df: pd.DataFrame, epoch_boundaries: pd.DataFrame):
df["epoch"] = pd.NA
# Iterate over each epoch boundary to find matching events
for _, epoch_boundary in epoch_boundaries.iterrows():
pid = epoch_boundary["pid"]
start = epoch_boundary["time_start"]
end = epoch_boundary["time_end"]
# Find rows in the partition that match the pid and fall within the time interval
mask = (df["pid"] == pid) & (df["time_start"] >= start) & (df["time_start"] < end)
# Assign the epoch number to the matching rows
df.loc[mask, "epoch"] = epoch_boundary["epoch"]
return df
@staticmethod
def _fix_file_posix_category(df: pd.DataFrame):
base_condition = df["cat"].str.contains("posix|stdio") & ~df["file_name"].isna()
# Step 1: Map file purpose suffixes first
purpose_updates = {"/data": "_reader", "/checkpoint": "_checkpoint"}
for path, suffix in purpose_updates.items():
mask = base_condition & df["file_name"].str.contains(path)
df.loc[mask, "cat"] = df.loc[mask, "cat"] + suffix
# Step 2: Map filesystem suffixes
filesystem_updates = {"/lustre": "_lustre", "/ssd": "_ssd"}
for path, suffix in filesystem_updates.items():
mask = base_condition & df["file_name"].str.contains(path)
df.loc[mask, "cat"] = df.loc[mask, "cat"] + suffix
return df
@staticmethod
def _sanitize_size_offset(df: pd.DataFrame):
df["size"] = df["size"].replace(0, pd.NA)
if "offset" in df.columns:
df["offset"] = df["offset"].replace(0, pd.NA)
return df
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
This pull request introduces several improvements to the
dftracer.pyanalyzer, focusing on more robust epoch assignment, file category labeling, and data sanitization. The changes enhance how epochs are set for events, expand file category handling, and improve the treatment of size and offset values.Epoch assignment improvements:
_set_epochsmethod to assign epochs based on matching bothpidand time intervals, iterating over each epoch boundary for more precise labeling.File category and data sanitization enhancements:
_fix_file_posix_categorymethod to append purpose-based (e.g.,_reader,_checkpoint) and filesystem-based (e.g.,_lustre,_ssd) suffixes to thecatcolumn for files matching certain patterns._sanitize_size_offsetmethod to replace zero values in thesizeandoffsetcolumns withNaN, improving data quality for downstream analysis.