import sqlite3 from datetime import datetime class Today_Queue_Status: queue:str="queue" waiting:str="waiting" failed_search:str="failed search" search_found:str="search found" downloading:str="downloading" failed_download:str="failed download" downloaded:str="downloaded" ## finished importing:str="importing" failed_import:str="failed import" imported:str="imported" completed:str="completed" ## finished uploading:str="uploading" uploaded:str="uploaded" fail_upload:str="failed upload" class sqlite_db: def __init__(self,db_path, **kwargs): super().__init__(**kwargs) self.db_path=db_path def find_title_config_db(self,title): try: conn = sqlite3.connect(self.db_path) conn.row_factory = sqlite3.Row # Enables dictionary access cursor = conn.cursor() cursor.execute("SELECT * FROM watchlist WHERE Title = ?", (title,)) title_data = cursor.fetchone() conn.close() return dict(title_data) if title_data is not None else None except Exception: return def add_today_queue(self,queue,is_clear_queue:bool=False,is_clear_title:bool=False): conn = sqlite3.connect(self.db_path) cursor = conn.cursor() if is_clear_queue: cursor.execute("DROP TABLE IF EXISTS today_queue") conn.commit() if is_clear_title: cursor.execute("DELETE FROM today_queue WHERE title = ? AND season = ? AND episode = ?", (queue[0]['title'], queue[0]['season'], queue[0]['episode'])) conn.commit() cursor.execute(""" CREATE TABLE IF NOT EXISTS today_queue ( title TEXT, season INTEGER, episode INTEGER, start_timestamp INTEGER, status TEXT DEFAULT queue ) """) cursor.executemany(""" INSERT OR IGNORE INTO today_queue (title, season, episode, start_timestamp ) VALUES (:title, :season, :episode, :start_timestamp) """, queue) conn.commit() conn.close() def add_download_history(self,queue,is_clear_queue:bool=False,is_clear_title:bool=False): # print(queue) conn = sqlite3.connect(self.db_path) cursor = conn.cursor() if is_clear_queue: cursor.execute("DROP TABLE IF EXISTS download_history") conn.commit() if is_clear_title: cursor.execute("DELETE FROM today_queue WHERE title = ? AND season = ? AND episode = ?", (queue[0]['title'], queue[0]['season'], queue[0]['episode'])) conn.commit() cursor.execute(""" CREATE TABLE IF NOT EXISTS download_history ( title TEXT, season INTEGER, episode INTEGER, start_timestamp INTEGER, status TEXT DEFAULT queue ) """) cursor.executemany(""" INSERT OR IGNORE INTO download_history (title, season, episode, start_timestamp) VALUES (:title, :season, :episode, :start_timestamp) """, queue) conn.commit() conn.close() def get_today_queue(self): conn = sqlite3.connect(self.db_path) conn.row_factory = sqlite3.Row cursor = conn.cursor() cursor.execute("SELECT * FROM today_queue") today_queue = cursor.fetchall() conn.close() return[dict(x) for x in today_queue] if today_queue is not None else None def get_watchlist(self): conn = sqlite3.connect(self.db_path) conn.row_factory = sqlite3.Row cursor = conn.cursor() cursor.execute("SELECT * FROM watchlist") watchlist = cursor.fetchall() conn.close() return [dict(x) for x in watchlist] if watchlist is not None else None def get_schedule(self): conn = sqlite3.connect(self.db_path) conn.row_factory = sqlite3.Row cursor = conn.cursor() cursor.execute("SELECT * FROM schedule") schedule = cursor.fetchall() conn.close() return [dict(x) for x in schedule] if schedule is not None else None def add_watchlist(self, entry): conn = sqlite3.connect(self.db_path) cursor = conn.cursor() cursor.execute(""" INSERT OR IGNORE INTO watchlist (ID, Service, Title, if_dub, url, url_org, audio_lang, sub_lang, quality, codec, range, audio_channel, title_lang, org_lang, season) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) """, (entry['ID'], entry['Service'], entry['Title'], entry['if_dub'], entry['url'], entry['url_org'], entry['audio_lang'], entry['sub_lang'], entry['quality'], entry['codec'], entry['range'], entry['audio_channel'], entry['title_lang'], entry['org_lang'], entry['season'])) conn.commit() conn.close() def update_download_status(self, title, season, episode, status): conn = sqlite3.connect(self.db_path) cursor = conn.cursor() cursor.execute(""" UPDATE today_queue SET status = ? WHERE title = ? AND season = ? AND episode = ? """, (status, title, season, episode)) conn.commit() conn.close() def update_download_history_status(self, title, season, episode, status): conn = sqlite3.connect(self.db_path) cursor = conn.cursor() cursor.execute(""" UPDATE download_history SET status = ? WHERE title = ? AND season = ? AND episode = ? """, (status, title, season, episode)) conn.commit() conn.close() def get_download_status(self, title, season,episode): # print('test') conn = sqlite3.connect(self.db_path) conn.row_factory = sqlite3.Row cursor = conn.cursor() cursor.execute(""" SELECT status FROM today_queue WHERE title = ? AND season = ? AND episode = ? """, (title, season, episode)) status = cursor.fetchone() conn.close() # print('testss') return dict(status) if status is not None else None # def get_overwrite_schedule(self) -> list[dict]: # overwrite_entries = [] # weekday_map = { # 'Monday': 1, # 'Tuesday': 2, # 'Wednesday': 3, # 'Thursday': 4, # 'Friday': 5, # 'Saturday': 6, # 'Sunday': 7 # } # conn = sqlite3.connect(self.db_path) # conn.row_factory = sqlite3.Row # cursor = conn.cursor() # cursor.execute(""" # SELECT * FROM schedule # WHERE title = ? # """, (entry['title'],)) # overwrite_schedule = cursor.fetchone() # conn.close() # overwrite_schedule = dict(overwrite_schedule) if overwrite_schedule is not None else {} # dt=datetime.fromtimestamp(entry['start_timestamp']) # if overwrite_schedule.get('air_time'): # iso_time = datetime.fromisoformat(f"{dt.year}-{dt.month:02}-{dt.day:02}T{overwrite_schedule['air_time'][:2]:02}:{overwrite_schedule['air_time'][2:]:02}:{dt.second:02}") # dt_overwrite = int(iso_time.timestamp()) # entry['start_timestamp'] = dt_overwrite # if overwrite_schedule.get('offset') and overwrite_schedule.get('offset') != 0: # entry['episode'] += overwrite_schedule['offset'] # if overwrite_schedule.get('day_of_week'): # for dow in overwrite_schedule.get('day_of_week').split(','): # day_of_week = dow # if isinstance(day_of_week, str): # day_of_week = weekday_map.get(day_of_week, None) # if day_of_week is None: # continue # current_day = dt.isoweekday() # days_difference = (day_of_week - current_day) # if days_difference == 0: # continue # # print(days_difference) # print(entry['title'],entry['episode']) # # print(datetime.fromtimestamp(entry['start_timestamp'])) # # print(datetime.fromtimestamp(entry['start_timestamp'])) # dt = datetime.fromtimestamp(entry['start_timestamp']).replace(hour=int(overwrite_schedule['air_time'][:2]), minute=int(overwrite_schedule['air_time'][2:]), second=0, microsecond=0) + timedelta(days=days_difference) # # print(dt) # entry['start_timestamp'] = int(dt.timestamp()) # print(datetime.fromtimestamp(entry['start_timestamp'])) # # print(datetime.today().strftime('%Y-%m-%d')) # # print(datetime.fromtimestamp(entry['start_timestamp'])) # if datetime.today().strftime('%Y-%m-%d') != datetime.fromtimestamp(entry['start_timestamp']).strftime('%Y-%m-%d') : # continue # overwrite_entries.append(entry) # return overwrite_entries def add_overwrite_schedule(self, entry) -> str: conn = sqlite3.connect(self.db_path) conn.row_factory = sqlite3.Row cursor = conn.cursor() cursor.execute("SELECT * FROM schedule WHERE title = ?", (entry['title'],)) title= cursor.fetchone() title = dict(title) if title is not None else None if title == entry: conn.close() return 'No changes made, entry already exists.' cursor.execute(""" INSERT INTO schedule (title, air_time, day_of_week, offset) VALUES (?, ?, ?, ?) ON CONFLICT(title) DO UPDATE SET air_time=excluded.air_time, day_of_week=excluded.day_of_week, offset=excluded.offset """, (entry['title'], entry['air_time'], entry['day_of_week'], entry['offset'])) conn.commit() conn.close() return 'Entry added or updated successfully.' def get_today_schedule(self): weekday_map = { 'Monday': 1, 'Tuesday': 2, 'Wednesday': 3, 'Thursday': 4, 'Friday': 5, 'Saturday': 6, 'Sunday': 7 } schedule=self.get_schedule() watchlist=self.get_watchlist() today_list=[] for entry in schedule: result = next((item for item in watchlist if item['Title'] == entry['title']), None) # print(entry) if entry['last_ep'] >= entry['end_ep']: continue if not entry['day_of_week']: continue for dow in entry['day_of_week'].split(','): if datetime.today().isoweekday() != weekday_map[dow.strip()]: continue if entry['last_date'] == datetime.now().date().strftime('%Y-%m-%d'): continue timestamp = int(datetime.now().replace(hour=int(entry['air_time'][:2]), minute=int(entry['air_time'][2:]), second=0, microsecond=0).timestamp()) for i in range(entry['multi_release']): detail ={ "title": result['Title'], "season": int(result['season']) if isinstance(result['season'], int) else 1, "episode": ((int(entry['last_ep'])) if entry['last_ep'] is not None else 0) + +i+1, "sonarr_id": result['ID'], "air_time": entry['air_time'], "day_of_week": entry['day_of_week'], "offset": entry['offset'], "start_timestamp":timestamp + (60*(i*5)) } # print(detail) today_list.append(detail) today_list = sorted(today_list, key=lambda x: x["start_timestamp"]) return today_list def update_schedule_episode(self, title, episode): conn = sqlite3.connect(self.db_path) cursor = conn.cursor() cursor.execute(""" UPDATE schedule SET last_ep = ?, last_date = ? WHERE title = ? AND last_ep = ? """, (episode, datetime.now().date().strftime('%Y-%m-%d'), title, episode-1)) conn.commit() conn.close() def get_show_by_date(self,date): weekday_map = { 1: 'Monday', 2: 'Tuesday', 3: 'Wednesday', 4: 'Thursday', 5: 'Friday', 6: 'Saturday', 7: 'Sunday' } # if isinstance(date, int): # date = list(weekday_map.keys())[list(weekday_map.values()).index(date)] # date = (datetime.now() + timedelta((weekday_map[date] - datetime.now().isoweekday()) % 7)).strftime('%Y-%m-%d') conn = sqlite3.connect(self.db_path) conn.row_factory = sqlite3.Row cursor = conn.cursor() # print(weekday_map[date]) cursor.execute(""" SELECT * FROM schedule WHERE day_of_week LIKE ? """, (f"%{weekday_map[date]}%",)) shows = cursor.fetchall() conn.close() return [dict(x) for x in shows] if shows is not None else None def get_show_by_title(self,title): # if isinstance(date, int): # date = list(weekday_map.keys())[list(weekday_map.values()).index(date)] # date = (datetime.now() + timedelta((weekday_map[date] - datetime.now().isoweekday()) % 7)).strftime('%Y-%m-%d') conn = sqlite3.connect(self.db_path) conn.row_factory = sqlite3.Row cursor = conn.cursor() # print(weekday_map[date]) cursor.execute(""" SELECT * FROM schedule WHERE title = ? """, (title,)) shows = cursor.fetchall() conn.close() return [dict(x) for x in shows] if shows is not None else None def get_torrent_detail(self,title): conn = sqlite3.connect(self.db_path) conn.row_factory = sqlite3.Row cursor = conn.cursor() cursor.execute(""" SELECT * FROM torrent WHERE title = ? """, (title,)) torrent = cursor.fetchone() conn.close() # print('testss') return dict(torrent) if torrent is not None else None def update_torrent_detail(self, title,qbit_name, episode): conn = sqlite3.connect(self.db_path) cursor = conn.cursor() cursor.execute(""" UPDATE torrent SET last_ep = ?, qbit_name = ? WHERE title = ? """, (episode,qbit_name,title)) conn.commit() conn.close()