1
0
Fork 0

Merge pull request #2787 from WesleyAC/celery-clear-queues-form

Add form to remove tasks from Celery
This commit is contained in:
Mouse Reeve 2023-04-07 06:17:50 -07:00 committed by GitHub
commit 5895524a25
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 79 additions and 0 deletions

View file

@ -1,10 +1,13 @@
""" celery status """
import json
from django.contrib.auth.decorators import login_required, permission_required
from django.http import HttpResponse
from django.template.response import TemplateResponse
from django.utils.decorators import method_decorator
from django.views import View
from django.views.decorators.http import require_GET
from django import forms
import redis
from celerywyrm import settings
@ -46,14 +49,61 @@ class CeleryStatus(View):
queues = None
errors.append(err)
form = ClearCeleryForm()
data = {
"stats": stats,
"active_tasks": active_tasks,
"queues": queues,
"form": form,
"errors": errors,
}
return TemplateResponse(request, "settings/celery.html", data)
def post(self, request):
"""Submit form to clear queues"""
form = ClearCeleryForm(request.POST)
if form.is_valid():
if len(celery.control.ping()) != 0:
return HttpResponse(
"Refusing to delete tasks while Celery worker is active"
)
pipeline = r.pipeline()
for queue in form.cleaned_data["queues"]:
for task in r.lrange(queue, 0, -1):
task_json = json.loads(task)
if task_json["headers"]["task"] in form.cleaned_data["tasks"]:
pipeline.lrem(queue, 0, task)
results = pipeline.execute()
return HttpResponse(f"Deleted {sum(results)} tasks")
class ClearCeleryForm(forms.Form):
"""Form to clear queues"""
queues = forms.MultipleChoiceField(
label="Queues",
choices=[
(LOW, "Low prioirty"),
(MEDIUM, "Medium priority"),
(HIGH, "High priority"),
(IMPORTS, "Imports"),
(BROADCAST, "Broadcasts"),
],
widget=forms.CheckboxSelectMultiple,
)
tasks = forms.MultipleChoiceField(
label="Tasks", choices=[], widget=forms.CheckboxSelectMultiple
)
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
celery.loader.import_default_modules()
self.fields["tasks"].choices = sorted(
[(k, k) for k in celery.tasks.keys() if not k.startswith("celery.")]
)
@require_GET
# pylint: disable=unused-argument