From c844e7932e91c0b6f4006ef6ed777b427b122b1c Mon Sep 17 00:00:00 2001 From: veclav talica Date: Sat, 26 Aug 2023 22:59:43 +0500 Subject: [PATCH] async communication stub, service discorvery via it --- scenes/App.gd | 15 ++++++-- scenes/Connections.gd | 83 +++++++++++++++++++++++++++++++++++++++++++ scenes/Xml.gd | 11 ++++-- 3 files changed, 104 insertions(+), 5 deletions(-) diff --git a/scenes/App.gd b/scenes/App.gd index 9c65ba4..7029249 100644 --- a/scenes/App.gd +++ b/scenes/App.gd @@ -1,6 +1,17 @@ extends Node +var _connection + +func _service_discovery(): + var iq := yield() as Xml.XmlElement + print(iq.as_string()) + func _ready(): - var connection = $Connections.establish_new_connection("poto.cafe", "veclavtalica", "-") - if connection == null: + _connection = $Connections.establish_new_connection("poto.cafe", "veclavtalica", "-") + if _connection == null: + push_error("Connection failed") + + if _connection.push_iq(_connection.domain, "get", + "", + _service_discovery()) != OK: push_error("Connection failed") diff --git a/scenes/Connections.gd b/scenes/Connections.gd index 93b3326..f635253 100644 --- a/scenes/Connections.gd +++ b/scenes/Connections.gd @@ -7,11 +7,92 @@ class Connection extends Reference: var bare_jid: String var jid: String var _id_counter: int = 0 + var _pending_iqs: Dictionary # of id to GDScriptFunctionState + var _xml_parser := Xml.Parser.new() + +# warning-ignore:unused_signal + signal presence_received(presence) +# warning-ignore:unused_signal + signal message_received(message) +# warning-ignore:unused_signal + signal iq_received(iq) func generate_id() -> int: self._id_counter += 1 return hash(self._id_counter) + func push_iq(to: String, action: String, payload: String, capture: GDScriptFunctionState) -> int: # Error + assert(action in ["get", "set"]) + + var id := self.generate_id() + if self.stream.put_data( + """{payload}""".format({ + "from": self.jid.xml_escape(), + "id": id, + "to": to.xml_escape(), + "action": action, + "payload": payload + }).to_utf8()) != OK: + return ERR_CONNECTION_ERROR + + assert(not id in self._pending_iqs) + self._pending_iqs[id] = capture + + return OK + +## Registry of connections used for poking of pending iqs. +var _connections: Array # of WeakRef to Connection + +func _ready(): + # todo: Some better interval? + if get_tree().connect("physics_frame", self, "_process_connections") != OK: + assert("Bruh") + +func _process_connections() -> void: + var to_remove := PoolIntArray() + + for idx in range(_connections.size()): + var connection := _connections[idx].get_ref() as Connection + if connection == null: + to_remove.push_back(idx) + continue + + var response = _wait_blocking_for_utf8_data(connection.stream, 0) + if response == null: + continue + + if connection._xml_parser.parse_a_bit(response): + var stanza := connection._xml_parser.root + if stanza.node_name == "iq": + if "to" in stanza.attributes and stanza.attributes["to"] != connection.jid: + push_error("Stanza is not addressed to assigned jid") + + if stanza.attributes["type"] in ["result", "error"]: + var id := int(stanza.attributes["id"]) # todo: Use strings directly instead? + var result = connection._pending_iqs[id].resume(stanza) + if result is GDScriptFunctionState: + connection._pending_iqs[id] = result + elif result != null: + assert("Ignored result of iq subroutine: " + result) + else: + var was_present := connection._pending_iqs.erase(id) + assert(was_present) + + elif stanza.attributes["type"] in ["set", "get"]: + connection.emit_signal("iq_received", stanza) + + elif stanza.node_name == "message": + connection.emit_signal("message_received", stanza) + + elif stanza.node_name == "presence": + connection.emit_signal("presence_received", stanza) + + connection._xml_parser = Xml.Parser.new() + + ## Collect dropped connections. + for idx in range(to_remove.size() - 1, 0, -1): + _connections.remove(to_remove[idx]) + func establish_new_connection(domain: String, identity: String, password: String) -> Connection: var stream := StreamPeerTCP.new() if stream.connect_to_host(domain, 5222) != OK: @@ -34,6 +115,8 @@ func establish_new_connection(domain: String, identity: String, password: String if _negotiate_connection(result, password) != OK: return null + _connections.push_back(weakref(result)) + return result # todo: Make it async diff --git a/scenes/Xml.gd b/scenes/Xml.gd index f53a445..945e493 100644 --- a/scenes/Xml.gd +++ b/scenes/Xml.gd @@ -19,7 +19,7 @@ class XmlElement extends XmlNode: return child return null - func as_string(level = 0) -> String: + func as_string(level: int = 0) -> String: var result = '\t'.repeat(level) + "XmlElement \"%s\"" % self.node_name if self.attributes.size() > 0: result += ", Attributes: " + String(self.attributes) @@ -32,11 +32,12 @@ class XmlText extends XmlNode: func is_text() -> bool: return true - func as_string(level = 0) -> String: + func as_string(_level: int = 0) -> String: return "XmlText \"%s\"" % self.data ## Wrapper over XMLParser for TCP stream oriented XML data. # todo: Save namespaces. +# todo: Ability to parse from partially arrived data, with saving for future resume. class Parser extends Reference: var root: XmlElement = null var _element_stack: Array # of XmlElement @@ -86,7 +87,11 @@ class Parser extends Reference: text.data = parser.get_node_data() self._element_stack[self._element_stack.size() - 1].children.push_back(text) + elif parser.get_node_type() == XMLParser.NODE_UNKNOWN: + ## Needed for things like version specification. + pass + else: - push_warning("Node type unimplemented and ignored") + push_error("Node type unimplemented") return self._element_stack.size() == 0