Add stateful management to track processed media across all apps

This commit is contained in:
Admin9705
2025-04-30 18:59:18 -04:00
parent 3a659b3066
commit b77342f7cc
17 changed files with 1459 additions and 278 deletions
+34
View File
@@ -700,6 +700,40 @@ input:checked + .toggle-slider:before {
transform: translateX(20px); /* Changed to match login page toggle (20px) */
}
/* Settings Stateful Management */
.setting-info-block {
background-color: var(--bg-tertiary);
border: 1px solid var(--border-color);
border-radius: 8px;
padding: 15px;
margin: 10px 0;
}
.setting-info-block .info-row {
display: flex;
justify-content: space-between;
padding: 5px 0;
border-bottom: 1px solid var(--border-color);
}
.setting-info-block .info-row:last-child {
border-bottom: none;
}
.danger-button {
background-color: var(--button-danger-bg);
color: #fff;
border: none;
border-radius: 4px;
padding: 8px 15px;
cursor: pointer;
transition: background-color 0.3s;
}
.danger-button:hover {
background-color: var(--button-danger-hover);
}
/* Responsive Adjustments */
@media (max-width: 992px) {
.sidebar {
+256 -50
View File
@@ -112,57 +112,96 @@ let huntarrUI = {
setupEventListeners: function() {
// Navigation
this.elements.navItems.forEach(item => {
item.addEventListener('click', this.handleNavigation.bind(this));
item.addEventListener('click', (e) => this.handleNavigation(e));
});
// App tabs
this.elements.appTabs.forEach(tab => {
tab.addEventListener('click', this.handleAppTabChange.bind(this));
});
// Settings tabs
this.elements.settingsTabs.forEach(tab => {
tab.addEventListener('click', this.handleSettingsTabChange.bind(this));
});
// Log tabs (New)
this.elements.logTabs.forEach(tab => {
tab.addEventListener('click', this.handleLogTabChange.bind(this));
});
// Logs
// Log auto-scroll setting
if (this.elements.autoScrollCheckbox) {
this.elements.autoScrollCheckbox.addEventListener('change', (e) => {
this.autoScroll = e.target.checked;
});
}
// Clear logs button
if (this.elements.clearLogsButton) {
this.elements.clearLogsButton.addEventListener('click', this.clearLogs.bind(this));
this.elements.clearLogsButton.addEventListener('click', () => this.clearLogs());
}
// Settings
// App tabs in logs section
this.elements.appTabs.forEach(tab => {
tab.addEventListener('click', (e) => this.handleAppTabChange(e));
});
// Log tabs in logs section
this.elements.logTabs.forEach(tab => {
tab.addEventListener('click', (e) => this.handleLogTabChange(e));
});
// Settings tabs
this.elements.settingsTabs.forEach(tab => {
tab.addEventListener('click', (e) => this.handleSettingsTabChange(e));
});
// Save settings button
if (this.elements.saveSettingsButton) {
this.elements.saveSettingsButton.addEventListener('click', this.saveSettings.bind(this));
this.elements.saveSettingsButton.addEventListener('click', () => this.saveSettings());
}
// Actions
// Start hunt button
if (this.elements.startHuntButton) {
this.elements.startHuntButton.addEventListener('click', this.startHunt.bind(this));
this.elements.startHuntButton.addEventListener('click', () => this.startHunt());
}
// Stop hunt button
if (this.elements.stopHuntButton) {
this.elements.stopHuntButton.addEventListener('click', this.stopHunt.bind(this));
this.elements.stopHuntButton.addEventListener('click', () => this.stopHunt());
}
// Theme
// if (this.elements.themeToggle) { // Removed theme toggle
// this.elements.themeToggle.addEventListener('change', this.handleThemeToggle.bind(this));
// }
// Logout button
if (this.elements.logoutLink) {
this.elements.logoutLink.addEventListener('click', (e) => this.logout(e));
}
// Logout
if (this.elements.logoutLink) { // Added listener for logout
this.elements.logoutLink.addEventListener('click', this.logout.bind(this));
// Dark mode toggle
const darkModeToggle = document.getElementById('darkModeToggle');
if (darkModeToggle) {
const prefersDarkMode = localStorage.getItem('huntarr-dark-mode') === 'true';
darkModeToggle.checked = prefersDarkMode;
darkModeToggle.addEventListener('change', function() {
const isDarkMode = this.checked;
document.body.classList.toggle('dark-theme', isDarkMode);
localStorage.setItem('huntarr-dark-mode', isDarkMode);
});
}
// Settings inputs change tracking
document.querySelectorAll('#settingsSection input, #settingsSection select').forEach(element => {
element.addEventListener('change', () => this.markSettingsAsChanged());
});
// Monitor for window beforeunload to warn about unsaved settings
window.addEventListener('beforeunload', (e) => {
if (this.settingsChanged) {
// Standard way to show a confirmation dialog when navigating away
e.preventDefault();
e.returnValue = ''; // Chrome requires returnValue to be set
return ''; // Legacy browsers
}
});
// Stateful management reset button
const resetStatefulBtn = document.getElementById('reset_stateful_btn');
if (resetStatefulBtn) {
resetStatefulBtn.addEventListener('click', () => this.resetStatefulManagement());
}
// Stateful management hours input
const statefulHoursInput = document.getElementById('stateful_management_hours');
if (statefulHoursInput) {
statefulHoursInput.addEventListener('change', () => {
this.updateStatefulExpirationOnUI();
});
}
// Handle window hash change
@@ -649,30 +688,40 @@ let huntarrUI = {
// Settings handling
loadAllSettings: function() {
// Ensure buttons are disabled and flag is reset when loading settings section
this.settingsChanged = false;
// Disable save button until changes are made
this.updateSaveResetButtonState(false);
this.settingsChanged = false;
console.log("[huntarrUI] Loading all settings...");
fetch(`/api/settings`) // Fetch the entire settings object
.then(response => {
if (!response.ok) {
throw new Error(`HTTP error! status: ${response.status}`);
}
return response.json();
})
// Get all settings to populate forms
fetch('/api/settings')
.then(response => response.json())
.then(data => {
console.log("[huntarrUI] All settings loaded:", data);
this.originalSettings = JSON.parse(JSON.stringify(data)); // Store deep copy
// Populate the currently active settings form
this.populateSettingsForm(this.currentSettingsTab, this.originalSettings[this.currentSettingsTab] || {});
// Optionally pre-populate others if needed, but might be redundant if done on tab switch
console.log('Loaded settings:', data);
// Store original settings for comparison
this.originalSettings = data;
// Populate each app's settings form
if (data.sonarr) this.populateSettingsForm('sonarr', data.sonarr);
if (data.radarr) this.populateSettingsForm('radarr', data.radarr);
if (data.lidarr) this.populateSettingsForm('lidarr', data.lidarr);
if (data.readarr) this.populateSettingsForm('readarr', data.readarr);
if (data.whisparr) this.populateSettingsForm('whisparr', data.whisparr);
if (data.swaparr) this.populateSettingsForm('swaparr', data.swaparr);
if (data.general) this.populateSettingsForm('general', data.general);
// Update duration displays (like sleep durations)
if (typeof SettingsForms !== 'undefined' &&
typeof SettingsForms.updateDurationDisplay === 'function') {
SettingsForms.updateDurationDisplay();
}
// Load stateful management info
this.loadStatefulInfo();
})
.catch(error => {
console.error(`Error loading all settings:`, error);
this.showNotification(`Error loading settings: ${error.message}`, 'error');
this.originalSettings = {}; // Reset on error
console.error('Error loading settings:', error);
this.showNotification('Error loading settings. Please try again.', 'error');
});
},
@@ -1098,7 +1147,7 @@ let huntarrUI = {
// Check for trailing slashes in URL
if (url.endsWith('/') || url.endsWith('\\')) {
statusSpan.textContent = 'Remove trailing slash from URL (/ or \\)';
statusSpan.className = 'connection-status error';
statusSpan.className = 'status-error';
return;
}
@@ -1651,6 +1700,163 @@ let huntarrUI = {
// or use the stored configuredApps status if checkAppConnection updates it.
this.checkAppConnections(); // Re-check all connections after a save might be simplest
},
// Load stateful management info
loadStatefulInfo: function() {
const statefulSection = document.getElementById('stateful-section');
if (!statefulSection) return;
// Show loading state
const initialStateEl = document.getElementById('stateful_initial_state');
const expiresDateEl = document.getElementById('stateful_expires_date');
if (initialStateEl) initialStateEl.textContent = 'Loading...';
if (expiresDateEl) expiresDateEl.textContent = 'Loading...';
console.log('Fetching stateful management info...');
fetch('/api/stateful/info', {
method: 'GET',
headers: {
'Accept': 'application/json',
'Cache-Control': 'no-cache, no-store, must-revalidate',
'Pragma': 'no-cache',
'Expires': '0'
},
cache: 'no-cache'
})
.then(response => {
console.log('Response status:', response.status);
if (!response.ok) {
throw new Error(`Network response was not ok: ${response.status} ${response.statusText}`);
}
return response.json();
})
.then(data => {
console.log('Stateful info data:', data);
if (initialStateEl) {
initialStateEl.textContent = data.created_date || 'Not available';
}
if (expiresDateEl) {
expiresDateEl.textContent = data.expires_date || 'Not available';
}
// Update the notification area
document.getElementById('stateful-notification').style.display = 'none';
})
.catch(error => {
console.error('Error loading stateful info:', error);
if (initialStateEl) initialStateEl.textContent = 'Error loading data';
if (expiresDateEl) expiresDateEl.textContent = 'Error loading data';
// Show error notification
const notification = document.getElementById('stateful-notification');
if (notification) {
notification.textContent = 'Failed to load stateful management info. Check logs for details.';
notification.style.display = 'block';
notification.className = 'notification error';
}
});
},
// Reset stateful management - clear all processed IDs
resetStatefulManagement: function() {
// Show a loading indicator or disable the button
const resetBtn = document.getElementById('reset_stateful_btn');
if (resetBtn) {
resetBtn.disabled = true;
resetBtn.textContent = 'Resetting...';
}
fetch('/api/stateful/reset', {
method: 'POST',
headers: {
'Accept': 'application/json',
'Content-Type': 'application/json'
},
cache: 'no-cache' // Add cache control to prevent caching
})
.then(response => {
if (!response.ok) {
throw new Error(`HTTP error! Status: ${response.status} ${response.statusText}`);
}
return response.json();
})
.then(data => {
if (data.success) {
this.showNotification('Stateful management reset successfully', 'success');
// Wait a moment before reloading the info to ensure it's refreshed
setTimeout(() => {
this.loadStatefulInfo(); // Reload stateful info to update the UI
}, 500);
} else {
throw new Error(data.message || 'Unknown error resetting stateful management');
}
})
.catch(error => {
console.error('Error resetting stateful management:', error);
this.showNotification(`Failed to reset stateful management: ${error.message}`, 'error');
})
.finally(() => {
// Restore the button state
if (resetBtn) {
resetBtn.disabled = false;
resetBtn.textContent = 'Reset Stateful Management';
}
});
},
// Update stateful management expiration based on hours input
updateStatefulExpirationOnUI: function() {
const hoursInput = document.getElementById('stateful_management_hours');
if (!hoursInput) return;
const hours = parseInt(hoursInput.value) || 168;
// Show updating indicator
const expiresDateEl = document.getElementById('stateful_expires_date');
if (expiresDateEl) {
expiresDateEl.textContent = 'Updating...';
}
fetch('/api/stateful/update-expiration', {
method: 'POST',
headers: {
'Accept': 'application/json',
'Content-Type': 'application/json'
},
body: JSON.stringify({ hours: hours }),
cache: 'no-cache'
})
.then(response => {
if (!response.ok) {
throw new Error(`HTTP error! Status: ${response.status} ${response.statusText}`);
}
return response.json();
})
.then(data => {
if (data.success) {
this.showNotification(`Updated expiration to ${hours} hours`, 'success');
// Reload the stateful info after a short delay
setTimeout(() => {
this.loadStatefulInfo();
}, 500);
} else {
throw new Error(data.message || 'Unknown error updating expiration');
}
})
.catch(error => {
console.error('Error updating stateful expiration:', error);
this.showNotification(`Failed to update expiration: ${error.message}`, 'error');
// Reset the UI
if (expiresDateEl) {
expiresDateEl.textContent = 'Error updating';
}
});
},
};
// Initialize when document is ready
+73
View File
@@ -1082,6 +1082,32 @@ const SettingsForms = {
</div>
</div>
<div class="settings-group">
<h3>Stateful Management</h3>
<div id="stateful-section" class="setting-info-block">
<div id="stateful-notification" class="notification error" style="display: none;">
Failed to load stateful management info. Check logs for details.
</div>
<div class="info-row">
<span>Initial State Created:</span>
<span id="stateful_initial_state">Loading...</span>
</div>
<div class="info-row">
<span>State Reset Date:</span>
<span id="stateful_expires_date">Loading...</span>
</div>
</div>
<div class="setting-item">
<label for="stateful_management_hours">State Reset Interval (Hours):</label>
<input type="number" id="stateful_management_hours" min="1" value="${settings.stateful_management_hours || 168}">
<p class="setting-help">Hours before resetting processed media state (<span id="stateful_management_days">${((settings.stateful_management_hours || 168) / 24).toFixed(1)} days</span>)</p>
</div>
<div class="setting-item">
<button id="reset_stateful_btn" class="danger-button">Reset Stateful Management</button>
<p class="setting-help">Reset stateful management and clear all processed media IDs</p>
</div>
</div>
<div class="settings-group">
<h3>Security</h3>
<div class="setting-item">
@@ -1095,6 +1121,53 @@ const SettingsForms = {
</div>
`;
// Add listener for stateful management hours input
const statefulHoursInput = container.querySelector('#stateful_management_hours');
const statefulDaysSpan = container.querySelector('#stateful_management_days');
if (statefulHoursInput && statefulDaysSpan) {
statefulHoursInput.addEventListener('input', function() {
const hours = parseInt(this.value) || 168;
const days = (hours / 24).toFixed(1);
statefulDaysSpan.textContent = `${days} days`;
});
}
// Add listener for reset stateful button
const resetStatefulBtn = container.querySelector('#reset_stateful_btn');
if (resetStatefulBtn) {
resetStatefulBtn.addEventListener('click', function() {
if (confirm('Are you sure you want to reset stateful management? This will clear all processed media IDs.')) {
// Dispatch event for main.js to handle the reset
const event = new CustomEvent('resetStateful', {});
container.dispatchEvent(event);
}
});
}
// Load stateful management info
fetch('/api/stateful/info')
.then(response => response.json())
.then(data => {
const createdDateEl = document.getElementById('stateful_initial_state');
const expiresDateEl = document.getElementById('stateful_expires_date');
if (createdDateEl && data.created_date) {
createdDateEl.textContent = data.created_date;
}
if (expiresDateEl && data.expires_date) {
expiresDateEl.textContent = data.expires_date;
}
})
.catch(error => {
console.error('Error loading stateful management info:', error);
const notificationEl = document.getElementById('stateful-notification');
if (notificationEl) {
notificationEl.style.display = 'block';
}
});
// Add confirmation dialog for local access bypass toggle
const localAccessBypassCheckbox = container.querySelector('#local_access_bypass');
if (localAccessBypassCheckbox) {
+29 -7
View File
@@ -11,6 +11,7 @@ from typing import Dict, Any, Callable
from src.primary.utils.logger import get_logger
from src.primary.apps.lidarr import api as lidarr_api
from src.primary.stats_manager import increment_stat
from src.primary.stateful_manager import is_processed, add_processed_id
# Get the logger for the Lidarr module
lidarr_logger = get_logger(__name__) # Use __name__ for correct logger hierarchy
@@ -138,24 +139,34 @@ def process_missing_albums(
else: # Album mode
target_entities = [item['id'] for item in missing_items] # Use the potentially filtered missing_items list
# Filter out already processed entities using stateful management
unprocessed_entities = []
for entity_id in target_entities:
if not is_processed("lidarr", instance_name, str(entity_id)):
unprocessed_entities.append(entity_id)
else:
lidarr_logger.debug(f"Skipping already processed {search_entity_type} ID: {entity_id}")
lidarr_logger.info(f"Found {len(unprocessed_entities)} unprocessed {search_entity_type}s out of {len(target_entities)} total.")
if not unprocessed_entities:
lidarr_logger.info(f"No unprocessed {search_entity_type}s found for {instance_name}. All available {search_entity_type}s have been processed.")
return False
# Select entities to search
if not target_entities:
if not unprocessed_entities:
lidarr_logger.info(f"No {search_entity_type}s found to process after grouping/filtering.")
return False
if random_missing:
entities_to_search_ids = random.sample(target_entities, min(len(target_entities), total_items_to_process))
entities_to_search_ids = random.sample(unprocessed_entities, min(len(unprocessed_entities), total_items_to_process))
lidarr_logger.debug(f"Randomly selected {len(entities_to_search_ids)} {search_entity_type}s to search.")
else:
# Sort by ID for consistent selection if not random (optional, API order might suffice)
# target_entities.sort() # Example: sort IDs numerically
entities_to_search_ids = target_entities[:total_items_to_process]
entities_to_search_ids = unprocessed_entities[:total_items_to_process]
lidarr_logger.debug(f"Selected first {len(entities_to_search_ids)} {search_entity_type}s to search.")
if not entities_to_search_ids:
lidarr_logger.info(f"No {search_entity_type}s selected for search after applying limit/randomization.")
return False
# --- Trigger Search (Artist or Album) ---
if hunt_missing_mode == "artist":
lidarr_logger.info(f"Triggering Artist Search for {len(entities_to_search_ids)} artists on {instance_name}...")
@@ -183,6 +194,11 @@ def process_missing_albums(
increment_stat("lidarr", "hunted") # Changed from "missing" to "hunted"
processed_count += 1 # Count artists searched
processed_artists_or_albums.add(artist_id)
# Add to processed list
add_processed_id("lidarr", instance_name, str(artist_id))
lidarr_logger.debug(f"Added artist ID {artist_id} to processed list for {instance_name}")
time.sleep(0.1) # Small delay between triggers
except Exception as e:
lidarr_logger.warning(f"Failed to trigger artist search for ID {artist_id} on {instance_name}: {e}")
@@ -225,6 +241,12 @@ def process_missing_albums(
increment_stat("lidarr", "hunted") # Changed from "missing" to "hunted"
processed_count += len(album_ids_to_search) # Count albums searched
processed_artists_or_albums.update(album_ids_to_search)
# Add album IDs to processed list
for album_id in album_ids_to_search:
add_processed_id("lidarr", instance_name, str(album_id))
lidarr_logger.debug(f"Added album ID {album_id} to processed list for {instance_name}")
time.sleep(command_wait_delay) # Basic delay after the single command
else:
lidarr_logger.warning(f"Failed to trigger album search for IDs {album_ids_to_search} on {instance_name}.")
+26 -4
View File
@@ -10,6 +10,7 @@ from typing import Dict, Any, Optional, Callable, List # Added List
from src.primary.utils.logger import get_logger
from src.primary.apps.lidarr import api as lidarr_api
from src.primary.stats_manager import increment_stat
from src.primary.stateful_manager import is_processed, add_processed_id
# Get logger for the app
lidarr_logger = get_logger(__name__) # Use __name__ for correct logger hierarchy
@@ -79,14 +80,29 @@ def process_cutoff_upgrades(
lidarr_logger.info(f"Found {len(cutoff_unmet_albums)} cutoff unmet albums for {instance_name}.")
# Filter out already processed albums using stateful management
unprocessed_albums = []
for album in cutoff_unmet_albums:
album_id = str(album.get("id"))
if not is_processed("lidarr", instance_name, album_id):
unprocessed_albums.append(album)
else:
lidarr_logger.debug(f"Skipping already processed album ID: {album_id}")
lidarr_logger.info(f"Found {len(unprocessed_albums)} unprocessed cutoff unmet albums out of {len(cutoff_unmet_albums)} total.")
if not unprocessed_albums:
lidarr_logger.info(f"No unprocessed cutoff unmet albums found for {instance_name}. All available albums have been processed.")
return False
# Select albums to search
albums_to_search: List[Dict[str, Any]] = [] # Ensure type hint
if random_upgrades:
albums_to_search = random.sample(cutoff_unmet_albums, min(len(cutoff_unmet_albums), total_items_to_process))
lidarr_logger.debug(f"Randomly selected {len(albums_to_search)} albums out of {len(cutoff_unmet_albums)} to search for upgrades.")
albums_to_search = random.sample(unprocessed_albums, min(len(unprocessed_albums), total_items_to_process))
lidarr_logger.debug(f"Randomly selected {len(albums_to_search)} albums out of {len(unprocessed_albums)} to search for upgrades.")
else:
albums_to_search = cutoff_unmet_albums[:total_items_to_process]
lidarr_logger.debug(f"Selected the first {len(albums_to_search)} albums out of {len(cutoff_unmet_albums)} to search for upgrades.")
albums_to_search = unprocessed_albums[:total_items_to_process]
lidarr_logger.debug(f"Selected the first {len(albums_to_search)} albums out of {len(unprocessed_albums)} to search for upgrades.")
album_ids_to_search = [album['id'] for album in albums_to_search]
@@ -110,6 +126,12 @@ def process_cutoff_upgrades(
if command_id:
lidarr_logger.debug(f"Upgrade album search command triggered with ID: {command_id} for albums: {album_ids_to_search}")
increment_stat("lidarr", "upgraded") # Use appropriate stat key
# Add album IDs to processed list
for album_id in album_ids_to_search:
add_processed_id("lidarr", instance_name, str(album_id))
lidarr_logger.debug(f"Added album ID {album_id} to processed list for {instance_name}")
time.sleep(command_wait_delay) # Basic delay
processed_count += len(album_ids_to_search)
processed_any = True # Mark that we processed something
+25 -3
View File
@@ -11,6 +11,7 @@ from typing import List, Dict, Any, Set, Callable
from src.primary.utils.logger import get_logger
from src.primary.apps.radarr import api as radarr_api
from src.primary.stats_manager import increment_stat
from src.primary.stateful_manager import is_processed, add_processed_id
# Get logger for the app
radarr_logger = get_logger("radarr")
@@ -43,6 +44,7 @@ def process_missing_movies(
hunt_missing_movies = app_settings.get("hunt_missing_movies", 0)
command_wait_delay = app_settings.get("command_wait_delay", 5)
command_wait_attempts = app_settings.get("command_wait_attempts", 12)
instance_name = app_settings.get("name", "Default")
if not api_url or not api_key:
radarr_logger.error("API URL or Key not configured in settings. Cannot process missing movies.")
@@ -96,14 +98,29 @@ def process_missing_movies(
movies_processed = 0
processing_done = False
# Filter out already processed movies using stateful management
unprocessed_movies = []
for movie in missing_movies:
movie_id = str(movie.get("id"))
if not is_processed("radarr", instance_name, movie_id):
unprocessed_movies.append(movie)
else:
radarr_logger.debug(f"Skipping already processed movie ID: {movie_id}")
radarr_logger.info(f"Found {len(unprocessed_movies)} unprocessed missing movies out of {len(missing_movies)} total.")
if not unprocessed_movies:
radarr_logger.info("No unprocessed missing movies found. All available movies have been processed.")
return False
# Select movies to search based on configuration
if random_missing:
radarr_logger.info(f"Randomly selecting up to {hunt_missing_movies} missing movies.")
movies_to_search = random.sample(missing_movies, min(len(missing_movies), hunt_missing_movies))
movies_to_search = random.sample(unprocessed_movies, min(len(unprocessed_movies), hunt_missing_movies))
else:
radarr_logger.info(f"Selecting the first {hunt_missing_movies} missing movies (sorted by title).")
missing_movies.sort(key=lambda x: x.get("title", ""))
movies_to_search = missing_movies[:hunt_missing_movies]
unprocessed_movies.sort(key=lambda x: x.get("title", ""))
movies_to_search = unprocessed_movies[:hunt_missing_movies]
radarr_logger.info(f"Selected {len(movies_to_search)} missing movies to search.")
@@ -151,6 +168,11 @@ def process_missing_movies(
radarr_logger.info(f"Triggered search command {search_command_id}. Assuming success for now.")
increment_stat("radarr", "hunted", 1)
radarr_logger.debug(f"Incremented radarr hunted statistics by 1")
# Add movie ID to processed list
add_processed_id("radarr", instance_name, str(movie_id))
radarr_logger.debug(f"Added movie ID {movie_id} to processed list for {instance_name}")
movies_processed += 1
processing_done = True
+18 -3
View File
@@ -10,6 +10,7 @@ from typing import List, Dict, Any, Set, Callable
from src.primary.utils.logger import get_logger
from src.primary.apps.radarr import api as radarr_api
from src.primary.stats_manager import increment_stat
from src.primary.stateful_manager import is_processed, add_processed_id
# Get logger for the app
radarr_logger = get_logger("radarr")
@@ -41,6 +42,7 @@ def process_cutoff_upgrades(
hunt_upgrade_movies = app_settings.get("hunt_upgrade_movies", 0)
command_wait_delay = app_settings.get("command_wait_delay", 5)
command_wait_attempts = app_settings.get("command_wait_attempts", 12)
instance_name = app_settings.get("name", "Default")
# Get movies eligible for upgrade
radarr_logger.info("Retrieving movies eligible for cutoff upgrade...")
@@ -52,11 +54,19 @@ def process_cutoff_upgrades(
radarr_logger.info(f"Found {len(upgrade_eligible_data)} movies eligible for upgrade.")
# Select movies to process
unprocessed_movies = upgrade_eligible_data
# Filter out already processed movies using stateful management
unprocessed_movies = []
for movie in upgrade_eligible_data:
movie_id = str(movie.get("id"))
if not is_processed("radarr", instance_name, movie_id):
unprocessed_movies.append(movie)
else:
radarr_logger.debug(f"Skipping already processed movie ID: {movie_id}")
radarr_logger.info(f"Found {len(unprocessed_movies)} unprocessed movies for upgrade out of {len(upgrade_eligible_data)} total.")
if not unprocessed_movies:
radarr_logger.info("No upgradeable movies found to process (after potential filtering). Skipping.")
radarr_logger.info("No upgradeable movies found to process (after filtering already processed). Skipping.")
return False
if random_upgrades:
@@ -99,6 +109,11 @@ def process_cutoff_upgrades(
if search_command_id:
radarr_logger.info(f" - Search command triggered (ID: {search_command_id}). Waiting for completion...")
increment_stat("radarr", "upgraded") # Assuming 'upgraded' stat exists
# Add movie ID to processed list
add_processed_id("radarr", instance_name, str(movie_id))
radarr_logger.debug(f"Added movie ID {movie_id} to processed list for {instance_name}")
processed_count += 1
processed_something = True
radarr_logger.info(f"Processed {processed_count}/{len(movies_to_process)} movie upgrades this cycle.")
+23 -2
View File
@@ -10,6 +10,7 @@ from typing import List, Dict, Any, Set, Callable
from src.primary.utils.logger import get_logger
from src.primary.apps.readarr import api as readarr_api
from src.primary.stats_manager import increment_stat
from src.primary.stateful_manager import is_processed, add_processed_id
# Get logger for the app
readarr_logger = get_logger("readarr")
@@ -34,6 +35,7 @@ def process_missing_books(
# Extract necessary settings
api_url = app_settings.get("api_url")
api_key = app_settings.get("api_key")
instance_name = app_settings.get("instance_name", "Readarr Default")
api_timeout = app_settings.get("api_timeout", 90) # Default timeout
monitored_only = app_settings.get("monitored_only", True)
skip_future_releases = app_settings.get("skip_future_releases", True)
@@ -67,13 +69,27 @@ def process_missing_books(
author_ids = list(books_by_author.keys())
# Filter out already processed authors using stateful management
unprocessed_authors = []
for author_id in author_ids:
if not is_processed("readarr", instance_name, str(author_id)):
unprocessed_authors.append(author_id)
else:
readarr_logger.debug(f"Skipping already processed author ID: {author_id}")
readarr_logger.info(f"Found {len(unprocessed_authors)} unprocessed authors out of {len(author_ids)} total authors with missing books.")
if not unprocessed_authors:
readarr_logger.info(f"No unprocessed authors found for {instance_name}. All available authors have been processed.")
return False
# Select authors/books to process
if random_missing:
readarr_logger.info(f"Randomly selecting up to {hunt_missing_books} authors with missing books.")
authors_to_process = random.sample(author_ids, min(hunt_missing_books, len(author_ids)))
authors_to_process = random.sample(unprocessed_authors, min(hunt_missing_books, len(unprocessed_authors)))
else:
readarr_logger.info(f"Selecting the first {hunt_missing_books} authors with missing books (order based on API return).")
authors_to_process = author_ids[:hunt_missing_books]
authors_to_process = unprocessed_authors[:hunt_missing_books]
readarr_logger.info(f"Selected {len(authors_to_process)} authors to search for missing books.")
processed_count = 0
@@ -122,6 +138,11 @@ def process_missing_books(
command_id = search_command_result.get('id') if isinstance(search_command_result, dict) else search_command_result
readarr_logger.info(f"Triggered book search command {command_id} for author {author_name}. Assuming success for now.") # Log only command ID
increment_stat("readarr", "hunted")
# Add author ID to processed list
add_processed_id("readarr", instance_name, str(author_id))
readarr_logger.debug(f"Added author ID {author_id} to processed list for {instance_name}")
processed_count += 1 # Count processed authors/groups
processed_authors.append(author_name) # Add to list of processed authors
processed_something = True
+24 -2
View File
@@ -11,6 +11,7 @@ from typing import List, Dict, Any, Set, Callable
from src.primary.utils.logger import get_logger
from src.primary.apps.readarr import api as readarr_api
from src.primary.stats_manager import increment_stat
from src.primary.stateful_manager import is_processed, add_processed_id
# Get logger for the app
readarr_logger = get_logger("readarr")
@@ -35,6 +36,7 @@ def process_cutoff_upgrades(
# Extract necessary settings
api_url = app_settings.get("api_url")
api_key = app_settings.get("api_key")
instance_name = app_settings.get("instance_name", "Readarr Default")
api_timeout = app_settings.get("api_timeout", 90) # Default timeout
monitored_only = app_settings.get("monitored_only", True)
skip_author_refresh = app_settings.get("skip_author_refresh", False)
@@ -85,16 +87,31 @@ def process_cutoff_upgrades(
if not upgrade_eligible_data:
readarr_logger.info("No upgradeable books found to process (after potential filtering). Skipping.")
return False
# Filter out already processed books using stateful management
unprocessed_books = []
for book in upgrade_eligible_data:
book_id = str(book.get("id"))
if not is_processed("readarr", instance_name, book_id):
unprocessed_books.append(book)
else:
readarr_logger.debug(f"Skipping already processed book ID: {book_id}")
readarr_logger.info(f"Found {len(unprocessed_books)} unprocessed books out of {len(upgrade_eligible_data)} total books eligible for upgrade.")
if not unprocessed_books:
readarr_logger.info(f"No unprocessed books found for {instance_name}. All available books have been processed.")
return False
# Select books to process
if random_upgrades:
readarr_logger.info(f"Randomly selecting up to {hunt_upgrade_books} books for upgrade search.")
books_to_process = random.sample(upgrade_eligible_data, min(hunt_upgrade_books, len(upgrade_eligible_data)))
books_to_process = random.sample(unprocessed_books, min(hunt_upgrade_books, len(unprocessed_books)))
else:
readarr_logger.info(f"Selecting the first {hunt_upgrade_books} books for upgrade search (order based on API return).")
# Add sorting if needed, e.g., by title or author
# upgrade_eligible_data.sort(key=lambda x: x.get('title', ''))
books_to_process = upgrade_eligible_data[:hunt_upgrade_books]
books_to_process = unprocessed_books[:hunt_upgrade_books]
readarr_logger.info(f"Selected {len(books_to_process)} books to search for upgrades.")
processed_count = 0
@@ -133,6 +150,11 @@ def process_cutoff_upgrades(
command_id = search_command_data.get('id') # Extract command ID
readarr_logger.info(f"Triggered book search command {command_id} for upgrade.")
increment_stat("readarr", "upgraded") # Assuming 'upgraded' stat exists
# Add book ID to processed list
add_processed_id("readarr", instance_name, str(book_id))
readarr_logger.debug(f"Added book ID {book_id} to processed list for {instance_name}")
processed_count += 1
processed_something = True
readarr_logger.info(f"Processed {processed_count}/{len(books_to_process)} book upgrades this cycle.")
+307 -198
View File
@@ -9,6 +9,7 @@ from typing import List, Dict, Any, Set, Callable
from src.primary.utils.logger import get_logger
from src.primary.apps.sonarr import api as sonarr_api
from src.primary.stats_manager import increment_stat
from src.primary.stateful_manager import is_processed, add_processed_id
# Get logger for the Sonarr app
sonarr_logger = get_logger("sonarr")
@@ -16,6 +17,7 @@ sonarr_logger = get_logger("sonarr")
def process_missing_episodes(
api_url: str,
api_key: str,
instance_name: str = "Default",
api_timeout: int = 60,
monitored_only: bool = True,
skip_future_episodes: bool = True,
@@ -42,7 +44,7 @@ def process_missing_episodes(
# Handle episode-based missing items
sonarr_logger.info("Episode-based missing mode selected")
return process_missing_episodes_mode(
api_url, api_key, api_timeout, monitored_only,
api_url, api_key, instance_name, api_timeout, monitored_only,
skip_future_episodes, skip_series_refresh, random_missing,
hunt_missing_items, command_wait_delay, command_wait_attempts,
stop_check
@@ -51,7 +53,7 @@ def process_missing_episodes(
# Handle season-based missing items (individual episodes grouped by season)
sonarr_logger.info("Season [Solo] mode selected - grouping episodes by season")
return process_missing_seasons_mode(
api_url, api_key, api_timeout, monitored_only,
api_url, api_key, instance_name, api_timeout, monitored_only,
skip_series_refresh, random_missing, hunt_missing_items,
command_wait_delay, command_wait_attempts, stop_check
)
@@ -59,7 +61,7 @@ def process_missing_episodes(
# Handle season pack searches (using SeasonSearch command)
sonarr_logger.info("Season [Packs] mode selected - searching for complete season packs")
return process_missing_seasons_packs_mode(
api_url, api_key, api_timeout, monitored_only,
api_url, api_key, instance_name, api_timeout, monitored_only,
skip_series_refresh, random_missing, hunt_missing_items,
command_wait_delay, command_wait_attempts, stop_check
)
@@ -67,7 +69,7 @@ def process_missing_episodes(
# Handle show-based missing items
sonarr_logger.info("Show-based missing mode selected")
return process_missing_shows_mode(
api_url, api_key, api_timeout, monitored_only,
api_url, api_key, instance_name, api_timeout, monitored_only,
skip_future_episodes, skip_series_refresh, random_missing,
hunt_missing_items, command_wait_delay, command_wait_attempts,
stop_check
@@ -79,6 +81,7 @@ def process_missing_episodes(
def process_missing_episodes_mode(
api_url: str,
api_key: str,
instance_name: str,
api_timeout: int,
monitored_only: bool,
skip_future_episodes: bool,
@@ -122,9 +125,24 @@ def process_missing_episodes_mode(
skipped_count = original_count - len(missing_episodes)
if skipped_count > 0:
sonarr_logger.info(f"Skipped {skipped_count} future episodes based on air date.")
# Filter out already processed episodes using stateful management
unprocessed_episodes = []
for episode in missing_episodes:
episode_id = str(episode.get("id"))
if not is_processed("sonarr", instance_name, episode_id):
unprocessed_episodes.append(episode)
else:
sonarr_logger.debug(f"Skipping already processed episode ID: {episode_id}")
sonarr_logger.info(f"Found {len(unprocessed_episodes)} unprocessed missing episodes out of {len(missing_episodes)} total.")
if not unprocessed_episodes:
sonarr_logger.info("No unprocessed missing episodes found. All available episodes have been processed.")
return False
# Select the first N episodes
episodes_to_search = missing_episodes[:hunt_missing_items]
episodes_to_search = unprocessed_episodes[:hunt_missing_items]
if stop_check():
sonarr_logger.info("Stop requested during missing episode processing.")
@@ -141,6 +159,19 @@ def process_missing_episodes_mode(
skipped_count = original_count - len(episodes_to_search)
if skipped_count > 0:
sonarr_logger.info(f"Skipped {skipped_count} future episodes based on air date.")
# Filter out already processed episodes for random selection approach
if random_missing:
unprocessed_episodes = []
for episode in episodes_to_search:
episode_id = str(episode.get("id"))
if not is_processed("sonarr", instance_name, episode_id):
unprocessed_episodes.append(episode)
else:
sonarr_logger.debug(f"Skipping already processed episode ID: {episode_id}")
sonarr_logger.info(f"Found {len(unprocessed_episodes)} unprocessed missing episodes out of {len(episodes_to_search)} total.")
episodes_to_search = unprocessed_episodes
if not episodes_to_search:
sonarr_logger.info("No missing episodes left to process after filtering.")
@@ -216,6 +247,10 @@ def process_missing_episodes_mode(
processed_any = True # Mark that we did something
sonarr_logger.info(f"Successfully processed and searched for {len(episode_ids)} episodes in series {series_id}.")
# Add episode IDs to stateful manager
for episode_id in episode_ids:
add_processed_id("sonarr", instance_name, episode_id)
# Increment the hunted statistics
increment_stat("sonarr", "hunted", len(episode_ids))
sonarr_logger.debug(f"Incremented sonarr hunted statistics by {len(episode_ids)}")
@@ -230,6 +265,7 @@ def process_missing_episodes_mode(
def process_missing_seasons_mode(
api_url: str,
api_key: str,
instance_name: str,
api_timeout: int,
monitored_only: bool,
skip_series_refresh: bool,
@@ -240,8 +276,149 @@ def process_missing_seasons_mode(
stop_check: Callable[[], bool]
) -> bool:
"""
Process missing seasons using season pack search
This mode uses the SeasonSearch command to search for entire season packs instead of individual episodes
Process missing episodes by grouping them into seasons and searching for those seasons' episodes.
This approach differs from season packs - it searches for individual episodes, but groups them by season.
"""
processed_any = False
# Get series with missing episodes
sonarr_logger.info("Retrieving series with missing episodes...")
series_with_missing = sonarr_api.get_series_with_missing_episodes(
api_url, api_key, api_timeout, monitored_only)
if not series_with_missing:
sonarr_logger.info("No series with missing episodes found.")
return False
sonarr_logger.info(f"Found {len(series_with_missing)} series with missing episodes.")
# Create a map of seasons with missing episodes
seasons_with_missing = []
for series in series_with_missing:
series_id = series.get('id')
series_title = series.get('title', 'Unknown')
# Group episodes by season
seasons = {}
for episode in series.get('missingEpisodes', []):
season_number = episode.get('seasonNumber')
if season_number not in seasons:
seasons[season_number] = {
'seriesId': series_id,
'seriesTitle': series_title,
'seasonNumber': season_number,
'episodes': []
}
seasons[season_number]['episodes'].append(episode)
# Add each season to the list
for season in seasons.values():
# Filter already processed seasons
season_id = f"{series_id}_{season['seasonNumber']}"
if not is_processed("sonarr", instance_name, season_id):
seasons_with_missing.append(season)
else:
sonarr_logger.debug(f"Skipping already processed season ID: {season_id}")
sonarr_logger.info(f"Found {len(seasons_with_missing)} unprocessed seasons with missing episodes.")
if not seasons_with_missing:
sonarr_logger.info("All seasons with missing episodes have been processed.")
return False
# Select seasons to process (random or sequential)
seasons_to_process = []
if random_missing:
sonarr_logger.info(f"Randomly selecting up to {hunt_missing_items} seasons with missing episodes.")
seasons_to_process = random.sample(
seasons_with_missing,
min(len(seasons_with_missing), hunt_missing_items)
)
else:
# Sort by series title and season number
sonarr_logger.info(f"Selecting the first {hunt_missing_items} seasons with missing episodes (sorted by series/season).")
seasons_with_missing.sort(key=lambda x: (x['seriesTitle'], x['seasonNumber']))
seasons_to_process = seasons_with_missing[:hunt_missing_items]
sonarr_logger.info(f"Selected {len(seasons_to_process)} seasons to process.")
# Process each selected season
for season in seasons_to_process:
if stop_check():
sonarr_logger.info("Stop requested. Aborting season processing.")
break
series_id = season['seriesId']
series_title = season['seriesTitle']
season_number = season['seasonNumber']
missing_episodes = season['episodes']
sonarr_logger.info(f"Processing series: {series_title}, Season: {season_number} with {len(missing_episodes)} missing episodes")
# Refresh series metadata if not skipped
if not skip_series_refresh:
sonarr_logger.info(f" - Refreshing series info...")
refresh_command_id = sonarr_api.refresh_series(api_url, api_key, api_timeout, series_id)
if refresh_command_id:
wait_for_command(
api_url, api_key, api_timeout, refresh_command_id,
command_wait_delay, command_wait_attempts, "Series Refresh", stop_check
)
# Continue regardless of refresh result
else:
sonarr_logger.warning(f" - Failed to trigger series refresh. Continuing with search.")
else:
sonarr_logger.debug(f" - Skipping series refresh (skip_series_refresh=true)")
# Extract episode IDs
episode_ids = [episode.get('id') for episode in missing_episodes if episode.get('id')]
if not episode_ids:
sonarr_logger.warning(f" - No valid episode IDs found for this season.")
continue
# Search for the episodes in this season
sonarr_logger.info(f" - Searching for {len(episode_ids)} missing episodes in this season...")
search_successful = sonarr_api.search_episodes(api_url, api_key, api_timeout, episode_ids)
if search_successful:
sonarr_logger.info(f" - Successfully triggered search for season {season_number} of {series_title}")
# Add season to processed list
season_id = f"{series_id}_{season_number}"
add_processed_id("sonarr", instance_name, season_id)
sonarr_logger.debug(f"Added season ID {season_id} to processed list for {instance_name}")
# Add individual episodes to stateful management for completeness
for episode_id in episode_ids:
add_processed_id("sonarr", instance_name, str(episode_id))
# Increment stats
increment_stat("sonarr", "hunted", len(episode_ids))
processed_any = True
else:
sonarr_logger.error(f" - Failed to trigger search for season {season_number} of {series_title}")
sonarr_logger.info(f"Season-based missing episode processing complete.")
return processed_any
def process_missing_seasons_packs_mode(
api_url: str,
api_key: str,
instance_name: str,
api_timeout: int,
monitored_only: bool,
skip_series_refresh: bool,
random_missing: bool,
hunt_missing_items: int,
command_wait_delay: int,
command_wait_attempts: int,
stop_check: Callable[[], bool]
) -> bool:
"""
Process missing seasons using the SeasonSearch command
This mode is optimized for torrent users who rely on season packs
Uses a direct episode lookup approach which is much more efficient
"""
processed_any = False
@@ -278,106 +455,28 @@ def process_missing_seasons_mode(
seasons_list = list(missing_seasons.values())
seasons_list.sort(key=lambda x: x['episode_count'], reverse=True)
# Apply randomization if requested
if random_missing:
random.shuffle(seasons_list)
sonarr_logger.info(f"Found {len(seasons_list)} seasons with missing episodes")
# Process up to hunt_missing_items seasons
processed_count = 0
# Filter out already processed seasons
unprocessed_seasons = []
for season in seasons_list:
if processed_count >= hunt_missing_items:
break
if stop_check():
sonarr_logger.info("Stop signal received, halting processing.")
break
series_id = season['series_id']
season_number = season['season_number']
series_title = season['series_title']
episode_count = season['episode_count']
sonarr_logger.info(f"Searching for season pack: {series_title} - Season {season_number} (contains {episode_count} missing episodes)")
# Trigger an API call to search for the entire season
command_id = sonarr_api.search_season(api_url, api_key, api_timeout, series_id, season_number)
if command_id:
processed_any = True
processed_count += 1
# Wait for command to complete if configured
if command_wait_delay > 0 and command_wait_attempts > 0:
if wait_for_command(
api_url, api_key, api_timeout, command_id,
command_wait_delay, command_wait_attempts, "Season Search", stop_check
):
# Increment stats by the number of episodes in the season
increment_stat("sonarr", "hunted", episode_count)
sonarr_logger.debug(f"Incremented sonarr hunted statistics by {episode_count} (full season)")
season_id = f"{season['series_id']}_{season['season_number']}"
if not is_processed("sonarr", instance_name, season_id):
unprocessed_seasons.append(season)
else:
sonarr_logger.debug(f"Skipping already processed season ID: {season_id}")
sonarr_logger.info(f"Processed {processed_count} missing season packs for Sonarr.")
return processed_any
def process_missing_seasons_packs_mode(
api_url: str,
api_key: str,
api_timeout: int,
monitored_only: bool,
skip_series_refresh: bool,
random_missing: bool,
hunt_missing_items: int,
command_wait_delay: int,
command_wait_attempts: int,
stop_check: Callable[[], bool]
) -> bool:
"""
Process missing seasons using the SeasonSearch command
This mode is optimized for torrent users who rely on season packs
Uses a direct episode lookup approach which is much more efficient
"""
processed_any = False
sonarr_logger.info(f"Found {len(unprocessed_seasons)} unprocessed seasons with missing episodes out of {len(seasons_list)} total.")
sonarr_logger.info("Running Season [Packs] mode with optimized performance for large libraries")
# Use our new optimized function to get series with missing episodes
series_with_missing = sonarr_api.get_series_with_missing_episodes(
api_url, api_key, api_timeout,
monitored_only=monitored_only,
limit=50, # Examine at most 50 series for performance
random_mode=random_missing
)
if not series_with_missing:
sonarr_logger.info("No series with missing episodes found")
if not unprocessed_seasons:
sonarr_logger.info("All seasons with missing episodes have been processed.")
return False
# Convert to a flat list of seasons with missing episodes
missing_seasons = []
for series in series_with_missing:
series_id = series['series_id']
series_title = series['series_title']
for season in series['seasons']:
missing_seasons.append({
'series_id': series_id,
'season_number': season['season_number'],
'series_title': series_title,
'episode_count': season['episode_count']
})
# Sort by episode count (most missing episodes first)
missing_seasons.sort(key=lambda x: x['episode_count'], reverse=True)
# No need to shuffle again, as we already shuffled at the series selection level if random mode is on
sonarr_logger.info(f"Found {len(missing_seasons)} seasons with missing episodes across {len(series_with_missing)} series")
# Apply randomization if requested
if random_missing:
random.shuffle(unprocessed_seasons)
# Process up to hunt_missing_items seasons
processed_count = 0
for season in missing_seasons[:hunt_missing_items]:
for season in unprocessed_seasons:
if processed_count >= hunt_missing_items:
break
@@ -409,11 +508,16 @@ def process_missing_seasons_packs_mode(
processed_any = True
processed_count += 1
# Add season to processed list
season_id = f"{series_id}_{season_number}"
add_processed_id("sonarr", instance_name, season_id)
sonarr_logger.debug(f"Added season ID {season_id} to processed list for {instance_name}")
# Wait for command to complete if configured
if command_wait_delay > 0 and command_wait_attempts > 0:
if wait_for_command(
api_url, api_key, api_timeout, command_id,
command_wait_delay, command_wait_attempts, "Season Pack Search", stop_check
command_wait_delay, command_wait_attempts, "Season Search", stop_check
):
# Increment stats by the number of episodes in the season
increment_stat("sonarr", "hunted", episode_count)
@@ -425,6 +529,7 @@ def process_missing_seasons_packs_mode(
def process_missing_shows_mode(
api_url: str,
api_key: str,
instance_name: str,
api_timeout: int,
monitored_only: bool,
skip_future_episodes: bool,
@@ -438,122 +543,126 @@ def process_missing_shows_mode(
"""Process missing episodes in show mode - gets all missing episodes for entire shows."""
processed_any = False
# Get all missing episodes
missing_episodes = sonarr_api.get_missing_episodes(api_url, api_key, api_timeout, monitored_only)
sonarr_logger.info(f"Received {len(missing_episodes)} missing episodes from Sonarr API (before filtering).")
# Get series with missing episodes
series_with_missing = sonarr_api.get_series_with_missing_episodes(
api_url, api_key, api_timeout, monitored_only)
if not missing_episodes:
sonarr_logger.info("No missing episodes found in Sonarr.")
return False
# Filter out future episodes if configured
if skip_future_episodes:
now_unix = time.time()
original_count = len(missing_episodes)
# Ensure airDateUtc exists and is not None before parsing
missing_episodes = [
ep for ep in missing_episodes
if ep.get('airDateUtc') and time.mktime(time.strptime(ep['airDateUtc'], '%Y-%m-%dT%H:%M:%SZ')) < now_unix
]
skipped_count = original_count - len(missing_episodes)
if skipped_count > 0:
sonarr_logger.info(f"Skipped {skipped_count} future episodes based on air date.")
if stop_check():
sonarr_logger.info("Stop requested during missing episode processing.")
return processed_any
# Group episodes by series
series_episodes: Dict[int, List[Dict]] = {}
series_titles: Dict[int, str] = {} # Keep track of series titles
for episode in missing_episodes:
series_id = episode.get('seriesId')
if series_id is not None:
if series_id not in series_episodes:
series_episodes[series_id] = []
# Store series title when first encountering the series ID
series_titles[series_id] = episode.get('series', {}).get('title', f"Series ID {series_id}")
series_episodes[series_id].append(episode)
# Create a list of (series_id, episode_count, series_title) tuples for selection
available_series = [(series_id, len(episodes), series_titles[series_id])
for series_id, episodes in series_episodes.items()]
if not available_series:
if not series_with_missing:
sonarr_logger.info("No series with missing episodes found.")
return False
# Select series to process - either randomly or sequentially
series_to_process = []
# Filter out shows that have been processed
unprocessed_series = []
for series in series_with_missing:
series_id = str(series.get("id"))
if not is_processed("sonarr", instance_name, series_id):
unprocessed_series.append(series)
else:
sonarr_logger.debug(f"Skipping already processed series ID: {series_id}")
sonarr_logger.info(f"Found {len(unprocessed_series)} unprocessed series with missing episodes out of {len(series_with_missing)} total.")
if not unprocessed_series:
sonarr_logger.info("All series with missing episodes have been processed.")
return False
# Select the shows to process (random or sequential)
shows_to_process = []
if random_missing:
# Randomly shuffle the available series
random.shuffle(available_series)
series_to_process = available_series[:hunt_missing_items]
sonarr_logger.info(f"Randomly selecting up to {hunt_missing_items} shows with missing episodes.")
shows_to_process = random.sample(
unprocessed_series,
min(len(unprocessed_series), hunt_missing_items)
)
else:
# Sort by missing episode count (descending) for most impactful processing
available_series.sort(key=lambda x: x[1], reverse=True)
series_to_process = available_series[:hunt_missing_items]
sonarr_logger.info(f"Selecting the first {hunt_missing_items} shows with missing episodes.")
# Default sort by title
unprocessed_series.sort(key=lambda x: x.get('title', 'Unknown'))
shows_to_process = unprocessed_series[:hunt_missing_items]
sonarr_logger.info(f"Selected {len(series_to_process)} series with missing episodes to process")
# Log selected series
for idx, (series_id, episode_count, series_title) in enumerate(series_to_process):
sonarr_logger.info(f" {idx+1}. {series_title} - {episode_count} missing episodes")
# Process each selected series
for series_id, _, series_title in series_to_process:
if stop_check():
sonarr_logger.info("Stop requested before processing next series.")
# Process each show
for show in shows_to_process:
if stop_check():
sonarr_logger.info("Stop requested. Aborting show processing.")
break
episodes = series_episodes[series_id]
episode_ids = [episode["id"] for episode in episodes]
sonarr_logger.info(f"Processing {series_title} with {len(episode_ids)} missing episodes")
show_id = show.get('id')
show_title = show.get('title', 'Unknown Show')
# Refresh series metadata if not skipped
# Get missing episodes for this show
missing_episodes = show.get('missingEpisodes', [])
# Filter out future episodes if needed
if skip_future_episodes:
now_unix = time.time()
original_count = len(missing_episodes)
missing_episodes = [
ep for ep in missing_episodes
if ep.get('airDateUtc') and time.mktime(time.strptime(ep['airDateUtc'], '%Y-%m-%dT%H:%M:%SZ')) < now_unix
]
skipped_count = original_count - len(missing_episodes)
if skipped_count > 0:
sonarr_logger.info(f"Skipped {skipped_count} future episodes for {show_title} based on air date.")
if not missing_episodes:
sonarr_logger.info(f"No eligible missing episodes found for {show_title} after filtering.")
continue
# Log episodes to be processed
sonarr_logger.info(f"Processing {len(missing_episodes)} missing episodes for show: {show_title}")
for idx, episode in enumerate(missing_episodes[:5]): # Only log first 5 for brevity
season = episode.get('seasonNumber', 'Unknown')
ep_num = episode.get('episodeNumber', 'Unknown')
title = episode.get('title', 'Unknown Title')
sonarr_logger.debug(f" {idx+1}. S{season:02d}E{ep_num:02d} - {title}")
if len(missing_episodes) > 5:
sonarr_logger.debug(f" ... and {len(missing_episodes)-5} more episodes.")
# Refresh series if not skipped
if not skip_series_refresh:
sonarr_logger.debug(f"Attempting to refresh series ID: {series_id}")
refresh_command_id = sonarr_api.refresh_series(api_url, api_key, api_timeout, series_id)
sonarr_logger.info(f"Refreshing series info for {show_title}...")
refresh_command_id = sonarr_api.refresh_series(api_url, api_key, api_timeout, show_id)
if refresh_command_id:
# Wait for refresh command to complete
if not wait_for_command(
wait_success = wait_for_command(
api_url, api_key, api_timeout, refresh_command_id,
command_wait_delay, command_wait_attempts, "Series Refresh", stop_check
):
sonarr_logger.warning(f"Series refresh command for {series_title} did not complete successfully or timed out.")
)
if not wait_success:
sonarr_logger.warning(f"Series refresh command timed out or failed for {show_title}. Proceeding with search anyway.")
else:
sonarr_logger.warning(f"Failed to trigger refresh command for series {series_title}")
if stop_check():
sonarr_logger.info("Stop requested after series refresh attempt.")
break
# Trigger search for all missing episodes in this series
sonarr_logger.debug(f"Attempting to search for {len(episode_ids)} episodes in {series_title}")
search_command_id = sonarr_api.search_episode(api_url, api_key, api_timeout, episode_ids)
sonarr_logger.warning(f"Failed to trigger refresh command for {show_title}. Proceeding with search anyway.")
if search_command_id:
# Wait for search command to complete
if wait_for_command(
api_url, api_key, api_timeout, search_command_id,
command_wait_delay, command_wait_attempts, "Episode Search", stop_check
):
# Mark as processed if search command completed successfully
processed_any = True
sonarr_logger.info(f"Successfully processed {len(episode_ids)} missing episodes in {series_title}")
# Increment the hunted statistics
increment_stat("sonarr", "hunted", len(episode_ids))
sonarr_logger.debug(f"Incremented sonarr hunted statistics by {len(episode_ids)}")
else:
sonarr_logger.warning(f"Episode search command for {series_title} did not complete successfully")
# Extract episode IDs to search
episode_ids = [episode.get('id') for episode in missing_episodes if episode.get('id')]
if not episode_ids:
sonarr_logger.warning(f"No valid episode IDs found for {show_title}.")
continue
# Search for all episodes in the show
sonarr_logger.info(f"Searching for {len(episode_ids)} missing episodes for {show_title}...")
search_successful = sonarr_api.search_episodes(api_url, api_key, api_timeout, episode_ids)
if search_successful:
processed_any = True
sonarr_logger.info(f"Successfully processed {len(episode_ids)} missing episodes in {show_title}")
# Add series ID to processed list
add_processed_id("sonarr", instance_name, str(show_id))
sonarr_logger.debug(f"Added series ID {show_id} to processed list for {instance_name}")
# Add episode IDs to stateful manager
for episode_id in episode_ids:
add_processed_id("sonarr", instance_name, str(episode_id))
# Increment the hunted statistics
increment_stat("sonarr", "hunted", len(episode_ids))
sonarr_logger.debug(f"Incremented sonarr hunted statistics by {len(episode_ids)}")
else:
sonarr_logger.error(f"Failed to trigger search command for {series_title}")
sonarr_logger.error(f"Failed to trigger search for {show_title}.")
sonarr_logger.info("Finished missing episodes processing cycle (show mode) for Sonarr.")
sonarr_logger.info("Show-based missing episode processing complete.")
return processed_any
def wait_for_command(
+50 -2
View File
@@ -9,6 +9,7 @@ from typing import List, Dict, Any, Set, Callable
from src.primary.utils.logger import get_logger
from src.primary.apps.sonarr import api as sonarr_api
from src.primary.stats_manager import increment_stat
from src.primary.stateful_manager import is_processed, add_processed_id
# Get logger for the app
sonarr_logger = get_logger("sonarr")
@@ -16,6 +17,7 @@ sonarr_logger = get_logger("sonarr")
def process_cutoff_upgrades(
api_url: str,
api_key: str,
instance_name: str = "Default",
api_timeout: int = 60,
monitored_only: bool = True,
skip_series_refresh: bool = False,
@@ -37,7 +39,7 @@ def process_cutoff_upgrades(
# Always use episode mode for upgrades, regardless of the hunt_missing_mode setting
return process_upgrade_episodes_mode(
api_url, api_key, api_timeout, monitored_only,
api_url, api_key, instance_name, api_timeout, monitored_only,
skip_series_refresh, random_upgrades, hunt_upgrade_items,
command_wait_delay, command_wait_attempts, stop_check
)
@@ -45,6 +47,7 @@ def process_cutoff_upgrades(
def process_upgrade_episodes_mode(
api_url: str,
api_key: str,
instance_name: str,
api_timeout: int,
monitored_only: bool,
skip_series_refresh: bool,
@@ -91,9 +94,24 @@ def process_upgrade_episodes_mode(
skipped_count = original_count - len(cutoff_unmet_episodes)
if skipped_count > 0:
sonarr_logger.info(f"Skipped {skipped_count} future episodes based on air date for upgrades.")
# Filter out already processed episodes using stateful management
unprocessed_episodes = []
for episode in cutoff_unmet_episodes:
episode_id = str(episode.get("id"))
if not is_processed("sonarr", instance_name, episode_id):
unprocessed_episodes.append(episode)
else:
sonarr_logger.debug(f"Skipping already processed episode ID for upgrade: {episode_id}")
sonarr_logger.info(f"Found {len(unprocessed_episodes)} unprocessed cutoff unmet episodes out of {len(cutoff_unmet_episodes)} total.")
if not unprocessed_episodes:
sonarr_logger.info("No unprocessed cutoff unmet episodes found. All available episodes have been processed.")
return False
# Select the first N episodes
episodes_to_search = cutoff_unmet_episodes[:hunt_upgrade_items]
episodes_to_search = unprocessed_episodes[:hunt_upgrade_items]
if stop_check():
sonarr_logger.info("Stop requested during upgrade processing.")
@@ -110,6 +128,19 @@ def process_upgrade_episodes_mode(
skipped_count = original_count - len(episodes_to_search)
if skipped_count > 0:
sonarr_logger.info(f"Skipped {skipped_count} future episodes based on air date for upgrades.")
# Filter out already processed episodes for random selection approach
if random_upgrades:
unprocessed_episodes = []
for episode in episodes_to_search:
episode_id = str(episode.get("id"))
if not is_processed("sonarr", instance_name, episode_id):
unprocessed_episodes.append(episode)
else:
sonarr_logger.debug(f"Skipping already processed episode ID for upgrade: {episode_id}")
sonarr_logger.info(f"Found {len(unprocessed_episodes)} unprocessed cutoff unmet episodes out of {len(episodes_to_search)} total.")
episodes_to_search = unprocessed_episodes
if not episodes_to_search:
sonarr_logger.info("No cutoff unmet episodes left to process for upgrades after filtering.")
@@ -198,6 +229,11 @@ def process_upgrade_episodes_mode(
# Increment the upgraded statistics
increment_stat("sonarr", "upgraded", len(episode_ids))
sonarr_logger.debug(f"Incremented sonarr upgraded statistics by {len(episode_ids)}")
# Mark episodes as processed using stateful management
for episode_id in episode_ids:
add_processed_id("sonarr", instance_name, str(episode_id))
sonarr_logger.debug(f"Marked episode ID {episode_id} as processed for upgrades")
else:
sonarr_logger.warning(f"Episode upgrade search command (ID: {search_command_id}) for series {series_id} did not complete successfully or timed out. Episodes will not be marked as processed yet.")
else:
@@ -209,6 +245,7 @@ def process_upgrade_episodes_mode(
def process_upgrade_seasons_mode(
api_url: str,
api_key: str,
instance_name: str,
api_timeout: int,
monitored_only: bool,
skip_series_refresh: bool,
@@ -336,6 +373,11 @@ def process_upgrade_seasons_mode(
# Increment the upgraded statistics
increment_stat("sonarr", "upgraded", len(episode_ids))
sonarr_logger.debug(f"Incremented sonarr upgraded statistics by {len(episode_ids)}")
# Mark episodes as processed using stateful management
for episode_id in episode_ids:
add_processed_id("sonarr", instance_name, str(episode_id))
sonarr_logger.debug(f"Marked episode ID {episode_id} as processed for upgrades")
else:
sonarr_logger.warning(f"Episode upgrade search command for {series_title} Season {season_number} did not complete successfully")
else:
@@ -347,6 +389,7 @@ def process_upgrade_seasons_mode(
def process_upgrade_shows_mode(
api_url: str,
api_key: str,
instance_name: str,
api_timeout: int,
monitored_only: bool,
skip_series_refresh: bool,
@@ -469,6 +512,11 @@ def process_upgrade_shows_mode(
# Increment the upgraded statistics
increment_stat("sonarr", "upgraded", len(episode_ids))
sonarr_logger.debug(f"Incremented sonarr upgraded statistics by {len(episode_ids)}")
# Mark episodes as processed using stateful management
for episode_id in episode_ids:
add_processed_id("sonarr", instance_name, str(episode_id))
sonarr_logger.debug(f"Marked episode ID {episode_id} as processed for upgrades")
else:
sonarr_logger.warning(f"Episode upgrade search command for {series_title} did not complete successfully")
else:
+25 -3
View File
@@ -13,6 +13,7 @@ from typing import List, Dict, Any, Set, Callable
from src.primary.utils.logger import get_logger
from src.primary.apps.whisparr import api as whisparr_api
from src.primary.stats_manager import increment_stat
from src.primary.stateful_manager import is_processed, add_processed_id
# Get logger for the app
whisparr_logger = get_logger("whisparr")
@@ -37,6 +38,7 @@ def process_missing_items(
# Extract necessary settings
api_url = app_settings.get("api_url")
api_key = app_settings.get("api_key")
instance_name = app_settings.get("instance_name", "Whisparr Default")
api_timeout = app_settings.get("api_timeout", 90) # Default timeout
monitored_only = app_settings.get("monitored_only", True)
skip_future_releases = app_settings.get("skip_future_releases", True)
@@ -103,18 +105,33 @@ def process_missing_items(
whisparr_logger.info("No missing items left to process after filtering future releases.")
return False
# Filter out already processed items using stateful management
unprocessed_items = []
for item in missing_items:
item_id = str(item.get("id"))
if not is_processed("whisparr", instance_name, item_id):
unprocessed_items.append(item)
else:
whisparr_logger.debug(f"Skipping already processed item ID: {item_id}")
whisparr_logger.info(f"Found {len(unprocessed_items)} unprocessed items out of {len(missing_items)} total items with missing files.")
if not unprocessed_items:
whisparr_logger.info(f"No unprocessed items found for {instance_name}. All available items have been processed.")
return False
items_processed = 0
processing_done = False
# Select items to search based on configuration
if random_missing:
whisparr_logger.info(f"Randomly selecting up to {hunt_missing_items} missing items.")
items_to_search = random.sample(missing_items, min(len(missing_items), hunt_missing_items))
items_to_search = random.sample(unprocessed_items, min(len(unprocessed_items), hunt_missing_items))
else:
whisparr_logger.info(f"Selecting the first {hunt_missing_items} missing items (sorted by title).")
# Sort by title for consistent ordering if not random
missing_items.sort(key=lambda x: x.get("title", ""))
items_to_search = missing_items[:hunt_missing_items]
unprocessed_items.sort(key=lambda x: x.get("title", ""))
items_to_search = unprocessed_items[:hunt_missing_items]
whisparr_logger.info(f"Selected {len(items_to_search)} missing items to search.")
@@ -160,6 +177,11 @@ def process_missing_items(
search_command_id = whisparr_api.item_search(api_url, api_key, api_timeout, [item_id], api_version)
if search_command_id:
whisparr_logger.info(f"Triggered search command {search_command_id}. Assuming success for now.")
# Add item ID to processed list
add_processed_id("whisparr", instance_name, str(item_id))
whisparr_logger.debug(f"Added item ID {item_id} to processed list for {instance_name}")
items_processed += 1
processing_done = True
+25 -3
View File
@@ -13,6 +13,7 @@ from typing import List, Dict, Any, Set, Callable
from src.primary.utils.logger import get_logger
from src.primary.apps.whisparr import api as whisparr_api
from src.primary.stats_manager import increment_stat
from src.primary.stateful_manager import is_processed, add_processed_id
# Get logger for the app
whisparr_logger = get_logger("whisparr")
@@ -37,6 +38,7 @@ def process_cutoff_upgrades(
# Extract necessary settings
api_url = app_settings.get("api_url")
api_key = app_settings.get("api_key")
instance_name = app_settings.get("instance_name", "Whisparr Default")
api_timeout = app_settings.get("api_timeout", 90) # Default timeout
monitored_only = app_settings.get("monitored_only", True)
skip_item_refresh = app_settings.get("skip_item_refresh", False)
@@ -78,18 +80,33 @@ def process_cutoff_upgrades(
whisparr_logger.info(f"Found {len(upgrade_eligible_data)} items eligible for quality upgrade.")
# Filter out already processed items using stateful management
unprocessed_items = []
for item in upgrade_eligible_data:
item_id = str(item.get("id"))
if not is_processed("whisparr", instance_name, item_id):
unprocessed_items.append(item)
else:
whisparr_logger.debug(f"Skipping already processed item ID: {item_id}")
whisparr_logger.info(f"Found {len(unprocessed_items)} unprocessed items out of {len(upgrade_eligible_data)} total items eligible for quality upgrade.")
if not unprocessed_items:
whisparr_logger.info(f"No unprocessed items found for {instance_name}. All available items have been processed.")
return False
items_processed = 0
processing_done = False
# Select items to upgrade based on configuration
if random_upgrades:
whisparr_logger.info(f"Randomly selecting up to {hunt_upgrade_items} items for quality upgrade.")
items_to_upgrade = random.sample(upgrade_eligible_data, min(len(upgrade_eligible_data), hunt_upgrade_items))
items_to_upgrade = random.sample(unprocessed_items, min(len(unprocessed_items), hunt_upgrade_items))
else:
whisparr_logger.info(f"Selecting the first {hunt_upgrade_items} items for quality upgrade (sorted by title).")
# Sort by title for consistent ordering if not random
upgrade_eligible_data.sort(key=lambda x: x.get("title", ""))
items_to_upgrade = upgrade_eligible_data[:hunt_upgrade_items]
unprocessed_items.sort(key=lambda x: x.get("title", ""))
items_to_upgrade = unprocessed_items[:hunt_upgrade_items]
whisparr_logger.info(f"Selected {len(items_to_upgrade)} items for quality upgrade.")
@@ -138,6 +155,11 @@ def process_cutoff_upgrades(
search_command_id = whisparr_api.item_search(api_url, api_key, api_timeout, [item_id], api_version)
if search_command_id:
whisparr_logger.info(f"Triggered search command {search_command_id}. Assuming success for now.")
# Add item ID to processed list
add_processed_id("whisparr", instance_name, str(item_id))
whisparr_logger.debug(f"Added item ID {item_id} to processed list for {instance_name}")
items_processed += 1
processing_done = True
+2 -1
View File
@@ -5,5 +5,6 @@
"check_for_updates": true,
"enable_notifications": false,
"notification_level": "info",
"local_access_bypass": false
"local_access_bypass": false,
"stateful_management_hours": 168
}
+399
View File
@@ -0,0 +1,399 @@
#!/usr/bin/env python3
"""
Stateful Manager for Huntarr
Handles storing and retrieving processed media IDs to prevent reprocessing
"""
import os
import json
import time
import pathlib
import datetime
import logging
from typing import Dict, Any, List, Optional, Set
# Create logger for stateful_manager
stateful_logger = logging.getLogger("stateful_manager")
# Constants
STATEFUL_DIR = pathlib.Path(os.getenv("STATEFUL_DIR", "/config/stateful"))
LOCK_FILE = STATEFUL_DIR / "lock.json"
DEFAULT_HOURS = 168 # Default 7 days (168 hours)
# Ensure the stateful directory exists
try:
STATEFUL_DIR.mkdir(parents=True, exist_ok=True)
stateful_logger.info(f"Stateful directory created/confirmed at {STATEFUL_DIR}")
except Exception as e:
stateful_logger.error(f"Error creating stateful directory: {e}")
# Create app directories
APP_TYPES = ["sonarr", "radarr", "lidarr", "readarr", "whisparr"]
for app_type in APP_TYPES:
(STATEFUL_DIR / app_type).mkdir(exist_ok=True)
def initialize_lock_file() -> None:
"""Initialize the lock file with the current timestamp if it doesn't exist."""
# Ensure directory exists - we don't need to log this again
try:
STATEFUL_DIR.mkdir(parents=True, exist_ok=True)
except Exception as e:
stateful_logger.error(f"Error creating stateful directory: {e}")
if not LOCK_FILE.exists():
try:
current_time = int(time.time())
# Get the expiration hours setting
try:
from src.primary.settings_manager import get_setting
hours = get_setting("general", "stateful_management_hours", DEFAULT_HOURS)
except Exception as e:
stateful_logger.error(f"Error getting stateful hours setting, using default: {e}")
hours = DEFAULT_HOURS
expires_at = current_time + (hours * 3600)
with open(LOCK_FILE, 'w') as f:
json.dump({
"created_at": current_time,
"expires_at": expires_at
}, f, indent=2)
stateful_logger.info(f"Initialized lock file at {LOCK_FILE} with expiration in {hours} hours")
except Exception as e:
stateful_logger.error(f"Error initializing lock file: {e}")
def get_lock_info() -> Dict[str, Any]:
"""Get the current lock information."""
initialize_lock_file()
try:
with open(LOCK_FILE, 'r') as f:
lock_info = json.load(f)
# Validate the structure and ensure required fields exist
if not isinstance(lock_info, dict):
raise ValueError("Lock info is not a dictionary")
if "created_at" not in lock_info:
lock_info["created_at"] = int(time.time())
if "expires_at" not in lock_info or lock_info["expires_at"] is None:
# Recalculate expiration if missing
from src.primary.settings_manager import get_setting
hours = get_setting("general", "stateful_management_hours", DEFAULT_HOURS)
lock_info["expires_at"] = lock_info["created_at"] + (hours * 3600)
# Save the updated info
with open(LOCK_FILE, 'w') as f:
json.dump(lock_info, f, indent=2)
return lock_info
except Exception as e:
stateful_logger.error(f"Error reading lock file: {e}")
# Return default values if there's an error
current_time = int(time.time())
try:
from src.primary.settings_manager import get_setting
hours = get_setting("general", "stateful_management_hours", DEFAULT_HOURS)
except:
hours = DEFAULT_HOURS
expires_at = current_time + (hours * 3600)
return {
"created_at": current_time,
"expires_at": expires_at
}
def update_lock_expiration(hours: int = None) -> None:
"""Update the lock expiration based on the hours setting."""
if hours is None:
from src.primary.settings_manager import get_setting
hours = get_setting("general", "stateful_management_hours", DEFAULT_HOURS)
lock_info = get_lock_info()
created_at = lock_info.get("created_at", int(time.time()))
expires_at = created_at + (hours * 3600)
lock_info["expires_at"] = expires_at
try:
with open(LOCK_FILE, 'w') as f:
json.dump(lock_info, f, indent=2)
stateful_logger.info(f"Updated lock expiration to {datetime.datetime.fromtimestamp(expires_at)}")
except Exception as e:
stateful_logger.error(f"Error updating lock expiration: {e}")
def reset_stateful_management() -> bool:
"""
Reset the stateful management system by:
1. Creating a new lock file with current timestamp
2. Deleting all stored processed IDs
Returns:
bool: True if reset was successful, False otherwise
"""
try:
# Create new lock file
current_time = int(time.time())
with open(LOCK_FILE, 'w') as f:
json.dump({
"created_at": current_time,
"expires_at": None # Will be updated later
}, f, indent=2)
# Delete all stored IDs
for app_type in APP_TYPES:
app_dir = STATEFUL_DIR / app_type
if app_dir.exists():
for json_file in app_dir.glob("*.json"):
try:
json_file.unlink()
stateful_logger.debug(f"Deleted {json_file}")
except Exception as e:
stateful_logger.error(f"Error deleting {json_file}: {e}")
# Update expiration
update_lock_expiration()
stateful_logger.info("Successfully reset stateful management")
return True
except Exception as e:
stateful_logger.error(f"Error resetting stateful management: {e}")
return False
def check_expiration() -> bool:
"""
Check if the stateful management has expired.
Returns:
bool: True if expired, False otherwise
"""
lock_info = get_lock_info()
expires_at = lock_info.get("expires_at")
# If expires_at is None, update it based on settings
if expires_at is None:
update_lock_expiration()
lock_info = get_lock_info()
expires_at = lock_info.get("expires_at")
current_time = int(time.time())
if current_time >= expires_at:
stateful_logger.info("Stateful management has expired, resetting...")
reset_stateful_management()
return True
return False
def get_processed_ids(app_type: str, instance_name: str) -> Set[str]:
"""
Get the set of processed media IDs for a specific app instance.
Args:
app_type: The type of app (sonarr, radarr, etc.)
instance_name: The name of the instance
Returns:
Set[str]: Set of processed media IDs
"""
if app_type not in APP_TYPES:
stateful_logger.warning(f"Unknown app type: {app_type}")
return set()
# Check if expiration has occurred
if check_expiration():
# If expired, we've reset everything, so return empty set
return set()
# Create safe filename from instance name
safe_instance_name = "".join([c if c.isalnum() else "_" for c in instance_name])
file_path = STATEFUL_DIR / app_type / f"{safe_instance_name}.json"
if not file_path.exists():
return set()
try:
with open(file_path, 'r') as f:
data = json.load(f)
# Convert list to set for faster lookups
return set(data.get("processed_ids", []))
except Exception as e:
stateful_logger.error(f"Error reading processed IDs for {instance_name}: {e}")
return set()
def add_processed_id(app_type: str, instance_name: str, media_id: str) -> bool:
"""
Add a media ID to the processed list for a specific app instance.
Args:
app_type: The type of app (sonarr, radarr, etc.)
instance_name: The name of the instance
media_id: The ID of the processed media
Returns:
bool: True if successful, False otherwise
"""
if app_type not in APP_TYPES:
stateful_logger.warning(f"Unknown app type: {app_type}")
return False
# Create safe filename from instance name
safe_instance_name = "".join([c if c.isalnum() else "_" for c in instance_name])
file_path = STATEFUL_DIR / app_type / f"{safe_instance_name}.json"
# Get existing processed IDs
processed_ids = list(get_processed_ids(app_type, instance_name))
# Add the new ID if it's not already there
if media_id not in processed_ids:
processed_ids.append(media_id)
try:
with open(file_path, 'w') as f:
json.dump({
"processed_ids": processed_ids,
"last_updated": int(time.time())
}, f, indent=2)
stateful_logger.debug(f"Added media ID {media_id} to {file_path}")
return True
except Exception as e:
stateful_logger.error(f"Error adding media ID {media_id} to {file_path}: {e}")
return False
def is_processed(app_type: str, instance_name: str, media_id: str) -> bool:
"""
Check if a media ID has already been processed.
Args:
app_type: The type of app (sonarr, radarr, etc.)
instance_name: The name of the instance
media_id: The ID of the media to check
Returns:
bool: True if already processed, False otherwise
"""
processed_ids = get_processed_ids(app_type, instance_name)
return media_id in processed_ids
def get_stateful_management_info() -> Dict[str, Any]:
"""
Get information about the current stateful management status.
Returns:
Dict[str, Any]: Information about stateful management
"""
from src.primary.settings_manager import get_setting
try:
lock_info = get_lock_info()
created_at = lock_info.get("created_at", int(time.time()))
hours = get_setting("general", "stateful_management_hours", DEFAULT_HOURS)
# Calculate expiration if not set
expires_at = lock_info.get("expires_at")
if expires_at is None:
expires_at = created_at + (hours * 3600)
# Update lock file with expiration
with open(LOCK_FILE, 'w') as f:
lock_info["expires_at"] = expires_at
json.dump(lock_info, f, indent=2)
# Get processed counts per app
app_counts = {}
for app_type in APP_TYPES:
app_dir = STATEFUL_DIR / app_type
app_counts[app_type] = {}
if app_dir.exists():
for json_file in app_dir.glob("*.json"):
instance_name = json_file.stem
try:
with open(json_file, 'r') as f:
data = json.load(f)
app_counts[app_type][instance_name] = len(data.get("processed_ids", []))
except Exception as e:
stateful_logger.error(f"Error reading processed IDs for {instance_name}: {e}")
app_counts[app_type][instance_name] = 0
# Format dates
try:
created_date = datetime.datetime.fromtimestamp(created_at).strftime("%Y-%m-%d %H:%M:%S")
expires_date = datetime.datetime.fromtimestamp(expires_at).strftime("%Y-%m-%d %H:%M:%S")
except Exception as e:
stateful_logger.error(f"Error formatting dates: {e}")
created_date = "Invalid date"
expires_date = "Invalid date"
result = {
"created_at": created_at,
"created_date": created_date,
"expires_at": expires_at,
"expires_date": expires_date,
"hours": hours,
"days": round(hours / 24, 1),
"app_counts": app_counts
}
stateful_logger.debug(f"Returning stateful info: {result}")
return result
except Exception as e:
stateful_logger.error(f"Error in get_stateful_management_info: {e}", exc_info=True)
# Return a fallback response that won't break the UI
current_time = int(time.time())
return {
"error": str(e),
"created_at": current_time,
"created_date": "Error loading data",
"expires_at": current_time + (DEFAULT_HOURS * 3600),
"expires_date": "Error loading data",
"hours": DEFAULT_HOURS,
"days": round(DEFAULT_HOURS / 24, 1),
"app_counts": {}
}
def initialize_stateful_system():
"""Perform a complete initialization of the stateful management system."""
stateful_logger.info("Initializing stateful management system")
# Ensure all required directories exist
try:
STATEFUL_DIR.mkdir(parents=True, exist_ok=True)
for app_type in APP_TYPES:
(STATEFUL_DIR / app_type).mkdir(exist_ok=True)
stateful_logger.info(f"Stateful directory structure created at {STATEFUL_DIR}")
except Exception as e:
stateful_logger.error(f"Failed to create stateful directories: {e}")
# Initialize the lock file with proper expiration
try:
initialize_lock_file()
# Update expiration time
from src.primary.settings_manager import get_setting
hours = get_setting("general", "stateful_management_hours", DEFAULT_HOURS)
update_lock_expiration(hours)
stateful_logger.info(f"Stateful lock file initialized with {hours} hour expiration")
except Exception as e:
stateful_logger.error(f"Failed to initialize lock file: {e}")
# Check for existing processed IDs
try:
total_ids = 0
for app_type in APP_TYPES:
app_dir = STATEFUL_DIR / app_type
if app_dir.exists():
files = list(app_dir.glob("*.json"))
total_ids += len(files)
if total_ids > 0:
stateful_logger.info(f"Found {total_ids} existing processed ID files")
else:
stateful_logger.info("No existing processed ID files found")
except Exception as e:
stateful_logger.error(f"Failed to check for existing processed IDs: {e}")
stateful_logger.info("Stateful management system initialization complete")
# Initialize the stateful system on module import
initialize_stateful_system()
+139
View File
@@ -0,0 +1,139 @@
#!/usr/bin/env python3
"""
Stateful Management API Routes
Handles API endpoints for stateful management
"""
from flask import Blueprint, jsonify, request, Response
import json
from src.primary.stateful_manager import (
get_stateful_management_info,
reset_stateful_management,
update_lock_expiration
)
from src.primary.utils.logger import get_logger
# Create logger
stateful_logger = get_logger("stateful")
# Create blueprint
stateful_api = Blueprint('stateful_api', __name__)
@stateful_api.route('/info', methods=['GET'])
def get_stateful_info():
"""Get information about the stateful management system."""
try:
info = get_stateful_management_info()
stateful_logger.debug(f"Stateful info: {info}")
# Ensure we're returning a valid JSON structure with required data
if 'created_date' not in info or 'expires_date' not in info:
stateful_logger.warning("Missing date information in stateful info")
# Add default values if they don't exist
if 'created_at' in info:
try:
from datetime import datetime
created_date = datetime.fromtimestamp(info['created_at']).strftime("%Y-%m-%d %H:%M:%S")
info['created_date'] = created_date
except Exception as e:
stateful_logger.error(f"Error formatting created_date: {e}")
info['created_date'] = "Invalid timestamp"
else:
info['created_date'] = "Unknown"
if 'expires_at' in info:
try:
from datetime import datetime
expires_date = datetime.fromtimestamp(info['expires_at']).strftime("%Y-%m-%d %H:%M:%S")
info['expires_date'] = expires_date
except Exception as e:
stateful_logger.error(f"Error formatting expires_date: {e}")
info['expires_date'] = "Invalid timestamp"
else:
info['expires_date'] = "Unknown"
# Add CORS headers to allow access from frontend
response = Response(json.dumps(info))
response.headers['Content-Type'] = 'application/json'
response.headers['Access-Control-Allow-Origin'] = '*'
stateful_logger.debug("Returning stateful info with proper headers")
return response
except Exception as e:
stateful_logger.error(f"Error getting stateful info: {e}", exc_info=True)
# Return error response with proper headers
error_data = {"error": str(e), "created_date": "Error", "expires_date": "Error"}
response = Response(json.dumps(error_data), status=500)
response.headers['Content-Type'] = 'application/json'
response.headers['Access-Control-Allow-Origin'] = '*'
return response
@stateful_api.route('/reset', methods=['POST'])
def reset_stateful():
"""Reset the stateful management system."""
try:
success = reset_stateful_management()
if success:
# Add CORS headers to allow access from frontend
response = Response(json.dumps({"success": True, "message": "Stateful management reset successfully"}))
response.headers['Content-Type'] = 'application/json'
response.headers['Access-Control-Allow-Origin'] = '*'
return response
else:
# Add CORS headers to allow access from frontend
response = Response(json.dumps({"success": False, "message": "Failed to reset stateful management"}), status=500)
response.headers['Content-Type'] = 'application/json'
response.headers['Access-Control-Allow-Origin'] = '*'
return response
except Exception as e:
stateful_logger.error(f"Error resetting stateful management: {e}")
# Return error response with proper headers
error_data = {"error": str(e)}
response = Response(json.dumps(error_data), status=500)
response.headers['Content-Type'] = 'application/json'
response.headers['Access-Control-Allow-Origin'] = '*'
return response
@stateful_api.route('/update-expiration', methods=['POST'])
def update_expiration():
"""Update the stateful management expiration time."""
try:
hours = request.json.get('hours')
if hours is None or not isinstance(hours, int) or hours <= 0:
stateful_logger.error(f"Invalid hours value for update-expiration: {hours}")
# Return error response with proper headers
error_data = {"success": False, "message": f"Invalid hours value: {hours}. Must be a positive integer."}
response = Response(json.dumps(error_data), status=400)
response.headers['Content-Type'] = 'application/json'
response.headers['Access-Control-Allow-Origin'] = '*'
return response
updated = update_lock_expiration(hours)
if updated:
# Get updated info
info = get_stateful_management_info()
# Add CORS headers to allow access from frontend
response_data = {
"success": True,
"message": f"Expiration updated to {hours} hours",
"expires_at": info.get("expires_at"),
"expires_date": info.get("expires_date")
}
response = Response(json.dumps(response_data))
response.headers['Content-Type'] = 'application/json'
response.headers['Access-Control-Allow-Origin'] = '*'
return response
else:
# Add CORS headers to allow access from frontend
response = Response(json.dumps({"success": False, "message": "Failed to update expiration"}), status=500)
response.headers['Content-Type'] = 'application/json'
response.headers['Access-Control-Allow-Origin'] = '*'
return response
except Exception as e:
stateful_logger.error(f"Error updating expiration: {e}", exc_info=True)
# Return error response with proper headers
error_data = {"success": False, "message": f"Error updating expiration: {str(e)}"}
response = Response(json.dumps(error_data), status=500)
response.headers['Content-Type'] = 'application/json'
response.headers['Access-Control-Allow-Origin'] = '*'
return response
+4
View File
@@ -38,6 +38,9 @@ from src.primary.routes.common import common_bp
# Import blueprints for each app from the centralized blueprints module
from src.primary.apps.blueprints import sonarr_bp, radarr_bp, lidarr_bp, readarr_bp, whisparr_bp, swaparr_bp
# Import stateful blueprint
from src.primary.stateful_routes import stateful_api
# Disable Flask default logging
log = logging.getLogger('werkzeug')
log.setLevel(logging.ERROR)
@@ -58,6 +61,7 @@ app.register_blueprint(lidarr_bp, url_prefix='/api/lidarr')
app.register_blueprint(readarr_bp, url_prefix='/api/readarr')
app.register_blueprint(whisparr_bp, url_prefix='/api/whisparr')
app.register_blueprint(swaparr_bp, url_prefix='/api/swaparr')
app.register_blueprint(stateful_api, url_prefix='/api/stateful')
# Register the authentication check to run before requests
app.before_request(authenticate_request)