diff --git a/tubearchivist/api/views.py b/tubearchivist/api/views.py
index 85b44b0e..15fde1d6 100644
--- a/tubearchivist/api/views.py
+++ b/tubearchivist/api/views.py
@@ -12,7 +12,7 @@ from home.src.index.generic import Pagination
from home.src.index.reindex import ReindexProgress
from home.src.index.video import SponsorBlock, YoutubeVideo
from home.src.ta.config import AppConfig
-from home.src.ta.ta_redis import RedisArchivist, RedisQueue
+from home.src.ta.ta_redis import RedisArchivist
from home.src.ta.task_manager import TaskCommand, TaskManager
from home.src.ta.urlparser import Parser
from home.tasks import (
@@ -38,8 +38,8 @@ class ApiBaseView(APIView):
authentication_classes = [SessionAuthentication, TokenAuthentication]
permission_classes = [IsAuthenticated]
- search_base = False
- data = False
+ search_base = ""
+ data = ""
def __init__(self):
super().__init__()
@@ -436,12 +436,9 @@ class DownloadApiView(ApiBaseView):
return Response({"message": message}, status=404)
print(f"{video_id}: change status to {item_status}")
+ PendingInteract(video_id, item_status).update_status()
if item_status == "priority":
- PendingInteract(youtube_id=video_id).prioritize()
- download_pending.delay(from_queue=False)
- else:
- PendingInteract(video_id, item_status).update_status()
- RedisQueue(queue_name="dl_queue").clear_item(video_id)
+ download_pending.delay(auto_only=True)
return Response(request.data)
@@ -494,6 +491,7 @@ class DownloadApiListView(ApiBaseView):
def post(request):
"""add list of videos to download queue"""
data = request.data
+ auto_start = bool(request.GET.get("autostart"))
try:
to_add = data["data"]
except KeyError:
@@ -510,7 +508,7 @@ class DownloadApiListView(ApiBaseView):
print(message)
return Response({"message": message}, status=400)
- extrac_dl.delay(youtube_ids)
+ extrac_dl.delay(youtube_ids, auto_start=auto_start)
return Response(data)
diff --git a/tubearchivist/config/management/commands/ta_startup.py b/tubearchivist/config/management/commands/ta_startup.py
index 80c6d9c4..e204bf21 100644
--- a/tubearchivist/config/management/commands/ta_startup.py
+++ b/tubearchivist/config/management/commands/ta_startup.py
@@ -44,6 +44,7 @@ class Command(BaseCommand):
self._mig_snapshot_check()
self._mig_set_vid_type()
self._mig_set_streams()
+ self._mig_set_autostart()
def _sync_redis_state(self):
"""make sure redis gets new config.json values"""
@@ -236,3 +237,34 @@ class Command(BaseCommand):
if idx % 100 == 0:
self.stdout.write(f" progress {idx}/{total}")
+
+ def _mig_set_autostart(self):
+ """migration: update from 0.3.5 to 0.3.6 set auto_start to false"""
+ self.stdout.write("[MIGRATION] set default download auto_start")
+ data = {
+ "query": {
+ "bool": {"must_not": [{"exists": {"field": "auto_start"}}]}
+ },
+ "script": {"source": "ctx._source['auto_start'] = false"},
+ }
+ path = "ta_download/_update_by_query"
+ response, status_code = ElasticWrap(path).post(data=data)
+ if status_code == 200:
+ updated = response.get("updated", 0)
+ if not updated:
+ self.stdout.write(
+ " no videos needed updating in ta_download"
+ )
+
+ self.stdout.write(
+ self.style.SUCCESS(
+ f" ✓ {updated} videos updated in ta_download"
+ )
+ )
+ return
+
+ message = " 🗙 ta_download auto_start update failed"
+ self.stdout.write(self.style.ERROR(message))
+ self.stdout.write(response)
+ sleep(60)
+ raise CommandError(message)
diff --git a/tubearchivist/home/config.json b/tubearchivist/home/config.json
index f2fa66eb..5c91fd15 100644
--- a/tubearchivist/home/config.json
+++ b/tubearchivist/home/config.json
@@ -12,14 +12,13 @@
"grid_items": 3
},
"subscriptions": {
- "auto_search": false,
"auto_download": false,
"channel_size": 50,
"live_channel_size": 50,
- "shorts_channel_size": 50
+ "shorts_channel_size": 50,
+ "auto_start": false
},
"downloads": {
- "limit_count": false,
"limit_speed": false,
"sleep_interval": 3,
"autodelete_days": false,
diff --git a/tubearchivist/home/src/download/queue.py b/tubearchivist/home/src/download/queue.py
index 3309f519..006b7a04 100644
--- a/tubearchivist/home/src/download/queue.py
+++ b/tubearchivist/home/src/download/queue.py
@@ -19,7 +19,6 @@ from home.src.index.video_constants import VideoTypeEnum
from home.src.index.video_streams import DurationConverter
from home.src.ta.config import AppConfig
from home.src.ta.helper import is_shorts
-from home.src.ta.ta_redis import RedisQueue
class PendingIndex:
@@ -113,20 +112,14 @@ class PendingInteract:
_, _ = ElasticWrap(path).post(data=data)
def update_status(self):
- """update status field of pending item"""
- data = {"doc": {"status": self.status}}
- path = f"ta_download/_update/{self.youtube_id}"
- _, _ = ElasticWrap(path).post(data=data)
+ """update status of pending item"""
+ if self.status == "priority":
+ data = {"doc": {"status": "pending", "auto_start": True}}
+ else:
+ data = {"doc": {"status": self.status}}
- def prioritize(self):
- """prioritize pending item in redis queue"""
- pending_video, _ = self.get_item()
- vid_type = pending_video.get("vid_type", VideoTypeEnum.VIDEOS.value)
- to_add = {
- "youtube_id": pending_video["youtube_id"],
- "vid_type": vid_type,
- }
- RedisQueue(queue_name="dl_queue").add_priority(to_add)
+ path = f"ta_download/_update/{self.youtube_id}/?refresh=true"
+ _, _ = ElasticWrap(path).post(data=data)
def get_item(self):
"""return pending item dict"""
@@ -236,7 +229,7 @@ class PendingList(PendingIndex):
# match vid_type later
self._add_video(video_id, VideoTypeEnum.UNKNOWN)
- def add_to_pending(self, status="pending"):
+ def add_to_pending(self, status="pending", auto_start=False):
"""add missing videos to pending list"""
self.get_channels()
bulk_list = []
@@ -252,7 +245,13 @@ class PendingList(PendingIndex):
if not video_details:
continue
- video_details["status"] = status
+ video_details.update(
+ {
+ "status": status,
+ "auto_start": auto_start,
+ }
+ )
+
action = {"create": {"_id": youtube_id, "_index": "ta_download"}}
bulk_list.append(json.dumps(action))
bulk_list.append(json.dumps(video_details))
@@ -274,7 +273,7 @@ class PendingList(PendingIndex):
# add last newline
bulk_list.append("\n")
query_str = "\n".join(bulk_list)
- _, _ = ElasticWrap("_bulk").post(query_str, ndjson=True)
+ _, _ = ElasticWrap("_bulk?refresh=true").post(query_str, ndjson=True)
def _notify_add(self, idx, total):
"""send notification for adding videos to download queue"""
diff --git a/tubearchivist/home/src/download/subscriptions.py b/tubearchivist/home/src/download/subscriptions.py
index b006f845..6325cb4f 100644
--- a/tubearchivist/home/src/download/subscriptions.py
+++ b/tubearchivist/home/src/download/subscriptions.py
@@ -175,10 +175,7 @@ class PlaylistSubscription:
def process_url_str(self, new_playlists, subscribed=True):
"""process playlist subscribe form url_str"""
- data = {
- "query": {"match_all": {}},
- "sort": [{"published": {"order": "desc"}}],
- }
+ data = {"query": {"match_all": {}}, "_source": ["youtube_id"]}
all_indexed = IndexPaginate("ta_video", data).get_results()
all_youtube_ids = [i["youtube_id"] for i in all_indexed]
@@ -284,6 +281,7 @@ class SubscriptionScanner:
def __init__(self, task=False):
self.task = task
self.missing_videos = False
+ self.auto_start = AppConfig().config["subscriptions"].get("auto_start")
def scan(self):
"""scan channels and playlists"""
diff --git a/tubearchivist/home/src/download/yt_dlp_handler.py b/tubearchivist/home/src/download/yt_dlp_handler.py
index 2a57d034..700eee51 100644
--- a/tubearchivist/home/src/download/yt_dlp_handler.py
+++ b/tubearchivist/home/src/download/yt_dlp_handler.py
@@ -6,14 +6,13 @@ functionality:
- move to archive
"""
-import json
import os
import shutil
from datetime import datetime
from home.src.download.queue import PendingList
from home.src.download.subscriptions import PlaylistSubscription
-from home.src.download.yt_dlp_base import CookieHandler, YtWrap
+from home.src.download.yt_dlp_base import YtWrap
from home.src.es.connect import ElasticWrap, IndexPaginate
from home.src.index.channel import YoutubeChannel
from home.src.index.comments import CommentList
@@ -22,7 +21,6 @@ from home.src.index.video import YoutubeVideo, index_new_video
from home.src.index.video_constants import VideoTypeEnum
from home.src.ta.config import AppConfig
from home.src.ta.helper import clean_string, ignore_filelist
-from home.src.ta.ta_redis import RedisQueue
class DownloadPostProcess:
@@ -159,114 +157,77 @@ class VideoDownloader:
self.channels = set()
self.videos = set()
- def run_queue(self):
+ def run_queue(self, auto_only=False):
"""setup download queue in redis loop until no more items"""
- self._setup_queue()
- queue = RedisQueue(queue_name="dl_queue")
-
- limit_queue = self.config["downloads"]["limit_count"]
- if limit_queue:
- queue.trim(limit_queue - 1)
-
+ self._get_overwrites()
while True:
- youtube_data = queue.get_next()
- if self.task.is_stopped() or not youtube_data:
- queue.clear()
+ video_data = self._get_next(auto_only)
+ if self.task.is_stopped() or not video_data:
break
- youtube_data = json.loads(youtube_data)
- youtube_id = youtube_data.get("youtube_id")
-
- tmp_vid_type = youtube_data.get(
- "vid_type", VideoTypeEnum.VIDEOS.value
- )
- video_type = VideoTypeEnum(tmp_vid_type)
- print(f"{youtube_id}: Downloading type: {video_type}")
+ youtube_id = video_data.get("youtube_id")
+ print(f"{youtube_id}: Downloading video")
+ self._notify(video_data, "Validate download format")
success = self._dl_single_vid(youtube_id)
if not success:
continue
- if self.task:
- self.task.send_progress(
- [
- f"Processing video {youtube_id}",
- "Add video metadata to index.",
- ]
- )
+ self._notify(video_data, "Add video metadata to index")
vid_dict = index_new_video(
youtube_id,
video_overwrites=self.video_overwrites,
- video_type=video_type,
+ video_type=VideoTypeEnum(video_data["vid_type"]),
)
self.channels.add(vid_dict["channel"]["channel_id"])
self.videos.add(vid_dict["youtube_id"])
- if self.task:
- self.task.send_progress(
- [
- f"Processing video {youtube_id}",
- "Move downloaded file to archive.",
- ]
- )
-
+ self._notify(video_data, "Move downloaded file to archive")
self.move_to_archive(vid_dict)
-
- if queue.has_item():
- message = "Continue with next video."
- else:
- message = "Download queue is finished."
-
- if self.task:
- self.task.send_progress([message])
-
self._delete_from_pending(youtube_id)
# post processing
self._add_subscribed_channels()
DownloadPostProcess(self).run()
- def _setup_queue(self):
- """setup required and validate"""
- if self.config["downloads"]["cookie_import"]:
- valid = CookieHandler(self.config).validate()
- if not valid:
- return
+ def _notify(self, video_data, message):
+ """send progress notification to task"""
+ if not self.task:
+ return
+ typ = VideoTypeEnum(video_data["vid_type"]).value.rstrip("s").title()
+ title = video_data.get("title")
+ self.task.send_progress([f"Processing {typ}: {title}", message])
+
+ def _get_next(self, auto_only):
+ """get next item in queue"""
+ must_list = [{"term": {"status": {"value": "pending"}}}]
+ if auto_only:
+ must_list.append({"term": {"auto_start": {"value": True}}})
+
+ data = {
+ "size": 1,
+ "query": {"bool": {"must": must_list}},
+ "sort": [
+ {"auto_start": {"order": "desc"}},
+ {"timestamp": {"order": "asc"}},
+ ],
+ }
+ path = "ta_download/_search"
+ response, _ = ElasticWrap(path).get(data=data)
+ if not response["hits"]["hits"]:
+ return False
+
+ return response["hits"]["hits"][0]["_source"]
+
+ def _get_overwrites(self):
+ """get channel overwrites"""
pending = PendingList()
pending.get_download()
pending.get_channels()
self.video_overwrites = pending.video_overwrites
- def add_pending(self):
- """add pending videos to download queue"""
- if self.task:
- self.task.send_progress(["Scanning your download queue."])
-
- pending = PendingList()
- pending.get_download()
- to_add = [
- json.dumps(
- {
- "youtube_id": i["youtube_id"],
- # Using .value in default val to match what would be
- # decoded when parsing json if not set
- "vid_type": i.get("vid_type", VideoTypeEnum.VIDEOS.value),
- }
- )
- for i in pending.all_pending
- ]
- if not to_add:
- # there is nothing pending
- print("download queue is empty")
- if self.task:
- self.task.send_progress(["Download queue is empty."])
-
- return
-
- RedisQueue(queue_name="dl_queue").add_list(to_add)
-
def _progress_hook(self, response):
"""process the progress_hooks from yt_dlp"""
progress = False
@@ -426,7 +387,7 @@ class VideoDownloader:
@staticmethod
def _delete_from_pending(youtube_id):
"""delete downloaded video from pending index if its there"""
- path = f"ta_download/_doc/{youtube_id}"
+ path = f"ta_download/_doc/{youtube_id}?refresh=true"
_, _ = ElasticWrap(path).delete()
def _add_subscribed_channels(self):
diff --git a/tubearchivist/home/src/es/index_mapping.json b/tubearchivist/home/src/es/index_mapping.json
index 66411594..c7482043 100644
--- a/tubearchivist/home/src/es/index_mapping.json
+++ b/tubearchivist/home/src/es/index_mapping.json
@@ -357,6 +357,9 @@
},
"vid_type": {
"type": "keyword"
+ },
+ "auto_start": {
+ "type": "boolean"
}
},
"expected_set": {
diff --git a/tubearchivist/home/src/frontend/forms.py b/tubearchivist/home/src/frontend/forms.py
index e06fda87..a643fb85 100644
--- a/tubearchivist/home/src/frontend/forms.py
+++ b/tubearchivist/home/src/frontend/forms.py
@@ -107,7 +107,6 @@ class ApplicationSettingsForm(forms.Form):
subscriptions_shorts_channel_size = forms.IntegerField(
required=False, min_value=0
)
- downloads_limit_count = forms.IntegerField(required=False)
downloads_limit_speed = forms.IntegerField(required=False)
downloads_throttledratelimit = forms.IntegerField(required=False)
downloads_sleep_interval = forms.IntegerField(required=False)
diff --git a/tubearchivist/home/tasks.py b/tubearchivist/home/tasks.py
index 21964689..1251891f 100644
--- a/tubearchivist/home/tasks.py
+++ b/tubearchivist/home/tasks.py
@@ -25,6 +25,7 @@ from home.src.index.reindex import Reindex, ReindexManual, ReindexPopulate
from home.src.ta.config import AppConfig, ReleaseVersion, ScheduleBuilder
from home.src.ta.ta_redis import RedisArchivist
from home.src.ta.task_manager import TaskManager
+from home.src.ta.urlparser import Parser
CONFIG = AppConfig().config
REDIS_HOST = os.environ.get("REDIS_HOST")
@@ -171,14 +172,16 @@ def update_subscribed(self):
return
manager.init(self)
- missing_videos = SubscriptionScanner(task=self).scan()
+ handler = SubscriptionScanner(task=self)
+ missing_videos = handler.scan()
+ auto_start = handler.auto_start
if missing_videos:
print(missing_videos)
- extrac_dl.delay(missing_videos)
+ extrac_dl.delay(missing_videos, auto_start=auto_start)
@shared_task(name="download_pending", bind=True, base=BaseTask)
-def download_pending(self, from_queue=True):
+def download_pending(self, auto_only=False):
"""download latest pending videos"""
manager = TaskManager()
if manager.is_pending(self):
@@ -187,19 +190,24 @@ def download_pending(self, from_queue=True):
return
manager.init(self)
- downloader = VideoDownloader(task=self)
- if from_queue:
- downloader.add_pending()
- downloader.run_queue()
+ VideoDownloader(task=self).run_queue(auto_only=auto_only)
@shared_task(name="extract_download", bind=True, base=BaseTask)
-def extrac_dl(self, youtube_ids):
+def extrac_dl(self, youtube_ids, auto_start=False):
"""parse list passed and add to pending"""
TaskManager().init(self)
- pending_handler = PendingList(youtube_ids=youtube_ids, task=self)
+ if isinstance(youtube_ids, str):
+ to_add = Parser(youtube_ids).parse()
+ else:
+ to_add = youtube_ids
+
+ pending_handler = PendingList(youtube_ids=to_add, task=self)
pending_handler.parse_url_list()
- pending_handler.add_to_pending()
+ pending_handler.add_to_pending(auto_start=auto_start)
+
+ if auto_start:
+ download_pending.delay(auto_only=True)
@shared_task(bind=True, name="check_reindex", base=BaseTask)
diff --git a/tubearchivist/home/templates/home/downloads.html b/tubearchivist/home/templates/home/downloads.html
index 35ed3d91..59f4f039 100644
--- a/tubearchivist/home/templates/home/downloads.html
+++ b/tubearchivist/home/templates/home/downloads.html
@@ -20,11 +20,12 @@
Add to download queue
Current download limit: {{ config.downloads.limit_count }}
- Limit the number of videos getting downloaded on every run. 0 (zero) to deactivate.Current download speed limit in KB/s: {{ config.downloads.limit_speed }}
Limit download speed. 0 (zero) to deactivate, e.g. 1000 (1MB/s). Speeds are in KB/s. Setting takes effect on new download jobs or application restart.