extends Node const disco_info_queury := "" const disco_items_queury := "" class Connection extends Reference: var stream: StreamPeer var identity: String var domain: String var bare_jid: String var jid: String var _id_counter: int = 0 var _pending_iqs: Dictionary # of id to GDScriptFunctionState var _pending_tail_iqs: Array # of GDScriptFunctionState var _xml_parser := Xml.Parser.new() # warning-ignore:unused_signal signal presence_received(presence) # warning-ignore:unused_signal signal message_received(message) # warning-ignore:unused_signal signal iq_received(iq) func generate_id() -> int: self._id_counter += 1 return hash(self._id_counter) class Promise extends Reference: var is_done: bool = false var is_ok: bool var value = null static func from(capture: GDScriptFunctionState) -> Promise: var result := Promise.new() if capture.connect("completed", result, "result_arrived") != OK: assert("Bruh") return result static func make_error(error: int) -> Promise: var result := Promise.new() result.is_done = true result.is_ok = false result.value = error return result func result_arrived(p_result) -> void: self.is_done = true self.is_ok = true self.value = p_result func promise_iq(to: String, action: String, payload: String, capture: GDScriptFunctionState) -> Promise: assert(action in ["get", "set"]) var id := self.generate_id() if self.stream.put_data( """{payload}""".format({ "from": self.jid.xml_escape(), "id": id, "to": to.xml_escape(), "action": action, "payload": payload }).to_utf8()) != OK: return Promise.make_error(ERR_CONNECTION_ERROR) assert(not id in self._pending_iqs) self._pending_iqs[id] = capture return Promise.from(capture) func push_iq(to: String, action: String, payload: String, capture: GDScriptFunctionState) -> int: # Error assert(action in ["get", "set"]) var id := self.generate_id() if self.stream.put_data( """{payload}""".format({ "from": self.jid.xml_escape(), "id": id, "to": to.xml_escape(), "action": action, "payload": payload }).to_utf8()) != OK: return ERR_CONNECTION_ERROR assert(not id in self._pending_iqs) self._pending_iqs[id] = capture return OK ## Registry of connections used for poking of pending iqs. var _connections: Array # of WeakRef to Connection static func are_promises_done(promises: Array) -> bool: for promise in promises: assert(promise is Connection.Promise) if not promise.is_done: return false return true static func yield_as_is(): return yield() func _ready(): # todo: Some better interval? if get_tree().connect("physics_frame", self, "_process_connections") != OK: assert("Bruh") func _process_connections() -> void: var to_remove := PoolIntArray() for idx in range(_connections.size()): var connection := _connections[idx].get_ref() as Connection if connection == null: to_remove.push_back(idx) continue var response = _wait_blocking_for_utf8_data(connection.stream, 0) if response == null: continue while connection._xml_parser.parse_a_bit(response): response = null var stanza := connection._xml_parser.take_root() if stanza.node_name == "iq": if "to" in stanza.attributes and stanza.attributes["to"] != connection.jid: push_error("Stanza is not addressed to assigned jid") if stanza.attributes["type"] in ["result", "error"]: var id := int(stanza.attributes["id"]) # todo: Use strings directly instead? var result = connection._pending_iqs[id].resume(stanza) if result is GDScriptFunctionState and result.is_valid(): connection._pending_tail_iqs.push_back(result) # elif result != null: # assert(false, "Ignored result of iq subroutine: " + "%s" % result) var was_present := connection._pending_iqs.erase(id) assert(was_present) elif stanza.attributes["type"] in ["set", "get"]: connection.emit_signal("iq_received", stanza) elif stanza.node_name == "message": connection.emit_signal("message_received", stanza) elif stanza.node_name == "presence": connection.emit_signal("presence_received", stanza) var to_remove_tails := PoolIntArray() for tail_idx in range(connection._pending_tail_iqs.size()): var result = connection._pending_tail_iqs[tail_idx].resume() if not result is GDScriptFunctionState or not result.is_valid(): # assert(result == null, "Ignored result of iq subroutine: " + "%s" % result) to_remove_tails.push_back(tail_idx) else: connection._pending_tail_iqs[tail_idx] = result for tail_idx in range(to_remove_tails.size() - 1, 0, -1): connection._pending_tail_iqs.remove(to_remove_tails[tail_idx]) ## Collect dropped connections. for idx in range(to_remove.size() - 1, 0, -1): _connections.remove(to_remove[idx]) func establish_new_connection(domain: String, identity: String, password: String) -> Connection: var stream := StreamPeerTCP.new() if stream.connect_to_host(domain, 5222) != OK: push_error("Cannot establish connection to " + domain) return null while stream.get_status() == StreamPeerTCP.STATUS_CONNECTING: pass if stream.get_status() == StreamPeerTCP.STATUS_ERROR: push_error("Cannot establish connection to " + domain) return null var result := Connection.new() result.stream = stream result.identity = identity result.domain = domain result.bare_jid = identity + '@' + domain if _negotiate_connection(result, password) != OK: return null _connections.push_back(weakref(result)) return result # todo: Make it async func _negotiate_connection(connection: Connection, password: String) -> int: ## See [9. Detailed Examples] of https://datatracker.ietf.org/doc/rfc6120/ var error: int = OK error = _negotiate_tls(connection) if error != OK: return error error = _negotiate_sasl(connection, password) if error != OK: return error error = _bind_resource(connection) if error != OK: return error return OK func _form_initial_message(domain: String, bare_jid: String) -> String: # todo: XML builder interface, writing by hand is extremely error prone. return """""".format({ "bare_jid": bare_jid.xml_escape(), "domain": domain.xml_escape()}) func _negotiate_tls(connection: Connection) -> int: var error: int = OK var response var parsed_response = Xml.Parser.new() ## Step 1: Client initiates stream to server: if connection.stream.put_data(_form_initial_message( connection.domain, connection.bare_jid).to_utf8()) != OK: return ERR_CONNECTION_ERROR ## Step 2: Server responds by sending a response stream header to client: response = _wait_blocking_for_utf8_data(connection.stream) if response == null: return ERR_CONNECTION_ERROR if parsed_response.parse_a_bit(response): return ERR_CONNECTION_ERROR # If is closed, - connection is closed too. ## Check that server response corresponds to what we ask for. # todo: For conformity client must send closing stream tag on error. if parsed_response._root.node_name != "stream:stream": return ERR_CANT_CONNECT if parsed_response._root.attributes["from"] != connection.domain: return ERR_CANT_CONNECT if "to" in parsed_response._root.attributes: if parsed_response._root.attributes["to"] != connection.bare_jid: return ERR_CANT_CONNECT if parsed_response._root.attributes["version"] != "1.0": return ERR_CANT_CONNECT # todo: Assert for namespaces. ## Step 3: Server sends stream features to client (only the STARTTLS ## extension at this point, which is mandatory-to-negotiate): if parsed_response._root.children.size() == 0: response = _wait_blocking_for_utf8_data(connection.stream) if response == null: return ERR_CONNECTION_ERROR if parsed_response.parse_a_bit(response): return ERR_CONNECTION_ERROR var features = parsed_response._root.take_named_child_element("stream:features") if features == null: return ERR_CANT_CONNECT var starttls = features.take_named_child_element("starttls") if starttls == null or starttls.attributes["xmlns"] != "urn:ietf:params:xml:ns:xmpp-tls": push_error("Connection declined, TLS is not a listed feature") return ERR_CANT_CONNECT ## Step 4: Client sends STARTTLS command to server: if connection.stream.put_data("".to_utf8()) != OK: return ERR_CONNECTION_ERROR ## Step 5: Server informs client that it is allowed to proceed: response = _wait_blocking_for_utf8_data(connection.stream) if response == null: return ERR_CONNECTION_ERROR if parsed_response.parse_a_bit(response): return ERR_CONNECTION_ERROR # todo: Handle failure case. var proceed = parsed_response._root.take_named_child_element("proceed") if proceed == null: return ERR_CANT_CONNECT if proceed.attributes["xmlns"] != "urn:ietf:params:xml:ns:xmpp-tls": return ERR_CANT_CONNECT ## Step 6: Client and server attempt to complete TLS negotiation over ## the existing TCP connection (see [TLS] for details). var tls_stream := StreamPeerSSL.new() error = tls_stream.connect_to_stream(connection.stream, true, connection.domain) if error != OK: push_error("Cannot establish client->server TLS connection") return error connection.stream = tls_stream ## Step 7 (alt): If TLS negotiation is unsuccessful, server closes TCP ## connection (thus, the stream negotiation process ends unsuccessfully ## and the parties do not move on to the next step): return OK func _negotiate_sasl(connection: Connection, password: String) -> int: var response var parsed_response = Xml.Parser.new() ## Step 7: If TLS negotiation is successful, client initiates a new ## stream to server over the TLS-protected TCP connection: if connection.stream.put_data(_form_initial_message( connection.domain, connection.bare_jid).to_utf8()) != OK: return ERR_CONNECTION_ERROR ## Step 8: Server responds by sending a stream header to client along ## with any available stream features: response = _wait_blocking_for_utf8_data(connection.stream) if response == null: return ERR_CONNECTION_ERROR if parsed_response.parse_a_bit(response): return ERR_CONNECTION_ERROR ## Check that server response corresponds to what we ask for. # todo: For conformity client must send closing stream tag on error. if parsed_response._root.node_name != "stream:stream": return ERR_CANT_CONNECT if parsed_response._root.attributes["from"] != connection.domain: return ERR_CANT_CONNECT if "to" in parsed_response._root.attributes: if parsed_response._root.attributes["to"] != connection.bare_jid: return ERR_CANT_CONNECT if parsed_response._root.attributes["version"] != "1.0": return ERR_CANT_CONNECT # todo: Assert for namespaces. if parsed_response._root.children.size() == 0: response = _wait_blocking_for_utf8_data(connection.stream) if response == null: return ERR_CONNECTION_ERROR if parsed_response.parse_a_bit(response): return ERR_CONNECTION_ERROR var features = parsed_response._root.take_named_child_element("stream:features") if features == null: return ERR_CANT_CONNECT var mechanisms = features.take_named_child_element("mechanisms") if mechanisms == null or mechanisms.attributes["xmlns"] != "urn:ietf:params:xml:ns:xmpp-sasl": push_error("Connection declined, mechanisms is not a listed feature") return ERR_CANT_CONNECT ## We only support PLAIN mechanism as it's sufficient over TLS. var plain_found := false for machanism in mechanisms.children: if machanism.is_element() and machanism.node_name == "mechanism": if machanism.children[0].data == "PLAIN": plain_found = true break if not plain_found: push_error("Connection declined, mechanisms don't include PLAIN method") return ERR_CANT_CONNECT ## Step 9: Client selects an authentication mechanism, including initial response data: ## https://datatracker.ietf.org/doc/html/rfc4616 if connection.stream.put_data(( "%s" % [_plain_auth_message(connection.identity, password)]).to_utf8()) != OK: return ERR_CONNECTION_ERROR ## (Skipped steps related to another auth machanism) ## Step 12: Server informs client of success, including additional data ## with success: response = _wait_blocking_for_utf8_data(connection.stream) if response == null: return ERR_CONNECTION_ERROR if parsed_response.parse_a_bit(response): return ERR_CONNECTION_ERROR var success = parsed_response._root.take_named_child_element("success") if success == null or success.attributes["xmlns"] != "urn:ietf:params:xml:ns:xmpp-sasl": return ERR_CANT_CONNECT return OK func _bind_resource(connection: Connection, resource: String = "tochie-facade") -> int: var response var parsed_response = Xml.Parser.new() ## Step 13: Client initiates a new stream to server: if connection.stream.put_data(_form_initial_message( connection.domain, connection.bare_jid).to_utf8()) != OK: return ERR_CONNECTION_ERROR ## Step 14: Server responds by sending a stream header to client along response = _wait_blocking_for_utf8_data(connection.stream) if response == null: return ERR_CONNECTION_ERROR if parsed_response.parse_a_bit(response): return ERR_CONNECTION_ERROR ## Check that server response corresponds to what we ask for. # todo: For conformity client must send closing stream tag on error. if parsed_response._root.node_name != "stream:stream": return ERR_CANT_CONNECT if parsed_response._root.attributes["from"] != connection.domain: return ERR_CANT_CONNECT if "to" in parsed_response._root.attributes: if parsed_response._root.attributes["to"] != connection.bare_jid: return ERR_CANT_CONNECT if parsed_response._root.attributes["version"] != "1.0": return ERR_CANT_CONNECT # todo: Assert for namespaces. if parsed_response._root.children.size() == 0: response = _wait_blocking_for_utf8_data(connection.stream) if response == null: return ERR_CONNECTION_ERROR if parsed_response.parse_a_bit(response): return ERR_CONNECTION_ERROR var features = parsed_response._root.take_named_child_element("stream:features") if features == null: return ERR_CANT_CONNECT var bind = features.take_named_child_element("bind") if bind == null or bind.attributes["xmlns"] != "urn:ietf:params:xml:ns:xmpp-bind": push_error("Connection declined, bind is not a listed feature") return ERR_CANT_CONNECT ## Step 15: Client binds a resource: var iq_id := connection.generate_id() if connection.stream.put_data(""" {resource} """.format({ "id": iq_id, "resource": resource.xml_escape() }).to_utf8()) != OK: return ERR_CONNECTION_ERROR response = _wait_blocking_for_utf8_data(connection.stream) if response == null: return ERR_CONNECTION_ERROR if parsed_response.parse_a_bit(response): return ERR_CONNECTION_ERROR ## Step 16: Server accepts submitted resourcepart and informs client of ## successful resource binding: var iq = parsed_response._root.take_named_child_element("iq") if iq.attributes["id"] != String(iq_id): push_error("iq id mismatch, expected: " + String(iq_id) + " but got: " + iq.attributes["id"]) return ERR_CANT_CONNECT bind = iq.take_named_child_element("bind") if bind == null or bind.attributes["xmlns"] != "urn:ietf:params:xml:ns:xmpp-bind": return ERR_CANT_CONNECT var jid = bind.take_named_child_element("jid") if jid == null: return ERR_CANT_CONNECT connection.jid = jid.children[0].data return OK func _plain_auth_message(identity: String, password: String) -> String: var raw = PoolByteArray([0]) raw.append_array(identity.to_utf8() + PoolByteArray([0])) raw.append_array(password.to_utf8()) return Marshalls.raw_to_base64(raw) func _wait_blocking_for_utf8_data(stream: StreamPeer, timeout_ms: int = 1000): # -> String or null: var enter_time := OS.get_system_time_msecs() if stream is StreamPeerSSL: stream.poll() var available_bytes := stream.get_available_bytes() while available_bytes == 0: if OS.get_system_time_msecs() - enter_time >= timeout_ms: return null if stream is StreamPeerSSL: stream.poll() available_bytes = stream.get_available_bytes() return stream.get_utf8_string(available_bytes)