refactor PostData class

* Split up into SearchForm class as part of searching module
* Split up into WatchState class as part of index module
This commit is contained in:
simon 2021-09-22 11:43:38 +07:00
parent aa1b9f4d3b
commit ecae40d502
3 changed files with 226 additions and 176 deletions

View File

@ -15,7 +15,7 @@ import requests
import yt_dlp as youtube_dl
from bs4 import BeautifulSoup
from home.src.config import AppConfig
from home.src.helper import DurationConverter, clean_string
from home.src.helper import DurationConverter, clean_string, process_url_list
class YoutubeChannel:
@ -363,6 +363,78 @@ class YoutubeVideo:
print(response.text)
class WatchState:
"""handle watched checkbox for videos and channels"""
CONFIG = AppConfig().config
ES_URL = CONFIG["application"]["es_url"]
HEADERS = {"Content-type": "application/json"}
def __init__(self, youtube_id):
self.youtube_id = youtube_id
self.stamp = int(datetime.now().strftime("%s"))
def mark_as_watched(self):
"""update es with new watched value"""
url_type = self.dedect_type()
if url_type == "video":
self.mark_vid_watched()
elif url_type == "channel":
self.mark_channel_watched()
print(f"marked {self.youtube_id} as watched")
def dedect_type(self):
"""find youtube id type"""
url_process = process_url_list([self.youtube_id])
url_type = url_process[0]["type"]
return url_type
def mark_vid_watched(self):
"""change watched status of single video"""
url = self.ES_URL + "/ta_video/_update/" + self.youtube_id
data = {
"doc": {"player": {"watched": True, "watched_date": self.stamp}}
}
payload = json.dumps(data)
request = requests.post(url, data=payload, headers=self.HEADERS)
if not request.ok:
print(request.text)
def mark_channel_watched(self):
"""change watched status of every video in channel"""
es_url = self.ES_URL
headers = self.HEADERS
youtube_id = self.youtube_id
# create pipeline
data = {
"description": youtube_id,
"processors": [
{"set": {"field": "player.watched", "value": True}},
{"set": {"field": "player.watched_date", "value": self.stamp}},
],
}
payload = json.dumps(data)
url = f"{es_url}/_ingest/pipeline/{youtube_id}"
request = requests.put(url, data=payload, headers=headers)
if not request.ok:
print(request.text)
raise ValueError("failed to post ingest pipeline")
# apply pipeline
must_list = [
{"term": {"channel.channel_id": {"value": youtube_id}}},
{"term": {"player.watched": {"value": False}}},
]
data = {"query": {"bool": {"must": must_list}}}
payload = json.dumps(data)
url = f"{es_url}/ta_video/_update_by_query?pipeline={youtube_id}"
request = requests.post(url, data=payload, headers=headers)
if not request.ok:
print(request.text)
def index_new_video(youtube_id, missing_vid=False):
"""combine video and channel classes for new video index"""
vid_handler = YoutubeVideo(youtube_id)

View File

@ -182,6 +182,40 @@ class SearchHandler:
return hit
class SearchForm:
"""build query from search form data"""
CONFIG = AppConfig().config
ES_URL = CONFIG["application"]["es_url"]
def search_channels(self, search_query):
"""fancy searching channels as you type"""
url = self.ES_URL + "/ta_channel/_search"
data = {
"size": 10,
"query": {
"multi_match": {
"query": search_query,
"type": "bool_prefix",
"fields": [
"channel_name.search_as_you_type",
"channel_name._2gram",
"channel_name._3gram",
],
}
},
}
look_up = SearchHandler(url, data, cache=False)
search_results = look_up.get_data()
return {"results": search_results}
@staticmethod
def search_videos():
"""searching for videos"""
# TBD palceholder for now
return False
class Pagination:
"""
figure out the pagination based on page size and total_hits

View File

@ -6,10 +6,8 @@ Functionality:
import json
import urllib.parse
from datetime import datetime
from time import sleep
import requests
from django.http import JsonResponse
from django.shortcuts import redirect, render
from django.utils.http import urlencode
@ -22,7 +20,8 @@ from home.src.helper import (
process_url_list,
set_message,
)
from home.src.searching import Pagination, SearchHandler
from home.src.index import WatchState
from home.src.searching import Pagination, SearchForm, SearchHandler
from home.tasks import (
download_pending,
download_single,
@ -450,7 +449,7 @@ def process(request):
if request.method == "POST":
post_dict = json.loads(request.body.decode())
post_handler = PostData(post_dict)
if post_handler.to_do:
if post_handler.to_exec:
task_result = post_handler.run_task()
return JsonResponse(task_result)
@ -458,183 +457,128 @@ def process(request):
class PostData:
"""generic post handler from process route"""
CONFIG = AppConfig().config
ES_URL = CONFIG["application"]["es_url"]
VALID_KEYS = [
"watched",
"rescan_pending",
"ignore",
"dl_pending",
"unsubscribe",
"sort_order",
"hide_watched",
"show_subed_only",
"channel-search",
"video-search",
"dlnow",
"manual-import",
"db-backup",
"db-restore",
]
"""
map frontend http post values to backend funcs
handover long running tasks to celery
"""
def __init__(self, post_dict):
self.post_dict = post_dict
self.to_do = self.validate()
def validate(self):
"""validate the post_dict"""
to_do = []
for key, value in self.post_dict.items():
if key in self.VALID_KEYS:
task_item = {"task": key, "status": value}
print(task_item)
to_do.append(task_item)
else:
print(key + " not a valid key")
return to_do
self.to_exec, self.exec_val = list(post_dict.items())[0]
def run_task(self):
"""run through the tasks to do"""
for item in self.to_do:
task = item["task"]
if task == "watched":
youtube_id = item["status"]
self.parse_watched(youtube_id)
elif task == "rescan_pending":
print("rescan subscribed channels")
update_subscribed.delay()
elif task == "ignore":
print("ignore video")
handler = PendingList()
ignore_list = item["status"]
handler.ignore_from_pending([ignore_list])
elif task == "dl_pending":
print("download pending")
download_pending.delay()
elif task == "unsubscribe":
channel_id_unsub = item["status"]
print("unsubscribe from " + channel_id_unsub)
ChannelSubscription().change_subscribe(
channel_id_unsub, channel_subscribed=False
)
elif task == "sort_order":
sort_order = item["status"]
set_message("sort_order", sort_order, expire=False)
elif task == "hide_watched":
hide_watched = bool(int(item["status"]))
print(item["status"])
set_message("hide_watched", hide_watched, expire=False)
elif task == "show_subed_only":
show_subed_only = bool(int(item["status"]))
print(show_subed_only)
set_message("show_subed_only", show_subed_only, expire=False)
elif task == "channel-search":
search_query = item["status"]
print("searching for: " + search_query)
search_results = self.search_channels(search_query)
return search_results
elif task == "video-search":
search_query = item["status"]
print("searching for: " + search_query)
search_results = self.search_videos(search_query)
return search_results
elif task == "dlnow":
youtube_id = item["status"]
print("downloading: " + youtube_id)
download_single.delay(youtube_id=youtube_id)
elif task == "manual-import":
print("starting manual import")
run_manual_import.delay()
elif task == "db-backup":
print("backing up database")
run_backup.delay()
elif task == "db-restore":
print("restoring index from backup zip")
run_restore_backup.delay()
"""execute and return task result"""
to_exec = self.exec_map()
task_result = to_exec()
return task_result
def exec_map(self):
"""map dict key and return function to execute"""
exec_map = {
"watched": self.watched,
"rescan_pending": self.rescan_pending,
"ignore": self.ignore,
"dl_pending": self.dl_pending,
"unsubscribe": self.unsubscribe,
"sort_order": self.sort_order,
"hide_watched": self.hide_watched,
"show_subed_only": self.show_subed_only,
"dlnow": self.dlnow,
"manual-import": self.manual_import,
"db-backup": self.db_backup,
"db-restore": self.db_restore,
"channel-search": self.channel_search,
}
return exec_map[self.to_exec]
def watched(self):
"""mark as watched"""
WatchState(self.exec_val).mark_as_watched()
return {"success": True}
def search_channels(self, search_query):
"""fancy searching channels as you type"""
url = self.ES_URL + "/ta_channel/_search"
data = {
"size": 10,
"query": {
"multi_match": {
"query": search_query,
"type": "bool_prefix",
"fields": [
"channel_name.search_as_you_type",
"channel_name._2gram",
"channel_name._3gram",
],
}
},
}
look_up = SearchHandler(url, data, cache=False)
search_results = look_up.get_data()
return {"results": search_results}
@staticmethod
def rescan_pending():
"""look for new items in subscribed channels"""
print("rescan subscribed channels")
update_subscribed.delay()
return {"success": True}
def search_videos(self, search_query):
"""fancy searching videos as you type"""
url = self.ES_URL + "/ta_video/_search"
data = {
"size": 10,
"query": {
"multi_match": {
"query": search_query,
"type": "bool_prefix",
"fields": [
"title.search_as_you_type",
"title._2gram",
"title._3gram",
],
}
},
}
look_up = SearchHandler(url, data, cache=False)
search_results = look_up.get_data()
return {"results": search_results}
def ignore(self):
"""ignore from download queue"""
print("ignore video")
handler = PendingList()
ignore_list = self.exec_val
handler.ignore_from_pending([ignore_list])
return {"success": True}
def parse_watched(self, youtube_id):
"""marked as watched based on id type"""
es_url = self.ES_URL
id_type = process_url_list([youtube_id])[0]["type"]
stamp = int(datetime.now().strftime("%s"))
if id_type == "video":
stamp = int(datetime.now().strftime("%s"))
url = self.ES_URL + "/ta_video/_update/" + youtube_id
source = {
"doc": {"player": {"watched": True, "watched_date": stamp}}
}
request = requests.post(url, json=source)
if not request.ok:
print(request.text)
elif id_type == "channel":
headers = {"Content-type": "application/json"}
data = {
"description": youtube_id,
"processors": [
{"set": {"field": "player.watched", "value": True}},
{"set": {"field": "player.watched_date", "value": stamp}},
],
}
payload = json.dumps(data)
url = es_url + "/_ingest/pipeline/" + youtube_id
request = requests.put(url, data=payload, headers=headers)
if not request.ok:
print(request.text)
# apply pipeline
must_list = [
{"term": {"channel.channel_id": {"value": youtube_id}}},
{"term": {"player.watched": {"value": False}}},
]
data = {"query": {"bool": {"must": must_list}}}
payload = json.dumps(data)
url = f"{es_url}/ta_video/_update_by_query?pipeline={youtube_id}"
request = requests.post(url, data=payload, headers=headers)
if not request.ok:
print(request.text)
@staticmethod
def dl_pending():
"""start the download queue"""
print("download pending")
download_pending.delay()
return {"success": True}
def unsubscribe(self):
"""unsubscribe from channel"""
channel_id_unsub = self.exec_val
print("unsubscribe from " + channel_id_unsub)
ChannelSubscription().change_subscribe(
channel_id_unsub, channel_subscribed=False
)
return {"success": True}
def sort_order(self):
"""change the sort between published to downloaded"""
sort_order = self.exec_val
set_message("sort_order", sort_order, expire=False)
return {"success": True}
def hide_watched(self):
"""toggle if to show watched vids or not"""
hide_watched = bool(int(self.exec_val))
print(f"hide watched: {hide_watched}")
set_message("hide_watched", hide_watched, expire=False)
return {"success": True}
def show_subed_only(self):
"""show or hide subscribed channels only on channels page"""
show_subed_only = bool(int(self.exec_val))
print(f"show subed only: {show_subed_only}")
set_message("show_subed_only", show_subed_only, expire=False)
return {"success": True}
def dlnow(self):
"""start downloading single vid now"""
youtube_id = self.exec_val
print("downloading: " + youtube_id)
download_single.delay(youtube_id=youtube_id)
return {"success": True}
@staticmethod
def manual_import():
"""run manual import from settings page"""
print("starting manual import")
run_manual_import.delay()
return {"success": True}
@staticmethod
def db_backup():
"""backup es to zip from settings page"""
print("backing up database")
run_backup.delay()
return {"success": True}
@staticmethod
def db_restore():
"""restore es zip from settings page"""
print("restoring index from backup zip")
run_restore_backup.delay()
return {"success": True}
def channel_search(self):
"""search for channel name as_you_type"""
search_query = self.exec_val
print("searching for: " + search_query)
search_results = SearchForm().search_channels(search_query)
return search_results