import os, re, hashlib, sys, traceback import asyncio import slixmpp from slixmpp.componentxmpp import ComponentXMPP from slixmpp.types import PresenceArgs import slixmpp.stanza import slixmpp.plugins.xep_0084.stanza import base64, time class GatewayXMPP(ComponentXMPP): def __init__(self, plexus, settings, gateway_name, gateway_data): ComponentXMPP.__init__(self, gateway_data["jid"], gateway_data["secret"], gateway_data["server"], gateway_data["port"]) global PLEXUS PLEXUS = plexus self.settings = settings self.pfrom = gateway_data["jid"] self.gateway_name = gateway_name self.relevant_mucs = {} self.room_name_to_muc = {} self.muc_to_room_name = {} self.my_puppets = {} self.xdm_id_to_base_id = {} self.base_id_to_xdm_id = {} self.xdm_id_to_stanza_id = {} self.stanza_id_to_xdm_id = {} self.xdm_id_to_origin_id = {} self.origin_id_to_xdm_id = {} # Native user avatars self.avatar_cache = {} self.xmpp_ready = False self.add_event_handler("session_start", self.session_start) self.add_event_handler("groupchat_message", self.message) self.add_event_handler("groupchat_presence", self.groupchat_presence) self.register_handler(slixmpp.xmlstream.handler.Callback("nickname_conflict", slixmpp.xmlstream.matcher.StanzaPath("presence"), self.nickname_conflict)) self.register_handler(slixmpp.xmlstream.handler.Callback("nickname_conflict", slixmpp.xmlstream.matcher.StanzaPath("iq"), self.iq_funker)) self.register_plugin('xep_0030') # Service Discovery self.register_plugin('xep_0004') # Data Forms self.register_plugin('xep_0060') # PubSub self.register_plugin('xep_0163') # PEP self.register_plugin('xep_0199') # XMPP Ping self.register_plugin('xep_0045') # MUC self.register_plugin('xep_0359') # Stanza IDs self.register_plugin('xep_0308') # Message correction self.register_plugin('xep_0424') # Message retraction self.register_plugin('xep_0084') # user avatar def new_user(identification, nick_name): if identification[0] == "xmpp" and identification[1] == self.gateway_name: # Ignore ours return puppet_data = { "identification": identification, "jid": f"{hashlib.sha256(str(identification).encode()).hexdigest()[:24]}@{self.pfrom}/mirror", # A puppet may have a different nick per MUC because of conflicts "nicknames": {muc_jid: {"state": "joining", "nick": nick_name} for muc_jid in self.relevant_mucs}, # Messages are added and executed as separate steps "queue": [], "avatar": {"state": "ready", "desired": None, "timeout": 0} } self.my_puppets[identification] = puppet_data PLEXUS.sub("new_user", new_user) def message(msg_unique_id, room_name, nick_name, avatar, gateway_type, gateway_name, unique_id, body, attachments, was_edit): if gateway_type == "xmpp" and gateway_name == self.gateway_name: # Ignore ours return muc_jid = self.room_name_to_muc[room_name] puppet_data = self.my_puppets[(gateway_type, gateway_name, unique_id)] if len(attachments): body = str(body) + "\n\n" + "\n".join([str(a) for a in attachments]) if avatar and (puppet_data["avatar"]["desired"] is None): puppet_data["avatar"]["desired"] = avatar puppet_data["avatar"]["state"] = "waiting" puppet_data["avatar"]["timeout"] = time.time_ns() def queue_callback(): kwargs = { "mto": muc_jid, "mbody": body, "mtype": "groupchat", "mfrom": puppet_data["jid"] } xmpp_msg = self.make_message(**kwargs) if msg_unique_id in self.xdm_id_to_base_id: # This message is an edit of an older message xmpp_msg["replace"]["id"] = self.xdm_id_to_base_id[msg_unique_id] else: # New message self.xdm_id_to_base_id[msg_unique_id] = xmpp_msg["id"] self.base_id_to_xdm_id[xmpp_msg["id"]] = msg_unique_id self.xdm_id_to_origin_id[msg_unique_id] = msg_unique_id.hex() self.origin_id_to_xdm_id[msg_unique_id.hex()] = msg_unique_id xmpp_msg["origin_id"]["id"] = msg_unique_id.hex() xmpp_msg.send() puppet_data["queue"].append(queue_callback) # If the user changed their nickname, try to update it on the puppet if nick_name != puppet_data["nicknames"][muc_jid]["nick"]: assert puppet_data["nicknames"][muc_jid]["state"] == "joined" puppet_data["nicknames"][muc_jid]["state"] = "joining" puppet_data["nicknames"][muc_jid]["nick"] = nick_name PLEXUS.sub("message", message) def message_delete(msg_unique_id, room_name, gateway_type, gateway_name, unique_id): if gateway_type == "xmpp" and gateway_name == self.gateway_name: # Ignore ours return muc_jid = self.room_name_to_muc[room_name] puppet_data = self.my_puppets[(gateway_type, gateway_name, unique_id)] def queue_callback(): if msg_unique_id in self.xdm_id_to_origin_id: retractee_id = self.xdm_id_to_origin_id[msg_unique_id] else: retractee_id = self.xdm_id_to_stanza_id[msg_unique_id] self.plugin["xep_0424"].send_retraction(mfrom = puppet_data["jid"], mto = muc_jid, mtype = "groupchat", id = retractee_id) puppet_data["queue"].append(queue_callback) PLEXUS.sub("message_delete", message_delete) PLEXUS.sub("ready", lambda: self.connect()) asyncio.get_event_loop().create_task(self.step_puppets()) # Puppets work as asynchronous state machines in the XMPP gateway, # because setting them up requires a few "handshakes" such as making # sure they've successfully reserved a nickname in each room. # If a puppet has not successfully joined all rooms, messages are # stored in a queue before being sent. def is_puppet_ready(self, puppet_data): return puppet_data["avatar"]["state"] == "ready" and all([nn["state"] == "joined" for nn in puppet_data["nicknames"].values()]) async def step_puppets(self): while True: for puppet_data in self.my_puppets.values(): puppet_jid = puppet_data["jid"] if self.is_puppet_ready(puppet_data): if len(puppet_data["queue"]) > 0: cb = puppet_data["queue"].pop(0) cb() else: # We are still looking for nicknames OR trying to setup an avatar if puppet_data["avatar"]["state"] == "waiting" and time.time_ns() >= puppet_data["avatar"]["timeout"]: pass puppet_data["avatar"]["state"] = "ready" for muc_jid, nn in puppet_data["nicknames"].items(): if nn["state"] == "failed": nn["nick"] = nn["nick"] + " (real)" nn["state"] = "joining" if nn["state"] == "joining": stanz = self.plugin["xep_0045"].make_join_stanza(muc_jid, nick = nn["nick"], presence_options = PresenceArgs(pstatus = "", pshow = "chat", pfrom = puppet_jid)) stanz.send() nn["state"] = "waiting" await asyncio.sleep(0.2) def session_start(self, ev): for room_name, room_gateways in self.settings.ROOMS.items(): for gateway in room_gateways: if "xmpp" in gateway and gateway["xmpp"] == self.gateway_name: self.relevant_mucs[gateway["jid"]] = gateway self.room_name_to_muc[room_name] = gateway["jid"] self.muc_to_room_name[gateway["jid"]] = room_name self.plugin["xep_0045"].join_muc(gateway["jid"], gateway["nick"], pfrom = self.pfrom) self.xmpp_ready = True def message(self, msg): if msg["to"].full != self.pfrom: return if msg["from"].bare not in self.relevant_mucs: return if msg["from"].resource in [p["nicknames"][msg["from"].bare]["nick"] for p in self.my_puppets.values()]: return if "DIE DIE DIE" in msg["body"]: sys.exit(0) if msg["replace"]["id"]: if msg["replace"]["id"] not in self.base_id_to_xdm_id: # Too late return msg_unique_id = self.base_id_to_xdm_id[msg["replace"]["id"]] PLEXUS.pub("message", msg_unique_id, self.muc_to_room_name[msg["from"].bare], f"{msg['from'].resource}", self.avatar_cache[msg["from"].full], "xmpp", self.gateway_name, (msg["from"].bare, msg["from"].resource), msg["body"], [], True) return msg_unique_id = os.urandom(16) self.xdm_id_to_base_id[msg_unique_id] = msg["id"] self.base_id_to_xdm_id[msg["id"]] = msg_unique_id self.xdm_id_to_stanza_id[msg_unique_id] = msg["stanza_id"]["id"] self.stanza_id_to_xdm_id[msg["stanza_id"]["id"]] = msg_unique_id PLEXUS.pub("message", msg_unique_id, self.muc_to_room_name[msg["from"].bare], f"{msg['from'].resource}", self.avatar_cache[msg["from"].full], "xmpp", self.gateway_name, (msg["from"].bare, msg["from"].resource), msg["body"], [], False) if msg["from"].full not in self.avatar_cache: # First try loading avatar before sending message def groupchat_presence(self, presence): if presence["from"].bare not in self.relevant_mucs: return # Access to the true JID is why the xdm bridge must be a moderator in the room item_stanza = presence.xml.find(".//{http://jabber.org/protocol/muc#user}item") if item_stanza is not None: jid = item_stanza.attrib.get("jid") if jid and re.match(re.compile(f".*?@{re.escape(self.pfrom)}/mirror"), jid): # Ignore ours return if presence["to"] != self.pfrom: return if presence["from"].resource == self.relevant_mucs[presence["from"].bare]["nick"]: return self.avatar_cache[presence["from"].full] = None def got_avatar(stanza): data = stanza.xml.find(".//{urn:xmpp:avatar:data}data") if data is not None: self.avatar_cache[presence["from"].full] = base64.standard_b64decode("".join(data.itertext())) self.plugin["xep_0060"].get_items(jid = presence["from"].full, node = "urn:xmpp:avatar:data", ifrom = self.pfrom, callback = got_avatar, max_items = 1) PLEXUS.pub("new_user", ("xmpp", self.gateway_name, (presence["from"].bare, presence["from"].resource)), presence["from"].resource) def iq_funker(self, iq): print("GOT IQ", iq) def nickname_conflict(self, presence): # FAILURE CASE: # # # # # # SUCCESS CASE: # # chat # # # # # err = presence.xml.find(".//{urn:ietf:params:xml:ns:xmpp-stanzas}conflict/..") if err is not None: for puppet_data in self.my_puppets.values(): if puppet_data["jid"] == presence["to"].full: for muc_jid, nn in puppet_data["nicknames"].items(): if muc_jid == presence["from"].bare: nn["state"] = "failed" elif presence["to"].full == self.pfrom and presence.xml.find(".//{http://jabber.org/protocol/muc#user}x/{http://jabber.org/protocol/muc#user}item") is not None: for puppet_data in self.my_puppets.values(): for muc_jid, nn in puppet_data["nicknames"].items(): if presence["from"].bare == muc_jid and presence["from"].resource == nn["nick"] and nn["state"] == "waiting": nn["state"] = "joined"