From f0a7869888c5247eea4ab5a4f0a5ac659a613969 Mon Sep 17 00:00:00 2001 From: mid <> Date: Sun, 28 Dec 2025 09:51:55 +0200 Subject: [PATCH] Big ass update --- gateway_discord/__init__.py | 44 ++++++-- gateway_matrix/__init__.py | 214 ++++++++++++++++++++++++++++++++++++ gateway_xmpp/__init__.py | 65 +++++++++-- main.py | 16 ++- 4 files changed, 320 insertions(+), 19 deletions(-) create mode 100644 gateway_matrix/__init__.py diff --git a/gateway_discord/__init__.py b/gateway_discord/__init__.py index 10bb1d3..b6ae1c1 100644 --- a/gateway_discord/__init__.py +++ b/gateway_discord/__init__.py @@ -1,5 +1,5 @@ import os, re, hashlib, sys, traceback -import asyncio +import asyncio, aiohttp import nextcord class GatewayDiscord(nextcord.Client): @@ -28,9 +28,14 @@ class GatewayDiscord(nextcord.Client): self.active_users = {} self.new_user_queue = [] + + self.avatar_cache_url = {} + self.avatar_cache = {} + self.webhook_avatars = {} def new_user(identification, nick_name): if identification[0] == "discord" and identification[1] == self.gateway_name: + # Ignore ours return self.new_user_queue.append(identification) @@ -39,7 +44,7 @@ class GatewayDiscord(nextcord.Client): self.handle_new_user_queue() PLEXUS.sub("new_user", new_user) - def message(msg_unique_id, room_name, nick_name, gateway_type, gateway_name, unique_id, body, attachments, was_edit): + def message(msg_unique_id, room_name, nick_name, avatar, gateway_type, gateway_name, unique_id, body, attachments, was_edit): if gateway_type == "discord" and gateway_name == self.gateway_name: # Ignore ours return @@ -57,12 +62,22 @@ class GatewayDiscord(nextcord.Client): asyncio.ensure_future(wh.edit_message(self.xdm_id_to_discord_id[msg_unique_id], content = str(body))) else: - def on_done(fut): - wh_msg = fut.result() + async def aw(): + if avatar is None: + self.webhook_avatars[wh.id] = None + await wh.edit(avatar = None) + else: + current_avatar_hash = hashlib.sha256(avatar).hexdigest() + if self.webhook_avatars[wh.id] != current_avatar_hash: + self.webhook_avatars[wh.id] = current_avatar_hash + await wh.edit(avatar = avatar) + + wh_msg = await wh.send(content = str(body), username = nick_name, wait = True, allowed_mentions = nextcord.AllowedMentions(everyone = False)) + self.xdm_id_to_discord_id[msg_unique_id] = wh_msg.id self.discord_id_to_xdm_id[wh_msg.id] = msg_unique_id - asyncio.ensure_future(wh.send(content = str(body), username = nick_name, wait = True, allowed_mentions = nextcord.AllowedMentions(everyone = False))).add_done_callback(on_done) + asyncio.ensure_future(aw()) PLEXUS.sub("message", message) PLEXUS.sub("ready", lambda: asyncio.ensure_future(self.start(gateway_data["token"]))) @@ -77,6 +92,16 @@ class GatewayDiscord(nextcord.Client): self.active_users[member.id] = {} PLEXUS.pub("new_user", ("discord", self.gateway_name, member.id), member.display_name) + async def cache_avatar(self, memb): + avatar_url = memb.display_avatar.url + avatar_url = avatar_url.split("?")[0] + "?size=128" + if (memb.id not in self.avatar_cache_url) or (self.avatar_cache_url[memb.id] != avatar_url): + self.avatar_cache_url[memb.id] = avatar_url + + async with aiohttp.ClientSession() as session: + async with session.get(avatar_url) as resp: + self.avatar_cache[memb.id] = await resp.read() + async def on_message(self, message): if message.author.bot: # Ignore ours @@ -99,7 +124,9 @@ class GatewayDiscord(nextcord.Client): self.xdm_id_to_discord_id[msg_unique_id] = message.id self.discord_id_to_xdm_id[message.id] = msg_unique_id - PLEXUS.pub("message", msg_unique_id, room_name, message.author.display_name, "discord", self.gateway_name, message.author.id, message.clean_content, [a.url for a in message.attachments], False) + await self.cache_avatar(message.author) + + PLEXUS.pub("message", msg_unique_id, room_name, message.author.display_name, self.avatar_cache[message.author.id], "discord", self.gateway_name, message.author.id, message.clean_content, [a.url for a in message.attachments], False) async def on_message_edit(self, before, after): if before.author.bot: @@ -123,9 +150,11 @@ class GatewayDiscord(nextcord.Client): if before.id not in self.discord_id_to_xdm_id: return + await self.cache_avatar(after.author) + msg_unique_id = self.discord_id_to_xdm_id[after.id] room_name = self.pair_to_room_name[(guild_id, channel_id)] - PLEXUS.pub("message", msg_unique_id, room_name, after.author.display_name, "discord", self.gateway_name, after.author.id, after.clean_content, [a.url for a in after.attachments], True) + PLEXUS.pub("message", msg_unique_id, room_name, after.author.display_name, self.avatar_cache[after.author.id], "discord", self.gateway_name, after.author.id, after.clean_content, [a.url for a in after.attachments], True) async def on_message_delete(self, message): if message.author.bot: @@ -169,6 +198,7 @@ class GatewayDiscord(nextcord.Client): wh = await discord_channel.create_webhook(name = "xdm", reason = "XDM bridge") self.webhooks[gateway["channel"]] = wh + self.webhook_avatars[wh.id] = None self.room_name_to_pair[room_name] = (gateway["guild"], gateway["channel"]) self.pair_to_room_name[(gateway["guild"], gateway["channel"])] = room_name diff --git a/gateway_matrix/__init__.py b/gateway_matrix/__init__.py new file mode 100644 index 0000000..c6dcc57 --- /dev/null +++ b/gateway_matrix/__init__.py @@ -0,0 +1,214 @@ +import mautrix.appservice +import mautrix.types.event.message +import mautrix.types.event.type +import mautrix.api +import asyncio +import hashlib +import os +import asyncache, cachetools +import aiohttp +import re + +# For Matrix, dynamic thumbnails and unauthorized media should be both enabled. + +class GatewayMatrix: + def __init__(self, plexus, settings, gateway_name, gateway_data): + global PLEXUS + PLEXUS = plexus + + self.appserv = mautrix.appservice.AppService(server = gateway_data["server"], domain = gateway_data["domain"], verify_ssl = False, id = gateway_data["id"], as_token = gateway_data["as_token"], hs_token = gateway_data["hs_token"], bot_localpart = gateway_data["bot_localpart"], bridge_name = gateway_data["bridge_name"], loop = asyncio.get_event_loop()) + + self.settings = settings + + self.gateway_name = gateway_name + + self.bot_localpart = gateway_data["bot_localpart"] + self.domain = gateway_data["domain"] + + self.room_name_to_room_id = {} + self.room_id_to_room_name = {} + + self.xdm_id_to_matrix_id = {} + self.matrix_id_to_xdm_id = {} + + self.msg_unique_id_to_event_id = {} + self.event_id_to_msg_unique_id = {} + self.msg_original_sender = {} + + self.my_puppets = {} + + self.active_users = {} + + self.ready = False + + def new_user(identification, nick_name): + if identification[0] == "matrix" and identification[1] == self.gateway_name: + # Ignore ours + return + PLEXUS.sub("new_user", new_user) + + def message(msg_unique_id, room_name, nick_name, avatar, gateway_type, gateway_name, unique_id, body, attachments, was_edit): + if gateway_type == "matrix" and gateway_name == self.gateway_name: + # Ignore ours + return + + if len(attachments): + body = str(body) + "\n\n" + "\n".join([str(a) for a in attachments]) + + puppet = self.load_puppet((gateway_type, gateway_name, unique_id)) + + # Not a problem if this gets called multiple times + if room_name not in puppet["rooms"]: + async def enter_room(): + await puppet["intent"].ensure_joined(room_id = self.room_name_to_room_id[room_name], ignore_cache = True) + return True + puppet["queue"].append(enter_room) + + if was_edit: + async def q_msg(): + await puppet["intent"].send_message_event(room_id = self.room_name_to_room_id[room_name], event_type = mautrix.types.event.type.EventType.ROOM_MESSAGE, content = mautrix.types.event.message.TextMessageEventContent(msgtype = mautrix.types.event.message.MessageType.TEXT, format = mautrix.types.event.message.Format.HTML, formatted_body = body, relates_to = mautrix.types.event.message.RelatesTo(rel_type = mautrix.types.event.message.RelationType.REPLACE, event_id = self.msg_unique_id_to_event_id[msg_unique_id]))) + + return True + puppet["queue"].append(q_msg) + else: + async def q_msg(): + await puppet["intent"].set_displayname(displayname = nick_name, check_current = True) + + avatar_hash = hashlib.sha256(avatar).hexdigest() if (avatar is not None) else None + if puppet["last_avatar_hash"] != avatar_hash: + puppet["last_avatar_hash"] = avatar_hash + await puppet["intent"].set_avatar_url(await puppet["intent"].upload_media(data = avatar)) + + event_id = await puppet["intent"].send_message_event(room_id = self.room_name_to_room_id[room_name], event_type = mautrix.types.event.type.EventType.ROOM_MESSAGE, content = mautrix.types.event.message.TextMessageEventContent(msgtype = mautrix.types.event.message.MessageType.TEXT, format = mautrix.types.event.message.Format.HTML, formatted_body = body)) + + self.msg_unique_id_to_event_id[msg_unique_id] = event_id + self.event_id_to_msg_unique_id[event_id] = msg_unique_id + self.msg_original_sender[msg_unique_id] = (gateway_type, gateway_name, unique_id) + + return True + puppet["queue"].append(q_msg) + PLEXUS.sub("message", message) + + def message_delete(msg_unique_id, room_name, gateway_type, gateway_name, unique_id): + if gateway_type == "matrix" and gateway_name == self.gateway_name: + # Ignore ours + return + + puppet = self.load_puppet((gateway_type, gateway_name, unique_id)) + + async def q_redact(): + await puppet["intent"].redact(room_id = self.room_name_to_room_id[room_name], event_id = self.msg_unique_id_to_event_id[msg_unique_id]) + + return True + puppet["queue"].append(q_redact) + PLEXUS.sub("message_delete", message_delete) + + async def f(): + await self.appserv.start(host = "127.0.0.1", port = gateway_data["as_port"]) + + for room_name, room_gateways in self.settings.ROOMS.items(): + for gateway in room_gateways: + if "matrix" in gateway and gateway["matrix"] == self.gateway_name: + room_id = await self.get_room_id_from_alias(gateway["room_alias"]) + + self.room_name_to_room_id[room_name] = room_id + self.room_id_to_room_name[room_id] = room_name + + await self.appserv.intent.ensure_joined(room_id = room_id, ignore_cache = True) + + self.appserv.matrix_event_handler(self.matrix_event_handler) + + self.ready = True + + asyncio.get_event_loop().create_task(f()) + asyncio.get_event_loop().create_task(self.step_puppets()) + + async def matrix_event_handler(self, event): + if not event.sender or not event.room_id or event.room_id not in self.room_id_to_room_name: + return + + if re.match(re.compile(f"@{re.escape(self.bot_localpart)}[^:]*\\:{re.escape(self.domain)}"), event.sender): + return + + if str(event.type) == "m.room.message": + if str(event.content.msgtype) != "m.text": + return + + was_edit = event.content._relates_to and str(event.content._relates_to.rel_type) == "m.replace" + + if was_edit: + msg_unique_id = self.event_id_to_msg_unique_id[event.content._relates_to.event_id] + else: + msg_unique_id = os.urandom(16) + self.msg_unique_id_to_event_id[msg_unique_id] = event.event_id + self.event_id_to_msg_unique_id[event.event_id] = msg_unique_id + self.msg_original_sender[msg_unique_id] = ("matrix", self.gateway_name, event.sender) + + room_name = self.room_id_to_room_name[event.room_id] + nick_name = (await self.appserv.intent.get_displayname(event.sender)) or event.sender + avatar = await self.get_native_user_avatar(event.sender) + + body = event.content.formatted_body if event.content.format else event.content.body + attachments = [] + + if ("matrix", self.gateway_name, event.sender) not in self.active_users: + self.active_users[("matrix", self.gateway_name, event.sender)] = None + PLEXUS.pub("new_user", ("matrix", self.gateway_name, event.sender), nick_name) + + PLEXUS.pub("message", msg_unique_id, room_name, nick_name, avatar, "matrix", self.gateway_name, event.sender, body, attachments, was_edit) + elif str(event.type) == "m.room.redaction": + if event.redacts not in self.event_id_to_msg_unique_id: + return + if self.event_id_to_msg_unique_id[event.redacts] not in self.msg_original_sender: + return + + PLEXUS.pub("message_delete", self.event_id_to_msg_unique_id[event.redacts], self.room_id_to_room_name[event.room_id], *self.msg_original_sender[self.event_id_to_msg_unique_id[event.redacts]]) + + def load_puppet(self, identification): + if identification in self.my_puppets: + return self.my_puppets[identification] + + puppet_mxid = f"@_uw_bridge_{hashlib.sha256(str(identification).encode('UTF-8')).hexdigest()[:24]}:underware.dev" + + puppet_data = { + "mxid": puppet_mxid, + "rooms": {}, + "queue": [], + "last_avatar_hash": None, + } + + async def get_intent(): + puppet_data["intent"] = self.appserv.intent.user(puppet_mxid) + return True + puppet_data["queue"].append(get_intent) + + self.my_puppets[identification] = puppet_data + + return puppet_data + + async def step_puppets(self): + while not self.ready: + await asyncio.sleep(2) + + while True: + for xdm_id, puppet_data in self.my_puppets.items(): + if len(puppet_data["queue"]) > 0: + if await puppet_data["queue"][0](): + puppet_data["queue"].pop(0) + await asyncio.sleep(0.2) + + # I couldn't find this in Mautrix itself, so wtf + async def get_room_id_from_alias(self, alias): + return (await self.appserv.intent.api.request(method = mautrix.api.Method.GET, path = mautrix.api.PathBuilder(f"_matrix/client/v3/directory/room/{alias}")))["room_id"] + + @asyncache.cached(cachetools.TTLCache(256, 1800)) + async def get_native_user_avatar(self, mxid): + mxc_uri = await self.appserv.intent.get_avatar_url(mxid) + + if mxc_uri: + # https://mementomori.social/@rolle/113732824034474418 + async with self.appserv.http_session.get(str(self.appserv.intent.api.get_download_url(mxc_uri, download_type = "thumbnail")) + "?width=128&height=128&method=scale&allow_redirect=true") as resp: + return await resp.read() + + return None + diff --git a/gateway_xmpp/__init__.py b/gateway_xmpp/__init__.py index 34e101a..79abe3f 100644 --- a/gateway_xmpp/__init__.py +++ b/gateway_xmpp/__init__.py @@ -3,6 +3,9 @@ import asyncio import slixmpp from slixmpp.componentxmpp import ComponentXMPP from slixmpp.types import PresenceArgs +import slixmpp.stanza +import slixmpp.plugins.xep_0084.stanza +import base64, time class GatewayXMPP(ComponentXMPP): def __init__(self, plexus, settings, gateway_name, gateway_data): @@ -29,6 +32,9 @@ class GatewayXMPP(ComponentXMPP): self.xdm_id_to_origin_id = {} self.origin_id_to_xdm_id = {} + + # Native user avatars + self.avatar_cache = {} self.xmpp_ready = False @@ -36,15 +42,18 @@ class GatewayXMPP(ComponentXMPP): self.add_event_handler("groupchat_message", self.message) self.add_event_handler("groupchat_presence", self.groupchat_presence) self.register_handler(slixmpp.xmlstream.handler.Callback("nickname_conflict", slixmpp.xmlstream.matcher.StanzaPath("presence"), self.nickname_conflict)) + self.register_handler(slixmpp.xmlstream.handler.Callback("nickname_conflict", slixmpp.xmlstream.matcher.StanzaPath("iq"), self.iq_funker)) self.register_plugin('xep_0030') # Service Discovery self.register_plugin('xep_0004') # Data Forms self.register_plugin('xep_0060') # PubSub + self.register_plugin('xep_0163') # PEP self.register_plugin('xep_0199') # XMPP Ping self.register_plugin('xep_0045') # MUC self.register_plugin('xep_0359') # Stanza IDs self.register_plugin('xep_0308') # Message correction self.register_plugin('xep_0424') # Message retraction + self.register_plugin('xep_0084') # user avatar def new_user(identification, nick_name): if identification[0] == "xmpp" and identification[1] == self.gateway_name: @@ -59,12 +68,14 @@ class GatewayXMPP(ComponentXMPP): "nicknames": {muc_jid: {"state": "joining", "nick": nick_name} for muc_jid in self.relevant_mucs}, # Messages are added and executed as separate steps - "queue": [] + "queue": [], + + "avatar": {"state": "ready", "desired": None, "timeout": 0} } self.my_puppets[identification] = puppet_data PLEXUS.sub("new_user", new_user) - def message(msg_unique_id, room_name, nick_name, gateway_type, gateway_name, unique_id, body, attachments, was_edit): + def message(msg_unique_id, room_name, nick_name, avatar, gateway_type, gateway_name, unique_id, body, attachments, was_edit): if gateway_type == "xmpp" and gateway_name == self.gateway_name: # Ignore ours return @@ -75,6 +86,11 @@ class GatewayXMPP(ComponentXMPP): if len(attachments): body = str(body) + "\n\n" + "\n".join([str(a) for a in attachments]) + if avatar and (puppet_data["avatar"]["desired"] is None): + puppet_data["avatar"]["desired"] = avatar + puppet_data["avatar"]["state"] = "waiting" + puppet_data["avatar"]["timeout"] = time.time_ns() + def queue_callback(): kwargs = { "mto": muc_jid, "mbody": body, "mtype": "groupchat", "mfrom": puppet_data["jid"] @@ -93,9 +109,16 @@ class GatewayXMPP(ComponentXMPP): self.origin_id_to_xdm_id[msg_unique_id.hex()] = msg_unique_id xmpp_msg["origin_id"]["id"] = msg_unique_id.hex() - print(xmpp_msg) xmpp_msg.send() puppet_data["queue"].append(queue_callback) + + # If the user changed their nickname, try to update it on the puppet + if nick_name != puppet_data["nicknames"][muc_jid]["nick"]: + assert puppet_data["nicknames"][muc_jid]["state"] == "joined" + puppet_data["nicknames"][muc_jid]["state"] = "joining" + puppet_data["nicknames"][muc_jid]["nick"] = nick_name + + PLEXUS.sub("message", message) def message_delete(msg_unique_id, room_name, gateway_type, gateway_name, unique_id): @@ -124,18 +147,27 @@ class GatewayXMPP(ComponentXMPP): # sure they've successfully reserved a nickname in each room. # If a puppet has not successfully joined all rooms, messages are # stored in a queue before being sent. + + def is_puppet_ready(self, puppet_data): + return puppet_data["avatar"]["state"] == "ready" and all([nn["state"] == "joined" for nn in puppet_data["nicknames"].values()]) + async def step_puppets(self): while True: for puppet_data in self.my_puppets.values(): puppet_jid = puppet_data["jid"] - if all([nn["state"] == "joined" for nn in puppet_data["nicknames"].values()]): - # We are good for doing stuff - if len(puppet_data["queue"]): + if self.is_puppet_ready(puppet_data): + if len(puppet_data["queue"]) > 0: cb = puppet_data["queue"].pop(0) cb() else: - # We are still looking for nicknames + # We are still looking for nicknames OR trying to setup an avatar + + if puppet_data["avatar"]["state"] == "waiting" and time.time_ns() >= puppet_data["avatar"]["timeout"]: + pass + + puppet_data["avatar"]["state"] = "ready" + for muc_jid, nn in puppet_data["nicknames"].items(): if nn["state"] == "failed": nn["nick"] = nn["nick"] + " (real)" @@ -175,7 +207,7 @@ class GatewayXMPP(ComponentXMPP): return msg_unique_id = self.base_id_to_xdm_id[msg["replace"]["id"]] - PLEXUS.pub("message", msg_unique_id, self.muc_to_room_name[msg["from"].bare], f"{msg['from'].resource}", "xmpp", self.gateway_name, (msg["from"].bare, msg["from"].resource), msg["body"], [], True) + PLEXUS.pub("message", msg_unique_id, self.muc_to_room_name[msg["from"].bare], f"{msg['from'].resource}", self.avatar_cache[msg["from"].full], "xmpp", self.gateway_name, (msg["from"].bare, msg["from"].resource), msg["body"], [], True) return @@ -187,7 +219,10 @@ class GatewayXMPP(ComponentXMPP): self.xdm_id_to_stanza_id[msg_unique_id] = msg["stanza_id"]["id"] self.stanza_id_to_xdm_id[msg["stanza_id"]["id"]] = msg_unique_id - PLEXUS.pub("message", msg_unique_id, self.muc_to_room_name[msg["from"].bare], f"{msg['from'].resource}", "xmpp", self.gateway_name, (msg["from"].bare, msg["from"].resource), msg["body"], [], False) + PLEXUS.pub("message", msg_unique_id, self.muc_to_room_name[msg["from"].bare], f"{msg['from'].resource}", self.avatar_cache[msg["from"].full], "xmpp", self.gateway_name, (msg["from"].bare, msg["from"].resource), msg["body"], [], False) + + if msg["from"].full not in self.avatar_cache: + # First try loading avatar before sending message def groupchat_presence(self, presence): if presence["from"].bare not in self.relevant_mucs: @@ -197,14 +232,26 @@ class GatewayXMPP(ComponentXMPP): if item_stanza is not None: jid = item_stanza.attrib.get("jid") if jid and re.match(re.compile(f".*?@{re.escape(self.pfrom)}/mirror"), jid): + # Ignore ours return if presence["to"] != self.pfrom: return if presence["from"].resource == self.relevant_mucs[presence["from"].bare]["nick"]: return + + self.avatar_cache[presence["from"].full] = None + def got_avatar(stanza): + data = stanza.xml.find(".//{urn:xmpp:avatar:data}data") + if data is not None: + self.avatar_cache[presence["from"].full] = base64.standard_b64decode("".join(data.itertext())) + self.plugin["xep_0060"].get_items(jid = presence["from"].full, node = "urn:xmpp:avatar:data", ifrom = self.pfrom, callback = got_avatar, max_items = 1) + PLEXUS.pub("new_user", ("xmpp", self.gateway_name, (presence["from"].bare, presence["from"].resource)), presence["from"].resource) + def iq_funker(self, iq): + print("GOT IQ", iq) + def nickname_conflict(self, presence): # FAILURE CASE: # diff --git a/main.py b/main.py index ce17d94..99e5cfe 100644 --- a/main.py +++ b/main.py @@ -1,11 +1,14 @@ import os, re, hashlib, sys, traceback import asyncio -import gateway_xmpp -import gateway_discord import settings +# Plexus is the main (pubsub) event bus that all gateways communicate through +# There are three main events all should keep track of: +# new_user: each gateway must emit this event for each native user before sending message events +# message: a native user has sent a message OR edited one. all gateways should relay the message to their own native channels +# message_delete: a native user has deleted a message. all gateways should delete the respective message in their own native channels class Plexus: def __init__(self): self.pubsub_callbacks = {} @@ -21,15 +24,22 @@ class Plexus: for callback in self.pubsub_callbacks[event_name]: try: callback(*args) - except e: + except: print(traceback.format_exc()) PLEXUS = Plexus() for name, data in settings.XMPPs.items(): + import gateway_xmpp data["handler"] = gateway_xmpp.GatewayXMPP(PLEXUS, settings, name, data) + for name, data in settings.DISCORDs.items(): + import gateway_discord data["handler"] = gateway_discord.GatewayDiscord(PLEXUS, settings, name, data) +for name, data in settings.MATRIXs.items(): + import gateway_matrix + data["handler"] = gateway_matrix.GatewayMatrix(PLEXUS, settings, name, data) + PLEXUS.pub("ready") asyncio.get_event_loop().run_forever()