import logging from . import configuration import socket import random import json logger = logging.getLogger(__name__) def signal_send(recipients: list[str], message: str) -> None: def to_dest(recipient: str) -> dict[str, str]: if recipient.startswith("G:"): return {"groupId": recipient[2:]} return {"userId": recipient} with socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as signal_json: signal_json.connect(configuration.SIGNAL_SOCKET) for recipient in recipients: payload = { "jsonrpc": "2.0", "method": "send", "params": {"message": message, **to_dest(recipient)}, "id": random.randint(0, (1 << 24)), } signal_json.send(json.dumps(payload).encode("utf-8"))