mirror of
https://github.com/bugsink/bugsink.git
synced 2025-12-30 09:50:11 -06:00
Snappea: logging
This commit is contained in:
@@ -1,5 +1,6 @@
|
||||
from copy import deepcopy
|
||||
import os
|
||||
import sys
|
||||
|
||||
from pathlib import Path
|
||||
|
||||
@@ -197,11 +198,30 @@ LOGGING['loggers']['bugsink'] = {
|
||||
"level": "INFO",
|
||||
"handlers": ["console"],
|
||||
}
|
||||
LOGGING['loggers']['snappea'] = {
|
||||
"level": "INFO",
|
||||
"handlers": ["console"],
|
||||
|
||||
# Snappea Logging
|
||||
LOGGING["formatters"]["snappea"] = {
|
||||
"format": "{asctime} - {threadName} - {levelname:7} - {message}",
|
||||
"style": "{",
|
||||
}
|
||||
|
||||
LOGGING["handlers"]["snappea"] = {
|
||||
"level": "DEBUG" if DEBUG else "INFO",
|
||||
"class": "logging.StreamHandler"
|
||||
}
|
||||
|
||||
LOGGING["handlers"]["snappea"]["formatter"] = "snappea"
|
||||
|
||||
LOGGING['loggers']['snappea'] = {
|
||||
"level": "DEBUG" if DEBUG else "INFO",
|
||||
"handlers": ["snappea"],
|
||||
}
|
||||
|
||||
# TODO sys.argv checking: how do I want to deal with it in my final config setup?
|
||||
if sys.argv[1:2] == ['runsnappea']:
|
||||
for logger in LOGGING['loggers'].values():
|
||||
logger["handlers"] = ["snappea"]
|
||||
|
||||
|
||||
# ###################### SERVER-MODE SETTINGS #################
|
||||
|
||||
|
||||
@@ -1,23 +1,22 @@
|
||||
import time
|
||||
import random
|
||||
import uuid
|
||||
import logging
|
||||
|
||||
from .decorators import shared_task
|
||||
|
||||
logger = logging.getLogger("snappea.foreman")
|
||||
# for the example tasks, we pick a non-snappea logger on purpose, to check that non-snappea logs are written in the
|
||||
# correct format when the snappea server is running (in general, logs inside tasks will have non-snappea loggers)
|
||||
logger = logging.getLogger("bugsink")
|
||||
|
||||
|
||||
@shared_task
|
||||
def example_worker():
|
||||
me = str(uuid.uuid4())
|
||||
logger.info("example worker started %s", me)
|
||||
def random_duration():
|
||||
logger.info("Starting something of a random duration")
|
||||
time.sleep(random.random() * 10)
|
||||
logger.info("example worker stopped %s", me)
|
||||
|
||||
|
||||
@shared_task
|
||||
def example_failing_worker():
|
||||
def failing_task():
|
||||
raise Exception("I am failing")
|
||||
|
||||
|
||||
|
||||
@@ -16,13 +16,20 @@ from .models import Task
|
||||
from .datastructures import Workers
|
||||
|
||||
|
||||
GRACEFUL_TIMEOUT = 10
|
||||
GRACEFUL_TIMEOUT = .1
|
||||
NUM_WORKERS = 4
|
||||
|
||||
|
||||
logger = logging.getLogger("snappea.foreman")
|
||||
|
||||
|
||||
def short_id(task_id):
|
||||
# we take the least-significant 6 digits for task IDs when displaying them (for humans). This leaves the ability to
|
||||
# distinguish them meaningfully even when some tasks are processed a few orders of magnitude faster than others (not
|
||||
# expected) while at the same time processing _very many_ of the fast tasks. Upside: more readable logs.
|
||||
return f"{task_id:06}"[-6:-3] + "-" + f"{task_id:06}"[-3:]
|
||||
|
||||
|
||||
class Foreman:
|
||||
"""
|
||||
The Foreman starts workers, as (threading.Thread) threads, based on snappea.Task objects it finds in the sqlite
|
||||
@@ -58,6 +65,8 @@ class Foreman:
|
||||
"""
|
||||
|
||||
def __init__(self):
|
||||
threading.current_thread().name = "FOREMAN"
|
||||
|
||||
self.workers = Workers()
|
||||
self.stopping = False
|
||||
|
||||
@@ -72,7 +81,9 @@ class Foreman:
|
||||
|
||||
# Pid stuff
|
||||
pid = os.getpid()
|
||||
logger.info("Foreman created, my pid is %s", pid)
|
||||
|
||||
logger.info(" ========= SNAPPEA =========")
|
||||
logger.info("Startup: my pid is %s", pid)
|
||||
with open("/tmp/snappea.pid", "w") as f: # TODO configurable location
|
||||
f.write(str(pid))
|
||||
|
||||
@@ -85,21 +96,27 @@ class Foreman:
|
||||
self.worker_semaphore = threading.Semaphore(NUM_WORKERS)
|
||||
|
||||
def run_in_thread(self, task_id, function, *args, **kwargs):
|
||||
logger.info("run_in_thread: %s, %s.%s", task_id, function.__module__, function.__name__)
|
||||
# NOTE: we expose args & kwargs in the logs; as it stands no sensitive stuff lives there in our case, but this
|
||||
# is something to keep an eye on
|
||||
logger.info(
|
||||
'Starting %s for "%s.%s" with %s, %s',
|
||||
short_id(task_id), function.__module__, function.__name__, args, kwargs)
|
||||
|
||||
def non_failing_function(*inner_args, **inner_kwargs):
|
||||
t0 = time.time()
|
||||
try:
|
||||
function(*inner_args, **inner_kwargs)
|
||||
except Exception as e:
|
||||
# Potential TODO: make this configurable / depend on our existing config in bugsink/settings.py
|
||||
logger.info("Worker exception: %s", str(e))
|
||||
logger.warning("Snappea caught Exception: %s", str(e))
|
||||
capture_exception(e)
|
||||
finally:
|
||||
logger.info("worker done: %s", task_id)
|
||||
logger.info("Worker done in %.3fs", time.time() - t0)
|
||||
self.workers.stopped(task_id)
|
||||
self.worker_semaphore.release()
|
||||
|
||||
worker_thread = threading.Thread(target=non_failing_function, args=args, kwargs=kwargs)
|
||||
worker_thread = threading.Thread(
|
||||
target=non_failing_function, args=args, kwargs=kwargs, name=f"{short_id(task_id)}")
|
||||
|
||||
# killing threads seems to be 'hard'(https://stackoverflow.com/questions/323972/is-there-any-way-to-kill-a-thre)
|
||||
# we can, however, set deamon=True which will ensure that an exit of the main thread is the end of the program
|
||||
@@ -113,7 +130,7 @@ class Foreman:
|
||||
# We set a flag and release a semaphore. The advantage is that we don't have to think about e.g. handle_sigint
|
||||
# being called while we're handling a previous call to it. The (slight) disadvantage is that we need to sprinkle
|
||||
# calls to check_for_stopping() in more locations (at least after every semaphore is acquired)
|
||||
logger.debug("Received SIGINT")
|
||||
logger.debug("Received SIGINT") # NOTE: calling logger in handle_xxx is probably a bad idea
|
||||
|
||||
if not self.stopping: # without this if-statement, repeated signals would extend the deadline
|
||||
self.stopping = True
|
||||
@@ -131,7 +148,7 @@ class Foreman:
|
||||
if len(pre_existing_wakeup_notifications) > 0:
|
||||
# We clear the wakeup_calls_dir on startup. Not strictly necessary because such files would be cleared by in
|
||||
# the loop anyway, but it's more efficient to do it first.
|
||||
logger.info("Clearing %s pre-existing wakeup notifications", len(pre_existing_wakeup_notifications))
|
||||
logger.info("Startup: Clearing %s items from wakeup dir", len(pre_existing_wakeup_notifications))
|
||||
for filename in pre_existing_wakeup_notifications:
|
||||
os.remove(filename)
|
||||
|
||||
@@ -139,13 +156,15 @@ class Foreman:
|
||||
# during this time-period will simply "count up" the semaphore even though the work is already being done. This
|
||||
# is not really a problem, we'll just notice that there is "No task found" an equal amount of times and go into
|
||||
# deep sleep after.
|
||||
logger.info("Checking Task backlog")
|
||||
logger.info("Startup: Clearing Task backlog")
|
||||
while self.create_workers() > 0:
|
||||
pass
|
||||
|
||||
logger.info("Task backlog empty now, proceeding to main loop")
|
||||
logger.info("Startup: Task backlog empty now, proceeding to main loop")
|
||||
while True:
|
||||
logger.debug("Main loop: Waiting for wakeup call")
|
||||
for event in self.wakeup_calls.read():
|
||||
logger.debug("Main loop: Removing wakeup notification %s", event.name)
|
||||
# I think we can just do os.unlink(), without being afraid of an error either here or on the side where
|
||||
# we write the file. I don't have a link to the man page to back this up, but when running "many" calls
|
||||
# (using 2 processes with each simple tight loop, one creating the files and one deleting them, I did
|
||||
@@ -159,7 +178,7 @@ class Foreman:
|
||||
def create_workers(self):
|
||||
"""returns the number of workers created (AKA tasks done)"""
|
||||
|
||||
logger.debug("Querying for tasks")
|
||||
logger.debug("Main loop: Querying for tasks")
|
||||
# We get "a lot" of Tasks at once, rather than one by one. We assume (but did not test) this is more efficient
|
||||
# than getting the Tasks one by one. It also has consequences for the case where many Tasks (and thus
|
||||
# wakeup notifications) come in at the same time: in such cases, we may deal with more than one Task for a
|
||||
@@ -177,8 +196,8 @@ class Foreman:
|
||||
|
||||
task_i = -1
|
||||
for task_i, task in enumerate(tasks):
|
||||
logger.debug("Creating worker for with task", task.id)
|
||||
logger.debug("Checking (potentially waiting) for available worker slots")
|
||||
logger.debug("Main loop: Creating worker for with task %s", short_id(task.id))
|
||||
logger.debug("Main loop: Checking (potentially waiting) for available worker slots")
|
||||
self.worker_semaphore.acquire()
|
||||
self.check_for_stopping() # always check after .acquire()
|
||||
# TODO note on why no transactions are needed (it's just a single call anyway)
|
||||
@@ -194,18 +213,17 @@ class Foreman:
|
||||
task.delete()
|
||||
self.run_in_thread(task_id, function, *args, **kwargs)
|
||||
|
||||
if task_i == -1:
|
||||
# Seeing this is expected on-bootup (one after all Peas are dealt with, and once for each noification that
|
||||
# was received while clearing the initial backlog, but before we went into "sleeping" (wait for
|
||||
# notification) mode). See also above, starting with 'We get "a lot" of Tasks at once'
|
||||
logger.info("No task found")
|
||||
task_count = task_i + 1
|
||||
|
||||
return task_i + 1
|
||||
# Seeing "0 tasks handled" is expected, both on Startup and right before we go into "Wait for wakeup". See also
|
||||
# above, starting with 'We get "a lot" of Tasks at once'
|
||||
logger.debug("Main loop: %s tasks handled", task_count)
|
||||
return task_count
|
||||
|
||||
def check_for_stopping(self):
|
||||
if not self.stopping:
|
||||
return
|
||||
logger.info("Foreman stopping")
|
||||
logger.info("Stopping")
|
||||
|
||||
# Loop over all tasks, waiting for them to finish. If they don't finish in time (GRACEFUL_TIMEOUT), we'll kill
|
||||
# them with a system-exit.
|
||||
@@ -213,12 +231,17 @@ class Foreman:
|
||||
if worker_thread.is_alive():
|
||||
time_left = self.stop_deadline - time.time()
|
||||
if time_left > 0:
|
||||
logger.info("Waiting for worker %s", task_id)
|
||||
worker_thread.join(time_left)
|
||||
if worker_thread.is_alive():
|
||||
logger.info("Worker %s did not die before the wait was over", task_id)
|
||||
logger.info(
|
||||
"Stopping: %s did not die in %.1fs, proceeding to kill",
|
||||
short_id(task_id), GRACEFUL_TIMEOUT)
|
||||
else:
|
||||
logger.info("No time left to wait for worker %s", task_id) # it will be killed by system-exit
|
||||
# for the logs distinguishing between "explicit join targets" and "not dead yet" is irrelevant, we
|
||||
# use the same format.
|
||||
logger.info(
|
||||
"Stopping: %s did not die in %.1fs, proceeding to kill", short_id(task_id), GRACEFUL_TIMEOUT)
|
||||
|
||||
logger.info("Stopping: EXIT")
|
||||
|
||||
logger.info("Foreman exit")
|
||||
sys.exit()
|
||||
|
||||
Reference in New Issue
Block a user