track dl failure, implement retry, continue from cache, #752

This commit is contained in:
Simon 2024-07-06 16:15:33 +02:00
parent 963d952dfa
commit 1fcea860c8
No known key found for this signature in database
GPG Key ID: 2C15AA5E89985DD4
2 changed files with 26 additions and 13 deletions

View File

@ -50,9 +50,10 @@ class VideoDownloader(DownloaderBase):
self.obs = False
self._build_obs()
def run_queue(self, auto_only=False) -> int:
def run_queue(self, auto_only=False) -> tuple[int, int]:
"""setup download queue in redis loop until no more items"""
downloaded = 0
failed = 0
while True:
video_data = self._get_next(auto_only)
if self.task.is_stopped() or not video_data:
@ -66,6 +67,7 @@ class VideoDownloader(DownloaderBase):
success = self._dl_single_vid(youtube_id, channel_id)
if not success:
failed += 1
continue
self._notify(video_data, "Add video metadata to index", progress=1)
@ -82,7 +84,7 @@ class VideoDownloader(DownloaderBase):
# post processing
DownloadPostProcess(self.task).run()
return downloaded
return downloaded, failed
def _notify(self, video_data, message, progress=False):
"""send progress notification to task"""
@ -153,6 +155,7 @@ class VideoDownloader(DownloaderBase):
"continuedl": True,
"writethumbnail": False,
"noplaylist": True,
"color": "no_color",
}
def _build_obs_user(self):
@ -219,12 +222,6 @@ class VideoDownloader(DownloaderBase):
self._set_overwrites(obs, channel_id)
dl_cache = os.path.join(self.CACHE_DIR, "download")
# check if already in cache to continue from there
all_cached = ignore_filelist(os.listdir(dl_cache))
for file_name in all_cached:
if youtube_id in file_name:
obs["outtmpl"] = os.path.join(dl_cache, file_name)
success, message = YtWrap(obs, self.config).download(youtube_id)
if not success:
self._handle_error(youtube_id, message)

View File

@ -7,6 +7,7 @@ Functionality:
"""
from celery import Task, shared_task
from celery.exceptions import Retry
from home.src.download.queue import PendingList
from home.src.download.subscriptions import (
SubscriptionHandler,
@ -114,7 +115,13 @@ def update_subscribed(self):
return None
@shared_task(name="download_pending", bind=True, base=BaseTask)
@shared_task(
name="download_pending",
bind=True,
base=BaseTask,
max_retries=3,
default_retry_delay=10,
)
def download_pending(self, auto_only=False):
"""download latest pending videos"""
manager = TaskManager()
@ -124,11 +131,20 @@ def download_pending(self, auto_only=False):
return None
manager.init(self)
downloader = VideoDownloader(task=self)
videos_downloaded = downloader.run_queue(auto_only=auto_only)
try:
downloader = VideoDownloader(task=self)
downloaded, failed = downloader.run_queue(auto_only=auto_only)
if videos_downloaded:
return f"downloaded {videos_downloaded} video(s)."
if failed:
print(f"[task][{self.name}] Videos failed, retry.")
self.send_progress("Videos failed, retry.")
raise self.retry()
except Retry as exc:
raise exc
if downloaded:
return f"downloaded {downloaded} video(s)."
return None