extends Node # todo: Settle on whether connection should send Sums.Result or receiver should check stanza errors by itself. class Connection extends Reference: var stream: StreamPeer var identity: String var domain: String var bare_jid: String var jid: String var _id_counter: int = 0 var _pending_iqs: Dictionary # of id to GDScriptFunctionState var _presence_sinks: Dictionary # of Jid to [[WeakRef, String, String]] var _xml_parser := Xml.Parser.new() # todo: Route signals to particular receivers, based on 'from' or 'to' # warning-ignore:unused_signal signal presence_received(presence) # warning-ignore:unused_signal signal message_received(message) # warning-ignore:unused_signal signal iq_received(iq) # todo: Separate incremental hash to its own class. func generate_id() -> int: self._id_counter += 1 return hash(self._id_counter) func promise_iq(to: String, type: String, payload: String, capture: GDScriptFunctionState) -> Sums.Promise: assert(type in ["get", "set"]) var id := self.generate_id() if self.stream.put_data( """{payload}""".format({ "from": self.jid.xml_escape(), "id": String(id).xml_escape(), "to": to.xml_escape(), "type": type, "payload": payload }).to_utf8()) != OK: return Sums.Promise.make_error(ERR_CONNECTION_ERROR) assert(not id in self._pending_iqs) self._pending_iqs[id] = capture return Sums.Promise.from(capture) func push_presence(to: String, type, payload: String) -> Sums.Result: assert(type == null or type is String) var message = Stanza.form_presence( String(self.generate_id()), self.jid, to, type, payload) if self.stream.put_data(message.to_utf8()) != OK: return Sums.Result.make_error(ERR_CONNECTION_ERROR) return Sums.Result.make_value(null) func presence_sink(p_base_jid: String, p_sink: Object, p_type: String, p_signal: String) -> void: self._presence_sinks[p_base_jid] = \ self._presence_sinks.get(p_base_jid, []) + [[weakref(p_sink), p_type, p_signal]] ## Registry of connections used for poking of pending iqs. var _connections: Array # of WeakRef to Connection func _ready(): # todo: Some better interval? if get_tree().connect("physics_frame", self, "_process_connections") != OK: assert("Bruh") # todo: Collapse Result inside Promise. func _process_connections() -> void: var to_remove := PoolIntArray() for idx in range(_connections.size()): var connection := _connections[idx].get_ref() as Connection if connection == null: to_remove.push_back(idx) continue var response = _wait_blocking_for_utf8_data(connection.stream, 0) if response == null: continue # todo: Move Result error into closure on connection failure? while connection._xml_parser.parse_a_bit(response): response = null var stanza := connection._xml_parser.take_root() if stanza.name == "iq": if "to" in stanza.attributes and stanza.attributes["to"] != connection.jid: # todo: Server errors should not be raised in client. push_error("Stanza is not addressed to assigned jid") if stanza.attributes["type"] in ["result", "error"]: var id := int(stanza.attributes["id"]) # todo: Use strings directly instead? connection._pending_iqs[id].resume(Sums.Result.make_value(stanza)) var was_present := connection._pending_iqs.erase(id) assert(was_present) elif stanza.attributes["type"] in ["set", "get"]: # todo: Emit in any way? connection.emit_signal("iq_received", stanza) elif stanza.name == "message": connection.emit_signal("message_received", stanza) elif stanza.name == "presence": connection.emit_signal("presence_received", stanza) for base_jid in connection._presence_sinks.keys(): if not stanza.attributes["from"].begins_with(base_jid): continue var sink = connection._presence_sinks[base_jid] var sink_idx := 0 while sink_idx < sink.size(): var ref = sink[sink_idx][0].get_ref() if ref == null: sink.remove(sink_idx) else: if sink[sink_idx][1] == "signal": ref.emit_signal(sink[sink_idx][2], stanza) else: ref.call(sink[sink_idx][2], stanza) sink_idx += 1 ## Collect dropped connections. for idx in range(to_remove.size() - 1, 0, -1): _connections.remove(to_remove[idx]) func establish_new_connection(domain: String, identity: String, password: String): var stream := StreamPeerTCP.new() if stream.connect_to_host(domain, 5222) != OK: push_error("Cannot establish connection to " + domain) return null while stream.get_status() == StreamPeerTCP.STATUS_CONNECTING: pass if stream.get_status() == StreamPeerTCP.STATUS_ERROR: push_error("Cannot establish connection to " + domain) return null var result := Connection.new() result.stream = stream result.identity = identity result.domain = domain result.bare_jid = identity + '@' + domain if _negotiate_connection(result, password) != OK: return null _connections.push_back(weakref(result)) return result # todo: Make it async func _negotiate_connection(connection: Connection, password: String) -> int: ## See [9. Detailed Examples] of https://datatracker.ietf.org/doc/rfc6120/ var error: int = OK error = _negotiate_tls(connection) if error != OK: return error error = _negotiate_sasl(connection, password) if error != OK: return error error = _bind_resource(connection) if error != OK: return error return OK func _form_initial_message(domain: String, bare_jid: String) -> String: # todo: XML builder interface, writing by hand is extremely error prone. return """""".format({ "bare_jid": bare_jid.xml_escape(), "domain": domain.xml_escape()}) func _negotiate_tls(connection: Connection) -> int: var error: int = OK var response var parsed_response = Xml.Parser.new() ## Step 1: Client initiates stream to server: if connection.stream.put_data(_form_initial_message( connection.domain, connection.bare_jid).to_utf8()) != OK: return ERR_CONNECTION_ERROR ## Step 2: Server responds by sending a response stream header to client: response = _wait_blocking_for_utf8_data(connection.stream) if response == null: return ERR_CONNECTION_ERROR if parsed_response.parse_a_bit(response): return ERR_CONNECTION_ERROR # If is closed, - connection is closed too. ## Check that server response corresponds to what we ask for. # todo: For conformity client must send closing stream tag on error. if parsed_response._root.name != "stream:stream": return ERR_CANT_CONNECT if parsed_response._root.attributes["from"] != connection.domain: return ERR_CANT_CONNECT if "to" in parsed_response._root.attributes: if parsed_response._root.attributes["to"] != connection.bare_jid: return ERR_CANT_CONNECT if parsed_response._root.attributes["version"] != "1.0": return ERR_CANT_CONNECT # todo: Assert for namespaces. ## Step 3: Server sends stream features to client (only the STARTTLS ## extension at this point, which is mandatory-to-negotiate): if parsed_response._root.children.size() == 0: response = _wait_blocking_for_utf8_data(connection.stream) if response == null: return ERR_CONNECTION_ERROR if parsed_response.parse_a_bit(response): return ERR_CONNECTION_ERROR var features = parsed_response._root.take_named_child_element("stream:features") if features == null: return ERR_CANT_CONNECT var starttls = features.take_named_child_element("starttls") if starttls == null or starttls.attributes["xmlns"] != "urn:ietf:params:xml:ns:xmpp-tls": push_error("Connection declined, TLS is not a listed feature") return ERR_CANT_CONNECT ## Step 4: Client sends STARTTLS command to server: if connection.stream.put_data("".to_utf8()) != OK: return ERR_CONNECTION_ERROR ## Step 5: Server informs client that it is allowed to proceed: response = _wait_blocking_for_utf8_data(connection.stream) if response == null: return ERR_CONNECTION_ERROR if parsed_response.parse_a_bit(response): return ERR_CONNECTION_ERROR # todo: Handle failure case. var proceed = parsed_response._root.take_named_child_element("proceed") if proceed == null: return ERR_CANT_CONNECT if proceed.attributes["xmlns"] != "urn:ietf:params:xml:ns:xmpp-tls": return ERR_CANT_CONNECT ## Step 6: Client and server attempt to complete TLS negotiation over ## the existing TCP connection (see [TLS] for details). var tls_stream := StreamPeerSSL.new() error = tls_stream.connect_to_stream(connection.stream, true, connection.domain) if error != OK: push_error("Cannot establish client->server TLS connection") return error connection.stream = tls_stream ## Step 7 (alt): If TLS negotiation is unsuccessful, server closes TCP ## connection (thus, the stream negotiation process ends unsuccessfully ## and the parties do not move on to the next step): return OK func _negotiate_sasl(connection: Connection, password: String) -> int: var response var parsed_response = Xml.Parser.new() ## Step 7: If TLS negotiation is successful, client initiates a new ## stream to server over the TLS-protected TCP connection: if connection.stream.put_data(_form_initial_message( connection.domain, connection.bare_jid).to_utf8()) != OK: return ERR_CONNECTION_ERROR ## Step 8: Server responds by sending a stream header to client along ## with any available stream features: response = _wait_blocking_for_utf8_data(connection.stream) if response == null: return ERR_CONNECTION_ERROR if parsed_response.parse_a_bit(response): return ERR_CONNECTION_ERROR ## Check that server response corresponds to what we ask for. # todo: For conformity client must send closing stream tag on error. if parsed_response._root.name != "stream:stream": return ERR_CANT_CONNECT if parsed_response._root.attributes["from"] != connection.domain: return ERR_CANT_CONNECT if "to" in parsed_response._root.attributes: if parsed_response._root.attributes["to"] != connection.bare_jid: return ERR_CANT_CONNECT if parsed_response._root.attributes["version"] != "1.0": return ERR_CANT_CONNECT # todo: Assert for namespaces. if parsed_response._root.children.size() == 0: response = _wait_blocking_for_utf8_data(connection.stream) if response == null: return ERR_CONNECTION_ERROR if parsed_response.parse_a_bit(response): return ERR_CONNECTION_ERROR var features = parsed_response._root.take_named_child_element("stream:features") if features == null: return ERR_CANT_CONNECT var mechanisms = features.take_named_child_element("mechanisms") if mechanisms == null or mechanisms.attributes["xmlns"] != "urn:ietf:params:xml:ns:xmpp-sasl": push_error("Connection declined, mechanisms is not a listed feature") return ERR_CANT_CONNECT ## We only support PLAIN mechanism as it's sufficient over TLS. var plain_found := false for machanism in mechanisms.children: if machanism.is_element() and machanism.name == "mechanism": if machanism.children[0].data == "PLAIN": plain_found = true break if not plain_found: push_error("Connection declined, mechanisms don't include PLAIN method") return ERR_CANT_CONNECT ## Step 9: Client selects an authentication mechanism, including initial response data: ## https://datatracker.ietf.org/doc/html/rfc4616 if connection.stream.put_data(( "%s" % [_plain_auth_message(connection.identity, password)]).to_utf8()) != OK: return ERR_CONNECTION_ERROR ## (Skipped steps related to another auth machanism) ## Step 12: Server informs client of success, including additional data ## with success: response = _wait_blocking_for_utf8_data(connection.stream) if response == null: return ERR_CONNECTION_ERROR if parsed_response.parse_a_bit(response): return ERR_CONNECTION_ERROR var success = parsed_response._root.take_named_child_element("success") if success == null or success.attributes["xmlns"] != "urn:ietf:params:xml:ns:xmpp-sasl": return ERR_CANT_CONNECT return OK func _bind_resource(connection: Connection, resource: String = "tochie-facade") -> int: var response var parsed_response = Xml.Parser.new() ## Step 13: Client initiates a new stream to server: if connection.stream.put_data(_form_initial_message( connection.domain, connection.bare_jid).to_utf8()) != OK: return ERR_CONNECTION_ERROR ## Step 14: Server responds by sending a stream header to client along response = _wait_blocking_for_utf8_data(connection.stream) if response == null: return ERR_CONNECTION_ERROR if parsed_response.parse_a_bit(response): return ERR_CONNECTION_ERROR ## Check that server response corresponds to what we ask for. # todo: For conformity client must send closing stream tag on error. if parsed_response._root.name != "stream:stream": return ERR_CANT_CONNECT if parsed_response._root.attributes["from"] != connection.domain: return ERR_CANT_CONNECT if "to" in parsed_response._root.attributes: if parsed_response._root.attributes["to"] != connection.bare_jid: return ERR_CANT_CONNECT if parsed_response._root.attributes["version"] != "1.0": return ERR_CANT_CONNECT # todo: Assert for namespaces. if parsed_response._root.children.size() == 0: response = _wait_blocking_for_utf8_data(connection.stream) if response == null: return ERR_CONNECTION_ERROR if parsed_response.parse_a_bit(response): return ERR_CONNECTION_ERROR var features = parsed_response._root.take_named_child_element("stream:features") if features == null: return ERR_CANT_CONNECT var bind = features.take_named_child_element("bind") if bind == null or bind.attributes["xmlns"] != "urn:ietf:params:xml:ns:xmpp-bind": push_error("Connection declined, bind is not a listed feature") return ERR_CANT_CONNECT ## Step 15: Client binds a resource: var iq_id := connection.generate_id() if connection.stream.put_data(""" {resource} """.format({ "id": iq_id, "resource": resource.xml_escape() }).to_utf8()) != OK: return ERR_CONNECTION_ERROR response = _wait_blocking_for_utf8_data(connection.stream) if response == null: return ERR_CONNECTION_ERROR if parsed_response.parse_a_bit(response): return ERR_CONNECTION_ERROR ## Step 16: Server accepts submitted resourcepart and informs client of ## successful resource binding: var iq = parsed_response._root.take_named_child_element("iq") if iq.attributes["id"] != String(iq_id): push_error("iq id mismatch, expected: " + String(iq_id) + " but got: " + iq.attributes["id"]) return ERR_CANT_CONNECT bind = iq.take_named_child_element("bind") if bind == null or bind.attributes["xmlns"] != "urn:ietf:params:xml:ns:xmpp-bind": return ERR_CANT_CONNECT var jid = bind.take_named_child_element("jid") if jid == null: return ERR_CANT_CONNECT connection.jid = jid.children[0].data return OK func _plain_auth_message(identity: String, password: String) -> String: var raw = PoolByteArray([0]) raw.append_array(identity.to_utf8() + PoolByteArray([0])) raw.append_array(password.to_utf8()) return Marshalls.raw_to_base64(raw) func _wait_blocking_for_utf8_data(stream: StreamPeer, timeout_ms: int = 1000): # -> String or null: var enter_time := OS.get_system_time_msecs() if stream is StreamPeerSSL: stream.poll() var available_bytes := stream.get_available_bytes() while available_bytes == 0: if OS.get_system_time_msecs() - enter_time >= timeout_ms: return null if stream is StreamPeerSSL: stream.poll() available_bytes = stream.get_available_bytes() return stream.get_utf8_string(available_bytes)