feat(integrations): Linear connector and shared HTTP/sync helpers

- Add Linear import (GraphQL, personal API key, optional team key filter).
- Centralize integration HTTP via integration_session and session_request.
- Add integration_sync_context for project/task refs and custom_fields metadata.
- Refactor Asana, GitHub, GitLab, Jira, Trello, ActivityWatch, and QuickBooks to use helpers.
- Extend integration UI, settings, and scheduled sync behavior as needed.
This commit is contained in:
Dries Peeters
2026-04-05 08:39:18 +02:00
parent 5df748b30e
commit 9449a46a42
17 changed files with 1000 additions and 193 deletions
+5 -3
View File
@@ -21,6 +21,7 @@ from typing import Any, Dict, List, Optional
import requests
from app.integrations.base import BaseConnector
from app.utils.integration_http import integration_session, session_request
from app.utils.timezone import get_timezone_obj, utc_to_local
logger = logging.getLogger(__name__)
@@ -74,17 +75,18 @@ class ActivityWatchConnector(BaseConnector):
base = self._get_server_url()
url = f"{base}/api/0/{path.lstrip('/')}"
try:
resp = requests.get(url, params=params, timeout=15)
session = integration_session()
resp = session_request(session, "GET", url, params=params, timeout=(5, 20))
resp.raise_for_status()
return resp.json()
except json.JSONDecodeError as e:
raise ValueError(f"Invalid JSON from ActivityWatch: {e}") from e
except requests.exceptions.ConnectionError as e:
raise ValueError(f"Cannot reach ActivityWatch at {base}: {e}") from e
except requests.exceptions.Timeout as e:
raise ValueError(f"ActivityWatch request timed out: {e}") from e
except requests.exceptions.HTTPError as e:
raise ValueError(f"ActivityWatch API error: {e}") from e
except json.JSONDecodeError as e:
raise ValueError(f"Invalid JSON from ActivityWatch: {e}") from e
def test_connection(self) -> Dict[str, Any]:
"""Test connectivity to aw-server: GET /api/0/buckets/."""
+51 -27
View File
@@ -163,6 +163,18 @@ class AsanaConnector(BaseConnector):
"""Sync tasks and projects with Asana."""
from app import db
from app.models import Project, Task
from app.utils.integration_sync_context import (
ensure_project_integration_fields,
find_project_by_integration_ref,
find_task_by_integration_ref,
require_sync_context,
set_task_integration_ref,
)
try:
actor_id, client_id = require_sync_context(self.integration)
except ValueError as e:
return {"success": False, "message": str(e)}
try:
headers = {"Authorization": f"Bearer {self.get_access_token()}"}
@@ -187,25 +199,30 @@ class AsanaConnector(BaseConnector):
for asana_project in asana_projects:
try:
# Find or create project
project = Project.query.filter_by(
user_id=self.integration.user_id, name=asana_project.get("name")
).first()
ap_gid = str(asana_project.get("gid") or "")
ap_name = (asana_project.get("name") or "Asana project").strip()[:200]
if not ap_gid:
continue
project = find_project_by_integration_ref(client_id, "asana", ap_gid)
if not project:
project = Project.query.filter_by(client_id=client_id, name=ap_name).first()
if not project:
project = Project(
name=asana_project.get("name"),
description=asana_project.get("notes", ""),
user_id=self.integration.user_id,
name=ap_name,
client_id=client_id,
description=(asana_project.get("notes") or "") or None,
status="active" if not asana_project.get("archived") else "archived",
)
db.session.add(project)
db.session.flush()
# Store Asana project GID in project metadata
if not hasattr(project, "metadata") or not project.metadata:
project.metadata = {}
project.metadata["asana_project_gid"] = asana_project.get("gid")
ensure_project_integration_fields(
project,
source="asana",
ref=ap_gid,
display_name=ap_name,
description=(asana_project.get("notes") or "") or None,
)
# Sync tasks from Asana project
tasks_response = requests.get(
@@ -219,41 +236,48 @@ class AsanaConnector(BaseConnector):
for asana_task in asana_tasks:
try:
# Get task details
at_gid = str(asana_task.get("gid") or "")
if not at_gid:
continue
task_response = requests.get(
f"{self.BASE_URL}/tasks/{asana_task.get('gid')}",
f"{self.BASE_URL}/tasks/{at_gid}",
headers=headers,
params={"opt_fields": "name,notes,completed,due_on,assignee"},
)
if task_response.status_code == 200:
task_data = task_response.json().get("data", {})
tname = (task_data.get("name") or "Task").strip()[:200]
tstatus = "done" if task_data.get("completed") else "todo"
# Find or create task
task = Task.query.filter_by(
project_id=project.id, name=task_data.get("name", "")
).first()
task = find_task_by_integration_ref(project.id, at_gid, source="asana")
if not task:
task = Task(
project_id=project.id,
name=task_data.get("name", ""),
description=task_data.get("notes", ""),
status="completed" if task_data.get("completed") else "todo",
name=tname,
description=(task_data.get("notes") or "") or None,
status=tstatus,
created_by=actor_id,
)
db.session.add(task)
db.session.flush()
else:
task.name = tname
task.description = (task_data.get("notes") or "") or None
task.status = tstatus
# Store Asana task GID in metadata
if not hasattr(task, "metadata") or not task.metadata:
task.metadata = {}
task.metadata["asana_task_gid"] = asana_task.get("gid")
set_task_integration_ref(
task,
source="asana",
ref=at_gid,
extra={"asana_task_gid": at_gid},
)
synced_count += 1
except Exception as e:
errors.append(
f"Error syncing task in project {asana_project.get('name')}: {str(e)}"
)
synced_count += 1
except Exception as e:
errors.append(f"Error syncing project {asana_project.get('name')}: {str(e)}")
+71 -26
View File
@@ -144,6 +144,13 @@ class GitHubConnector(BaseConnector):
from app import db
from app.models import Project, Task
from app.utils.integration_sync_context import (
ensure_project_integration_fields,
find_project_by_integration_ref,
find_task_by_integration_ref,
require_sync_context,
set_task_integration_ref,
)
logger = logging.getLogger(__name__)
@@ -151,6 +158,11 @@ class GitHubConnector(BaseConnector):
if not token:
return {"success": False, "message": "No access token available. Please reconnect the integration."}
try:
actor_id, client_id = require_sync_context(self.integration)
except ValueError as e:
return {"success": False, "message": str(e)}
# Get repositories from config
repos_str = self.integration.config.get("repositories", "")
if not repos_str:
@@ -200,15 +212,16 @@ class GitHubConnector(BaseConnector):
owner, repo_name = repo.split("/", 1)
# Find or create project
project = Project.query.filter_by(user_id=self.integration.user_id, name=repo).first()
# Find or create project (client + custom_fields integration marker)
project = find_project_by_integration_ref(client_id, "github", repo)
if not project:
project = Project.query.filter_by(client_id=client_id, name=repo).first()
if not project:
try:
project = Project(
name=repo,
client_id=client_id,
description=f"GitHub repository: {repo}",
user_id=self.integration.user_id,
status="active",
)
db.session.add(project)
@@ -217,6 +230,13 @@ class GitHubConnector(BaseConnector):
errors.append(f"Error creating project for {repo}: {str(e)}")
logger.error(f"Error creating project for {repo}: {e}", exc_info=True)
continue
ensure_project_integration_fields(
project,
source="github",
ref=repo,
display_name=repo,
description=f"GitHub repository: {repo}",
)
# Fetch issues
try:
@@ -254,25 +274,33 @@ class GitHubConnector(BaseConnector):
for issue in issues:
try:
if issue.get("pull_request"):
continue
issue_number = issue.get("number")
issue_title = issue.get("title", "")
issue_title = (issue.get("title") or "").strip() or "Issue"
issue_title = issue_title[:180]
if not issue_number:
continue
# Find or create task
task = Task.query.filter_by(
project_id=project.id, name=f"#{issue_number}: {issue_title}"
).first()
issue_ref = f"{repo}#{issue_number}"
body = (issue.get("body") or "").strip()
url = issue.get("html_url") or ""
if url:
body = f"{body}\n\nGitHub: {url}" if body else f"GitHub: {url}"
gh_state = (issue.get("state") or "").lower()
task_status = "done" if gh_state == "closed" else "todo"
task = find_task_by_integration_ref(project.id, issue_ref, source="github")
if not task:
try:
task_name = f"#{issue_number}: {issue_title}"[:200]
task = Task(
project_id=project.id,
name=f"#{issue_number}: {issue_title}",
description=issue.get("body", ""),
status="todo",
notes=f"GitHub Issue: {issue.get('html_url', '')}",
name=task_name,
description=body or None,
status=task_status,
created_by=actor_id,
)
db.session.add(task)
db.session.flush()
@@ -282,17 +310,22 @@ class GitHubConnector(BaseConnector):
f"Error creating task for issue #{issue_number} in {repo}: {e}", exc_info=True
)
continue
else:
task.name = f"#{issue_number}: {issue_title}"[:200]
task.description = body or None
task.status = task_status
# Store GitHub issue info in task metadata
try:
if not hasattr(task, "metadata") or not task.metadata:
task.metadata = {}
task.metadata["github_repo"] = repo
task.metadata["github_issue_number"] = issue_number
task.metadata["github_issue_id"] = issue.get("id")
task.metadata["github_issue_url"] = issue.get("html_url")
except Exception as e:
logger.warning(f"Error updating task metadata for issue #{issue_number}: {e}")
set_task_integration_ref(
task,
source="github",
ref=issue_ref,
extra={
"issue_number": issue_number,
"issue_id": issue.get("id"),
"url": url,
"repo": repo,
},
)
synced_count += 1
except Exception as e:
@@ -335,7 +368,12 @@ class GitHubConnector(BaseConnector):
db.session.rollback()
except Exception as rollback_err:
logger.debug("Rollback after GitHub sync failure: %s", rollback_err)
return {"success": False, "message": f"Sync failed: {str(e)}", "errors": errors}
return {
"success": False,
"message": f"Sync failed: {str(e)}",
"errors": errors,
"synced_items": synced_count,
}
def handle_webhook(
self, payload: Dict[str, Any], headers: Dict[str, str], raw_body: Optional[bytes] = None
@@ -387,12 +425,19 @@ class GitHubConnector(BaseConnector):
logger.warning("GitHub webhook signature provided but no secret configured - rejecting webhook")
return {"success": False, "message": "Webhook secret not configured"}
else:
# No signature provided - check if secret is configured
# No signature: always reject (configure secret on GitHub + matching webhook_secret here)
webhook_secret = self.integration.config.get("webhook_secret") if self.integration else None
if webhook_secret:
# Secret configured but no signature - reject for security
logger.warning("GitHub webhook secret configured but no signature provided - rejecting webhook")
return {"success": False, "message": "Webhook signature required but not provided"}
logger.warning(
"GitHub webhook rejected: missing X-Hub-Signature-256. "
"Set a secret on the GitHub webhook and store it in integration config as webhook_secret."
)
return {
"success": False,
"message": "Webhook signature required; configure webhook_secret on GitHub and in TimeTracker.",
}
# Process webhook event
action = payload.get("action")
+128 -19
View File
@@ -190,48 +190,157 @@ class GitLabConnector(BaseConnector):
return {"success": False, "message": f"Connection error: {str(e)}"}
def sync_data(self, sync_type: str = "full") -> Dict[str, Any]:
"""Sync issues from GitLab repositories."""
"""Sync issues from GitLab repositories into TimeTracker projects and tasks."""
from app import db
from app.models import Project, Task
from app.utils.integration_sync_context import (
ensure_project_integration_fields,
find_project_by_integration_ref,
find_task_by_integration_ref,
require_sync_context,
set_task_integration_ref,
)
token = self.get_access_token()
if not token:
return {"success": False, "message": "No access token available"}
try:
actor_id, client_id = require_sync_context(self.integration)
except ValueError as e:
return {"success": False, "message": str(e)}
base_url = self._get_base_url()
headers = {"Authorization": f"Bearer {token}"}
synced_count = 0
errors = []
try:
# Get repositories from config or all accessible repos
repo_ids = self.integration.config.get("repository_ids", [])
raw_ids = self.integration.config.get("repository_ids", []) if self.integration else []
repo_ids: List[int] = []
if isinstance(raw_ids, str):
for part in raw_ids.split(","):
part = part.strip()
if part.isdigit():
repo_ids.append(int(part))
elif isinstance(raw_ids, list):
for x in raw_ids:
try:
repo_ids.append(int(x))
except (TypeError, ValueError):
continue
try:
if not repo_ids:
# Get all accessible projects
projects_response = requests.get(
f"{base_url}/api/v4/projects",
headers={"Authorization": f"Bearer {token}"},
headers=headers,
params={"membership": True, "per_page": 100},
timeout=30,
)
if projects_response.status_code == 200:
projects = projects_response.json()
repo_ids = [p["id"] for p in projects]
if projects_response.status_code != 200:
return {
"success": False,
"message": f"Could not list GitLab projects: HTTP {projects_response.status_code}",
}
repo_ids = [p["id"] for p in projects_response.json()[:20]]
# Sync issues from each repository
for repo_id in repo_ids:
try:
issues_response = requests.get(
f"{base_url}/api/v4/projects/{repo_id}/issues",
headers={"Authorization": f"Bearer {token}"},
params={"state": "opened", "per_page": 100},
pr = requests.get(f"{base_url}/api/v4/projects/{repo_id}", headers=headers, timeout=30)
if pr.status_code != 200:
errors.append(f"GitLab project {repo_id}: HTTP {pr.status_code}")
continue
gl_project = pr.json()
path = gl_project.get("path_with_namespace") or gl_project.get("name") or str(repo_id)
path = str(path)[:200]
project_ref = str(repo_id)
project = find_project_by_integration_ref(client_id, "gitlab", project_ref)
if not project:
project = Project.query.filter_by(client_id=client_id, name=path).first()
if not project:
project = Project(
name=path,
client_id=client_id,
description=(gl_project.get("description") or "") or f"GitLab: {path}",
status="active",
)
db.session.add(project)
db.session.flush()
ensure_project_integration_fields(
project,
source="gitlab",
ref=project_ref,
display_name=path,
description=(gl_project.get("description") or "") or f"GitLab: {path}",
)
if issues_response.status_code == 200:
issues = issues_response.json()
synced_count += len(issues)
issues_response = requests.get(
f"{base_url}/api/v4/projects/{repo_id}/issues",
headers=headers,
params={"state": "opened", "per_page": 100},
timeout=30,
)
if issues_response.status_code != 200:
errors.append(f"GitLab issues for project {repo_id}: HTTP {issues_response.status_code}")
continue
for issue in issues_response.json():
iid = issue.get("iid")
if not iid:
continue
title = (issue.get("title") or "Issue").strip()[:180]
issue_ref = f"{repo_id}:{iid}"
desc = (issue.get("description") or "").strip()
web_url = issue.get("web_url") or ""
if web_url:
desc = f"{desc}\n\nGitLab: {web_url}" if desc else f"GitLab: {web_url}"
state = (issue.get("state") or "").lower()
task_status = "done" if state in ("closed", "merged") else "todo"
task_name = f"#{iid}: {title}"[:200]
task = find_task_by_integration_ref(project.id, issue_ref, source="gitlab")
if not task:
task = Task(
project_id=project.id,
name=task_name,
description=desc or None,
status=task_status,
created_by=actor_id,
)
db.session.add(task)
db.session.flush()
else:
task.name = task_name
task.description = desc or None
task.status = task_status
set_task_integration_ref(
task,
source="gitlab",
ref=issue_ref,
extra={
"gitlab_project_id": repo_id,
"iid": iid,
"id": issue.get("id"),
"url": web_url,
},
)
synced_count += 1
except Exception as e:
errors.append(f"Error syncing repository {repo_id}: {str(e)}")
return {"success": True, "message": "Sync completed", "synced_items": synced_count, "errors": errors}
db.session.commit()
msg = f"Sync completed. Upserted {synced_count} issue(s)."
if errors:
msg += f" {len(errors)} error(s)."
return {"success": True, "message": msg, "synced_items": synced_count, "errors": errors}
except Exception as e:
return {"success": False, "message": f"Sync failed: {str(e)}"}
try:
db.session.rollback()
except Exception:
pass
return {"success": False, "message": f"Sync failed: {str(e)}", "errors": errors}
def get_config_schema(self) -> Dict[str, Any]:
"""Get configuration schema."""
+53 -28
View File
@@ -172,13 +172,19 @@ class JiraConnector(BaseConnector):
pass
return None
def _upsert_task_from_issue(self, issue: Dict[str, Any]) -> int:
def _upsert_task_from_issue(self, issue: Dict[str, Any], actor_id: int, client_id: int) -> int:
"""
Find or create Project and Task from a single Jira issue dict.
Reuses same mapping logic as sync_data. Returns 1 if upserted, 0 on skip/error.
"""
from app import db
from app.models import Project, Task
from app.utils.integration_sync_context import (
ensure_project_integration_fields,
find_project_by_integration_ref,
find_task_by_integration_ref,
set_task_integration_ref,
)
issue_key = issue.get("key")
if not issue_key:
@@ -187,51 +193,56 @@ class JiraConnector(BaseConnector):
project_key = (issue_fields.get("project") or {}).get("key") or ""
project_key = project_key or "Jira"
project = Project.query.filter_by(
user_id=self.integration.user_id, name=project_key
).first()
project = find_project_by_integration_ref(client_id, "jira", project_key)
if not project:
project = Project.query.filter_by(client_id=client_id, name=project_key).first()
if not project:
project = Project(
name=project_key,
client_id=client_id,
description=f"Synced from Jira project {project_key}",
user_id=self.integration.user_id,
status="active",
)
db.session.add(project)
db.session.flush()
ensure_project_integration_fields(
project,
source="jira",
ref=project_key,
display_name=project_key,
description=f"Synced from Jira project {project_key}",
)
task = Task.query.filter_by(project_id=project.id, name=issue_key).first()
summary = issue_fields.get("summary") or ""
status_name = (issue_fields.get("status") or {}).get("name") or "To Do"
mapped_status = self._map_jira_status(status_name)
description_text = self._extract_description_text(issue_fields)
desc = summary
if description_text:
desc = f"{summary}\n\n{description_text}" if summary else description_text
task = find_task_by_integration_ref(project.id, issue_key, source="jira")
if not task:
task_kw = {
"project_id": project.id,
"name": issue_key,
"description": summary,
"status": mapped_status,
}
if getattr(Task, "notes", None) is not None:
task_kw["notes"] = description_text
if self.integration.user_id is not None:
task_kw["created_by"] = self.integration.user_id
task = Task(**task_kw)
task = Task(
project_id=project.id,
name=issue_key[:200],
description=desc or None,
status=mapped_status,
created_by=actor_id,
)
db.session.add(task)
db.session.flush()
else:
task.description = summary
task.description = desc or None
task.status = mapped_status
if hasattr(task, "notes"):
task.notes = description_text
task.name = issue_key[:200]
if hasattr(task, "metadata"):
if not task.metadata:
task.metadata = {}
task.metadata["jira_issue_key"] = issue_key
task.metadata["jira_issue_id"] = issue.get("id")
set_task_integration_ref(
task,
source="jira",
ref=issue_key,
extra={"jira_issue_id": issue.get("id")},
)
return 1
@@ -243,6 +254,13 @@ class JiraConnector(BaseConnector):
if not token:
return {"success": False, "message": "No access token available"}
from app.utils.integration_sync_context import require_sync_context
try:
actor_id, client_id = require_sync_context(self.integration)
except ValueError as e:
return {"success": False, "message": str(e)}
base_url = self.integration.config.get("jira_url", "https://your-domain.atlassian.net")
api_url = f"{base_url}/rest/api/3/search"
@@ -273,7 +291,7 @@ class JiraConnector(BaseConnector):
for issue in issues:
try:
synced_count += self._upsert_task_from_issue(issue)
synced_count += self._upsert_task_from_issue(issue, actor_id, client_id)
except Exception as e:
errors.append(f"Error syncing issue {issue.get('key', 'unknown')}: {str(e)}")
@@ -309,6 +327,13 @@ class JiraConnector(BaseConnector):
if not token:
return {"success": False, "message": "No access token available", "issue_key": issue_key}
from app.utils.integration_sync_context import require_sync_context
try:
actor_id, client_id = require_sync_context(self.integration)
except ValueError as e:
return {"success": False, "message": str(e), "issue_key": issue_key}
base_url = self.integration.config.get("jira_url", "https://your-domain.atlassian.net")
api_url = f"{base_url}/rest/api/3/issue/{issue_key}"
fields = "summary,description,status,assignee,project,created,updated"
@@ -337,7 +362,7 @@ class JiraConnector(BaseConnector):
}
issue = response.json()
self._upsert_task_from_issue(issue)
self._upsert_task_from_issue(issue, actor_id, client_id)
db.session.commit()
return {
"success": True,
+252
View File
@@ -0,0 +1,252 @@
"""
Linear integration: import issues as tasks using a Personal API Key.
https://developers.linear.app/docs/graphql/working-with-the-graphql-api
"""
from __future__ import annotations
import logging
from typing import Any, Dict, List, Optional
from app.integrations.base import BaseConnector
from app.utils.integration_http import integration_session, session_request
logger = logging.getLogger(__name__)
LINEAR_GRAPHQL = "https://api.linear.app/graphql"
class LinearConnector(BaseConnector):
"""Linear connector (API key; issues → tasks)."""
display_name = "Linear"
description = "Import Linear issues as tasks"
icon = "tasks"
@property
def provider_name(self) -> str:
return "linear"
def get_authorization_url(self, redirect_uri: str, state: str = None) -> str:
raise NotImplementedError("Linear uses a Personal API key; configure in Integrations.")
def exchange_code_for_tokens(self, code: str, redirect_uri: str) -> Dict[str, Any]:
raise NotImplementedError("Linear uses a Personal API key.")
def refresh_access_token(self) -> Dict[str, Any]:
raise NotImplementedError("Linear API keys do not expire.")
def _api_key(self) -> Optional[str]:
if self.credentials and self.credentials.access_token:
return self.credentials.access_token.strip()
return None
def _graphql(self, query: str, variables: Optional[Dict] = None) -> Dict[str, Any]:
key = self._api_key()
if not key:
raise ValueError("No Linear API key configured.")
session = integration_session()
resp = session_request(
session,
"POST",
LINEAR_GRAPHQL,
headers={"Authorization": key, "Content-Type": "application/json"},
json={"query": query, "variables": variables or {}},
)
if resp.status_code >= 400:
raise ValueError(f"Linear API HTTP {resp.status_code}: {resp.text[:300]}")
data = resp.json()
if data.get("errors"):
raise ValueError(f"Linear GraphQL error: {data['errors'][:1]}")
return data.get("data") or {}
def test_connection(self) -> Dict[str, Any]:
try:
data = self._graphql("query { viewer { id name } }")
viewer = data.get("viewer") or {}
name = viewer.get("name") or viewer.get("id") or "OK"
return {"success": True, "message": f"Connected to Linear as {name}."}
except Exception as e:
return {"success": False, "message": str(e)}
def sync_data(self, sync_type: str = "full") -> Dict[str, Any]:
from app import db
from app.models import Project, Task
from app.utils.integration_sync_context import (
ensure_project_integration_fields,
find_project_by_integration_ref,
find_task_by_integration_ref,
require_sync_context,
set_task_integration_ref,
)
key = self._api_key()
if not key:
return {"success": False, "message": "No Linear API key. Save your key under Integrations → Linear."}
team_filter = (self.integration.config or {}).get("linear_team_keys", "")
team_keys: Optional[List[str]] = None
if team_filter and isinstance(team_filter, str):
team_keys = [t.strip() for t in team_filter.split(",") if t.strip()]
try:
actor_id, client_id = require_sync_context(self.integration)
except ValueError as e:
return {"success": False, "message": str(e)}
q = """
query SyncIssues($after: String) {
issues(first: 100, after: $after) {
pageInfo { hasNextPage endCursor }
nodes {
id
identifier
title
url
team { key name }
state { name }
}
}
}
"""
all_nodes: List[Dict] = []
after = None
try:
for _ in range(20):
data = self._graphql(q, {"after": after})
conn = (data.get("issues") or {})
nodes = conn.get("nodes") or []
for n in nodes:
tk = (n.get("team") or {}).get("key") or ""
if team_keys and tk not in team_keys:
continue
all_nodes.append(n)
page = conn.get("pageInfo") or {}
if not page.get("hasNextPage"):
break
after = page.get("endCursor")
except Exception as e:
logger.error("Linear sync fetch failed: %s", e, exc_info=True)
return {"success": False, "message": str(e)}
synced = 0
errors: List[str] = []
projects_cache: Dict[str, Project] = {}
def project_for_team(team_key: str, team_name: str) -> Optional[Project]:
ref = f"{team_key}:{team_name}" if team_key else team_name or "default"
if ref in projects_cache:
return projects_cache[ref]
p = find_project_by_integration_ref(client_id, "linear", ref)
if not p:
display = f"Linear / {team_name or team_key or 'Issues'}"
p = Project.query.filter_by(client_id=client_id, name=display).first()
if not p:
try:
p = Project(
name=f"Linear / {team_name or team_key or 'Issues'}",
client_id=client_id,
description=f"Linear workspace team {team_key or ''}",
status="active",
)
db.session.add(p)
db.session.flush()
except Exception as ex:
errors.append(f"Project create: {ex}")
return None
ensure_project_integration_fields(
project=p,
source="linear",
ref=ref,
display_name=p.name,
description=p.description or "",
)
projects_cache[ref] = p
return p
for n in all_nodes:
issue_id = n.get("id")
if not issue_id:
continue
team = n.get("team") or {}
tk = team.get("key") or "unknown"
tn = team.get("name") or tk
project = project_for_team(tk, tn)
if not project:
continue
title = (n.get("title") or "Untitled").strip()[:500]
ident = n.get("identifier") or issue_id
try:
task = find_task_by_integration_ref(project.id, issue_id, source="linear")
state_name = (n.get("state") or {}).get("name") or ""
status = "done" if state_name.lower() in ("done", "completed", "canceled", "cancelled") else "todo"
if not task:
task = Task(
name=f"{ident}: {title}"[:500],
description=(n.get("url") or "")[:2000],
project_id=project.id,
status=status,
created_by=actor_id,
)
db.session.add(task)
db.session.flush()
set_task_integration_ref(
task,
source="linear",
ref=issue_id,
extra={"identifier": ident, "url": n.get("url")},
)
synced += 1
else:
task.name = f"{ident}: {title}"[:500]
task.status = status
if n.get("url"):
task.description = (n.get("url") or "")[:2000]
set_task_integration_ref(
task,
source="linear",
ref=issue_id,
extra={"identifier": ident, "url": n.get("url")},
)
synced += 1
except Exception as ex:
errors.append(f"{ident}: {ex}")
logger.warning("Linear issue upsert failed: %s", ex, exc_info=True)
try:
db.session.commit()
except Exception as e:
db.session.rollback()
return {"success": False, "message": f"Database error: {e}"}
msg = f"Processed {len(all_nodes)} Linear issues."
if errors:
msg += f" ({len(errors)} errors)"
return {
"success": True,
"message": msg,
"synced_items": synced,
"synced_count": synced,
"errors": errors[:20],
}
@classmethod
def get_config_schema(cls) -> Dict[str, Any]:
return {
"fields": [
{
"name": "linear_team_keys",
"label": "Team keys (optional)",
"type": "text",
"description": "Comma-separated Linear team keys to import (empty = all teams)",
"required": False,
},
{
"name": "auto_sync",
"label": "Automatic sync",
"type": "boolean",
"default": True,
},
]
}
+2 -2
View File
@@ -109,8 +109,8 @@ class QuickBooksConnector(BaseConnector):
)
if company_response:
company_info = company_response.get("CompanyInfo", {})
except Exception:
pass
except Exception as e:
logger.debug("QuickBooks company info fetch after OAuth failed (optional): %s", e)
return {
"access_token": data.get("access_token"),
+2
View File
@@ -10,6 +10,7 @@ from app.integrations.github import GitHubConnector
from app.integrations.gitlab import GitLabConnector
from app.integrations.google_calendar import GoogleCalendarConnector
from app.integrations.jira import JiraConnector
from app.integrations.linear import LinearConnector
from app.integrations.microsoft_teams import MicrosoftTeamsConnector
from app.integrations.outlook_calendar import OutlookCalendarConnector
from app.integrations.quickbooks import QuickBooksConnector
@@ -22,6 +23,7 @@ from app.services.integration_service import IntegrationService
def register_connectors():
"""Register all available connectors."""
IntegrationService.register_connector("jira", JiraConnector)
IntegrationService.register_connector("linear", LinearConnector)
IntegrationService.register_connector("slack", SlackConnector)
IntegrationService.register_connector("github", GitHubConnector)
IntegrationService.register_connector("google_calendar", GoogleCalendarConnector)
+82 -53
View File
@@ -121,8 +121,7 @@ class TrelloConnector(BaseConnector):
def sync_data(self, sync_type: str = "full") -> Dict[str, Any]:
"""Sync boards and cards with Trello."""
from app import db
from app.models import Project, Task
from app.utils.integration_sync_context import require_sync_context
try:
from app.models import Settings
@@ -135,6 +134,11 @@ class TrelloConnector(BaseConnector):
if not token or not api_key:
return {"success": False, "message": "Trello credentials not configured"}
try:
actor_id, client_id = require_sync_context(self.integration)
except ValueError as e:
return {"success": False, "message": str(e)}
# Get sync direction from config
sync_direction = (
self.integration.config.get("sync_direction", "trello_to_timetracker")
@@ -143,10 +147,10 @@ class TrelloConnector(BaseConnector):
)
if sync_direction in ("trello_to_timetracker", "bidirectional"):
trello_result = self._sync_trello_to_timetracker(api_key, token)
trello_result = self._sync_trello_to_timetracker(api_key, token, actor_id, client_id)
# If bidirectional, also sync TimeTracker to Trello
if sync_direction == "bidirectional":
tracker_result = self._sync_timetracker_to_trello(api_key, token)
tracker_result = self._sync_timetracker_to_trello(api_key, token, actor_id, client_id)
# Merge results
if trello_result.get("success") and tracker_result.get("success"):
return {
@@ -169,17 +173,25 @@ class TrelloConnector(BaseConnector):
# Handle TimeTracker to Trello sync
if sync_direction == "timetracker_to_trello":
return self._sync_timetracker_to_trello(api_key, token)
return self._sync_timetracker_to_trello(api_key, token, actor_id, client_id)
return {"success": False, "message": f"Unknown sync direction: {sync_direction}"}
except Exception as e:
return {"success": False, "message": f"Sync failed: {str(e)}"}
def _sync_trello_to_timetracker(self, api_key: str, token: str) -> Dict[str, Any]:
def _sync_trello_to_timetracker(
self, api_key: str, token: str, actor_id: int, client_id: int
) -> Dict[str, Any]:
"""Sync Trello boards and cards to TimeTracker projects and tasks."""
from app import db
from app.models import Project, Task
from app.utils.integration_sync_context import (
ensure_project_integration_fields,
find_project_by_integration_ref,
find_task_by_integration_ref,
set_task_integration_ref,
)
synced_count = 0
errors = []
@@ -199,25 +211,31 @@ class TrelloConnector(BaseConnector):
for board in boards:
try:
# Create or update project from board
project = Project.query.filter_by(user_id=self.integration.user_id, name=board.get("name")).first()
board_id = str(board.get("id") or "")
board_name = (board.get("name") or "Trello board").strip()[:200]
if not board_id:
continue
project = find_project_by_integration_ref(client_id, "trello", board_id)
if not project:
project = Project.query.filter_by(client_id=client_id, name=board_name).first()
if not project:
project = Project(
name=board.get("name"),
description=board.get("desc", ""),
user_id=self.integration.user_id,
name=board_name,
client_id=client_id,
description=(board.get("desc") or "") or None,
status="active",
)
db.session.add(project)
db.session.flush()
ensure_project_integration_fields(
project,
source="trello",
ref=board_id,
display_name=board_name,
description=(board.get("desc") or "") or None,
)
# Store Trello board ID in metadata
if not hasattr(project, "metadata") or not project.metadata:
project.metadata = {}
project.metadata["trello_board_id"] = board.get("id")
# Sync cards as tasks
cards_response = requests.get(
f"{self.BASE_URL}/boards/{board.get('id')}/cards",
params={"key": api_key, "token": token, "filter": "open"},
@@ -227,32 +245,34 @@ class TrelloConnector(BaseConnector):
cards = cards_response.json()
for card in cards:
# Find or create task
task = Task.query.filter_by(project_id=project.id, name=card.get("name")).first()
card_id = str(card.get("id") or "")
if not card_id:
continue
cname = (card.get("name") or "Card").strip()[:200]
new_status = self._map_trello_list_to_status(card.get("idList"))
task = find_task_by_integration_ref(project.id, card_id, source="trello")
if not task:
task = Task(
project_id=project.id,
name=card.get("name"),
description=card.get("desc", ""),
status=self._map_trello_list_to_status(card.get("idList")),
name=cname,
description=(card.get("desc") or "") or None,
status=new_status,
created_by=actor_id,
)
db.session.add(task)
db.session.flush()
else:
# Update existing task if needed
if card.get("desc") and task.description != card.get("desc"):
task.description = card.get("desc")
# Update status based on list
new_status = self._map_trello_list_to_status(card.get("idList"))
if task.status != new_status:
task.status = new_status
if card.get("desc") is not None:
task.description = (card.get("desc") or "") or None
task.name = cname
task.status = new_status
# Store Trello card ID in metadata
if not hasattr(task, "metadata") or not task.metadata:
task.metadata = {}
task.metadata["trello_card_id"] = card.get("id")
task.metadata["trello_list_id"] = card.get("idList")
set_task_integration_ref(
task,
source="trello",
ref=card_id,
extra={"trello_list_id": card.get("idList")},
)
synced_count += 1
except Exception as e:
@@ -262,22 +282,25 @@ class TrelloConnector(BaseConnector):
return {"success": True, "synced_count": synced_count, "errors": errors}
def _sync_timetracker_to_trello(self, api_key: str, token: str) -> Dict[str, Any]:
def _sync_timetracker_to_trello(
self, api_key: str, token: str, actor_id: int, client_id: int
) -> Dict[str, Any]:
"""Sync TimeTracker tasks to Trello cards."""
from app import db
from app.models import Project, Task
from app.utils.integration_sync_context import ensure_project_integration_fields, set_task_integration_ref
synced_count = 0
errors = []
# Get all projects that have Trello board IDs
projects = Project.query.filter_by(user_id=self.integration.user_id, status="active").all()
projects = Project.query.filter_by(client_id=client_id, status="active").all()
for project in projects:
# Check if project has Trello board ID
cf = project.custom_fields if isinstance(project.custom_fields, dict) else {}
block = cf.get("integration") if isinstance(cf, dict) else {}
trello_board_id = None
if hasattr(project, "metadata") and project.metadata:
trello_board_id = project.metadata.get("trello_board_id")
if isinstance(block, dict) and block.get("source") == "trello":
trello_board_id = block.get("ref")
if not trello_board_id:
# Try to find or create board
@@ -305,9 +328,13 @@ class TrelloConnector(BaseConnector):
continue
if trello_board_id:
if not hasattr(project, "metadata") or not project.metadata:
project.metadata = {}
project.metadata["trello_board_id"] = trello_board_id
ensure_project_integration_fields(
project,
source="trello",
ref=str(trello_board_id),
display_name=project.name,
description=project.description,
)
if not trello_board_id:
continue
@@ -344,10 +371,11 @@ class TrelloConnector(BaseConnector):
for task in tasks:
try:
# Check if task already has Trello card ID
tcf = task.custom_fields if isinstance(task.custom_fields, dict) else {}
tblock = tcf.get("integration") if isinstance(tcf, dict) else {}
trello_card_id = None
if hasattr(task, "metadata") and task.metadata:
trello_card_id = task.metadata.get("trello_card_id")
if isinstance(tblock, dict) and tblock.get("source") == "trello":
trello_card_id = tblock.get("ref")
# Determine target list
target_list_id = status_to_list.get(task.status, default_list_id)
@@ -386,11 +414,12 @@ class TrelloConnector(BaseConnector):
card_data = create_response.json()
trello_card_id = card_data.get("id")
# Store Trello card ID in task metadata
if not hasattr(task, "metadata") or not task.metadata:
task.metadata = {}
task.metadata["trello_card_id"] = trello_card_id
task.metadata["trello_list_id"] = target_list_id
set_task_integration_ref(
task,
source="trello",
ref=str(trello_card_id),
extra={"trello_list_id": target_list_id},
)
synced_count += 1
else:
@@ -427,7 +456,7 @@ class TrelloConnector(BaseConnector):
# Map common list names to statuses
if "done" in list_name or "completed" in list_name or "closed" in list_name:
return "completed"
return "done"
elif "in progress" in list_name or "doing" in list_name or "active" in list_name:
return "in_progress"
elif "todo" in list_name or "to do" in list_name or "backlog" in list_name:
+5 -18
View File
@@ -630,25 +630,12 @@ class Settings(db.Model):
pass
# Fallback: return a non-persisted Settings instance
# #region agent log
try:
import json
import logging
log_data = {
"location": "settings.py:493",
"message": "Returning fallback Settings instance",
"data": {"fallback": True},
"timestamp": int(datetime.utcnow().timestamp() * 1000),
"sessionId": "debug-session",
"runId": "run1",
"hypothesisId": "E",
}
log_path = os.path.join(os.path.dirname(os.path.dirname(os.path.dirname(__file__))), ".cursor", "debug.log")
with open(log_path, "a", encoding="utf-8") as f:
f.write(json.dumps(log_data) + "\n")
except (OSError, IOError, TypeError, ValueError):
pass
# #endregion
logging.getLogger(__name__).warning(
"Returning transient in-memory Settings instance (database row missing or creation failed). "
"Check database connectivity and migrations."
)
return cls()
@classmethod
+37 -10
View File
@@ -466,6 +466,34 @@ def manage_integration(provider):
else:
flash(_("Failed to update credentials."), "error")
elif request.form.get("action") == "update_linear_api_key":
if provider != "linear":
flash(_("Invalid action for this integration."), "error")
return redirect(url_for("integrations.manage_integration", provider=provider))
if not integration:
flash(_("Integration not found."), "error")
return redirect(url_for("integrations.manage_integration", provider=provider))
api_key = request.form.get("linear_api_key", "").strip()
if not api_key:
flash(_("Linear API key is required."), "error")
return redirect(url_for("integrations.manage_integration", provider=provider))
result = service.save_credentials(
integration_id=integration.id,
access_token=api_key,
refresh_token=None,
expires_at=None,
token_type="Bearer",
scope="read",
extra_data={"auth_type": "api_key"},
)
if result.get("success"):
integration.is_active = True
safe_commit("linear_api_key_saved", {"integration_id": integration.id})
flash(_("Linear API key saved. Use Sync to import issues as tasks."), "success")
else:
flash(result.get("message", _("Could not save API key.")), "error")
return redirect(url_for("integrations.manage_integration", provider=provider))
# Check if this is a CalDAV credential update (non-OAuth)
elif request.form.get("action") == "update_caldav_credentials":
# CalDAV uses username/password, not OAuth
@@ -747,7 +775,7 @@ def view_integration(integration_id):
recent_events = (
IntegrationEvent.query.filter_by(integration_id=integration_id)
.order_by(IntegrationEvent.created_at.desc())
.limit(20)
.limit(50)
.all()
)
@@ -879,18 +907,20 @@ def sync_integration(integration_id):
return redirect(url_for("integrations.view_integration", integration_id=integration_id))
try:
from app.utils.integration_sync_context import sync_result_item_count
from datetime import datetime
sync_result = connector.sync_data()
# Update integration status
from datetime import datetime
integration.last_sync_at = datetime.utcnow()
if sync_result.get("success"):
integration.last_sync_status = "success"
integration.last_error = None
message = sync_result.get("message", "Sync completed successfully.")
if sync_result.get("synced_count"):
message += f" Synced {sync_result['synced_count']} items."
n = sync_result_item_count(sync_result)
if n:
message += f" Synced {n} items."
flash(_("Sync completed successfully. %(details)s", details=message), "success")
else:
integration.last_sync_status = "error"
@@ -898,16 +928,13 @@ def sync_integration(integration_id):
flash(_("Sync failed: %(message)s", message=sync_result.get("message", "Unknown error")), "error")
# Log sync event
_n = sync_result_item_count(sync_result)
service._log_event(
integration_id,
"sync",
sync_result.get("success", False),
sync_result.get("message"),
(
{"synced_count": sync_result.get("synced_count")}
if sync_result.get("success") and sync_result.get("synced_count")
else None
),
({"synced_count": _n, "synced_items": _n} if sync_result.get("success") and _n else None),
)
if not safe_commit("update_integration_sync_status", {"integration_id": integration_id}):
+26 -1
View File
@@ -37,7 +37,32 @@
</div>
{% endif %}
<!-- OAuth Credentials Setup Section (Admin only, not for CalDAV or ActivityWatch) -->
{% if current_user.is_admin and provider not in ('caldav_calendar', 'activitywatch') %}
{% if current_user.is_admin and provider == 'linear' and integration %}
<div class="bg-card-light dark:bg-card-dark p-6 rounded-xl border border-border-light dark:border-border-dark shadow-sm mb-6">
<h2 class="text-lg font-semibold mb-4">
<i class="fas fa-key mr-2"></i>{{ _('Linear API Key') }}
</h2>
<form method="POST">
<input type="hidden" name="csrf_token" value="{{ csrf_token() }}">
<input type="hidden" name="action" value="update_linear_api_key">
<div>
<label for="linear_api_key" class="block text-sm font-medium mb-2">
{{ _('Personal API Key') }} <span class="text-red-500">*</span>
</label>
<input type="password" name="linear_api_key" id="linear_api_key"
class="w-full px-3 py-2 border border-border-light dark:border-border-dark rounded-lg bg-card-light dark:bg-card-dark"
placeholder="{{ _('Create at linear.app/settings/api') }}"
autocomplete="off">
<p class="mt-1 text-xs text-text-muted-light dark:text-text-muted-dark">
{{ _('From Linear: Settings → API → Personal API keys') }}
</p>
</div>
<button type="submit" class="mt-4 bg-primary text-white px-4 py-2 rounded-lg">{{ _('Save API Key') }}</button>
</form>
</div>
{% endif %}
{% if current_user.is_admin and provider not in ('caldav_calendar', 'activitywatch', 'linear') %}
<div class="bg-card-light dark:bg-card-dark p-6 rounded-xl border border-border-light dark:border-border-dark shadow-sm">
<h2 class="text-lg font-semibold mb-4">
<i class="fas fa-key mr-2"></i>{{ _('OAuth Credentials Setup') }}
+10 -4
View File
@@ -73,12 +73,12 @@
<div class="bg-card-light dark:bg-card-dark p-6 rounded-xl border border-border-light dark:border-border-dark shadow-sm mt-6">
<h2 class="text-lg font-semibold mb-4">{{ _('Sync History') }}</h2>
{% if recent_events %}
<div class="space-y-2 max-h-96 overflow-y-auto">
<div class="space-y-2 max-h-[32rem] overflow-y-auto">
{% for event in recent_events %}
<div class="border-b border-border-light dark:border-border-dark pb-2 last:border-0">
<div class="flex items-start justify-between">
<div class="flex-1">
<div class="flex items-center gap-2">
<div class="flex-1 min-w-0">
<div class="flex items-center gap-2 flex-wrap">
<span class="text-sm font-medium text-text-light dark:text-text-dark">{{ event.event_type|replace('_', ' ')|title }}</span>
{% if event.status == 'success' %}
<span class="px-2 py-0.5 text-xs rounded bg-green-100 text-green-800 dark:bg-green-900 dark:text-green-200">{{ _('Success') }}</span>
@@ -89,7 +89,13 @@
{% endif %}
</div>
{% if event.message %}
<p class="text-sm text-text-muted-light dark:text-text-muted-dark mt-1">{{ event.message }}</p>
<p class="text-sm text-text-muted-light dark:text-text-muted-dark mt-1 break-words">{{ event.message }}</p>
{% endif %}
{% if event.event_metadata %}
<details class="mt-2">
<summary class="text-xs text-primary cursor-pointer hover:underline">{{ _('Details') }}</summary>
<pre class="mt-2 text-xs bg-gray-100 dark:bg-gray-900 p-2 rounded overflow-x-auto max-h-48 overflow-y-auto">{{ event.event_metadata | tojson }}</pre>
</details>
{% endif %}
<p class="text-xs text-text-muted-light dark:text-text-muted-dark mt-1">{{ event.created_at|user_datetime }}</p>
</div>
+57
View File
@@ -0,0 +1,57 @@
"""
Shared HTTP session for outbound integration calls (retries, timeouts).
"""
from __future__ import annotations
import logging
from typing import Any, Dict, Optional
import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
logger = logging.getLogger(__name__)
_DEFAULT_TIMEOUT = (5, 30)
def integration_session(
total_retries: int = 3,
backoff_factor: float = 0.5,
timeout: tuple = _DEFAULT_TIMEOUT,
) -> requests.Session:
"""
Session with retry on 429, 500, 502, 503, 504 for GET/POST/PUT/PATCH/DELETE.
"""
session = requests.Session()
retry = Retry(
total=total_retries,
connect=total_retries,
read=total_retries,
backoff_factor=backoff_factor,
status_forcelist=(429, 500, 502, 503, 504),
allowed_methods=frozenset(["GET", "POST", "PUT", "PATCH", "DELETE", "HEAD", "OPTIONS"]),
raise_on_status=False,
)
adapter = HTTPAdapter(max_retries=retry, pool_connections=10, pool_maxsize=20)
session.mount("http://", adapter)
session.mount("https://", adapter)
session.request_timeout = timeout # type: ignore[attr-defined]
return session
def session_request(
session: requests.Session,
method: str,
url: str,
*,
headers: Optional[Dict[str, str]] = None,
params: Optional[Dict[str, Any]] = None,
json: Any = None,
data: Any = None,
timeout: Optional[tuple] = None,
) -> requests.Response:
"""Perform request using session's default timeout."""
to = timeout or getattr(session, "request_timeout", _DEFAULT_TIMEOUT)
return session.request(method.upper(), url, headers=headers, params=params, json=json, data=data, timeout=to)
+153
View File
@@ -0,0 +1,153 @@
"""
Resolve client and actor user for integration sync (especially global integrations).
Per-user integrations use integration.user_id. Global integrations use, in order:
1. INTEGRATION_SYNC_USER_ID (numeric user id)
2. First active user with role admin
3. First active user
Projects are created under a dedicated client (default name "Integration imports"),
overridable via INTEGRATION_IMPORT_CLIENT_NAME.
External system linkage is stored in Project.custom_fields / Task.custom_fields under
the key "integration": {"source": "<provider>", "ref": "<stable id>"}.
"""
from __future__ import annotations
import logging
import os
from typing import Any, Dict, Optional, Tuple
logger = logging.getLogger(__name__)
DEFAULT_IMPORT_CLIENT_NAME = "Integration imports"
def _import_client_name() -> str:
raw = (os.getenv("INTEGRATION_IMPORT_CLIENT_NAME") or "").strip()
return raw or DEFAULT_IMPORT_CLIENT_NAME
def get_or_create_integration_import_client():
"""Return the shared Client used for imported integration projects; flush only (caller commits)."""
from app import db
from app.models import Client
name = _import_client_name()
c = Client.query.filter_by(name=name).first()
if c:
return c
c = Client(name=name)
db.session.add(c)
db.session.flush()
return c
def resolve_integration_actor_user_id(integration) -> Optional[int]:
"""
User id to use for Task.created_by and similar when syncing.
"""
from app.models import User
if integration is not None and getattr(integration, "user_id", None) is not None:
return integration.user_id
env_raw = (os.getenv("INTEGRATION_SYNC_USER_ID") or "").strip()
if env_raw.isdigit():
uid = int(env_raw)
from app import db
u = db.session.get(User, uid)
if u and getattr(u, "is_active", True):
return uid
logger.warning("INTEGRATION_SYNC_USER_ID=%s is missing or inactive", env_raw)
admin = User.query.filter_by(role="admin", is_active=True).order_by(User.id).first()
if admin:
return admin.id
any_user = User.query.filter_by(is_active=True).order_by(User.id).first()
return any_user.id if any_user else None
def require_sync_context(integration) -> Tuple[int, int]:
"""
Returns (actor_user_id, import_client_id).
Raises ValueError with a clear message if no actor user exists.
"""
uid = resolve_integration_actor_user_id(integration)
if uid is None:
raise ValueError(
"No active user found to attribute imported tasks to. "
"Create a user or set INTEGRATION_SYNC_USER_ID to a valid user id."
)
client = get_or_create_integration_import_client()
return uid, client.id
def find_project_by_integration_ref(client_id: int, source: str, ref: str):
from app.models import Project
for p in Project.query.filter_by(client_id=client_id).all():
cf = p.custom_fields if p.custom_fields is not None else {}
block = cf.get("integration") if isinstance(cf, dict) else {}
if isinstance(block, dict) and block.get("source") == source and block.get("ref") == ref:
return p
return None
def ensure_project_integration_fields(
project,
*,
source: str,
ref: str,
display_name: str,
description: Optional[str] = None,
) -> None:
"""Attach integration marker to project custom_fields (mutates in place)."""
cf: Dict[str, Any] = dict(project.custom_fields) if isinstance(project.custom_fields, dict) else {}
cf["integration"] = {"source": source, "ref": ref}
project.custom_fields = cf
if display_name and project.name != display_name:
project.name = display_name
if description is not None:
project.description = description
def find_task_by_integration_ref(project_id: int, ref: str, source: Optional[str] = None):
"""Match task by integration ref. If ``source`` is set, require the same integration source."""
from app.models import Task
for t in Task.query.filter_by(project_id=project_id).all():
cf = t.custom_fields if t.custom_fields is not None else {}
block = cf.get("integration") if isinstance(cf, dict) else {}
if not isinstance(block, dict) or block.get("ref") != ref:
continue
if source is not None and block.get("source") != source:
continue
return t
return None
def set_task_integration_ref(task, *, source: str, ref: str, extra: Optional[Dict[str, Any]] = None) -> None:
cf: Dict[str, Any] = dict(task.custom_fields) if isinstance(task.custom_fields, dict) else {}
block: Dict[str, Any] = {"source": source, "ref": ref}
if extra:
block.update(extra)
cf["integration"] = block
task.custom_fields = cf
def sync_result_item_count(sync_result: Optional[Dict[str, Any]]) -> int:
"""Normalize synced_count vs synced_items from connector sync_data return values."""
if not sync_result or not isinstance(sync_result, dict):
return 0
for key in ("synced_count", "synced_items"):
v = sync_result.get(key)
if v is not None:
try:
return int(v)
except (TypeError, ValueError):
continue
return 0
+15 -2
View File
@@ -899,7 +899,16 @@ def sync_integrations():
f"Failed to sync integration {integration.id} ({integration.provider}): {result.get('message')}"
)
db.session.commit()
from app.utils.integration_sync_context import sync_result_item_count
_n = sync_result_item_count(result)
service._log_event(
integration.id,
"sync",
bool(result.get("success")),
result.get("message"),
({"synced_count": _n, "synced_items": _n, "trigger": "scheduler"} if _n or result.get("success") else {"trigger": "scheduler"}),
)
except Exception as e:
error_msg = f"Error syncing integration {integration.id} ({integration.provider}): {str(e)}"
@@ -907,7 +916,11 @@ def sync_integrations():
logger.error(error_msg, exc_info=True)
integration.last_sync_status = "error"
integration.last_error = str(e)
db.session.commit()
try:
service._log_event(integration.id, "sync", False, str(e), {"trigger": "scheduler"})
except Exception as log_err:
logger.warning("Could not log integration sync failure: %s", log_err)
db.session.commit()
logger.info(f"Integration sync completed. Synced {synced_count}/{len(active_integrations)} integrations")
if errors:
@@ -0,0 +1,51 @@
"""Tests for integration sync helpers."""
from unittest.mock import MagicMock, patch
import pytest
pytestmark = [pytest.mark.unit]
def test_sync_result_item_count_prefers_synced_count():
from app.utils.integration_sync_context import sync_result_item_count
assert sync_result_item_count({"synced_count": 5, "synced_items": 9}) == 5
def test_sync_result_item_count_falls_back_to_synced_items():
from app.utils.integration_sync_context import sync_result_item_count
assert sync_result_item_count({"synced_items": 7}) == 7
def test_sync_result_item_count_empty():
from app.utils.integration_sync_context import sync_result_item_count
assert sync_result_item_count({}) == 0
assert sync_result_item_count(None) == 0
@patch("app.models.Task")
def test_find_task_by_integration_ref_filters_by_source(MockTask):
from app.utils.integration_sync_context import find_task_by_integration_ref
t_git = MagicMock()
t_git.custom_fields = {"integration": {"source": "github", "ref": "same-ref"}}
t_jira = MagicMock()
t_jira.custom_fields = {"integration": {"source": "jira", "ref": "same-ref"}}
MockTask.query.filter_by.return_value.all.return_value = [t_git, t_jira]
assert find_task_by_integration_ref(42, "same-ref", source="jira") is t_jira
assert find_task_by_integration_ref(42, "same-ref", source="github") is t_git
@patch("app.models.Task")
def test_find_task_by_integration_ref_without_source_matches_any(MockTask):
from app.utils.integration_sync_context import find_task_by_integration_ref
first = MagicMock()
first.custom_fields = {"integration": {"source": "github", "ref": "r1"}}
MockTask.query.filter_by.return_value.all.return_value = [first]
assert find_task_by_integration_ref(1, "r1") is first