232 lines
7.9 KiB
Python
232 lines
7.9 KiB
Python
|
||
import logging
|
||
from logging.handlers import TimedRotatingFileHandler
|
||
from pathlib import Path
|
||
# from rich.logging import RichHandler # optional
|
||
from discord import SyncWebhook
|
||
import asyncio
|
||
|
||
from dotenv import dotenv_values
|
||
import requests
|
||
import discord
|
||
|
||
class logger:
|
||
def __init__(
|
||
self,
|
||
app_name="app",
|
||
log_dir="../log",
|
||
gotify_config=None,
|
||
discord_config=None,
|
||
level=logging.DEBUG,
|
||
use_utc=False, # rotate at UTC midnight if True
|
||
keep_days=7, # retention
|
||
):
|
||
"""
|
||
Continuous app logging with daily rotation.
|
||
Current file name: <log_dir>/<app_name>.log
|
||
Rotated backups: <app_name>.log.YYYY-MM-DD
|
||
"""
|
||
# ---- Ensure directory ----
|
||
log_dir_path = Path(log_dir)
|
||
log_dir_path.mkdir(parents=True, exist_ok=True)
|
||
|
||
self.app_name=app_name
|
||
self.log_dir_path=log_dir_path
|
||
|
||
base_log_path = log_dir_path / f"{app_name}.log"
|
||
|
||
# ---- Formatter using `{}` style ----
|
||
LOG_FORMAT = "{asctime} [{levelname[0]}] {name} : {message}"
|
||
LOG_DATE_FORMAT = "%Y-%m-%d %H:%M:%S"
|
||
LOG_STYLE = "{"
|
||
LOG_FORMATTER = logging.Formatter(LOG_FORMAT, LOG_DATE_FORMAT, LOG_STYLE)
|
||
|
||
|
||
# ---- Create handlers ----
|
||
# Console (basic StreamHandler; swap to RichHandler if you want pretty output)
|
||
console_handler = logging.StreamHandler()
|
||
console_handler.setLevel(level)
|
||
console_handler.setFormatter(LOG_FORMATTER)
|
||
|
||
|
||
# Timed rotating file handler: rotates at midnight, keeps last N days
|
||
file_handler = TimedRotatingFileHandler(
|
||
filename=str(base_log_path),
|
||
when="midnight",
|
||
interval=1,
|
||
backupCount=keep_days,
|
||
encoding="utf-8",
|
||
utc=use_utc,
|
||
)
|
||
# Suffix for rotated files (defaults to app.log.YYYY-MM-DD)
|
||
file_handler.suffix = "%Y-%m-%d"
|
||
file_handler.setLevel(level)
|
||
file_handler.setFormatter(LOG_FORMATTER)
|
||
|
||
# ---- Configure a dedicated logger to avoid duplicating handlers ----
|
||
self.logger = logging.getLogger(app_name) # change name if needed
|
||
self.logger.setLevel(level)
|
||
|
||
# Remove any pre-existing handlers (optional, but avoids silent conflicts)
|
||
for h in list(self.logger.handlers):
|
||
self.logger.removeHandler(h)
|
||
|
||
self.logger.addHandler(console_handler)
|
||
self.logger.addHandler(file_handler)
|
||
|
||
|
||
# Optional: stop propagation to root to prevent double logging if root is configured elsewhere
|
||
self.logger.propagate = False
|
||
|
||
# ---- Instance state ----
|
||
self.client = None
|
||
self.worker_started = False
|
||
self.queue = asyncio.Queue()
|
||
|
||
# ---- Notification configs ----
|
||
if gotify_config:
|
||
self.gotify_token = gotify_config if isinstance(gotify_config, str) else gotify_config.get("token")
|
||
if self.gotify_token:
|
||
self.url = f"https://gotify.panitan.net/message?token={self.gotify_token}"
|
||
else:
|
||
self.url = None
|
||
self.logger.warning("Gotify token missing in config.")
|
||
else:
|
||
self.url = None
|
||
self.gotify_token = None
|
||
|
||
if discord_config:
|
||
self.discord_channel_id = discord_config
|
||
|
||
# Inform where we’re logging
|
||
self.logger.info(f"Logging to {base_log_path} (rotates daily, keep {keep_days} days).")
|
||
|
||
# ---- Internal helper ----
|
||
def _log_lines(self, message, log_level):
|
||
message = str(message)
|
||
for line in message.split('\n'):
|
||
if line:
|
||
self.logger.log(log_level, line,exc_info=log_level==logging.ERROR)
|
||
|
||
|
||
# ---- Public log APIs ----
|
||
def log(self, *message, is_gotify=False, is_discord: dict = None, image_url=None):
|
||
message = " ".join(str(m) for m in message)
|
||
self._log_lines(message, logging.INFO)
|
||
try:
|
||
if is_gotify:
|
||
self.gotify(message, "Logging", image_url)
|
||
if is_discord:
|
||
self.discord(is_discord)
|
||
except Exception:
|
||
return
|
||
|
||
def debug(self, *message, is_gotify=False, is_discord=None, image_url=None):
|
||
message = " ".join(str(m) for m in message)
|
||
self._log_lines(message, logging.DEBUG)
|
||
try:
|
||
if is_gotify:
|
||
self.gotify(message, "Debug", image_url)
|
||
if is_discord:
|
||
self.discord(is_discord)
|
||
except Exception:
|
||
return
|
||
|
||
def error(self, *message, is_gotify=False, is_discord=None, image_url=None):
|
||
message = " ".join(str(m) for m in message)
|
||
self._log_lines(message, logging.ERROR)
|
||
try:
|
||
if is_gotify:
|
||
self.gotify(message, "Error", image_url)
|
||
if is_discord:
|
||
self.discord(is_discord)
|
||
except Exception:
|
||
return
|
||
|
||
def warn(self, *message, is_gotify=False, is_discord=None, image_url=None):
|
||
message = " ".join(str(m) for m in message)
|
||
self._log_lines(message, logging.WARN)
|
||
try:
|
||
if is_gotify:
|
||
self.gotify(message, "Warning", image_url)
|
||
if is_discord:
|
||
self.discord(is_discord)
|
||
except Exception:
|
||
return
|
||
|
||
# ---- Notifiers ----
|
||
def gotify(self, msg, title, image_url=None):
|
||
|
||
if not self.url or not self.gotify_token:
|
||
self.logger.warning("Gotify not configured; skipping notification.")
|
||
# time.sleep(2)
|
||
return
|
||
|
||
if image_url:
|
||
msg = f"{msg}\n\n!Image"
|
||
|
||
try:
|
||
requests.post(
|
||
self.url,
|
||
json={
|
||
"message": msg,
|
||
"title": title,
|
||
"extras": {"client::display": {"contentType": "text/markdown"}}
|
||
},
|
||
headers={"X-Gotify-Key": self.gotify_token},
|
||
timeout=10,
|
||
)
|
||
except Exception as e:
|
||
self.logger.error(f"Gotify notification failed: {e}")
|
||
# time.sleep(2)
|
||
|
||
def discord(self, config: dict):
|
||
channel = config.get("channel")
|
||
embed = config.get("embed")
|
||
web_hook_urls = config.get("web_hook_urls",[])
|
||
if not channel and embed:
|
||
return
|
||
try:
|
||
if self.client is None:
|
||
|
||
self.client = discord.Client(intents=discord.Intents.default())
|
||
|
||
if not self.worker_started:
|
||
self.client.loop.create_task(self.worker())
|
||
self.worker_started = True
|
||
|
||
self.queue.put_nowait((channel, embed, web_hook_urls))
|
||
# async def send_message():
|
||
# await self.client.wait_until_ready()
|
||
# await channel.send(embed=embed)
|
||
# for url in web_hook_urls:
|
||
# webhook = SyncWebhook.from_url(url)
|
||
# webhook.send(embed=embed)
|
||
|
||
|
||
# self.client.loop.create_task(send_message())
|
||
except Exception as e:
|
||
self.logger.error(f"Discord notification failed: {e}")
|
||
# time.sleep(2)
|
||
|
||
async def worker(self):
|
||
await self.client.wait_until_ready()
|
||
|
||
while True:
|
||
channel, embed, web_hook_urls = await self.queue.get()
|
||
|
||
try:
|
||
await channel.send(embed=embed)
|
||
|
||
for url in web_hook_urls:
|
||
webhook = SyncWebhook.from_url(url)
|
||
webhook.send(embed=embed)
|
||
|
||
except Exception as e:
|
||
self.logger.error(f"Discord send error: {e}")
|
||
|
||
self.queue.task_done()
|
||
if __name__ == "__main__":
|
||
console = logger(app_name="scheduler",log_dir="./log",gotify_config=dotenv_values('.env').get("gotify_token"),discord_config=dotenv_values('.env')['DISCORD_CHANNEL_ID'],level=logging.DEBUG)
|
||
print
|
||
console.log("This is a test log message.","blah", is_gotify=True, is_discord={"channel": None, "embed": None, "web_hook_urls": []}) |