mirror of
https://git.sr.ht/~coasteen/dotfiles
synced 2025-11-04 14:47:38 +01:00
78 lines
2.1 KiB
Text
78 lines
2.1 KiB
Text
|
|
#!/usr/bin/env python3
|
||
|
|
import os
|
||
|
|
import subprocess
|
||
|
|
import time
|
||
|
|
from pathlib import Path
|
||
|
|
import threading
|
||
|
|
import json
|
||
|
|
import tempfile
|
||
|
|
import urllib.request
|
||
|
|
import websocket
|
||
|
|
import signal
|
||
|
|
|
||
|
|
URL = "https://radio.animebits.moe/stream/stream128.ogg"
|
||
|
|
CONF = Path.home() / ".config" / "radiorc"
|
||
|
|
PID = Path.home() / ".config" / "radio-pid"
|
||
|
|
|
||
|
|
with open(CONF) as f:
|
||
|
|
VOL = f.read().strip()
|
||
|
|
|
||
|
|
def notify(title, image_url=None):
|
||
|
|
icon_path = None
|
||
|
|
if image_url:
|
||
|
|
try:
|
||
|
|
tmp = tempfile.NamedTemporaryFile(delete=False, suffix=".png")
|
||
|
|
urllib.request.urlretrieve(image_url, tmp.name)
|
||
|
|
icon_path = tmp.name
|
||
|
|
except:
|
||
|
|
icon_path = None
|
||
|
|
args = ["notify-send", title]
|
||
|
|
if icon_path:
|
||
|
|
args += ["-i", icon_path]
|
||
|
|
subprocess.run(args)
|
||
|
|
|
||
|
|
def ws_listener():
|
||
|
|
def on_message(ws, message):
|
||
|
|
data = json.loads(message)
|
||
|
|
if "data" in data and "title" in data["data"]:
|
||
|
|
title = data["data"]["title"]
|
||
|
|
image = data["data"].get("album_cover")
|
||
|
|
notify(title, image)
|
||
|
|
|
||
|
|
def on_error(ws, error): pass
|
||
|
|
def on_close(ws, close_status_code, close_msg):
|
||
|
|
time.sleep(1)
|
||
|
|
run_ws()
|
||
|
|
def on_open(ws): pass
|
||
|
|
|
||
|
|
def run_ws():
|
||
|
|
ws = websocket.WebSocketApp(
|
||
|
|
"wss://radio.animebits.moe/api/events/basic",
|
||
|
|
on_message=on_message,
|
||
|
|
on_error=on_error,
|
||
|
|
on_close=on_close,
|
||
|
|
on_open=on_open,
|
||
|
|
)
|
||
|
|
ws.run_forever()
|
||
|
|
|
||
|
|
run_ws()
|
||
|
|
|
||
|
|
threading.Thread(target=ws_listener, daemon=True).start()
|
||
|
|
|
||
|
|
while True:
|
||
|
|
try:
|
||
|
|
print("Playing now...")
|
||
|
|
subprocess.run(["notify-send", "Playing now..."])
|
||
|
|
proc = subprocess.Popen(["mpv", "--no-video", f"--volume={VOL}", URL], preexec_fn=os.setsid)
|
||
|
|
with open(PID, "w") as f:
|
||
|
|
f.write(str(os.getpgid(proc.pid)))
|
||
|
|
proc.wait()
|
||
|
|
except KeyboardInterrupt:
|
||
|
|
subprocess.run(["notify-send", "Stopping now..."])
|
||
|
|
print("\nStopping now...")
|
||
|
|
break
|
||
|
|
except Exception as e:
|
||
|
|
print(f"Something broke... trying again... Error: {e}")
|
||
|
|
time.sleep(2)
|
||
|
|
|