feat: Add comprehensive feature implementation including integrations, workflows, approvals, and AI features

Major Features:
- Integration framework with implementations for Asana, Google Calendar, QuickBooks, and Trello
- Workflow automation system with workflow engine service
- Time entry approval system with client approval capabilities
- Recurring tasks functionality
- Client portal customization and team chat features
- AI-powered categorization and suggestion services
- GPS tracking for expenses
- Gamification system with service layer
- Custom reporting with service and model support
- Enhanced OCR service for expense processing
- Pomodoro timer service
- Currency service for multi-currency support
- PowerPoint export utility

Frontend Enhancements:
- Activity feed JavaScript module
- Mentions system for team chat
- Offline sync capabilities
- New templates for approvals, chat, and recurring tasks

Database Migrations:
- Updated integration framework migrations (066-068)
- Added workflow automation migration (069)
- Added time entry approvals migration (070)
- Added recurring tasks migration (071)
- Added client portal and team chat migration (072)
- Added AI features and GPS tracking migration (073)

Documentation:
- Updated implementation documentation
- Removed obsolete feature gap analysis docs
- Added comprehensive implementation status reports
This commit is contained in:
Dries Peeters
2025-11-28 22:39:04 +01:00
parent 653800d22b
commit 8585b097e0
58 changed files with 9439 additions and 1351 deletions
+264
View File
@@ -0,0 +1,264 @@
"""
Asana integration connector.
Sync tasks and projects with Asana.
"""
from typing import Dict, Any, Optional, List
from datetime import datetime, timedelta
from app.integrations.base import BaseConnector
import requests
import os
class AsanaConnector(BaseConnector):
"""Asana integration connector."""
display_name = "Asana"
description = "Sync tasks and projects with Asana"
icon = "asana"
BASE_URL = "https://app.asana.com/api/1.0"
@property
def provider_name(self) -> str:
return "asana"
def get_authorization_url(self, redirect_uri: str, state: str = None) -> str:
"""Get Asana OAuth authorization URL."""
from app.models import Settings
settings = Settings.get_settings()
creds = settings.get_integration_credentials("asana")
client_id = creds.get("client_id") or os.getenv("ASANA_CLIENT_ID")
if not client_id:
raise ValueError("ASANA_CLIENT_ID not configured")
auth_url = "https://app.asana.com/-/oauth_authorize"
params = {
"client_id": client_id,
"redirect_uri": redirect_uri,
"response_type": "code",
"state": state or ""
}
query_string = "&".join([f"{k}={v}" for k, v in params.items()])
return f"{auth_url}?{query_string}"
def exchange_code_for_tokens(self, code: str, redirect_uri: str) -> Dict[str, Any]:
"""Exchange authorization code for tokens."""
from app.models import Settings
settings = Settings.get_settings()
creds = settings.get_integration_credentials("asana")
client_id = creds.get("client_id") or os.getenv("ASANA_CLIENT_ID")
client_secret = creds.get("client_secret") or os.getenv("ASANA_CLIENT_SECRET")
if not client_id or not client_secret:
raise ValueError("Asana OAuth credentials not configured")
token_url = f"{self.BASE_URL}/oauth_token"
response = requests.post(
token_url,
data={
"grant_type": "authorization_code",
"client_id": client_id,
"client_secret": client_secret,
"code": code,
"redirect_uri": redirect_uri,
},
)
response.raise_for_status()
data = response.json()
expires_at = None
if "expires_in" in data:
expires_at = datetime.utcnow() + timedelta(seconds=data["expires_in"])
# Get user info
user_info = {}
if "access_token" in data:
try:
user_response = requests.get(
f"{self.BASE_URL}/users/me",
headers={"Authorization": f"Bearer {data['access_token']}"}
)
if user_response.status_code == 200:
user_data = user_response.json().get("data", {})
user_info = {
"gid": user_data.get("gid"),
"name": user_data.get("name"),
"email": user_data.get("email")
}
except Exception:
pass
return {
"access_token": data.get("access_token"),
"refresh_token": data.get("refresh_token"),
"expires_at": expires_at.isoformat() if expires_at else None,
"token_type": "Bearer",
"extra_data": user_info
}
def refresh_access_token(self) -> Dict[str, Any]:
"""Refresh access token."""
if not self.credentials or not self.credentials.refresh_token:
raise ValueError("No refresh token available")
from app.models import Settings
settings = Settings.get_settings()
creds = settings.get_integration_credentials("asana")
client_id = creds.get("client_id") or os.getenv("ASANA_CLIENT_ID")
client_secret = creds.get("client_secret") or os.getenv("ASANA_CLIENT_SECRET")
token_url = f"{self.BASE_URL}/oauth_token"
response = requests.post(
token_url,
data={
"grant_type": "refresh_token",
"client_id": client_id,
"client_secret": client_secret,
"refresh_token": self.credentials.refresh_token,
},
)
response.raise_for_status()
data = response.json()
expires_at = None
if "expires_in" in data:
expires_at = datetime.utcnow() + timedelta(seconds=data["expires_in"])
# Update credentials
self.credentials.access_token = data.get("access_token")
if "refresh_token" in data:
self.credentials.refresh_token = data.get("refresh_token")
if expires_at:
self.credentials.expires_at = expires_at
self.credentials.save()
return {
"access_token": data.get("access_token"),
"expires_at": expires_at.isoformat() if expires_at else None
}
def test_connection(self) -> Dict[str, Any]:
"""Test connection to Asana."""
try:
headers = {"Authorization": f"Bearer {self.get_access_token()}"}
response = requests.get(f"{self.BASE_URL}/users/me", headers=headers)
if response.status_code == 200:
user_data = response.json().get("data", {})
return {
"success": True,
"message": f"Connected to Asana as {user_data.get('name', 'Unknown')}"
}
else:
return {
"success": False,
"message": f"Connection test failed: {response.status_code}"
}
except Exception as e:
return {
"success": False,
"message": f"Connection test failed: {str(e)}"
}
def sync_data(self, sync_type: str = "full") -> Dict[str, Any]:
"""Sync tasks and projects with Asana."""
from app.models import Task, Project
from app import db
try:
headers = {"Authorization": f"Bearer {self.get_access_token()}"}
# Get workspace from config
workspace_gid = self.integration.config.get("workspace_gid")
if not workspace_gid:
return {"success": False, "message": "Workspace GID not configured"}
synced_count = 0
errors = []
# Sync projects from Asana
projects_response = requests.get(
f"{self.BASE_URL}/projects",
headers=headers,
params={"workspace": workspace_gid, "opt_fields": "name,notes,archived"}
)
if projects_response.status_code == 200:
asana_projects = projects_response.json().get("data", [])
for asana_project in asana_projects:
try:
# Find or create project
project = Project.query.filter_by(
user_id=self.integration.user_id,
name=asana_project.get("name")
).first()
if not project:
project = Project(
name=asana_project.get("name"),
description=asana_project.get("notes", ""),
user_id=self.integration.user_id,
status="active" if not asana_project.get("archived") else "archived"
)
db.session.add(project)
db.session.flush()
# Store Asana project GID in project metadata
if not hasattr(project, 'metadata') or not project.metadata:
project.metadata = {}
project.metadata['asana_project_gid'] = asana_project.get("gid")
synced_count += 1
except Exception as e:
errors.append(f"Error syncing project {asana_project.get('name')}: {str(e)}")
db.session.commit()
return {
"success": True,
"synced_count": synced_count,
"errors": errors
}
except Exception as e:
return {
"success": False,
"message": f"Sync failed: {str(e)}"
}
def get_config_schema(self) -> Dict[str, Any]:
"""Get configuration schema."""
return {
"fields": [
{
"name": "workspace_gid",
"type": "string",
"label": "Workspace GID",
"description": "Asana workspace GID to sync with"
},
{
"name": "sync_direction",
"type": "select",
"label": "Sync Direction",
"options": [
{"value": "asana_to_timetracker", "label": "Asana → TimeTracker"},
{"value": "timetracker_to_asana", "label": "TimeTracker → Asana"},
{"value": "bidirectional", "label": "Bidirectional"}
],
"default": "asana_to_timetracker"
}
],
"required": ["workspace_gid"]
}
+397
View File
@@ -0,0 +1,397 @@
"""
Google Calendar integration connector.
Provides two-way sync between TimeTracker and Google Calendar.
"""
from typing import Dict, Any, Optional, List
from datetime import datetime, timedelta
from app.integrations.base import BaseConnector
import requests
import os
from google.oauth2.credentials import Credentials
from google_auth_oauthlib.flow import Flow
from google.auth.transport.requests import Request
from googleapiclient.discovery import build
from googleapiclient.errors import HttpError
class GoogleCalendarConnector(BaseConnector):
"""Google Calendar integration connector."""
display_name = "Google Calendar"
description = "Two-way sync with Google Calendar"
icon = "google"
# OAuth 2.0 scopes required
SCOPES = [
'https://www.googleapis.com/auth/calendar',
'https://www.googleapis.com/auth/calendar.events'
]
@property
def provider_name(self) -> str:
return "google_calendar"
def get_authorization_url(self, redirect_uri: str, state: str = None) -> str:
"""Get Google OAuth authorization URL."""
from app.models import Settings
settings = Settings.get_settings()
creds = settings.get_integration_credentials("google_calendar")
client_id = creds.get("client_id") or os.getenv("GOOGLE_CLIENT_ID")
client_secret = creds.get("client_secret") or os.getenv("GOOGLE_CLIENT_SECRET")
if not client_id or not client_secret:
raise ValueError("Google Calendar OAuth credentials not configured")
flow = Flow.from_client_config(
{
"web": {
"client_id": client_id,
"client_secret": client_secret,
"auth_uri": "https://accounts.google.com/o/oauth2/auth",
"token_uri": "https://oauth2.googleapis.com/token",
"redirect_uris": [redirect_uri]
}
},
scopes=self.SCOPES,
redirect_uri=redirect_uri
)
if state:
flow.state = state
authorization_url, _ = flow.authorization_url(
access_type='offline',
include_granted_scopes='true',
prompt='consent' # Force consent to get refresh token
)
return authorization_url
def exchange_code_for_tokens(self, code: str, redirect_uri: str) -> Dict[str, Any]:
"""Exchange authorization code for tokens."""
from app.models import Settings
settings = Settings.get_settings()
creds = settings.get_integration_credentials("google_calendar")
client_id = creds.get("client_id") or os.getenv("GOOGLE_CLIENT_ID")
client_secret = creds.get("client_secret") or os.getenv("GOOGLE_CLIENT_SECRET")
if not client_id or not client_secret:
raise ValueError("Google Calendar OAuth credentials not configured")
flow = Flow.from_client_config(
{
"web": {
"client_id": client_id,
"client_secret": client_secret,
"auth_uri": "https://accounts.google.com/o/oauth2/auth",
"token_uri": "https://oauth2.googleapis.com/token",
"redirect_uris": [redirect_uri]
}
},
scopes=self.SCOPES,
redirect_uri=redirect_uri
)
flow.fetch_token(code=code)
credentials = flow.credentials
# Get user info
user_info = {}
try:
service = build('oauth2', 'v2', credentials=credentials)
user_info_response = service.userinfo().get().execute()
user_info = {
"email": user_info_response.get("email"),
"name": user_info_response.get("name"),
"picture": user_info_response.get("picture")
}
except Exception:
pass
return {
"access_token": credentials.token,
"refresh_token": credentials.refresh_token,
"expires_at": credentials.expiry.isoformat() if credentials.expiry else None,
"token_type": "Bearer",
"scope": " ".join(credentials.scopes) if credentials.scopes else None,
"extra_data": user_info
}
def refresh_access_token(self) -> Dict[str, Any]:
"""Refresh access token using refresh token."""
if not self.credentials or not self.credentials.refresh_token:
raise ValueError("No refresh token available")
from app.models import Settings
settings = Settings.get_settings()
creds = settings.get_integration_credentials("google_calendar")
client_id = creds.get("client_id") or os.getenv("GOOGLE_CLIENT_ID")
client_secret = creds.get("client_secret") or os.getenv("GOOGLE_CLIENT_SECRET")
if not client_id or not client_secret:
raise ValueError("Google Calendar OAuth credentials not configured")
credentials = Credentials(
token=self.credentials.access_token,
refresh_token=self.credentials.refresh_token,
token_uri="https://oauth2.googleapis.com/token",
client_id=client_id,
client_secret=client_secret
)
credentials.refresh(Request())
# Update credentials
self.credentials.access_token = credentials.token
if credentials.expiry:
self.credentials.expires_at = credentials.expiry
self.credentials.save()
return {
"access_token": credentials.token,
"expires_at": credentials.expiry.isoformat() if credentials.expiry else None
}
def test_connection(self) -> Dict[str, Any]:
"""Test connection to Google Calendar."""
try:
service = self._get_calendar_service()
calendar_list = service.calendarList().list().execute()
return {
"success": True,
"message": f"Connected to Google Calendar. Found {len(calendar_list.get('items', []))} calendars."
}
except Exception as e:
return {
"success": False,
"message": f"Connection test failed: {str(e)}"
}
def _get_calendar_service(self):
"""Get Google Calendar API service."""
credentials = Credentials(
token=self.credentials.access_token,
refresh_token=self.credentials.refresh_token,
token_uri="https://oauth2.googleapis.com/token",
client_id=os.getenv("GOOGLE_CLIENT_ID"),
client_secret=os.getenv("GOOGLE_CLIENT_SECRET")
)
# Refresh if needed
if credentials.expired:
credentials.refresh(Request())
self.credentials.access_token = credentials.token
if credentials.expiry:
self.credentials.expires_at = credentials.expiry
self.credentials.save()
return build('calendar', 'v3', credentials=credentials)
def sync_data(self, sync_type: str = "full") -> Dict[str, Any]:
"""Sync time entries with Google Calendar."""
from app.models import TimeEntry, CalendarSyncEvent
from app import db
from datetime import datetime, timedelta
try:
service = self._get_calendar_service()
# Get calendar ID from integration config
calendar_id = self.integration.config.get("calendar_id", "primary")
# Get time entries to sync
if sync_type == "incremental":
# Get last sync time
last_sync = CalendarSyncEvent.query.filter_by(
integration_id=self.integration.id
).order_by(CalendarSyncEvent.synced_at.desc()).first()
start_date = last_sync.synced_at if last_sync else datetime.utcnow() - timedelta(days=30)
else:
start_date = datetime.utcnow() - timedelta(days=90)
# Get time entries
time_entries = TimeEntry.query.filter(
TimeEntry.user_id == self.integration.user_id,
TimeEntry.start_time >= start_date,
TimeEntry.end_time.isnot(None)
).all()
synced_count = 0
errors = []
for entry in time_entries:
try:
# Check if already synced
existing_sync = CalendarSyncEvent.query.filter_by(
integration_id=self.integration.id,
time_entry_id=entry.id
).first()
if existing_sync:
# Update existing event
event_id = existing_sync.external_event_id
self._update_calendar_event(service, calendar_id, event_id, entry)
else:
# Create new event
event_id = self._create_calendar_event(service, calendar_id, entry)
# Create sync record
sync_event = CalendarSyncEvent(
integration_id=self.integration.id,
time_entry_id=entry.id,
external_event_id=event_id,
synced_at=datetime.utcnow()
)
db.session.add(sync_event)
synced_count += 1
except Exception as e:
errors.append(f"Error syncing entry {entry.id}: {str(e)}")
db.session.commit()
return {
"success": True,
"synced_count": synced_count,
"errors": errors
}
except Exception as e:
return {
"success": False,
"message": f"Sync failed: {str(e)}"
}
def _create_calendar_event(self, service, calendar_id: str, time_entry) -> str:
"""Create a calendar event from a time entry."""
from app.models import Project, Task
project = Project.query.get(time_entry.project_id)
task = Task.query.get(time_entry.task_id) if time_entry.task_id else None
# Build event title
title_parts = []
if project:
title_parts.append(project.name)
if task:
title_parts.append(task.name)
if not title_parts:
title_parts.append("Time Entry")
title = " - ".join(title_parts)
# Build description
description_parts = []
if time_entry.notes:
description_parts.append(time_entry.notes)
if time_entry.tags:
description_parts.append(f"Tags: {time_entry.tags}")
description = "\n\n".join(description_parts) if description_parts else None
event = {
'summary': title,
'description': description,
'start': {
'dateTime': time_entry.start_time.isoformat(),
'timeZone': 'UTC',
},
'end': {
'dateTime': time_entry.end_time.isoformat(),
'timeZone': 'UTC',
},
'colorId': '9' if time_entry.billable else '11', # Blue for billable, red for non-billable
}
created_event = service.events().insert(
calendarId=calendar_id,
body=event
).execute()
return created_event['id']
def _update_calendar_event(self, service, calendar_id: str, event_id: str, time_entry):
"""Update an existing calendar event."""
from app.models import Project, Task
project = Project.query.get(time_entry.project_id)
task = Task.query.get(time_entry.task_id) if time_entry.task_id else None
# Build event title
title_parts = []
if project:
title_parts.append(project.name)
if task:
title_parts.append(task.name)
if not title_parts:
title_parts.append("Time Entry")
title = " - ".join(title_parts)
# Build description
description_parts = []
if time_entry.notes:
description_parts.append(time_entry.notes)
if time_entry.tags:
description_parts.append(f"Tags: {time_entry.tags}")
description = "\n\n".join(description_parts) if description_parts else None
# Get existing event
event = service.events().get(calendarId=calendar_id, eventId=event_id).execute()
# Update event
event['summary'] = title
event['description'] = description
event['start'] = {
'dateTime': time_entry.start_time.isoformat(),
'timeZone': 'UTC',
}
event['end'] = {
'dateTime': time_entry.end_time.isoformat(),
'timeZone': 'UTC',
}
event['colorId'] = '9' if time_entry.billable else '11'
service.events().update(
calendarId=calendar_id,
eventId=event_id,
body=event
).execute()
def get_config_schema(self) -> Dict[str, Any]:
"""Get configuration schema."""
return {
"fields": [
{
"name": "calendar_id",
"type": "string",
"label": "Calendar ID",
"default": "primary",
"description": "Google Calendar ID to sync with (default: primary)"
},
{
"name": "sync_direction",
"type": "select",
"label": "Sync Direction",
"options": [
{"value": "time_tracker_to_calendar", "label": "TimeTracker → Calendar"},
{"value": "calendar_to_time_tracker", "label": "Calendar → TimeTracker"},
{"value": "bidirectional", "label": "Bidirectional"}
],
"default": "time_tracker_to_calendar"
},
{
"name": "auto_sync",
"type": "boolean",
"label": "Auto Sync",
"default": True,
"description": "Automatically sync when time entries are created/updated"
}
],
"required": []
}
+390
View File
@@ -0,0 +1,390 @@
"""
QuickBooks integration connector.
Sync invoices, expenses, and payments with QuickBooks Online.
"""
from typing import Dict, Any, Optional, List
from datetime import datetime, timedelta
from app.integrations.base import BaseConnector
import requests
import os
import base64
import logging
logger = logging.getLogger(__name__)
class QuickBooksConnector(BaseConnector):
"""QuickBooks Online integration connector."""
display_name = "QuickBooks Online"
description = "Sync invoices, expenses, and payments with QuickBooks"
icon = "quickbooks"
BASE_URL = "https://sandbox-quickbooks.api.intuit.com" # Sandbox
PRODUCTION_URL = "https://quickbooks.api.intuit.com" # Production
@property
def provider_name(self) -> str:
return "quickbooks"
def get_base_url(self):
"""Get base URL based on environment"""
use_sandbox = self.integration.config.get("use_sandbox", True) if self.integration else True
return self.BASE_URL if use_sandbox else self.PRODUCTION_URL
def get_authorization_url(self, redirect_uri: str, state: str = None) -> str:
"""Get QuickBooks OAuth authorization URL."""
from app.models import Settings
settings = Settings.get_settings()
creds = settings.get_integration_credentials("quickbooks")
client_id = creds.get("client_id") or os.getenv("QUICKBOOKS_CLIENT_ID")
client_secret = creds.get("client_secret") or os.getenv("QUICKBOOKS_CLIENT_SECRET")
if not client_id:
raise ValueError("QUICKBOOKS_CLIENT_ID not configured")
auth_url = "https://appcenter.intuit.com/connect/oauth2"
scopes = [
"com.intuit.quickbooks.accounting",
"com.intuit.quickbooks.payment"
]
params = {
"client_id": client_id,
"scope": " ".join(scopes),
"redirect_uri": redirect_uri,
"response_type": "code",
"access_type": "offline",
"state": state or ""
}
query_string = "&".join([f"{k}={v}" for k, v in params.items()])
return f"{auth_url}?{query_string}"
def exchange_code_for_tokens(self, code: str, redirect_uri: str) -> Dict[str, Any]:
"""Exchange authorization code for tokens."""
from app.models import Settings
settings = Settings.get_settings()
creds = settings.get_integration_credentials("quickbooks")
client_id = creds.get("client_id") or os.getenv("QUICKBOOKS_CLIENT_ID")
client_secret = creds.get("client_secret") or os.getenv("QUICKBOOKS_CLIENT_SECRET")
if not client_id or not client_secret:
raise ValueError("QuickBooks OAuth credentials not configured")
token_url = "https://oauth.platform.intuit.com/oauth2/v1/tokens/bearer"
# QuickBooks requires Basic Auth for token exchange
auth_string = f"{client_id}:{client_secret}"
auth_bytes = auth_string.encode('ascii')
auth_b64 = base64.b64encode(auth_bytes).decode('ascii')
response = requests.post(
token_url,
headers={
"Authorization": f"Basic {auth_b64}",
"Accept": "application/json",
"Content-Type": "application/x-www-form-urlencoded"
},
data={
"grant_type": "authorization_code",
"code": code,
"redirect_uri": redirect_uri
}
)
response.raise_for_status()
data = response.json()
expires_at = None
if "expires_in" in data:
expires_at = datetime.utcnow() + timedelta(seconds=data["expires_in"])
# Get company info
company_info = {}
if "access_token" in data and "realmId" in data:
try:
realm_id = data["realmId"]
company_response = self._api_request(
"GET",
f"/v3/company/{realm_id}/companyinfo/{realm_id}",
data.get("access_token"),
realm_id
)
if company_response:
company_info = company_response.get("CompanyInfo", {})
except Exception:
pass
return {
"access_token": data.get("access_token"),
"refresh_token": data.get("refresh_token"),
"expires_at": expires_at.isoformat() if expires_at else None,
"token_type": "Bearer",
"realm_id": data.get("realmId"), # QuickBooks company ID
"extra_data": {
"company_name": company_info.get("CompanyName", ""),
"company_id": data.get("realmId")
}
}
def refresh_access_token(self) -> Dict[str, Any]:
"""Refresh access token using refresh token."""
if not self.credentials or not self.credentials.refresh_token:
raise ValueError("No refresh token available")
from app.models import Settings
settings = Settings.get_settings()
creds = settings.get_integration_credentials("quickbooks")
client_id = creds.get("client_id") or os.getenv("QUICKBOOKS_CLIENT_ID")
client_secret = creds.get("client_secret") or os.getenv("QUICKBOOKS_CLIENT_SECRET")
token_url = "https://oauth.platform.intuit.com/oauth2/v1/tokens/bearer"
auth_string = f"{client_id}:{client_secret}"
auth_bytes = auth_string.encode('ascii')
auth_b64 = base64.b64encode(auth_bytes).decode('ascii')
response = requests.post(
token_url,
headers={
"Authorization": f"Basic {auth_b64}",
"Accept": "application/json",
"Content-Type": "application/x-www-form-urlencoded"
},
data={
"grant_type": "refresh_token",
"refresh_token": self.credentials.refresh_token
}
)
response.raise_for_status()
data = response.json()
expires_at = None
if "expires_in" in data:
expires_at = datetime.utcnow() + timedelta(seconds=data["expires_in"])
# Update credentials
self.credentials.access_token = data.get("access_token")
if "refresh_token" in data:
self.credentials.refresh_token = data.get("refresh_token")
if expires_at:
self.credentials.expires_at = expires_at
self.credentials.save()
return {
"access_token": data.get("access_token"),
"expires_at": expires_at.isoformat() if expires_at else None
}
def test_connection(self) -> Dict[str, Any]:
"""Test connection to QuickBooks."""
try:
realm_id = self.integration.config.get("realm_id") if self.integration else None
if not realm_id:
return {"success": False, "message": "QuickBooks company not configured"}
company_info = self._api_request(
"GET",
f"/v3/company/{realm_id}/companyinfo/{realm_id}",
self.get_access_token(),
realm_id
)
if company_info:
company_name = company_info.get("CompanyInfo", {}).get("CompanyName", "Unknown")
return {
"success": True,
"message": f"Connected to QuickBooks company: {company_name}"
}
else:
return {
"success": False,
"message": "Failed to retrieve company information"
}
except Exception as e:
return {
"success": False,
"message": f"Connection test failed: {str(e)}"
}
def _api_request(self, method: str, endpoint: str, access_token: str, realm_id: str) -> Optional[Dict]:
"""Make API request to QuickBooks"""
base_url = self.get_base_url()
url = f"{base_url}{endpoint}"
headers = {
"Authorization": f"Bearer {access_token}",
"Accept": "application/json",
"Content-Type": "application/json"
}
if realm_id:
headers["realmId"] = realm_id
try:
if method.upper() == "GET":
response = requests.get(url, headers=headers, timeout=10)
elif method.upper() == "POST":
response = requests.post(url, headers=headers, timeout=10, json={})
else:
response = requests.request(method, url, headers=headers, timeout=10)
response.raise_for_status()
return response.json()
except Exception as e:
logger.error(f"QuickBooks API request failed: {e}")
return None
def sync_data(self, sync_type: str = "full") -> Dict[str, Any]:
"""Sync invoices and expenses with QuickBooks"""
from app.models import Invoice, Expense
from app import db
try:
realm_id = self.integration.config.get("realm_id")
if not realm_id:
return {"success": False, "message": "QuickBooks company not configured"}
access_token = self.get_access_token()
synced_count = 0
errors = []
# Sync invoices (create as invoices in QuickBooks)
if sync_type == "full" or sync_type == "invoices":
invoices = Invoice.query.filter(
Invoice.status.in_(["sent", "paid"]),
Invoice.created_at >= datetime.utcnow() - timedelta(days=90)
).all()
for invoice in invoices:
try:
qb_invoice = self._create_quickbooks_invoice(invoice, access_token, realm_id)
if qb_invoice:
# Store QuickBooks ID in invoice metadata
if not hasattr(invoice, 'metadata') or not invoice.metadata:
invoice.metadata = {}
invoice.metadata['quickbooks_id'] = qb_invoice.get("Id")
synced_count += 1
except Exception as e:
errors.append(f"Error syncing invoice {invoice.id}: {str(e)}")
# Sync expenses (create as expenses in QuickBooks)
if sync_type == "full" or sync_type == "expenses":
expenses = Expense.query.filter(
Expense.date >= datetime.utcnow().date() - timedelta(days=90)
).all()
for expense in expenses:
try:
qb_expense = self._create_quickbooks_expense(expense, access_token, realm_id)
if qb_expense:
if not hasattr(expense, 'metadata') or not expense.metadata:
expense.metadata = {}
expense.metadata['quickbooks_id'] = qb_expense.get("Id")
synced_count += 1
except Exception as e:
errors.append(f"Error syncing expense {expense.id}: {str(e)}")
db.session.commit()
return {
"success": True,
"synced_count": synced_count,
"errors": errors
}
except Exception as e:
return {
"success": False,
"message": f"Sync failed: {str(e)}"
}
def _create_quickbooks_invoice(self, invoice, access_token: str, realm_id: str) -> Optional[Dict]:
"""Create invoice in QuickBooks"""
# Build QuickBooks invoice structure
qb_invoice = {
"Line": []
}
# Add invoice items
for item in invoice.items:
qb_invoice["Line"].append({
"Amount": float(item.quantity * item.unit_price),
"DetailType": "SalesItemLineDetail",
"SalesItemLineDetail": {
"ItemRef": {
"value": "1", # Would need to map to actual QuickBooks item
"name": item.description
},
"Qty": float(item.quantity),
"UnitPrice": float(item.unit_price)
}
})
# Add customer reference (would need customer mapping)
# qb_invoice["CustomerRef"] = {"value": customer_qb_id}
endpoint = f"/v3/company/{realm_id}/invoice"
return self._api_request("POST", endpoint, access_token, realm_id)
def _create_quickbooks_expense(self, expense, access_token: str, realm_id: str) -> Optional[Dict]:
"""Create expense in QuickBooks"""
# Build QuickBooks expense structure
qb_expense = {
"PaymentType": "Cash",
"AccountRef": {
"value": "1" # Would need account mapping
},
"Line": [{
"Amount": float(expense.amount),
"DetailType": "AccountBasedExpenseLineDetail",
"AccountBasedExpenseLineDetail": {
"AccountRef": {
"value": "1" # Expense account
}
}
}]
}
endpoint = f"/v3/company/{realm_id}/purchase"
return self._api_request("POST", endpoint, access_token, realm_id)
def get_config_schema(self) -> Dict[str, Any]:
"""Get configuration schema."""
return {
"fields": [
{
"name": "realm_id",
"type": "string",
"label": "Company ID (Realm ID)",
"description": "QuickBooks company ID (realm ID)"
},
{
"name": "use_sandbox",
"type": "boolean",
"label": "Use Sandbox",
"default": True,
"description": "Use QuickBooks sandbox environment for testing"
},
{
"name": "sync_invoices",
"type": "boolean",
"label": "Sync Invoices",
"default": True
},
{
"name": "sync_expenses",
"type": "boolean",
"label": "Sync Expenses",
"default": True
}
],
"required": ["realm_id"]
}
+8
View File
@@ -7,6 +7,10 @@ from app.services.integration_service import IntegrationService
from app.integrations.jira import JiraConnector
from app.integrations.slack import SlackConnector
from app.integrations.github import GitHubConnector
from app.integrations.google_calendar import GoogleCalendarConnector
from app.integrations.asana import AsanaConnector
from app.integrations.trello import TrelloConnector
from app.integrations.quickbooks import QuickBooksConnector
def register_connectors():
@@ -14,6 +18,10 @@ def register_connectors():
IntegrationService.register_connector("jira", JiraConnector)
IntegrationService.register_connector("slack", SlackConnector)
IntegrationService.register_connector("github", GitHubConnector)
IntegrationService.register_connector("google_calendar", GoogleCalendarConnector)
IntegrationService.register_connector("asana", AsanaConnector)
IntegrationService.register_connector("trello", TrelloConnector)
IntegrationService.register_connector("quickbooks", QuickBooksConnector)
# Auto-register on import
+264
View File
@@ -0,0 +1,264 @@
"""
Trello integration connector.
Sync boards, lists, and cards with Trello.
"""
from typing import Dict, Any, Optional, List
from datetime import datetime, timedelta
from app.integrations.base import BaseConnector
import requests
import os
import hmac
import hashlib
import base64
class TrelloConnector(BaseConnector):
"""Trello integration connector."""
display_name = "Trello"
description = "Sync boards and cards with Trello"
icon = "trello"
BASE_URL = "https://api.trello.com/1"
@property
def provider_name(self) -> str:
return "trello"
def get_authorization_url(self, redirect_uri: str, state: str = None) -> str:
"""Get Trello OAuth authorization URL."""
from app.models import Settings
settings = Settings.get_settings()
creds = settings.get_integration_credentials("trello")
api_key = creds.get("api_key") or os.getenv("TRELLO_API_KEY")
if not api_key:
raise ValueError("TRELLO_API_KEY not configured")
auth_url = "https://trello.com/1/OAuthAuthorizeToken"
params = {
"key": api_key,
"name": "TimeTracker Integration",
"response_type": "token",
"scope": "read,write",
"expiration": "never",
"redirect_uri": redirect_uri
}
query_string = "&".join([f"{k}={v}" for k, v in params.items()])
return f"{auth_url}?{query_string}"
def exchange_code_for_tokens(self, code: str, redirect_uri: str) -> Dict[str, Any]:
"""Exchange authorization code for tokens (Trello uses token directly)."""
# Trello uses token-based auth, not OAuth flow
# The token is returned directly from the authorization URL
from app.models import Settings
settings = Settings.get_settings()
creds = settings.get_integration_credentials("trello")
api_key = creds.get("api_key") or os.getenv("TRELLO_API_KEY")
if not api_key:
raise ValueError("Trello API key not configured")
# For Trello, the 'code' parameter is actually the token
token = code
# Verify token by getting user info
user_info = {}
try:
response = requests.get(
f"{self.BASE_URL}/members/me",
params={"key": api_key, "token": token}
)
if response.status_code == 200:
user_data = response.json()
user_info = {
"id": user_data.get("id"),
"username": user_data.get("username"),
"fullName": user_data.get("fullName"),
"email": user_data.get("email")
}
except Exception:
pass
return {
"access_token": token,
"refresh_token": None, # Trello tokens don't expire
"expires_at": None,
"token_type": "Bearer",
"extra_data": user_info
}
def refresh_access_token(self) -> Dict[str, Any]:
"""Refresh access token (Trello tokens don't expire)."""
# Trello tokens don't expire, so just return current token
return {
"access_token": self.credentials.access_token if self.credentials else None,
"expires_at": None
}
def test_connection(self) -> Dict[str, Any]:
"""Test connection to Trello."""
try:
from app.models import Settings
settings = Settings.get_settings()
creds = settings.get_integration_credentials("trello")
api_key = creds.get("api_key") or os.getenv("TRELLO_API_KEY")
headers = {"Authorization": f"Bearer {self.get_access_token()}"}
response = requests.get(
f"{self.BASE_URL}/members/me",
params={"key": api_key, "token": self.get_access_token()}
)
if response.status_code == 200:
user_data = response.json()
return {
"success": True,
"message": f"Connected to Trello as {user_data.get('fullName', 'Unknown')}"
}
else:
return {
"success": False,
"message": f"Connection test failed: {response.status_code}"
}
except Exception as e:
return {
"success": False,
"message": f"Connection test failed: {str(e)}"
}
def sync_data(self, sync_type: str = "full") -> Dict[str, Any]:
"""Sync boards and cards with Trello."""
from app.models import Project, Task
from app import db
try:
from app.models import Settings
settings = Settings.get_settings()
creds = settings.get_integration_credentials("trello")
api_key = creds.get("api_key") or os.getenv("TRELLO_API_KEY")
token = self.get_access_token()
if not token or not api_key:
return {"success": False, "message": "Trello credentials not configured"}
synced_count = 0
errors = []
# Get boards
boards_response = requests.get(
f"{self.BASE_URL}/members/me/boards",
params={"key": api_key, "token": token, "filter": "open"}
)
if boards_response.status_code == 200:
boards = boards_response.json()
for board in boards:
try:
# Create or update project from board
project = Project.query.filter_by(
user_id=self.integration.user_id,
name=board.get("name")
).first()
if not project:
project = Project(
name=board.get("name"),
description=board.get("desc", ""),
user_id=self.integration.user_id,
status="active"
)
db.session.add(project)
db.session.flush()
# Store Trello board ID in metadata
if not hasattr(project, 'metadata') or not project.metadata:
project.metadata = {}
project.metadata['trello_board_id'] = board.get("id")
# Sync cards as tasks
cards_response = requests.get(
f"{self.BASE_URL}/boards/{board.get('id')}/cards",
params={"key": api_key, "token": token, "filter": "open"}
)
if cards_response.status_code == 200:
cards = cards_response.json()
for card in cards:
# Find or create task
task = Task.query.filter_by(
project_id=project.id,
name=card.get("name")
).first()
if not task:
task = Task(
project_id=project.id,
name=card.get("name"),
description=card.get("desc", ""),
status=self._map_trello_list_to_status(card.get("idList"))
)
db.session.add(task)
db.session.flush()
# Store Trello card ID in metadata
if not hasattr(task, 'metadata') or not task.metadata:
task.metadata = {}
task.metadata['trello_card_id'] = card.get("id")
synced_count += 1
except Exception as e:
errors.append(f"Error syncing board {board.get('name')}: {str(e)}")
db.session.commit()
return {
"success": True,
"synced_count": synced_count,
"errors": errors
}
except Exception as e:
return {
"success": False,
"message": f"Sync failed: {str(e)}"
}
def _map_trello_list_to_status(self, list_id: str) -> str:
"""Map Trello list to task status."""
# This would need to fetch list name, but for now use default mapping
# In production, you'd fetch the list and map by name
return "todo"
def get_config_schema(self) -> Dict[str, Any]:
"""Get configuration schema."""
return {
"fields": [
{
"name": "board_ids",
"type": "array",
"label": "Board IDs",
"description": "Trello board IDs to sync (leave empty to sync all)"
},
{
"name": "sync_direction",
"type": "select",
"label": "Sync Direction",
"options": [
{"value": "trello_to_timetracker", "label": "Trello → TimeTracker"},
{"value": "timetracker_to_trello", "label": "TimeTracker → Trello"},
{"value": "bidirectional", "label": "Bidirectional"}
],
"default": "trello_to_timetracker"
}
],
"required": []
}