You asked for my python script but now I can’t seem to load that comment to reply directly to it. Anyway, here’s the script, I haven’t bothered to upload the repo anywhere. I’m sure it isn’t perfect but it works fine for me. The action for opening evolution when you click the tray icon is specific to hyprland so will probably need to be modified to suit your needs.
You asked for my python script but now I can’t seem to load that comment to reply directly to it. Anyway, here’s the script, I haven’t bothered to upload the repo anywhere. I’m sure it isn’t perfect but it works fine for me. The action for opening evolution when you click the tray icon is specific to hyprland so will probably need to be modified to suit your needs.
import asyncio import concurrent.futures import logging import signal import sqlite3 import sys from pathlib import Path from subprocess import run import pkg_resources from inotify_simple import INotify, flags from PySimpleGUIQt import SystemTray menu_def = ["BLANK", ["Exit"]] empty_icon = pkg_resources.resource_filename( "evolution_tray", "resources/inbox-empty.svg" ) full_icon = pkg_resources.resource_filename( "evolution_tray", "resources/inbox-full.svg" ) inotify = INotify() tray = SystemTray(filename=empty_icon, menu=menu_def, tooltip="Inbox empty") logging.getLogger("asyncio").setLevel(logging.WARNING) handler = logging.StreamHandler(sys.stdout) logger = logging.getLogger() logger.setLevel("DEBUG") logger.addHandler(handler) def handle_menu_events(): while True: menu_item = tray.read() if menu_item == "Exit": signal.raise_signal(signal.SIGTERM) elif menu_item == "__ACTIVATED__": run(["hyprctl", "dispatch", "exec", "evolution"]) # tray.update(filename=paused_icon) logger.info("Opened evolution") def get_all_databases(): cache_path = Path.home() / ".cache" / "evolution" / "mail" return list(cache_path.glob("**/folders.db")) def check_unread() -> int: unread = 0 for db in get_all_databases(): conn = sqlite3.connect(db) cursor = conn.cursor() try: cursor.execute("select count(*) read from INBOX where read == 0") unread += cursor.fetchone()[0] except: pass finally: conn.close() if unread > 0: tray.update(filename=full_icon, tooltip=f"{unread} unread emails") else: tray.update(filename=empty_icon, tooltip="Inbox empty") return unread def watch_inbox(): while True: for database in get_all_databases(): inotify.add_watch(database, mask=flags.MODIFY) while inotify.read(): logger.info("New mail") logger.info(f"{check_unread()} new emails") async def main(): executor = concurrent.futures.ThreadPoolExecutor(max_workers=1) loop = asyncio.get_running_loop() check_unread() watch_task = asyncio.wait( fs={ loop.run_in_executor(executor, watch_inbox), }, return_when=asyncio.FIRST_COMPLETED, ) await asyncio.gather(watch_task, loop.create_task(handle_menu_events())) def entrypoint(): signal.signal(signal.SIGINT, signal.SIG_DFL) signal.signal(signal.SIGTERM, signal.SIG_DFL) try: asyncio.run(main()) except Exception as e: logger.exception(e) if __name__ == "__main__": entrypoint()