[WEB-4999] feat: implement flexible data export utility with CSV, JSON, and XLSX support (#7884)

* feat: implement flexible data export utility with CSV, JSON, and XLSX support

- Introduced Exporter class for handling various data formats.
- Added formatters for CSV, JSON, and XLSX exports.
- Created schemas for defining export fields and their transformations.
- Implemented IssueExportSchema for exporting issue data with nested attributes.
- Enhanced issue export task to utilize the new exporter system for better data handling.

* feat: enhance issue export functionality with new relations and context handling

- Updated issue export task to utilize new IssueRelation model for better relationship management.
- Refactored Exporter class to accept QuerySets directly, improving performance and flexibility.
- Enhanced IssueExportSchema to include parent issues and relations in the export.
- Improved documentation for exporting multiple projects and filtering fields during export.

* feat: enhance export functionality with field filtering and context support

- Updated Exporter class to merge fields into options for formatting.
- Modified formatters to filter fields based on specified options.
- Enhanced ExportSchema to support optional field selection during serialization.
- Improved documentation for the serialize method to clarify field filtering capabilities.

* fixed type
This commit is contained in:
Dheeraj Kumar Ketireddy
2025-10-14 15:46:55 +05:30
committed by GitHub
parent 9dc14d8d67
commit 4168127803
9 changed files with 1333 additions and 374 deletions
+43 -374
View File
@@ -1,82 +1,24 @@
# Python imports
import csv
import io
import json
import zipfile
from typing import List
from collections import defaultdict
import boto3
from botocore.client import Config
from uuid import UUID
from datetime import datetime, date
# Third party imports
from celery import shared_task
# Django imports
from django.conf import settings
from django.utils import timezone
from openpyxl import Workbook
from django.db.models import F, Prefetch
from collections import defaultdict
from django.db.models import Prefetch
# Module imports
from plane.db.models import ExporterHistory, Issue, FileAsset, Label, User, IssueComment
from plane.db.models import ExporterHistory, Issue, IssueRelation
from plane.utils.exception_logger import log_exception
def dateTimeConverter(time: datetime) -> str | None:
"""
Convert a datetime object to a formatted string.
"""
if time:
return time.strftime("%a, %d %b %Y %I:%M:%S %Z%z")
def dateConverter(time: date) -> str | None:
"""
Convert a date object to a formatted string.
"""
if time:
return time.strftime("%a, %d %b %Y")
def create_csv_file(data: List[List[str]]) -> str:
"""
Create a CSV file from the provided data.
"""
csv_buffer = io.StringIO()
csv_writer = csv.writer(csv_buffer, delimiter=",", quoting=csv.QUOTE_ALL)
for row in data:
csv_writer.writerow(row)
csv_buffer.seek(0)
return csv_buffer.getvalue()
def create_json_file(data: List[dict]) -> str:
"""
Create a JSON file from the provided data.
"""
return json.dumps(data)
def create_xlsx_file(data: List[List[str]]) -> bytes:
"""
Create an XLSX file from the provided data.
"""
workbook = Workbook()
sheet = workbook.active
for row in data:
sheet.append(row)
xlsx_buffer = io.BytesIO()
workbook.save(xlsx_buffer)
xlsx_buffer.seek(0)
return xlsx_buffer.getvalue()
from plane.utils.exporters import Exporter, IssueExportSchema
def create_zip_file(files: List[tuple[str, str | bytes]]) -> io.BytesIO:
@@ -118,7 +60,9 @@ def upload_to_s3(zip_file: io.BytesIO, workspace_id: UUID, token_id: str, slug:
# Generate presigned url for the uploaded file with different base
presign_s3 = boto3.client(
"s3",
endpoint_url=f"{settings.AWS_S3_URL_PROTOCOL}//{str(settings.AWS_S3_CUSTOM_DOMAIN).replace('/uploads', '')}/", # noqa: E501
endpoint_url=(
f"{settings.AWS_S3_URL_PROTOCOL}//{str(settings.AWS_S3_CUSTOM_DOMAIN).replace('/uploads', '')}/"
),
aws_access_key_id=settings.AWS_ACCESS_KEY_ID,
aws_secret_access_key=settings.AWS_SECRET_ACCESS_KEY,
config=Config(signature_version="s3v4"),
@@ -176,187 +120,6 @@ def upload_to_s3(zip_file: io.BytesIO, workspace_id: UUID, token_id: str, slug:
exporter_instance.save(update_fields=["status", "url", "key"])
def generate_table_row(issue: dict) -> List[str]:
"""
Generate a table row from an issue dictionary.
"""
return [
f"""{issue["project_identifier"]}-{issue["sequence_id"]}""",
issue["project_name"],
issue["name"],
issue["description"],
issue["state_name"],
dateConverter(issue["start_date"]),
dateConverter(issue["target_date"]),
issue["priority"],
issue["created_by"],
", ".join(issue["labels"]) if issue["labels"] else "",
issue["cycle_name"],
issue["cycle_start_date"],
issue["cycle_end_date"],
", ".join(issue.get("module_name", "")) if issue.get("module_name") else "",
dateTimeConverter(issue["created_at"]),
dateTimeConverter(issue["updated_at"]),
dateTimeConverter(issue["completed_at"]),
dateTimeConverter(issue["archived_at"]),
(
", ".join(
[
f"{comment['comment']} ({comment['created_at']} by {comment['created_by']})"
for comment in issue["comments"]
]
)
if issue["comments"]
else ""
),
issue["estimate"] if issue["estimate"] else "",
", ".join(issue["link"]) if issue["link"] else "",
", ".join(issue["assignees"]) if issue["assignees"] else "",
issue["subscribers_count"] if issue["subscribers_count"] else "",
issue["attachment_count"] if issue["attachment_count"] else "",
", ".join(issue["attachment_links"]) if issue["attachment_links"] else "",
]
def generate_json_row(issue: dict) -> dict:
"""
Generate a JSON row from an issue dictionary.
"""
return {
"ID": f"""{issue["project_identifier"]}-{issue["sequence_id"]}""",
"Project": issue["project_name"],
"Name": issue["name"],
"Description": issue["description"],
"State": issue["state_name"],
"Start Date": dateConverter(issue["start_date"]),
"Target Date": dateConverter(issue["target_date"]),
"Priority": issue["priority"],
"Created By": (f"{issue['created_by']}" if issue["created_by"] else ""),
"Assignee": issue["assignees"],
"Labels": issue["labels"],
"Cycle Name": issue["cycle_name"],
"Cycle Start Date": issue["cycle_start_date"],
"Cycle End Date": issue["cycle_end_date"],
"Module Name": issue["module_name"],
"Created At": dateTimeConverter(issue["created_at"]),
"Updated At": dateTimeConverter(issue["updated_at"]),
"Completed At": dateTimeConverter(issue["completed_at"]),
"Archived At": dateTimeConverter(issue["archived_at"]),
"Comments": issue["comments"],
"Estimate": issue["estimate"],
"Link": issue["link"],
"Subscribers Count": issue["subscribers_count"],
"Attachment Count": issue["attachment_count"],
"Attachment Links": issue["attachment_links"],
}
def update_json_row(rows: List[dict], row: dict) -> None:
"""
Update the json row with the new assignee and label.
"""
matched_index = next(
(index for index, existing_row in enumerate(rows) if existing_row["ID"] == row["ID"]),
None,
)
if matched_index is not None:
existing_assignees, existing_labels = (
rows[matched_index]["Assignee"],
rows[matched_index]["Labels"],
)
assignee, label = row["Assignee"], row["Labels"]
if assignee is not None and (existing_assignees is None or label not in existing_assignees):
rows[matched_index]["Assignee"] += f", {assignee}"
if label is not None and (existing_labels is None or label not in existing_labels):
rows[matched_index]["Labels"] += f", {label}"
else:
rows.append(row)
def update_table_row(rows: List[List[str]], row: List[str]) -> None:
"""
Update the table row with the new assignee and label.
"""
matched_index = next(
(index for index, existing_row in enumerate(rows) if existing_row[0] == row[0]),
None,
)
if matched_index is not None:
existing_assignees, existing_labels = rows[matched_index][7:9]
assignee, label = row[7:9]
if assignee is not None and (existing_assignees is None or label not in existing_assignees):
rows[matched_index][8] += f", {assignee}"
if label is not None and (existing_labels is None or label not in existing_labels):
rows[matched_index][8] += f", {label}"
else:
rows.append(row)
def generate_csv(
header: List[str],
project_id: str,
issues: List[dict],
files: List[tuple[str, str | bytes]],
) -> None:
"""
Generate CSV export for all the passed issues.
"""
rows = [header]
for issue in issues:
row = generate_table_row(issue)
update_table_row(rows, row)
csv_file = create_csv_file(rows)
files.append((f"{project_id}.csv", csv_file))
def generate_json(
header: List[str],
project_id: str,
issues: List[dict],
files: List[tuple[str, str | bytes]],
) -> None:
"""
Generate JSON export for all the passed issues.
"""
rows = []
for issue in issues:
row = generate_json_row(issue)
update_json_row(rows, row)
json_file = create_json_file(rows)
files.append((f"{project_id}.json", json_file))
def generate_xlsx(
header: List[str],
project_id: str,
issues: List[dict],
files: List[tuple[str, str | bytes]],
) -> None:
"""
Generate XLSX export for all the passed issues.
"""
rows = [header]
for issue in issues:
row = generate_table_row(issue)
update_table_row(rows, row)
xlsx_file = create_xlsx_file(rows)
files.append((f"{project_id}.xlsx", xlsx_file))
def get_created_by(obj: Issue | IssueComment) -> str:
"""
Get the created by user for the given object.
"""
if obj.created_by:
return f"{obj.created_by.first_name} {obj.created_by.last_name}"
return ""
@shared_task
def issue_export_task(
provider: str,
@@ -377,7 +140,7 @@ def issue_export_task(
exporter_instance.status = "processing"
exporter_instance.save(update_fields=["status"])
# Base query to get the issues
# Build base queryset for issues
workspace_issues = (
Issue.objects.filter(
workspace__id=workspace_id,
@@ -390,7 +153,6 @@ def issue_export_task(
"project",
"workspace",
"state",
"parent",
"created_by",
"estimate_point",
)
@@ -400,144 +162,51 @@ def issue_export_task(
"issue_module__module",
"issue_comments",
"assignees",
Prefetch(
"assignees",
queryset=User.objects.only("first_name", "last_name").distinct(),
to_attr="assignee_details",
),
Prefetch(
"labels",
queryset=Label.objects.only("name").distinct(),
to_attr="label_details",
),
"issue_subscribers",
"issue_link",
Prefetch(
"issue_relation",
queryset=IssueRelation.objects.select_related("related_issue", "related_issue__project"),
),
Prefetch(
"issue_related",
queryset=IssueRelation.objects.select_related("issue", "issue__project"),
),
Prefetch(
"parent",
queryset=Issue.objects.select_related("type", "project"),
),
)
)
# Get the attachments for the issues
file_assets = FileAsset.objects.filter(
issue_id__in=workspace_issues.values_list("id", flat=True),
entity_type=FileAsset.EntityTypeContext.ISSUE_ATTACHMENT,
).annotate(work_item_id=F("issue_id"), asset_id=F("id"))
# Create a dictionary to store the attachments for the issues
attachment_dict = defaultdict(list)
for asset in file_assets:
attachment_dict[asset.work_item_id].append(asset.asset_id)
# Create a list to store the issues data
issues_data = []
# Iterate over the issues
for issue in workspace_issues:
attachments = attachment_dict.get(issue.id, [])
issue_data = {
"id": issue.id,
"project_identifier": issue.project.identifier,
"project_name": issue.project.name,
"project_id": issue.project.id,
"sequence_id": issue.sequence_id,
"name": issue.name,
"description": issue.description_stripped,
"priority": issue.priority,
"start_date": issue.start_date,
"target_date": issue.target_date,
"state_name": issue.state.name if issue.state else None,
"created_at": issue.created_at,
"updated_at": issue.updated_at,
"completed_at": issue.completed_at,
"archived_at": issue.archived_at,
"module_name": [module.module.name for module in issue.issue_module.all()],
"created_by": get_created_by(issue),
"labels": [label.name for label in issue.label_details],
"comments": [
{
"comment": comment.comment_stripped,
"created_at": dateConverter(comment.created_at),
"created_by": get_created_by(comment),
}
for comment in issue.issue_comments.all()
],
"estimate": issue.estimate_point.value if issue.estimate_point and issue.estimate_point.value else "",
"link": [link.url for link in issue.issue_link.all()],
"assignees": [f"{assignee.first_name} {assignee.last_name}" for assignee in issue.assignee_details],
"subscribers_count": issue.issue_subscribers.count(),
"attachment_count": len(attachments),
"attachment_links": [
f"/api/assets/v2/workspaces/{issue.workspace.slug}/projects/{issue.project_id}/issues/{issue.id}/attachments/{asset}/"
for asset in attachments
],
}
# Get Cycles data for the issue
cycle = issue.issue_cycle.last()
if cycle:
# Update cycle data
issue_data["cycle_name"] = cycle.cycle.name
issue_data["cycle_start_date"] = dateConverter(cycle.cycle.start_date)
issue_data["cycle_end_date"] = dateConverter(cycle.cycle.end_date)
else:
issue_data["cycle_name"] = ""
issue_data["cycle_start_date"] = ""
issue_data["cycle_end_date"] = ""
issues_data.append(issue_data)
# CSV header
header = [
"ID",
"Project",
"Name",
"Description",
"State",
"Start Date",
"Target Date",
"Priority",
"Created By",
"Labels",
"Cycle Name",
"Cycle Start Date",
"Cycle End Date",
"Module Name",
"Created At",
"Updated At",
"Completed At",
"Archived At",
"Comments",
"Estimate",
"Link",
"Assignees",
"Subscribers Count",
"Attachment Count",
"Attachment Links",
]
# Map the provider to the function
EXPORTER_MAPPER = {
"csv": generate_csv,
"json": generate_json,
"xlsx": generate_xlsx,
}
# Create exporter for the specified format
try:
exporter = Exporter(
format_type=provider,
schema_class=IssueExportSchema,
options={"list_joiner": ", "},
)
except ValueError as e:
# Invalid format type
exporter_instance = ExporterHistory.objects.get(token=token_id)
exporter_instance.status = "failed"
exporter_instance.reason = str(e)
exporter_instance.save(update_fields=["status", "reason"])
return
files = []
if multiple:
project_dict = defaultdict(list)
for issue in issues_data:
project_dict[str(issue["project_id"])].append(issue)
# Export each project separately with its own queryset
for project_id in project_ids:
issues = project_dict.get(str(project_id), [])
exporter = EXPORTER_MAPPER.get(provider)
if exporter is not None:
exporter(header, project_id, issues, files)
project_issues = workspace_issues.filter(project_id=project_id)
export_filename = f"{slug}-{project_id}"
filename, content = exporter.export(export_filename, project_issues)
files.append((filename, content))
else:
exporter = EXPORTER_MAPPER.get(provider)
if exporter is not None:
exporter(header, workspace_id, issues_data, files)
# Export all issues in a single file
export_filename = f"{slug}-{workspace_id}"
filename, content = exporter.export(export_filename, workspace_issues)
files.append((filename, content))
zip_buffer = create_zip_file(files)
upload_to_s3(zip_buffer, workspace_id, token_id, slug)
+15
View File
@@ -273,6 +273,21 @@ class IssueRelationChoices(models.TextChoices):
IMPLEMENTED_BY = "implemented_by", "Implemented By"
# Bidirectional relation pairs: (forward, reverse)
# Defined after class to avoid enum metaclass conflicts
IssueRelationChoices._RELATION_PAIRS = (
("blocked_by", "blocking"),
("relates_to", "relates_to"), # symmetric
("duplicate", "duplicate"), # symmetric
("start_before", "start_after"),
("finish_before", "finish_after"),
("implemented_by", "implements"),
)
# Generate reverse mapping from pairs
IssueRelationChoices._REVERSE_MAPPING = {forward: reverse for forward, reverse in IssueRelationChoices._RELATION_PAIRS}
class IssueRelation(ProjectBaseModel):
issue = models.ForeignKey(Issue, related_name="issue_relation", on_delete=models.CASCADE)
related_issue = models.ForeignKey(Issue, related_name="issue_related", on_delete=models.CASCADE)
+496
View File
@@ -0,0 +1,496 @@
# 📊 Exporters
A flexible and extensible data export utility for exporting Django model data in multiple formats (CSV, JSON, XLSX).
## 🎯 Overview
The exporters module provides a schema-based approach to exporting data with support for:
- **📄 Multiple formats**: CSV, JSON, and XLSX (Excel)
- **🔒 Type-safe field definitions**: StringField, NumberField, DateField, DateTimeField, BooleanField, ListField, JSONField
- **⚡ Custom transformations**: Field-level transformations and custom preparer methods
- **🔗 Dotted path notation**: Easy access to nested attributes and related models
- **🎨 Format-specific handling**: Automatic formatting based on export format (e.g., lists as arrays in JSON, comma-separated in CSV)
## 🚀 Quick Start
### Basic Usage
```python
from plane.utils.exporters import Exporter, ExportSchema, StringField, NumberField
# Define a schema
class UserExportSchema(ExportSchema):
name = StringField(source="username", label="User Name")
email = StringField(source="email", label="Email Address")
posts_count = NumberField(label="Total Posts")
def prepare_posts_count(self, obj):
return obj.posts.count()
# Export data - just pass the queryset!
users = User.objects.all()
exporter = Exporter(format_type="csv", schema_class=UserExportSchema)
filename, content = exporter.export("users_export", users)
```
### Exporting Issues
```python
from plane.utils.exporters import Exporter, IssueExportSchema
# Get issues with prefetched relations
issues = Issue.objects.filter(project_id=project_id).prefetch_related(
'assignee_details',
'label_details',
'issue_module',
# ... other relations
)
# Export as XLSX - pass the queryset directly!
exporter = Exporter(format_type="xlsx", schema_class=IssueExportSchema)
filename, content = exporter.export("issues", issues)
# Export with custom fields only
exporter = Exporter(format_type="json", schema_class=IssueExportSchema)
filename, content = exporter.export("issues_filtered", issues, fields=["id", "name", "state_name", "assignees"])
```
### Exporting Multiple Projects Separately
```python
# Export each project to a separate file
for project_id in project_ids:
project_issues = issues.filter(project_id=project_id)
exporter = Exporter(format_type="csv", schema_class=IssueExportSchema)
filename, content = exporter.export(f"issues-{project_id}", project_issues)
# Save or upload the file
```
## 📝 Schema Definition
### Field Types
#### 📝 StringField
Converts values to strings.
```python
name = StringField(source="name", label="Name", default="N/A")
```
#### 🔢 NumberField
Handles numeric values (int, float).
```python
count = NumberField(source="items_count", label="Count", default=0)
```
#### 📅 DateField
Formats date objects as `%a, %d %b %Y` (e.g., "Mon, 01 Jan 2024").
```python
start_date = DateField(source="start_date", label="Start Date")
```
#### ⏰ DateTimeField
Formats datetime objects as `%a, %d %b %Y %I:%M:%S %Z%z`.
```python
created_at = DateTimeField(source="created_at", label="Created At")
```
#### ✅ BooleanField
Converts values to boolean.
```python
is_active = BooleanField(source="is_active", label="Active", default=False)
```
#### 📋 ListField
Handles list/array values. In CSV/XLSX, lists are joined with a separator (default: `", "`). In JSON, they remain as arrays.
```python
tags = ListField(source="tags", label="Tags")
assignees = ListField(label="Assignees") # Custom preparer can populate this
```
#### 🗂️ JSONField
Handles complex JSON-serializable objects (dicts, lists of dicts). In CSV/XLSX, they're serialized as JSON strings. In JSON, they remain as objects.
```python
metadata = JSONField(source="metadata", label="Metadata")
comments = JSONField(label="Comments")
```
### ⚙️ Field Parameters
All field types support these parameters:
- **`source`**: Dotted path string to the attribute (e.g., `"project.name"`)
- **`default`**: Default value when field is None
- **`label`**: Display name in export headers
### 🔗 Dotted Path Notation
Access nested attributes using dot notation:
```python
project_name = StringField(source="project.name", label="Project")
owner_email = StringField(source="created_by.email", label="Owner Email")
```
### 🎯 Custom Preparers
For complex logic, define `prepare_{field_name}` methods:
```python
class MySchema(ExportSchema):
assignees = ListField(label="Assignees")
def prepare_assignees(self, obj):
return [f"{u.first_name} {u.last_name}" for u in obj.assignee_details]
```
Preparers take precedence over field definitions.
### ⚡ Custom Transformations with Preparer Methods
For any custom logic or transformations, use `prepare_<field_name>` methods:
```python
class MySchema(ExportSchema):
name = StringField(source="name", label="Name (Uppercase)")
status = StringField(label="Status")
def prepare_name(self, obj):
"""Transform the name field to uppercase."""
return obj.name.upper() if obj.name else ""
def prepare_status(self, obj):
"""Compute status based on model state."""
return "Active" if obj.is_active else "Inactive"
```
## 📦 Export Formats
### 📊 CSV Format
- Fields are quoted with `QUOTE_ALL`
- Lists are joined with `", "` (customizable with `list_joiner` option)
- JSON objects are serialized as JSON strings
- File extension: `.csv`
```python
exporter = Exporter(
format_type="csv",
schema_class=MySchema,
options={"list_joiner": "; "} # Custom separator
)
```
### 📋 JSON Format
- Lists remain as arrays
- Objects remain as nested structures
- Preserves data types
- File extension: `.json`
```python
exporter = Exporter(format_type="json", schema_class=MySchema)
filename, content = exporter.export("data", records)
# content is a JSON string: '[{"field": "value"}, ...]'
```
### 📗 XLSX Format
- Creates Excel-compatible files using openpyxl
- Lists are joined with `", "` (customizable with `list_joiner` option)
- JSON objects are serialized as JSON strings
- File extension: `.xlsx`
- Returns binary content (bytes)
```python
exporter = Exporter(format_type="xlsx", schema_class=MySchema)
filename, content = exporter.export("data", records)
# content is bytes
```
## 🔧 Advanced Usage
### 📦 Using Context for Pre-fetched Data
Pass context data to schemas to avoid N+1 queries. Override `get_context_data()` in your schema:
```python
class MySchema(ExportSchema):
attachment_count = NumberField(label="Attachments")
def prepare_attachment_count(self, obj):
attachments_dict = self.context.get("attachments_dict", {})
return len(attachments_dict.get(obj.id, []))
@classmethod
def get_context_data(cls, queryset):
"""Pre-fetch all attachments in one query."""
attachments_dict = get_attachments_dict(queryset)
return {"attachments_dict": attachments_dict}
# The Exporter automatically uses get_context_data() when serializing
queryset = MyModel.objects.all()
exporter = Exporter(format_type="csv", schema_class=MySchema)
filename, content = exporter.export("data", queryset)
```
### 🔌 Registering Custom Formatters
Add support for new export formats:
```python
from plane.utils.exporters import Exporter, BaseFormatter
class XMLFormatter(BaseFormatter):
def format(self, filename, records, schema_class, options=None):
# Implementation
return (f"{filename}.xml", xml_content)
# Register the formatter
Exporter.register_formatter("xml", XMLFormatter)
# Use it
exporter = Exporter(format_type="xml", schema_class=MySchema)
```
### ✅ Checking Available Formats
```python
formats = Exporter.get_available_formats()
# Returns: ['csv', 'json', 'xlsx']
```
### 🔍 Filtering Fields
Pass a `fields` parameter to export only specific fields:
```python
# Export only specific fields
exporter = Exporter(format_type="csv", schema_class=MySchema)
filename, content = exporter.export(
"filtered_data",
queryset,
fields=["id", "name", "email"]
)
```
### 🎯 Extending Schemas
Create extended schemas by inheriting from existing ones and overriding `get_context_data()`:
```python
class ExtendedIssueExportSchema(IssueExportSchema):
custom_field = JSONField(label="Custom Data")
def prepare_custom_field(self, obj):
# Use pre-fetched data from context
return self.context.get("custom_data", {}).get(obj.id, {})
@classmethod
def get_context_data(cls, queryset):
# Get parent context (attachments, etc.)
context = super().get_context_data(queryset)
# Add your custom pre-fetched data
context["custom_data"] = fetch_custom_data(queryset)
return context
```
### 💾 Manual Serialization
If you need to serialize data without exporting, you can use the schema directly:
```python
# Serialize a queryset to a list of dicts
data = MySchema.serialize_queryset(queryset, fields=["id", "name"])
# Or serialize a single object
schema = MySchema()
obj_data = schema.serialize(obj)
```
## 💡 Example: IssueExportSchema
The `IssueExportSchema` demonstrates a complete implementation:
```python
from plane.utils.exporters import Exporter, IssueExportSchema
# Simple export - just pass the queryset!
issues = Issue.objects.filter(project_id=project_id)
exporter = Exporter(format_type="csv", schema_class=IssueExportSchema)
filename, content = exporter.export("issues", issues)
# Export specific fields only
filename, content = exporter.export(
"issues_filtered",
issues,
fields=["id", "name", "state_name", "assignees", "labels"]
)
# Export multiple projects to separate files
for project_id in project_ids:
project_issues = issues.filter(project_id=project_id)
filename, content = exporter.export(f"issues-{project_id}", project_issues)
# Save or upload each file
```
Key features:
- 🔗 Access to related models via dotted paths
- 🎯 Custom preparers for complex fields
- 📎 Context-based attachment handling via `get_context_data()`
- 📋 List and JSON field handling
- 📅 Date/datetime formatting
## ✨ Best Practices
1. **🚄 Avoid N+1 Queries**: Override `get_context_data()` to pre-fetch related data:
```python
@classmethod
def get_context_data(cls, queryset):
return {
"attachments": get_attachments_dict(queryset),
"comments": get_comments_dict(queryset),
}
```
2. **🏷️ Use Labels**: Provide descriptive labels for better export headers:
```python
created_at = DateTimeField(source="created_at", label="Created At")
```
3. **🛡️ Handle None Values**: Set appropriate defaults for fields that might be None:
```python
count = NumberField(source="count", default=0)
```
4. **🎯 Use Preparers for Complex Logic**: Keep field definitions simple and use preparers for complex transformations:
```python
def prepare_assignees(self, obj):
return [f"{u.first_name} {u.last_name}" for u in obj.assignee_details]
```
5. **⚡ Pass QuerySets Directly**: Let the Exporter handle serialization:
```python
# Good - Exporter handles serialization
exporter.export("data", queryset)
# Avoid - Manual serialization unless needed
data = MySchema.serialize_queryset(queryset)
exporter.export("data", data)
```
6. **📦 Filter QuerySets, Not Data**: For multiple exports, filter the queryset instead of the serialized data:
```python
# Good - efficient, only serializes what's needed
for project_id in project_ids:
project_issues = issues.filter(project_id=project_id)
exporter.export(f"project-{project_id}", project_issues)
# Avoid - serializes all data upfront
all_data = MySchema.serialize_queryset(issues)
for project_id in project_ids:
project_data = [d for d in all_data if d['project_id'] == project_id]
exporter.export(f"project-{project_id}", project_data)
```
## 📚 API Reference
### 📊 Exporter
**`__init__(format_type, schema_class, options=None)`**
- `format_type`: Export format ('csv', 'json', 'xlsx')
- `schema_class`: Schema class defining fields
- `options`: Optional dict of format-specific options
**`export(filename, data, fields=None)`**
- `filename`: Filename without extension
- `data`: Django QuerySet or list of dicts
- `fields`: Optional list of field names to include
- Returns: `(filename_with_extension, content)`
- `content` is str for CSV/JSON, bytes for XLSX
**`get_available_formats()`** (class method)
- Returns: List of available format types
**`register_formatter(format_type, formatter_class)`** (class method)
- Register a custom formatter
### 📝 ExportSchema
**`__init__(context=None)`**
- `context`: Optional dict accessible in preparer methods via `self.context` for pre-fetched data
**`serialize(obj, fields=None)`**
- Returns: Dict of serialized field values for a single object
**`serialize_queryset(queryset, fields=None)`** (class method)
- `queryset`: QuerySet of objects to serialize
- `fields`: Optional list of field names to include
- Returns: List of dicts with serialized data
**`get_context_data(queryset)`** (class method)
- Override to pre-fetch related data for the queryset
- Returns: Dict of context data
### 🔧 ExportField
Base class for all field types. Subclass to create custom field types.
**`get_value(obj, context)`**
- Returns: Formatted value for the field
**`_format_value(raw)`**
- Override in subclasses for type-specific formatting
## 🧪 Testing
```python
# Test exporting a queryset
queryset = MyModel.objects.all()
exporter = Exporter(format_type="json", schema_class=MySchema)
filename, content = exporter.export("test", queryset)
assert filename == "test.json"
assert isinstance(content, str)
# Test with field filtering
filename, content = exporter.export("test", queryset, fields=["id", "name"])
data = json.loads(content)
assert all(set(item.keys()) == {"id", "name"} for item in data)
# Test manual serialization
data = MySchema.serialize_queryset(queryset)
assert len(data) == queryset.count()
```
@@ -0,0 +1,38 @@
"""Export utilities for various data formats."""
from .exporter import Exporter
from .formatters import BaseFormatter, CSVFormatter, JSONFormatter, XLSXFormatter
from .schemas import (
BooleanField,
DateField,
DateTimeField,
ExportField,
ExportSchema,
IssueExportSchema,
JSONField,
ListField,
NumberField,
StringField,
)
__all__ = [
# Core Exporter
"Exporter",
# Schemas
"ExportSchema",
"ExportField",
"StringField",
"NumberField",
"DateField",
"DateTimeField",
"BooleanField",
"ListField",
"JSONField",
# Formatters
"BaseFormatter",
"CSVFormatter",
"JSONFormatter",
"XLSXFormatter",
# Issue Schema
"IssueExportSchema",
]
@@ -0,0 +1,72 @@
from typing import Any, Dict, List, Type, Union
from django.db.models import QuerySet
from .formatters import CSVFormatter, JSONFormatter, XLSXFormatter
class Exporter:
"""Generic exporter class that handles data exports using different formatters."""
# Available formatters
FORMATTERS = {
"csv": CSVFormatter,
"json": JSONFormatter,
"xlsx": XLSXFormatter,
}
def __init__(self, format_type: str, schema_class: Type, options: Dict[str, Any] = None):
"""Initialize exporter with specified format type and schema.
Args:
format_type: The export format (csv, json, xlsx)
schema_class: The schema class to use for field definitions
options: Optional formatting options
"""
if format_type not in self.FORMATTERS:
raise ValueError(f"Unsupported format: {format_type}. Available: {list(self.FORMATTERS.keys())}")
self.format_type = format_type
self.schema_class = schema_class
self.formatter = self.FORMATTERS[format_type]()
self.options = options or {}
def export(
self,
filename: str,
data: Union[QuerySet, List[dict]],
fields: List[str] = None,
) -> tuple[str, str | bytes]:
"""Export data using the configured formatter and return (filename, content).
Args:
filename: The filename for the export (without extension)
data: Either a Django QuerySet or a list of already-serialized dicts
fields: Optional list of field names to include in export
Returns:
Tuple of (filename_with_extension, content)
"""
# Serialize the queryset if needed
if isinstance(data, QuerySet):
records = self.schema_class.serialize_queryset(data, fields=fields)
else:
# Already serialized data
records = data
# Merge fields into options for the formatter
format_options = {**self.options}
if fields:
format_options["fields"] = fields
return self.formatter.format(filename, records, self.schema_class, format_options)
@classmethod
def get_available_formats(cls) -> List[str]:
"""Get list of available export formats."""
return list(cls.FORMATTERS.keys())
@classmethod
def register_formatter(cls, format_type: str, formatter_class: type) -> None:
"""Register a new formatter for a format type."""
cls.FORMATTERS[format_type] = formatter_class
@@ -0,0 +1,199 @@
import csv
import io
import json
from typing import Any, Dict, List, Type
from openpyxl import Workbook
class BaseFormatter:
"""Base class for export formatters."""
def format(
self,
filename: str,
records: List[dict],
schema_class: Type,
options: Dict[str, Any] | None = None,
) -> tuple[str, str | bytes]:
"""Format records for export.
Args:
filename: The filename for the export (without extension)
records: List of records to export
schema_class: Schema class to extract field order and labels
options: Optional formatting options
Returns:
Tuple of (filename_with_extension, content)
"""
raise NotImplementedError
@staticmethod
def _get_field_info(schema_class: Type) -> tuple[List[str], Dict[str, str]]:
"""Extract field order and labels from schema.
Args:
schema_class: Schema class with field definitions
Returns:
Tuple of (field_order, field_labels)
"""
if not hasattr(schema_class, "_declared_fields"):
raise ValueError(f"Schema class {schema_class.__name__} must have _declared_fields attribute")
# Get order and labels from schema
field_order = list(schema_class._declared_fields.keys())
field_labels = {
name: field.label if field.label else name.replace("_", " ").title()
for name, field in schema_class._declared_fields.items()
}
return field_order, field_labels
class CSVFormatter(BaseFormatter):
"""Formatter for CSV exports."""
@staticmethod
def _format_field_value(value: Any, list_joiner: str = ", ") -> str:
"""Format a field value for CSV output."""
if value is None:
return ""
if isinstance(value, list):
return list_joiner.join(str(v) for v in value)
if isinstance(value, dict):
# For complex objects, serialize as JSON
return json.dumps(value)
return str(value)
def _generate_table_row(
self, record: dict, field_order: List[str], options: Dict[str, Any] | None = None
) -> List[str]:
"""Generate a CSV row from a record."""
opts = options or {}
list_joiner = opts.get("list_joiner", ", ")
return [self._format_field_value(record.get(field, ""), list_joiner) for field in field_order]
def _create_csv_file(self, data: List[List[str]]) -> str:
"""Create CSV file content from row data."""
buf = io.StringIO()
writer = csv.writer(buf, delimiter=",", quoting=csv.QUOTE_ALL)
for row in data:
writer.writerow(row)
buf.seek(0)
return buf.getvalue()
def format(self, filename, records, schema_class, options: Dict[str, Any] | None = None) -> tuple[str, str]:
if not records:
return (f"{filename}.csv", "")
# Get field order and labels from schema
field_order, field_labels = self._get_field_info(schema_class)
# Filter to requested fields if specified
opts = options or {}
requested_fields = opts.get("fields")
if requested_fields:
field_order = [f for f in field_order if f in requested_fields]
header = [field_labels[field] for field in field_order]
rows = [header]
for record in records:
row = self._generate_table_row(record, field_order, options)
rows.append(row)
content = self._create_csv_file(rows)
return (f"{filename}.csv", content)
class JSONFormatter(BaseFormatter):
"""Formatter for JSON exports."""
def _generate_json_row(
self, record: dict, field_labels: Dict[str, str], field_order: List[str], options: Dict[str, Any] | None = None
) -> dict:
"""Generate a JSON object from a record.
Preserves data types - lists stay as arrays, dicts stay as objects.
"""
return {field_labels[field]: record.get(field) for field in field_order if field in record}
def format(self, filename, records, schema_class, options: Dict[str, Any] | None = None) -> tuple[str, str]:
if not records:
return (f"{filename}.json", "[]")
# Get field order and labels from schema
field_order, field_labels = self._get_field_info(schema_class)
# Filter to requested fields if specified
opts = options or {}
requested_fields = opts.get("fields")
if requested_fields:
field_order = [f for f in field_order if f in requested_fields]
rows: List[dict] = []
for record in records:
row = self._generate_json_row(record, field_labels, field_order, options)
rows.append(row)
content = json.dumps(rows)
return (f"{filename}.json", content)
class XLSXFormatter(BaseFormatter):
"""Formatter for XLSX (Excel) exports."""
@staticmethod
def _format_field_value(value: Any, list_joiner: str = ", ") -> str:
"""Format a field value for XLSX output."""
if value is None:
return ""
if isinstance(value, list):
return list_joiner.join(str(v) for v in value)
if isinstance(value, dict):
# For complex objects, serialize as JSON
return json.dumps(value)
return str(value)
def _generate_table_row(
self, record: dict, field_order: List[str], options: Dict[str, Any] | None = None
) -> List[str]:
"""Generate an XLSX row from a record."""
opts = options or {}
list_joiner = opts.get("list_joiner", ", ")
return [self._format_field_value(record.get(field, ""), list_joiner) for field in field_order]
def _create_xlsx_file(self, data: List[List[str]]) -> bytes:
"""Create XLSX file content from row data."""
wb = Workbook()
sh = wb.active
for row in data:
sh.append(row)
out = io.BytesIO()
wb.save(out)
out.seek(0)
return out.getvalue()
def format(self, filename, records, schema_class, options: Dict[str, Any] | None = None) -> tuple[str, bytes]:
if not records:
# Create empty workbook
content = self._create_xlsx_file([])
return (f"{filename}.xlsx", content)
# Get field order and labels from schema
field_order, field_labels = self._get_field_info(schema_class)
# Filter to requested fields if specified
opts = options or {}
requested_fields = opts.get("fields")
if requested_fields:
field_order = [f for f in field_order if f in requested_fields]
header = [field_labels[field] for field in field_order]
rows = [header]
for record in records:
row = self._generate_table_row(record, field_order, options)
rows.append(row)
content = self._create_xlsx_file(rows)
return (f"{filename}.xlsx", content)
@@ -0,0 +1,30 @@
"""Export schemas for various data types."""
from .base import (
BooleanField,
DateField,
DateTimeField,
ExportField,
ExportSchema,
JSONField,
ListField,
NumberField,
StringField,
)
from .issue import IssueExportSchema
__all__ = [
# Base field types
"ExportField",
"StringField",
"NumberField",
"DateField",
"DateTimeField",
"BooleanField",
"ListField",
"JSONField",
# Base schema
"ExportSchema",
# Issue schema
"IssueExportSchema",
]
@@ -0,0 +1,234 @@
from dataclasses import dataclass, field
from typing import Any, Dict, List, Optional
from django.db.models import QuerySet
@dataclass
class ExportField:
"""Base export field class for generic fields."""
source: Optional[str] = None
default: Any = ""
label: Optional[str] = None # Display name for export headers
def get_value(self, obj: Any, context: Dict[str, Any]) -> Any:
raw: Any
if self.source:
raw = self._resolve_dotted_path(obj, self.source)
else:
raw = obj
return self._format_value(raw)
def _format_value(self, raw: Any) -> Any:
"""Format the raw value. Override in subclasses for type-specific formatting."""
return raw if raw is not None else self.default
def _resolve_dotted_path(self, obj: Any, path: str) -> Any:
current = obj
for part in path.split("."):
if current is None:
return None
if hasattr(current, part):
current = getattr(current, part)
elif isinstance(current, dict):
current = current.get(part)
else:
return None
return current
@dataclass
class StringField(ExportField):
"""Export field for string values."""
default: str = ""
def _format_value(self, raw: Any) -> str:
if raw is None:
return self.default
return str(raw)
@dataclass
class DateField(ExportField):
"""Export field for date values with automatic conversion."""
default: str = ""
def _format_value(self, raw: Any) -> str:
if raw is None:
return self.default
# Convert date to formatted string
if hasattr(raw, "strftime"):
return raw.strftime("%a, %d %b %Y")
return str(raw)
@dataclass
class DateTimeField(ExportField):
"""Export field for datetime values with automatic conversion."""
default: str = ""
def _format_value(self, raw: Any) -> str:
if raw is None:
return self.default
# Convert datetime to formatted string
if hasattr(raw, "strftime"):
return raw.strftime("%a, %d %b %Y %I:%M:%S %Z%z")
return str(raw)
@dataclass
class NumberField(ExportField):
"""Export field for numeric values."""
default: Any = ""
def _format_value(self, raw: Any) -> Any:
if raw is None:
return self.default
return raw
@dataclass
class BooleanField(ExportField):
"""Export field for boolean values."""
default: bool = False
def _format_value(self, raw: Any) -> bool:
if raw is None:
return self.default
return bool(raw)
@dataclass
class ListField(ExportField):
"""Export field for list/array values.
Returns the list as-is by default. The formatter will handle conversion to strings
when needed (e.g., CSV/XLSX will join with separator, JSON will keep as array).
"""
default: Optional[List] = field(default_factory=list)
def _format_value(self, raw: Any) -> List[Any]:
if raw is None:
return self.default if self.default is not None else []
if isinstance(raw, (list, tuple)):
return list(raw)
return [raw] # Wrap single items in a list
@dataclass
class JSONField(ExportField):
"""Export field for complex JSON-serializable values (dicts, lists of dicts, etc).
Preserves the structure as-is for JSON exports. For CSV/XLSX, the formatter
will handle serialization (e.g., JSON stringify).
"""
default: Any = field(default_factory=dict)
def _format_value(self, raw: Any) -> Any:
if raw is None:
return self.default
# Return as-is - should be JSON-serializable
return raw
class ExportSchemaMeta(type):
def __new__(mcls, name, bases, attrs):
declared: Dict[str, ExportField] = {
key: value for key, value in list(attrs.items()) if isinstance(value, ExportField)
}
for key in declared.keys():
attrs.pop(key)
cls = super().__new__(mcls, name, bases, attrs)
base_fields: Dict[str, ExportField] = {}
for base in bases:
if hasattr(base, "_declared_fields"):
base_fields.update(base._declared_fields)
base_fields.update(declared)
cls._declared_fields = base_fields
return cls
class ExportSchema(metaclass=ExportSchemaMeta):
"""Base schema for exporting data in various formats.
Subclasses should define fields as class attributes and can override:
- prepare_<field_name> methods for custom field serialization
- get_context_data() class method to pre-fetch related data for the queryset
"""
def __init__(self, context: Optional[Dict[str, Any]] = None) -> None:
self.context = context or {}
def serialize(self, obj: Any, fields: Optional[List[str]] = None) -> Dict[str, Any]:
"""Serialize a single object.
Args:
obj: The object to serialize
fields: Optional list of field names to include. If None, all fields are serialized.
Returns:
Dictionary of serialized data
"""
output: Dict[str, Any] = {}
# Determine which fields to process
fields_to_process = fields if fields else list(self._declared_fields.keys())
for field_name in fields_to_process:
# Skip if field doesn't exist in schema
if field_name not in self._declared_fields:
continue
export_field = self._declared_fields[field_name]
# Prefer explicit preparer methods if present
preparer = getattr(self, f"prepare_{field_name}", None)
if callable(preparer):
output[field_name] = preparer(obj)
continue
output[field_name] = export_field.get_value(obj, self.context)
return output
@classmethod
def get_context_data(cls, queryset: QuerySet) -> Dict[str, Any]:
"""Get context data for serialization. Override in subclasses to pre-fetch related data.
Args:
queryset: QuerySet of objects to be serialized
Returns:
Dictionary of context data to be passed to the schema instance
"""
return {}
@classmethod
def serialize_queryset(cls, queryset: QuerySet, fields: List[str] = None) -> List[Dict[str, Any]]:
"""Serialize a queryset of objects to export data.
Args:
queryset: QuerySet of objects to serialize
fields: Optional list of field names to include. Defaults to all fields.
Returns:
List of dictionaries containing serialized data
"""
# Get context data (can be extended by subclasses)
context = cls.get_context_data(queryset)
# Serialize each object, passing fields to only process requested fields
schema = cls(context=context)
data = []
for obj in queryset:
obj_data = schema.serialize(obj, fields=fields)
data.append(obj_data)
return data
@@ -0,0 +1,206 @@
from collections import defaultdict
from typing import Any, Dict, List, Optional
from django.db.models import F, QuerySet
from plane.db.models import CycleIssue, FileAsset
from .base import (
DateField,
DateTimeField,
ExportSchema,
JSONField,
ListField,
NumberField,
StringField,
)
def get_issue_attachments_dict(issues_queryset: QuerySet) -> Dict[str, List[str]]:
"""Get attachments dictionary for the given issues queryset.
Args:
issues_queryset: Queryset of Issue objects
Returns:
Dictionary mapping issue IDs to lists of attachment IDs
"""
file_assets = FileAsset.objects.filter(
issue_id__in=issues_queryset.values_list("id", flat=True),
entity_type=FileAsset.EntityTypeContext.ISSUE_ATTACHMENT,
).annotate(work_item_id=F("issue_id"), asset_id=F("id"))
attachment_dict = defaultdict(list)
for asset in file_assets:
attachment_dict[asset.work_item_id].append(asset.asset_id)
return attachment_dict
def get_issue_last_cycles_dict(issues_queryset: QuerySet) -> Dict[str, Optional[CycleIssue]]:
"""Get the last cycle for each issue in the given queryset.
Args:
issues_queryset: Queryset of Issue objects
Returns:
Dictionary mapping issue IDs to their last CycleIssue object
"""
# Fetch all cycle issues for the given issues, ordered by created_at descending
# select_related is used to fetch cycle data in the same query
cycle_issues = (
CycleIssue.objects.filter(issue_id__in=issues_queryset.values_list("id", flat=True))
.select_related("cycle")
.order_by("issue_id", "-created_at")
)
# Keep only the last (most recent) cycle for each issue
last_cycles_dict = {}
for cycle_issue in cycle_issues:
if cycle_issue.issue_id not in last_cycles_dict:
last_cycles_dict[cycle_issue.issue_id] = cycle_issue
return last_cycles_dict
class IssueExportSchema(ExportSchema):
"""Schema for exporting issue data in various formats."""
@staticmethod
def _get_created_by(obj) -> str:
"""Get the created by user for the given object."""
try:
if getattr(obj, "created_by", None):
return f"{obj.created_by.first_name} {obj.created_by.last_name}"
except Exception:
pass
return ""
@staticmethod
def _format_date(date_obj) -> str:
"""Format date object to string."""
if date_obj and hasattr(date_obj, "strftime"):
return date_obj.strftime("%a, %d %b %Y")
return ""
# Field definitions with display labels
id = StringField(label="ID")
project_identifier = StringField(source="project.identifier", label="Project Identifier")
project_name = StringField(source="project.name", label="Project")
project_id = StringField(source="project.id", label="Project ID")
sequence_id = NumberField(source="sequence_id", label="Sequence ID")
name = StringField(source="name", label="Name")
description = StringField(source="description_stripped", label="Description")
priority = StringField(source="priority", label="Priority")
start_date = DateField(source="start_date", label="Start Date")
target_date = DateField(source="target_date", label="Target Date")
state_name = StringField(label="State")
created_at = DateTimeField(source="created_at", label="Created At")
updated_at = DateTimeField(source="updated_at", label="Updated At")
completed_at = DateTimeField(source="completed_at", label="Completed At")
archived_at = DateTimeField(source="archived_at", label="Archived At")
module_name = ListField(label="Module Name")
created_by = StringField(label="Created By")
labels = ListField(label="Labels")
comments = JSONField(label="Comments")
estimate = StringField(label="Estimate")
link = ListField(label="Link")
assignees = ListField(label="Assignees")
subscribers_count = NumberField(label="Subscribers Count")
attachment_count = NumberField(label="Attachment Count")
attachment_links = ListField(label="Attachment Links")
cycle_name = StringField(label="Cycle Name")
cycle_start_date = DateField(label="Cycle Start Date")
cycle_end_date = DateField(label="Cycle End Date")
parent = StringField(label="Parent")
relations = JSONField(label="Relations")
def prepare_id(self, i):
return f"{i.project.identifier}-{i.sequence_id}"
def prepare_state_name(self, i):
return i.state.name if i.state else None
def prepare_module_name(self, i):
return [m.module.name for m in i.issue_module.all()]
def prepare_created_by(self, i):
return self._get_created_by(i)
def prepare_labels(self, i):
return [label.name for label in i.labels.all()]
def prepare_comments(self, i):
return [
{
"comment": comment.comment_stripped,
"created_at": self._format_date(comment.created_at),
"created_by": self._get_created_by(comment),
}
for comment in i.issue_comments.all()
]
def prepare_estimate(self, i):
return i.estimate_point.value if i.estimate_point and i.estimate_point.value else ""
def prepare_link(self, i):
return [link.url for link in i.issue_link.all()]
def prepare_assignees(self, i):
return [f"{u.first_name} {u.last_name}" for u in i.assignees.all()]
def prepare_subscribers_count(self, i):
return i.issue_subscribers.count()
def prepare_attachment_count(self, i):
return len((self.context.get("attachments_dict") or {}).get(i.id, []))
def prepare_attachment_links(self, i):
return [
f"/api/assets/v2/workspaces/{i.workspace.slug}/projects/{i.project_id}/issues/{i.id}/attachments/{asset}/"
for asset in (self.context.get("attachments_dict") or {}).get(i.id, [])
]
def prepare_cycle_name(self, i):
cycles_dict = self.context.get("cycles_dict") or {}
last_cycle = cycles_dict.get(i.id)
return last_cycle.cycle.name if last_cycle else ""
def prepare_cycle_start_date(self, i):
cycles_dict = self.context.get("cycles_dict") or {}
last_cycle = cycles_dict.get(i.id)
return last_cycle.cycle.start_date if last_cycle else None
def prepare_cycle_end_date(self, i):
cycles_dict = self.context.get("cycles_dict") or {}
last_cycle = cycles_dict.get(i.id)
return last_cycle.cycle.end_date if last_cycle else None
def prepare_parent(self, i):
if not i.parent:
return ""
return f"{i.parent.project.identifier}-{i.parent.sequence_id}"
def prepare_relations(self, i):
# Should show reverse relation as well
from plane.db.models.issue import IssueRelationChoices
relations = {
r.relation_type: f"{r.related_issue.project.identifier}-{r.related_issue.sequence_id}"
for r in i.issue_relation.all()
}
reverse_relations = {}
for relation in i.issue_related.all():
reverse_relations[IssueRelationChoices._REVERSE_MAPPING[relation.relation_type]] = (
f"{relation.issue.project.identifier}-{relation.issue.sequence_id}"
)
relations.update(reverse_relations)
return relations
@classmethod
def get_context_data(cls, queryset: QuerySet) -> Dict[str, Any]:
"""Get context data for issue serialization."""
return {
"attachments_dict": get_issue_attachments_dict(queryset),
"cycles_dict": get_issue_last_cycles_dict(queryset),
}