From 2d970c59145d2d2f5bdaa42cbebd39dfca601323 Mon Sep 17 00:00:00 2001 From: veclav talica Date: Sun, 27 Aug 2023 01:52:21 +0500 Subject: [PATCH] tailed iqs, service discovery via promise array --- scenes/App.gd | 20 ++++++- scenes/Connections.gd | 132 +++++++++++++++++++++++++++++++----------- scenes/Xml.gd | 40 +++++++++---- 3 files changed, 147 insertions(+), 45 deletions(-) diff --git a/scenes/App.gd b/scenes/App.gd index 7029249..836a6ad 100644 --- a/scenes/App.gd +++ b/scenes/App.gd @@ -4,14 +4,32 @@ var _connection func _service_discovery(): var iq := yield() as Xml.XmlElement - print(iq.as_string()) + + var feature_promises := Array() + for item in iq.children[0].children: + feature_promises.push_back(_connection.promise_iq(item.attributes["jid"], "get", + "", + _connection.iq_as_is())) + + while not _connection.are_promises_done(feature_promises): + yield() + + for feature_promise in feature_promises: + if not feature_promise.is_ok: + push_error("Connection failed") + return + + var feature := feature_promise.value as Xml.XmlElement + print(feature.as_string()) func _ready(): _connection = $Connections.establish_new_connection("poto.cafe", "veclavtalica", "-") if _connection == null: push_error("Connection failed") + return if _connection.push_iq(_connection.domain, "get", "", _service_discovery()) != OK: push_error("Connection failed") + return diff --git a/scenes/Connections.gd b/scenes/Connections.gd index f635253..200ba14 100644 --- a/scenes/Connections.gd +++ b/scenes/Connections.gd @@ -8,6 +8,7 @@ class Connection extends Reference: var jid: String var _id_counter: int = 0 var _pending_iqs: Dictionary # of id to GDScriptFunctionState + var _pending_tail_iqs: Array # of GDScriptFunctionState var _xml_parser := Xml.Parser.new() # warning-ignore:unused_signal @@ -21,6 +22,48 @@ class Connection extends Reference: self._id_counter += 1 return hash(self._id_counter) + class Promise extends Reference: + var is_done: bool = false + var is_ok: bool + var value = null + + static func from(capture: GDScriptFunctionState) -> Promise: + var result := Promise.new() + if capture.connect("completed", result, "result_arrived") != OK: + assert("Bruh") + return result + + static func make_error(error: int) -> Promise: + var result := Promise.new() + result.is_done = true + result.is_ok = false + result.value = error + return result + + func result_arrived(p_result) -> void: + self.is_done = true + self.is_ok = true + self.value = p_result + + func promise_iq(to: String, action: String, payload: String, capture: GDScriptFunctionState) -> Promise: + assert(action in ["get", "set"]) + + var id := self.generate_id() + if self.stream.put_data( + """{payload}""".format({ + "from": self.jid.xml_escape(), + "id": id, + "to": to.xml_escape(), + "action": action, + "payload": payload + }).to_utf8()) != OK: + return Promise.make_error(ERR_CONNECTION_ERROR) + + assert(not id in self._pending_iqs) + self._pending_iqs[id] = capture + + return Promise.from(capture) + func push_iq(to: String, action: String, payload: String, capture: GDScriptFunctionState) -> int: # Error assert(action in ["get", "set"]) @@ -40,6 +83,16 @@ class Connection extends Reference: return OK + static func are_promises_done(promises: Array) -> bool: + for promise in promises: + assert(promise is Promise) + if not promise.is_done: + return false + return true + + static func iq_as_is(): + return yield() + ## Registry of connections used for poking of pending iqs. var _connections: Array # of WeakRef to Connection @@ -61,8 +114,10 @@ func _process_connections() -> void: if response == null: continue - if connection._xml_parser.parse_a_bit(response): - var stanza := connection._xml_parser.root + while connection._xml_parser.parse_a_bit(response): + response = null + + var stanza := connection._xml_parser.take_root() if stanza.node_name == "iq": if "to" in stanza.attributes and stanza.attributes["to"] != connection.jid: push_error("Stanza is not addressed to assigned jid") @@ -70,13 +125,12 @@ func _process_connections() -> void: if stanza.attributes["type"] in ["result", "error"]: var id := int(stanza.attributes["id"]) # todo: Use strings directly instead? var result = connection._pending_iqs[id].resume(stanza) - if result is GDScriptFunctionState: - connection._pending_iqs[id] = result - elif result != null: - assert("Ignored result of iq subroutine: " + result) - else: - var was_present := connection._pending_iqs.erase(id) - assert(was_present) + if result is GDScriptFunctionState and result.is_valid(): + connection._pending_tail_iqs.push_back(result) + # elif result != null: + # assert(false, "Ignored result of iq subroutine: " + "%s" % result) + var was_present := connection._pending_iqs.erase(id) + assert(was_present) elif stanza.attributes["type"] in ["set", "get"]: connection.emit_signal("iq_received", stanza) @@ -87,7 +141,17 @@ func _process_connections() -> void: elif stanza.node_name == "presence": connection.emit_signal("presence_received", stanza) - connection._xml_parser = Xml.Parser.new() + var to_remove_tails := PoolIntArray() + for tail_idx in range(connection._pending_tail_iqs.size()): + var result = connection._pending_tail_iqs[tail_idx].resume() + if not result is GDScriptFunctionState or not result.is_valid(): + # assert(result == null, "Ignored result of iq subroutine: " + "%s" % result) + to_remove_tails.push_back(tail_idx) + else: + connection._pending_tail_iqs[tail_idx] = result + + for tail_idx in range(to_remove_tails.size() - 1, 0, -1): + connection._pending_tail_iqs.remove(to_remove_tails[tail_idx]) ## Collect dropped connections. for idx in range(to_remove.size() - 1, 0, -1): @@ -161,21 +225,21 @@ func _negotiate_tls(connection: Connection) -> int: ## Check that server response corresponds to what we ask for. # todo: For conformity client must send closing stream tag on error. - if parsed_response.root.node_name != "stream:stream": return ERR_CANT_CONNECT - if parsed_response.root.attributes["from"] != connection.domain: return ERR_CANT_CONNECT - if "to" in parsed_response.root.attributes: - if parsed_response.root.attributes["to"] != connection.bare_jid: return ERR_CANT_CONNECT - if parsed_response.root.attributes["version"] != "1.0": return ERR_CANT_CONNECT + if parsed_response._root.node_name != "stream:stream": return ERR_CANT_CONNECT + if parsed_response._root.attributes["from"] != connection.domain: return ERR_CANT_CONNECT + if "to" in parsed_response._root.attributes: + if parsed_response._root.attributes["to"] != connection.bare_jid: return ERR_CANT_CONNECT + if parsed_response._root.attributes["version"] != "1.0": return ERR_CANT_CONNECT # todo: Assert for namespaces. ## Step 3: Server sends stream features to client (only the STARTTLS ## extension at this point, which is mandatory-to-negotiate): - if parsed_response.root.children.size() == 0: + if parsed_response._root.children.size() == 0: response = _wait_blocking_for_utf8_data(connection.stream) if response == null: return ERR_CONNECTION_ERROR if parsed_response.parse_a_bit(response): return ERR_CONNECTION_ERROR - var features = parsed_response.root.take_named_child_element("stream:features") + var features = parsed_response._root.take_named_child_element("stream:features") if features == null: return ERR_CANT_CONNECT var starttls = features.take_named_child_element("starttls") if starttls == null or starttls.attributes["xmlns"] != "urn:ietf:params:xml:ns:xmpp-tls": @@ -193,7 +257,7 @@ func _negotiate_tls(connection: Connection) -> int: # todo: Handle failure case. - var proceed = parsed_response.root.take_named_child_element("proceed") + var proceed = parsed_response._root.take_named_child_element("proceed") if proceed == null: return ERR_CANT_CONNECT if proceed.attributes["xmlns"] != "urn:ietf:params:xml:ns:xmpp-tls": return ERR_CANT_CONNECT @@ -232,19 +296,19 @@ func _negotiate_sasl(connection: Connection, password: String) -> int: ## Check that server response corresponds to what we ask for. # todo: For conformity client must send closing stream tag on error. - if parsed_response.root.node_name != "stream:stream": return ERR_CANT_CONNECT - if parsed_response.root.attributes["from"] != connection.domain: return ERR_CANT_CONNECT - if "to" in parsed_response.root.attributes: - if parsed_response.root.attributes["to"] != connection.bare_jid: return ERR_CANT_CONNECT - if parsed_response.root.attributes["version"] != "1.0": return ERR_CANT_CONNECT + if parsed_response._root.node_name != "stream:stream": return ERR_CANT_CONNECT + if parsed_response._root.attributes["from"] != connection.domain: return ERR_CANT_CONNECT + if "to" in parsed_response._root.attributes: + if parsed_response._root.attributes["to"] != connection.bare_jid: return ERR_CANT_CONNECT + if parsed_response._root.attributes["version"] != "1.0": return ERR_CANT_CONNECT # todo: Assert for namespaces. - if parsed_response.root.children.size() == 0: + if parsed_response._root.children.size() == 0: response = _wait_blocking_for_utf8_data(connection.stream) if response == null: return ERR_CONNECTION_ERROR if parsed_response.parse_a_bit(response): return ERR_CONNECTION_ERROR - var features = parsed_response.root.take_named_child_element("stream:features") + var features = parsed_response._root.take_named_child_element("stream:features") if features == null: return ERR_CANT_CONNECT var mechanisms = features.take_named_child_element("mechanisms") if mechanisms == null or mechanisms.attributes["xmlns"] != "urn:ietf:params:xml:ns:xmpp-sasl": @@ -278,7 +342,7 @@ func _negotiate_sasl(connection: Connection, password: String) -> int: if response == null: return ERR_CONNECTION_ERROR if parsed_response.parse_a_bit(response): return ERR_CONNECTION_ERROR - var success = parsed_response.root.take_named_child_element("success") + var success = parsed_response._root.take_named_child_element("success") if success == null or success.attributes["xmlns"] != "urn:ietf:params:xml:ns:xmpp-sasl": return ERR_CANT_CONNECT @@ -300,19 +364,19 @@ func _bind_resource(connection: Connection, resource: String = "tochie-facade") ## Check that server response corresponds to what we ask for. # todo: For conformity client must send closing stream tag on error. - if parsed_response.root.node_name != "stream:stream": return ERR_CANT_CONNECT - if parsed_response.root.attributes["from"] != connection.domain: return ERR_CANT_CONNECT - if "to" in parsed_response.root.attributes: - if parsed_response.root.attributes["to"] != connection.bare_jid: return ERR_CANT_CONNECT - if parsed_response.root.attributes["version"] != "1.0": return ERR_CANT_CONNECT + if parsed_response._root.node_name != "stream:stream": return ERR_CANT_CONNECT + if parsed_response._root.attributes["from"] != connection.domain: return ERR_CANT_CONNECT + if "to" in parsed_response._root.attributes: + if parsed_response._root.attributes["to"] != connection.bare_jid: return ERR_CANT_CONNECT + if parsed_response._root.attributes["version"] != "1.0": return ERR_CANT_CONNECT # todo: Assert for namespaces. - if parsed_response.root.children.size() == 0: + if parsed_response._root.children.size() == 0: response = _wait_blocking_for_utf8_data(connection.stream) if response == null: return ERR_CONNECTION_ERROR if parsed_response.parse_a_bit(response): return ERR_CONNECTION_ERROR - var features = parsed_response.root.take_named_child_element("stream:features") + var features = parsed_response._root.take_named_child_element("stream:features") if features == null: return ERR_CANT_CONNECT var bind = features.take_named_child_element("bind") if bind == null or bind.attributes["xmlns"] != "urn:ietf:params:xml:ns:xmpp-bind": @@ -336,7 +400,7 @@ func _bind_resource(connection: Connection, resource: String = "tochie-facade") ## Step 16: Server accepts submitted resourcepart and informs client of ## successful resource binding: - var iq = parsed_response.root.take_named_child_element("iq") + var iq = parsed_response._root.take_named_child_element("iq") if iq.attributes["id"] != String(iq_id): push_error("iq id mismatch, expected: " + String(iq_id) + " but got: " + iq.attributes["id"]) return ERR_CANT_CONNECT diff --git a/scenes/Xml.gd b/scenes/Xml.gd index 945e493..712caea 100644 --- a/scenes/Xml.gd +++ b/scenes/Xml.gd @@ -39,35 +39,49 @@ class XmlText extends XmlNode: # todo: Save namespaces. # todo: Ability to parse from partially arrived data, with saving for future resume. class Parser extends Reference: - var root: XmlElement = null + var _root: XmlElement = null var _element_stack: Array # of XmlElement + var _pending: PoolByteArray + + func take_root() -> XmlElement: + var result := self._root + assert(result != null) + self._root = null + return result ## Returns true if root element is closed, otherwise more tags are expected to come later. func parse_a_bit(data): # -> bool or Error: var error: int var parser := XMLParser.new() if data is String: data = data.to_utf8() - error = parser.open_buffer(data) + if data == null: data = PoolByteArray() + var total = self._pending + data + if total.size() == 0: + return false + + error = parser.open_buffer(total) if error != OK: push_error("Error opening a buffer for XML parsing") return error + self._pending = PoolByteArray() + while parser.read() == OK: if parser.get_node_type() == XMLParser.NODE_ELEMENT: var element := XmlElement.new() - - if self.root != null: - self._element_stack[self._element_stack.size() - 1].children.push_back(element) - else: self.root = element - - if not parser.is_empty(): - self._element_stack.push_back(element) - element.node_name = parser.get_node_name() var attribute_count := parser.get_attribute_count() for idx in range(attribute_count): element.attributes[parser.get_attribute_name(idx)] = parser.get_attribute_value(idx) + if self._root != null: + self._element_stack[self._element_stack.size() - 1].children.push_back(element) + else: self._root = element + + # todo: Handle _root empty element. + if not parser.is_empty(): + self._element_stack.push_back(element) + elif parser.get_node_type() == XMLParser.NODE_ELEMENT_END: if self._element_stack.size() == 0: push_error("Closing element closes nothing" % [parser.get_node_name()]) @@ -78,6 +92,12 @@ class Parser extends Reference: push_error("Element <%s> closes sooner than <%s>" % [parser.get_node_name(), popped.node_name]) return ERR_PARSE_ERROR + if self._element_stack.size() == 0: + # todo: Handle partial data. + if parser.read() == OK: + self._pending = total.subarray(parser.get_node_offset(), -1) + return true + elif parser.get_node_type() == XMLParser.NODE_TEXT: if self._element_stack.size() == 0: push_error("Text node should be a child of an element")