Conversation
📝 WalkthroughWalkthroughThe PR adds CSV export functionality to dynamic queries by introducing a new HTTP endpoint that executes queries and streams results as a downloadable CSV file, including identical RBAC access controls and caching support to the existing dynamic query execution flow. Changes
Sequence DiagramsequenceDiagram
actor Client
participant Controller as Export<br/>Endpoint
participant Auth as RBAC<br/>Check
participant QuerySvc as Dynamic Query<br/>Service
participant Cache as Cache<br/>Manager
participant CSV as CSV<br/>Converter
Client->>Controller: GET /export (datasource_id, query_id, filter, cache params)
Controller->>Auth: Validate user permissions & filters
Auth-->>Controller: Access granted + RBAC filters
Controller->>Cache: Check cached results
alt Cache Hit & !force_fetch
Cache-->>Controller: Return cached results
else Cache Miss or force_fetch
Controller->>QuerySvc: Execute YAML query with filters
QuerySvc-->>Controller: Return serialized rows
Controller->>Cache: Store results
end
Controller->>CSV: Convert rows to CSV bytes
CSV-->>Controller: CSV bytes (UTF-8 with BOM)
Controller-->>Client: StreamingResponse (text/csv, export_<query_id>.csv)
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~30 minutes Poem
🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 2
🧹 Nitpick comments (1)
wavefront/server/modules/plugins_module/plugins_module/controllers/datasource_controller.py (1)
840-845: Consider true streaming to avoid buffering large exports.
iter([csv_bytes])still builds the entire CSV in memory. A row-yielding generator keeps memory bounded for large datasets.♻️ Suggested change
- return StreamingResponse( - iter([csv_bytes]), + return StreamingResponse( + _iter_csv_rows(rows), media_type='text/csv', headers={ 'Content-Disposition': f'attachment; filename="{filename}"', }, )Add a generator near
_serialized_rows_to_csv:def _iter_csv_rows(rows: list): if not rows: yield b'' return out = io.StringIO() fieldnames = list(rows[0].keys()) for row in rows[1:]: for k in row: if k not in fieldnames: fieldnames.append(k) writer = csv.DictWriter(out, fieldnames=fieldnames, extrasaction='ignore') writer.writeheader() yield out.getvalue().encode('utf-8-sig') out.seek(0); out.truncate(0) for row in rows: writer.writerow({k: _cell_value(row.get(k)) for k in fieldnames}) yield out.getvalue().encode('utf-8') out.seek(0); out.truncate(0)🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@wavefront/server/modules/plugins_module/plugins_module/controllers/datasource_controller.py` around lines 840 - 845, The current StreamingResponse uses iter([csv_bytes]) which buffers the whole CSV; change to a row-yielding generator (e.g., implement a helper like _iter_csv_rows or convert _serialized_rows_to_csv into a generator) that writes the header first (with UTF-8 BOM/utf-8-sig) via csv.DictWriter to an io.StringIO, yields its bytes, then iterates rows yielding each row chunk as bytes (clearing the buffer between yields) so StreamingResponse(streaming_generator, media_type='text/csv', headers={...}) truly streams large exports without building csv_bytes in memory.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In
`@wavefront/server/modules/plugins_module/plugins_module/controllers/datasource_controller.py`:
- Around line 65-72: The CSV writer currently writes untrusted values directly
(via _cell_value and writer.writerow) which can lead to CSV injection; update
_cell_value to, after converting dict/list to JSON and non-string types to str,
detect string values that start with any of the characters '=', '+', '-', '@' or
a tab and sanitize them by prefixing a single quote (') unless they already
start with a single quote or are None; keep the existing behavior for None and
other values and ensure this sanitized value is what writer.writerow receives
for each fieldname when iterating rows.
- Around line 838-839: The code passes the whole dynamic query dict into
_serialized_rows_to_csv, but execute_dynamic_query returns {query_id: {status,
error, description, result: [...]}}, while _serialized_rows_to_csv expects a
list of row dicts; update the logic around serialized_res and csv_bytes to
extract rows = serialized_res[query_id]['result'] (or
serialized_res.get(query_id, {}).get('result')) and pass that list to
_serialized_rows_to_csv, and add a guard to return a clear error or empty CSV if
the expected keys or list are missing/invalid; reference variables/functions
serialized_res, query_id, execute_dynamic_query and _serialized_rows_to_csv when
making the change.
---
Nitpick comments:
In
`@wavefront/server/modules/plugins_module/plugins_module/controllers/datasource_controller.py`:
- Around line 840-845: The current StreamingResponse uses iter([csv_bytes])
which buffers the whole CSV; change to a row-yielding generator (e.g., implement
a helper like _iter_csv_rows or convert _serialized_rows_to_csv into a
generator) that writes the header first (with UTF-8 BOM/utf-8-sig) via
csv.DictWriter to an io.StringIO, yields its bytes, then iterates rows yielding
each row chunk as bytes (clearing the buffer between yields) so
StreamingResponse(streaming_generator, media_type='text/csv', headers={...})
truly streams large exports without building csv_bytes in memory.
| def _cell_value(v): | ||
| if isinstance(v, (dict, list)): | ||
| return json.dumps(v) | ||
| return v if v is None or isinstance(v, str) else str(v) | ||
|
|
||
| writer.writeheader() | ||
| for row in rows: | ||
| writer.writerow({k: _cell_value(row.get(k)) for k in fieldnames}) |
There was a problem hiding this comment.
Escape formula-like values to prevent CSV injection.
Untrusted values starting with =, +, -, @, or tab can be interpreted as formulas by spreadsheet apps. Sanitize before writing to CSV.
🔒 Proposed fix
def _cell_value(v):
- if isinstance(v, (dict, list)):
- return json.dumps(v)
- return v if v is None or isinstance(v, str) else str(v)
+ if isinstance(v, (dict, list)):
+ v = json.dumps(v)
+ elif v is None or isinstance(v, str):
+ v = v
+ else:
+ v = str(v)
+ if isinstance(v, str) and v[:1] in ('=', '+', '-', '@', '\t'):
+ return f"'{v}"
+ return v🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In
`@wavefront/server/modules/plugins_module/plugins_module/controllers/datasource_controller.py`
around lines 65 - 72, The CSV writer currently writes untrusted values directly
(via _cell_value and writer.writerow) which can lead to CSV injection; update
_cell_value to, after converting dict/list to JSON and non-string types to str,
detect string values that start with any of the characters '=', '+', '-', '@' or
a tab and sanitize them by prefixing a single quote (') unless they already
start with a single quote or are None; keep the existing behavior for None and
other values and ensure this sanitized value is what writer.writerow receives
for each fieldname when iterating rows.
| csv_bytes = _serialized_rows_to_csv(serialized_res) | ||
| filename = f'export_{query_id}.csv' |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
# Search for execute_dynamic_query definition
rg -nP --type=py 'def execute_dynamic_query\b' -A 10Repository: rootflo/wavefront
Length of output: 10315
🏁 Script executed:
#!/bin/bash
# Search for QueryResult class definition
rg -nP --type=py 'class QueryResult\b' -A 15Repository: rootflo/wavefront
Length of output: 43
🏁 Script executed:
#!/bin/bash
# Search for _serialized_rows_to_csv function definition
rg -nP --type=py 'def _serialized_rows_to_csv\b' -A 15Repository: rootflo/wavefront
Length of output: 2182
🏁 Script executed:
#!/bin/bash
# Read the context around lines 838-839 in datasource_controller.py
cat -n wavefront/server/modules/plugins_module/plugins_module/controllers/datasource_controller.py | sed -n '820,860p'Repository: rootflo/wavefront
Length of output: 1730
🏁 Script executed:
#!/bin/bash
# Find serialize_values function definition
rg -nP --type=py 'def serialize_values\b' -A 20Repository: rootflo/wavefront
Length of output: 4982
🏁 Script executed:
#!/bin/bash
# Look at actual implementation examples of execute_dynamic_query to see what they return
cat -n wavefront/server/modules/insights_module/insights_module/repository/pvo_repository.py | sed -n '161,220p'Repository: rootflo/wavefront
Length of output: 2548
🏁 Script executed:
#!/bin/bash
# Check if there are any implementations in datasource module that show return values
rg -nP --type=py 'return.*\[.*\]|return.*{.*:' wavefront/server/plugins/datasource/ -A 2 | head -100Repository: rootflo/wavefront
Length of output: 2052
🏁 Script executed:
#!/bin/bash
# Check datasource plugin implementations for execute_dynamic_query actual return values
cat -n wavefront/server/plugins/datasource/datasource/redshift/__init__.py | sed -n '55,150p'Repository: rootflo/wavefront
Length of output: 777
🏁 Script executed:
#!/bin/bash
# Check bigquery implementation
cat -n wavefront/server/plugins/datasource/datasource/bigquery/__init__.py | sed -n '73,200p'Repository: rootflo/wavefront
Length of output: 3579
🏁 Script executed:
#!/bin/bash
# Check the generic datasource implementation
cat -n wavefront/server/plugins/datasource/datasource/__init__.py | sed -n '99,180p'Repository: rootflo/wavefront
Length of output: 1031
🏁 Script executed:
#!/bin/bash
# Check what dynamic_query_service.execute_dynamic_query returns
cat -n wavefront/server/modules/insights_module/insights_module/service/dynamic_query_service.py | sed -n '32,120p'Repository: rootflo/wavefront
Length of output: 3619
🏁 Script executed:
#!/bin/bash
# Check the actual datasource export endpoint to understand the full flow
cat -n wavefront/server/modules/plugins_module/plugins_module/controllers/datasource_controller.py | sed -n '664,720p'Repository: rootflo/wavefront
Length of output: 2711
🏁 Script executed:
#!/bin/bash
# Check what datasource_plugin is and how it's used
rg -nP --type=py 'datasource_plugin\s*=' wavefront/server/modules/plugins_module/plugins_module/controllers/datasource_controller.py -B 5 -A 5Repository: rootflo/wavefront
Length of output: 3841
🏁 Script executed:
#!/bin/bash
# Check all datasource plugin types and their return values more carefully
cat -n wavefront/server/plugins/datasource/datasource/__init__.py | sed -n '1,50p'Repository: rootflo/wavefront
Length of output: 2279
🏁 Script executed:
#!/bin/bash
# Look at what DatasourcePlugin does with different datasource types
fd -t f "DatasourcePlugin" --exec cat {} \; 2>/dev/null | head -100Repository: rootflo/wavefront
Length of output: 43
🏁 Script executed:
#!/bin/bash
# Check the datasource __init__.py for DatasourcePlugin class
rg -nP --type=py 'class DatasourcePlugin' -A 30Repository: rootflo/wavefront
Length of output: 3336
🏁 Script executed:
#!/bin/bash
# Check if there are any tests that show what execute_dynamic_query returns
find . -name "*.py" -path "*/test*" -type f | xargs grep -l "execute_dynamic_query" | head -5Repository: rootflo/wavefront
Length of output: 43
🏁 Script executed:
#!/bin/bash
# Look for actual calls to _serialized_rows_to_csv to see what's passed to it
rg -nP --type=py '_serialized_rows_to_csv\(' -B 10 -A 2Repository: rootflo/wavefront
Length of output: 3345
🏁 Script executed:
#!/bin/bash
# Check what the other execution path returns - the insights module path
cat -n wavefront/server/modules/plugins_module/plugins_module/controllers/datasource_controller.py | sed -n '810,826p'Repository: rootflo/wavefront
Length of output: 795
🏁 Script executed:
#!/bin/bash
# Look at where insights dynamic_query_service.execute_dynamic_query is called
rg -nP --type=py 'dynamic_query_service.execute_dynamic_query' -B 5 -A 10Repository: rootflo/wavefront
Length of output: 2165
🏁 Script executed:
#!/bin/bash
# Let me verify the complete return type from BigQuery by checking the full implementation
cat -n wavefront/server/plugins/datasource/datasource/bigquery/__init__.py | sed -n '127,149p'Repository: rootflo/wavefront
Length of output: 1065
🏁 Script executed:
#!/bin/bash
# Check if there's a 'records' key pattern anywhere or understand the actual structure better
rg -nP --type=py "results\[.*\] = {" -B 2 -A 2Repository: rootflo/wavefront
Length of output: 1144
🏁 Script executed:
#!/bin/bash
# Verify what yaml_query structure is - is it a list of queries?
cat -n wavefront/server/modules/plugins_module/plugins_module/controllers/datasource_controller.py | sed -n '690,710p'Repository: rootflo/wavefront
Length of output: 1092
🏁 Script executed:
#!/bin/bash
# Check if there are actual tests showing the expected behavior
find . -type f -name "*.py" | xargs grep -l "DatasourcePlugin\|datasource_controller" | grep -i testRepository: rootflo/wavefront
Length of output: 43
🏁 Script executed:
#!/bin/bash
# Find get_dynamic_yaml_query to understand yaml_query structure
rg -nP --type=py 'def get_dynamic_yaml_query' -A 30Repository: rootflo/wavefront
Length of output: 4170
🏁 Script executed:
#!/bin/bash
# Check DynamicQuery model to see what's in yaml_query
rg -nP --type=py 'class.*Query.*:' wavefront/server/modules/plugins_module/ | head -20Repository: rootflo/wavefront
Length of output: 409
🏁 Script executed:
#!/bin/bash
# Check if there's validation on the query structure for CSV export
cat -n wavefront/server/modules/plugins_module/plugins_module/controllers/datasource_controller.py | sed -n '750,795p'Repository: rootflo/wavefront
Length of output: 2254
🏁 Script executed:
#!/bin/bash
# Let's see the complete structure of the export endpoint around line 764
cat -n wavefront/server/modules/plugins_module/plugins_module/controllers/datasource_controller.py | sed -n '750,847p'Repository: rootflo/wavefront
Length of output: 4392
🏁 Script executed:
#!/bin/bash
# Final verification - let's trace through what actually gets passed to _serialized_rows_to_csv
# by checking if there's any code between execute_dynamic_query and _serialized_rows_to_csv that unwraps
cat -n wavefront/server/modules/plugins_module/plugins_module/controllers/datasource_controller.py | sed -n '815,846p'Repository: rootflo/wavefront
Length of output: 1409
🏁 Script executed:
#!/bin/bash
# Check if there are example YAML files showing the structure
find . -name "*.yaml" -o -name "*.yml" | xargs grep -l "queries:" | head -3Repository: rootflo/wavefront
Length of output: 43
Normalize dynamic query result structure before CSV conversion.
execute_dynamic_query returns a dict keyed by query ID with structure {query_id: {status, error, description, result: [rows]}}, but _serialized_rows_to_csv expects a list of dicts. The current code passes the dict directly, which will fail at rows[0] (line 58). Extract the rows from the result structure or return an error for unexpected formats.
🧩 Suggested fix
- csv_bytes = _serialized_rows_to_csv(serialized_res)
+ # Extract rows from query result dict
+ rows = []
+ if isinstance(serialized_res, dict):
+ # Handle dict keyed by query_id with result/records subkeys
+ for query_id, result in serialized_res.items():
+ if isinstance(result, dict) and 'result' in result:
+ rows.extend(result['result'] if result['result'] else [])
+ elif isinstance(result, list):
+ rows.extend(result)
+ elif isinstance(serialized_res, list):
+ rows = serialized_res
+
+ if not isinstance(rows, list):
+ return JSONResponse(
+ status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
+ content=response_formatter.buildErrorResponse(
+ 'Unexpected dynamic query result format'
+ ),
+ )
+ csv_bytes = _serialized_rows_to_csv(rows)🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In
`@wavefront/server/modules/plugins_module/plugins_module/controllers/datasource_controller.py`
around lines 838 - 839, The code passes the whole dynamic query dict into
_serialized_rows_to_csv, but execute_dynamic_query returns {query_id: {status,
error, description, result: [...]}}, while _serialized_rows_to_csv expects a
list of row dicts; update the logic around serialized_res and csv_bytes to
extract rows = serialized_res[query_id]['result'] (or
serialized_res.get(query_id, {}).get('result')) and pass that list to
_serialized_rows_to_csv, and add a guard to return a clear error or empty CSV if
the expected keys or list are missing/invalid; reference variables/functions
serialized_res, query_id, execute_dynamic_query and _serialized_rows_to_csv when
making the change.
Summary by CodeRabbit
Release Notes