2025-02-12 11:16:07 +07:00

346 lines
11 KiB
Python

"""all task API views"""
from common.serializers import (
AsyncTaskResponseSerializer,
ErrorResponseSerializer,
)
from common.views_base import AdminOnly, ApiBaseView
from django.shortcuts import get_object_or_404
from drf_spectacular.utils import OpenApiResponse, extend_schema
from rest_framework.response import Response
from task.models import CustomPeriodicTask
from task.serializers import (
CustomPeriodicTaskSerializer,
TaskCreateDataSerializer,
TaskIDDataSerializer,
TaskNotificationPostSerializer,
TaskNotificationSerializer,
TaskResultSerializer,
)
from task.src.config_schedule import CrontabValidator, ScheduleBuilder
from task.src.notify import Notifications, get_all_notifications
from task.src.task_config import TASK_CONFIG
from task.src.task_manager import TaskCommand, TaskManager
class TaskListView(ApiBaseView):
"""resolves to /api/task/by-name/
GET: return a list of all stored task results
"""
permission_classes = [AdminOnly]
@extend_schema(responses=TaskResultSerializer(many=True))
def get(self, request):
"""get all stored task results"""
# pylint: disable=unused-argument
all_results = TaskManager().get_all_results()
serializer = TaskResultSerializer(all_results, many=True)
return Response(serializer.data)
class TaskNameListView(ApiBaseView):
"""resolves to /api/task/by-name/<task-name>/
GET: return a list of stored results of task
POST: start new background process
"""
permission_classes = [AdminOnly]
@extend_schema(
responses={
200: OpenApiResponse(TaskResultSerializer(many=True)),
404: OpenApiResponse(
ErrorResponseSerializer(), description="task name not found"
),
},
)
def get(self, request, task_name):
"""get stored task by name"""
# pylint: disable=unused-argument
if task_name not in TASK_CONFIG:
error = ErrorResponseSerializer({"error": "task name not found"})
return Response(error.data, status=404)
all_results = TaskManager().get_tasks_by_name(task_name)
serializer = TaskResultSerializer(all_results, many=True)
return Response(serializer.data)
@extend_schema(
responses={
200: OpenApiResponse(AsyncTaskResponseSerializer()),
400: OpenApiResponse(
ErrorResponseSerializer(), description="bad request"
),
404: OpenApiResponse(
ErrorResponseSerializer(), description="task name not found"
),
}
)
def post(self, request, task_name):
"""start new task without args"""
# pylint: disable=unused-argument
task_config = TASK_CONFIG.get(task_name)
if not task_config:
error = ErrorResponseSerializer({"error": "task name not found"})
return Response(error.data, status=404)
if not task_config.get("api_start"):
error = ErrorResponseSerializer(
{"error": "can not start task through this endpoint"}
)
return Response(error.data, status=404)
message = TaskCommand().start(task_name)
serializer = AsyncTaskResponseSerializer(message)
return Response(serializer.data)
class TaskIDView(ApiBaseView):
"""resolves to /api/task/by-id/<task-id>/
GET: return details of task id
POST: send command to task by id
"""
valid_commands = ["stop", "kill"]
permission_classes = [AdminOnly]
@extend_schema(
responses={
200: OpenApiResponse(TaskResultSerializer()),
404: OpenApiResponse(
ErrorResponseSerializer(), description="task not found"
),
},
)
def get(self, request, task_id):
"""get task by ID"""
# pylint: disable=unused-argument
task_result = TaskManager().get_task(task_id)
if not task_result:
error = ErrorResponseSerializer({"error": "task ID not found"})
return Response(error.data, status=404)
serializer = TaskResultSerializer(task_result)
return Response(serializer.data)
@extend_schema(
request=TaskIDDataSerializer(),
responses={
204: OpenApiResponse(description="task command sent"),
400: OpenApiResponse(
ErrorResponseSerializer(), description="bad request"
),
404: OpenApiResponse(
ErrorResponseSerializer(), description="task not found"
),
},
)
def post(self, request, task_id):
"""post command to task"""
data_serializer = TaskIDDataSerializer(data=request.data)
data_serializer.is_valid(raise_exception=True)
validated_data = data_serializer.validated_data
command = validated_data["command"]
task_result = TaskManager().get_task(task_id)
if not task_result:
error = ErrorResponseSerializer({"error": "task ID not found"})
return Response(error.data, status=404)
task_conf = TASK_CONFIG.get(task_result.get("name"))
if command == "stop":
if not task_conf.get("api_stop"):
error = ErrorResponseSerializer(
{"error": "task can not be stopped"}
)
return Response(error.data, status=400)
TaskCommand().stop(task_id)
if command == "kill":
if not task_conf.get("api_stop"):
error = ErrorResponseSerializer(
{"error": "task can not be killed"}
)
return Response(error.data, status=400)
TaskCommand().kill(task_id)
return Response(status=204)
class ScheduleListView(ApiBaseView):
"""resolves to /api/task/schedule/
GET: list all schedules
"""
permission_classes = [AdminOnly]
@extend_schema(
responses={
200: OpenApiResponse(CustomPeriodicTaskSerializer(many=True)),
},
)
def get(self, request):
"""get all schedules"""
tasks = CustomPeriodicTask.objects.all()
serializer = CustomPeriodicTaskSerializer(tasks, many=True)
return Response(serializer.data)
class ScheduleView(ApiBaseView):
"""resolves to /api/task/schedule/<task-name>/
POST: create/update schedule for task with config
- example: {"schedule": "0 0 *", "config": {"days": 90}}
DEL: delete schedule for task
"""
permission_classes = [AdminOnly]
@extend_schema(
responses={
200: OpenApiResponse(CustomPeriodicTaskSerializer()),
404: OpenApiResponse(
ErrorResponseSerializer(), description="schedule not found"
),
},
)
def get(self, request, task_name):
"""get single schedule by task_name"""
task = get_object_or_404(CustomPeriodicTask, name=task_name)
serializer = CustomPeriodicTaskSerializer(task)
return Response(serializer.data)
@extend_schema(
request=TaskCreateDataSerializer(),
responses={
200: OpenApiResponse(CustomPeriodicTaskSerializer()),
400: OpenApiResponse(
ErrorResponseSerializer(), description="bad request"
),
},
)
def post(self, request, task_name):
"""create/update schedule for task"""
data_serializer = TaskCreateDataSerializer(data=request.data)
data_serializer.is_valid(raise_exception=True)
validated_data = data_serializer.validated_data
cron_schedule = validated_data.get("schedule")
schedule_config = validated_data.get("config")
if not cron_schedule and not schedule_config:
error = ErrorResponseSerializer(
{"error": "expected schedule or config key"}
)
return Response(error.data, status=400)
try:
validator = CrontabValidator()
validator.validate_cron(cron_schedule)
validator.validate_config(task_name, schedule_config)
except ValueError as err:
error = ErrorResponseSerializer({"error": str(err)})
return Response(error.data, status=400)
task = ScheduleBuilder().update_schedule(
task_name, cron_schedule, schedule_config
)
message = f"update schedule for task {task_name}"
if schedule_config:
message += f" with config {schedule_config}"
print(message)
serializer = CustomPeriodicTaskSerializer(task)
return Response(serializer.data)
@extend_schema(
responses={
204: OpenApiResponse(description="schedule deleted"),
404: OpenApiResponse(
ErrorResponseSerializer(), description="schedule not found"
),
},
)
def delete(self, request, task_name):
"""delete schedule by task_name"""
task = get_object_or_404(CustomPeriodicTask, name=task_name)
_ = task.delete()
return Response(status=204)
class ScheduleNotification(ApiBaseView):
"""resolves to /api/task/notification/
GET: get all schedule notifications
POST: add notification url to task
DEL: delete notification
"""
@extend_schema(
responses=TaskNotificationSerializer(),
)
def get(self, request):
"""handle get request"""
serializer = TaskNotificationSerializer(get_all_notifications())
return Response(serializer.data)
@extend_schema(
request=TaskNotificationPostSerializer(),
responses={
200: OpenApiResponse(TaskNotificationSerializer()),
400: OpenApiResponse(
ErrorResponseSerializer(), description="bad request"
),
},
)
def post(self, request):
"""create notification"""
data_serializer = TaskNotificationPostSerializer(data=request.data)
data_serializer.is_valid(raise_exception=True)
validated_data = data_serializer.validated_data
task_name = validated_data["task_name"]
url = validated_data["url"]
if not url:
error = ErrorResponseSerializer({"error": "missing url"})
return Response(error.data, status=400)
Notifications(task_name).add_url(url)
serializer = TaskNotificationSerializer(get_all_notifications())
return Response(serializer.data)
@extend_schema(
request=TaskNotificationPostSerializer(),
responses={
204: OpenApiResponse(description="notification url deleted"),
400: OpenApiResponse(
ErrorResponseSerializer(), description="bad request"
),
},
)
def delete(self, request):
"""delete notification"""
data_serializer = TaskNotificationPostSerializer(data=request.data)
data_serializer.is_valid(raise_exception=True)
validated_data = data_serializer.validated_data
task_name = validated_data["task_name"]
url = validated_data.get("url")
if url:
Notifications(task_name).remove_url(url)
else:
Notifications(task_name).remove_task()
return Response(status=204)