tochie-facade/scenes/Connections.gd

406 lines
17 KiB
GDScript3
Raw Normal View History

2023-08-26 15:16:36 +00:00
extends Node
class_name Connections
# todo: Settle on whether connection should send Sums.Result or receiver should check stanza errors by itself.
2023-08-26 15:16:36 +00:00
class Connection extends Reference:
var stream: StreamPeer
var identity: String
var domain: String
var bare_jid: String
var jid: String
2023-08-26 15:23:27 +00:00
var _id_counter: int = 0
var _pending_iqs: Dictionary # of id to GDScriptFunctionState
var _presence_sinks: Dictionary # of Jid to [[WeakRef, String]]
var _xml_parser := Xml.Parser.new()
# todo: Route signals to particular receivers, based on 'from' or 'to'
# warning-ignore:unused_signal
signal presence_received(presence)
# warning-ignore:unused_signal
signal message_received(message)
# warning-ignore:unused_signal
signal iq_received(iq)
2023-08-26 15:16:36 +00:00
# todo: Separate incremental hash to its own class.
2023-08-26 15:16:36 +00:00
func generate_id() -> int:
2023-08-26 15:23:27 +00:00
self._id_counter += 1
return hash(self._id_counter)
2023-08-26 15:16:36 +00:00
func promise_iq(to: String, type: String, payload: String, capture: GDScriptFunctionState) -> Sums.Promise:
assert(type in ["get", "set"])
var id := self.generate_id()
if self.stream.put_data(
"""<iq from='{from}' id='{id}' to='{to}' type='{type}'>{payload}</iq>""".format({
"from": self.jid.xml_escape(),
"id": String(id).xml_escape(),
"to": to.xml_escape(),
"type": type,
"payload": payload
}).to_utf8()) != OK:
return Sums.Promise.make_error(ERR_CONNECTION_ERROR)
assert(not id in self._pending_iqs)
self._pending_iqs[id] = capture
return Sums.Promise.from(capture)
func push_presence(to: String, type, payload: String) -> Sums.Result:
assert(type == null or type is String)
var message = Stanza.form_presence(
String(self.generate_id()),
self.jid, to, type, payload)
if self.stream.put_data(message.to_utf8()) != OK:
return Sums.Result.make_error(ERR_CONNECTION_ERROR)
return Sums.Result.make_value(null)
func presence_sink(p_base_jid: String, p_sink: Object, p_signal: String) -> void:
self._presence_sinks[p_base_jid] = \
self._presence_sinks.get(p_base_jid, []) + [[weakref(p_sink), p_signal]]
## Registry of connections used for poking of pending iqs.
var _connections: Array # of WeakRef to Connection
func _ready():
# todo: Some better interval?
if get_tree().connect("physics_frame", self, "_process_connections") != OK:
assert("Bruh")
# todo: Collapse Result inside Promise.
func _process_connections() -> void:
var to_remove := PoolIntArray()
for idx in range(_connections.size()):
var connection := _connections[idx].get_ref() as Connection
if connection == null:
to_remove.push_back(idx)
continue
var response = _wait_blocking_for_utf8_data(connection.stream, 0)
if response == null:
continue
# todo: Move Result error into closure on connection failure?
while connection._xml_parser.parse_a_bit(response):
response = null
var stanza := connection._xml_parser.take_root()
print(stanza.as_string())
if stanza.name == "iq":
if "to" in stanza.attributes and stanza.attributes["to"] != connection.jid:
# todo: Server errors should not be raised in client.
push_error("Stanza is not addressed to assigned jid")
if stanza.attributes["type"] in ["result", "error"]:
var id := int(stanza.attributes["id"]) # todo: Use strings directly instead?
connection._pending_iqs[id].resume(Sums.Result.make_value(stanza))
var was_present := connection._pending_iqs.erase(id)
assert(was_present)
elif stanza.attributes["type"] in ["set", "get"]:
# todo: Emit in any way?
connection.emit_signal("iq_received", stanza)
elif stanza.name == "message":
connection.emit_signal("message_received", stanza)
elif stanza.name == "presence":
connection.emit_signal("presence_received", stanza)
for base_jid in connection._presence_sinks.keys():
if not stanza.attributes["from"].begins_with(base_jid):
continue
for to_emit in connection._presence_sinks[base_jid]:
to_emit[0].emit_signal(to_emit[1], stanza)
## Collect dropped connections.
for idx in range(to_remove.size() - 1, 0, -1):
_connections.remove(to_remove[idx])
func establish_new_connection(domain: String, identity: String, password: String):
2023-08-26 15:16:36 +00:00
var stream := StreamPeerTCP.new()
if stream.connect_to_host(domain, 5222) != OK:
push_error("Cannot establish connection to " + domain)
2023-08-26 15:16:36 +00:00
return null
while stream.get_status() == StreamPeerTCP.STATUS_CONNECTING:
pass
if stream.get_status() == StreamPeerTCP.STATUS_ERROR:
push_error("Cannot establish connection to " + domain)
2023-08-26 15:16:36 +00:00
return null
var result := Connection.new()
result.stream = stream
result.identity = identity
result.domain = domain
result.bare_jid = identity + '@' + domain
if _negotiate_connection(result, password) != OK:
return null
_connections.push_back(weakref(result))
2023-08-26 15:16:36 +00:00
return result
# todo: Make it async
func _negotiate_connection(connection: Connection, password: String) -> int:
## See [9. Detailed Examples] of https://datatracker.ietf.org/doc/rfc6120/
var error: int = OK
error = _negotiate_tls(connection)
if error != OK: return error
error = _negotiate_sasl(connection, password)
if error != OK: return error
error = _bind_resource(connection)
if error != OK: return error
return OK
func _form_initial_message(domain: String, bare_jid: String) -> String:
# todo: XML builder interface, writing by hand is extremely error prone.
return """<?xml version='1.0'?><stream:stream
from='{bare_jid}' to='{domain}' version='1.0'
xml:lang='en' xmlns='jabber:client'
xmlns:stream='http://etherx.jabber.org/streams'>""".format({
"bare_jid": bare_jid.xml_escape(), "domain": domain.xml_escape()})
func _negotiate_tls(connection: Connection) -> int:
var error: int = OK
var response
var parsed_response = Xml.Parser.new()
## Step 1: Client initiates stream to server:
if connection.stream.put_data(_form_initial_message(
connection.domain, connection.bare_jid).to_utf8()) != OK:
return ERR_CONNECTION_ERROR
## Step 2: Server responds by sending a response stream header to client:
response = _wait_blocking_for_utf8_data(connection.stream)
if response == null: return ERR_CONNECTION_ERROR
if parsed_response.parse_a_bit(response): return ERR_CONNECTION_ERROR
# If <stream:stream> is closed, - connection is closed too.
## Check that server response corresponds to what we ask for.
# todo: For conformity client must send closing stream tag on error.
if parsed_response._root.name != "stream:stream": return ERR_CANT_CONNECT
if parsed_response._root.attributes["from"] != connection.domain: return ERR_CANT_CONNECT
if "to" in parsed_response._root.attributes:
if parsed_response._root.attributes["to"] != connection.bare_jid: return ERR_CANT_CONNECT
if parsed_response._root.attributes["version"] != "1.0": return ERR_CANT_CONNECT
2023-08-26 15:16:36 +00:00
# todo: Assert for namespaces.
## Step 3: Server sends stream features to client (only the STARTTLS
## extension at this point, which is mandatory-to-negotiate):
if parsed_response._root.children.size() == 0:
2023-08-26 15:16:36 +00:00
response = _wait_blocking_for_utf8_data(connection.stream)
if response == null: return ERR_CONNECTION_ERROR
if parsed_response.parse_a_bit(response): return ERR_CONNECTION_ERROR
var features = parsed_response._root.take_named_child_element("stream:features")
2023-08-26 15:16:36 +00:00
if features == null: return ERR_CANT_CONNECT
var starttls = features.take_named_child_element("starttls")
if starttls == null or starttls.attributes["xmlns"] != "urn:ietf:params:xml:ns:xmpp-tls":
push_error("Connection declined, TLS is not a listed feature")
return ERR_CANT_CONNECT
## Step 4: Client sends STARTTLS command to server:
if connection.stream.put_data("<starttls xmlns='urn:ietf:params:xml:ns:xmpp-tls'/>".to_utf8()) != OK:
return ERR_CONNECTION_ERROR
## Step 5: Server informs client that it is allowed to proceed:
response = _wait_blocking_for_utf8_data(connection.stream)
if response == null: return ERR_CONNECTION_ERROR
if parsed_response.parse_a_bit(response): return ERR_CONNECTION_ERROR
# todo: Handle failure case.
var proceed = parsed_response._root.take_named_child_element("proceed")
2023-08-26 15:16:36 +00:00
if proceed == null: return ERR_CANT_CONNECT
if proceed.attributes["xmlns"] != "urn:ietf:params:xml:ns:xmpp-tls": return ERR_CANT_CONNECT
## Step 6: Client and server attempt to complete TLS negotiation over
## the existing TCP connection (see [TLS] for details).
var tls_stream := StreamPeerSSL.new()
error = tls_stream.connect_to_stream(connection.stream, true, connection.domain)
if error != OK:
push_error("Cannot establish client->server TLS connection")
return error
connection.stream = tls_stream
## Step 7 (alt): If TLS negotiation is unsuccessful, server closes TCP
## connection (thus, the stream negotiation process ends unsuccessfully
## and the parties do not move on to the next step):
return OK
func _negotiate_sasl(connection: Connection, password: String) -> int:
var response
var parsed_response = Xml.Parser.new()
## Step 7: If TLS negotiation is successful, client initiates a new
## stream to server over the TLS-protected TCP connection:
if connection.stream.put_data(_form_initial_message(
connection.domain, connection.bare_jid).to_utf8()) != OK:
return ERR_CONNECTION_ERROR
## Step 8: Server responds by sending a stream header to client along
## with any available stream features:
response = _wait_blocking_for_utf8_data(connection.stream)
if response == null: return ERR_CONNECTION_ERROR
if parsed_response.parse_a_bit(response): return ERR_CONNECTION_ERROR
## Check that server response corresponds to what we ask for.
# todo: For conformity client must send closing stream tag on error.
if parsed_response._root.name != "stream:stream": return ERR_CANT_CONNECT
if parsed_response._root.attributes["from"] != connection.domain: return ERR_CANT_CONNECT
if "to" in parsed_response._root.attributes:
if parsed_response._root.attributes["to"] != connection.bare_jid: return ERR_CANT_CONNECT
if parsed_response._root.attributes["version"] != "1.0": return ERR_CANT_CONNECT
2023-08-26 15:16:36 +00:00
# todo: Assert for namespaces.
if parsed_response._root.children.size() == 0:
2023-08-26 15:16:36 +00:00
response = _wait_blocking_for_utf8_data(connection.stream)
if response == null: return ERR_CONNECTION_ERROR
if parsed_response.parse_a_bit(response): return ERR_CONNECTION_ERROR
var features = parsed_response._root.take_named_child_element("stream:features")
2023-08-26 15:16:36 +00:00
if features == null: return ERR_CANT_CONNECT
var mechanisms = features.take_named_child_element("mechanisms")
if mechanisms == null or mechanisms.attributes["xmlns"] != "urn:ietf:params:xml:ns:xmpp-sasl":
push_error("Connection declined, mechanisms is not a listed feature")
return ERR_CANT_CONNECT
## We only support PLAIN mechanism as it's sufficient over TLS.
var plain_found := false
for machanism in mechanisms.children:
if machanism.is_element() and machanism.name == "mechanism":
2023-08-26 15:16:36 +00:00
if machanism.children[0].data == "PLAIN":
plain_found = true
break
if not plain_found:
push_error("Connection declined, mechanisms don't include PLAIN method")
return ERR_CANT_CONNECT
## Step 9: Client selects an authentication mechanism, including initial response data:
## https://datatracker.ietf.org/doc/html/rfc4616
if connection.stream.put_data((
"<auth xmlns='urn:ietf:params:xml:ns:xmpp-sasl' mechanism='PLAIN'>%s</auth>" %
[_plain_auth_message(connection.identity, password)]).to_utf8()) != OK:
return ERR_CONNECTION_ERROR
## (Skipped steps related to another auth machanism)
## Step 12: Server informs client of success, including additional data
## with success:
response = _wait_blocking_for_utf8_data(connection.stream)
if response == null: return ERR_CONNECTION_ERROR
if parsed_response.parse_a_bit(response): return ERR_CONNECTION_ERROR
var success = parsed_response._root.take_named_child_element("success")
2023-08-26 15:16:36 +00:00
if success == null or success.attributes["xmlns"] != "urn:ietf:params:xml:ns:xmpp-sasl":
return ERR_CANT_CONNECT
return OK
func _bind_resource(connection: Connection, resource: String = "tochie-facade") -> int:
var response
var parsed_response = Xml.Parser.new()
## Step 13: Client initiates a new stream to server:
if connection.stream.put_data(_form_initial_message(
connection.domain, connection.bare_jid).to_utf8()) != OK:
return ERR_CONNECTION_ERROR
## Step 14: Server responds by sending a stream header to client along
response = _wait_blocking_for_utf8_data(connection.stream)
if response == null: return ERR_CONNECTION_ERROR
if parsed_response.parse_a_bit(response): return ERR_CONNECTION_ERROR
## Check that server response corresponds to what we ask for.
# todo: For conformity client must send closing stream tag on error.
if parsed_response._root.name != "stream:stream": return ERR_CANT_CONNECT
if parsed_response._root.attributes["from"] != connection.domain: return ERR_CANT_CONNECT
if "to" in parsed_response._root.attributes:
if parsed_response._root.attributes["to"] != connection.bare_jid: return ERR_CANT_CONNECT
if parsed_response._root.attributes["version"] != "1.0": return ERR_CANT_CONNECT
2023-08-26 15:16:36 +00:00
# todo: Assert for namespaces.
if parsed_response._root.children.size() == 0:
2023-08-26 15:16:36 +00:00
response = _wait_blocking_for_utf8_data(connection.stream)
if response == null: return ERR_CONNECTION_ERROR
if parsed_response.parse_a_bit(response): return ERR_CONNECTION_ERROR
var features = parsed_response._root.take_named_child_element("stream:features")
2023-08-26 15:16:36 +00:00
if features == null: return ERR_CANT_CONNECT
var bind = features.take_named_child_element("bind")
if bind == null or bind.attributes["xmlns"] != "urn:ietf:params:xml:ns:xmpp-bind":
push_error("Connection declined, bind is not a listed feature")
return ERR_CANT_CONNECT
## Step 15: Client binds a resource:
var iq_id := connection.generate_id()
if connection.stream.put_data("""<iq id='{id}' type='set'>
<bind xmlns='urn:ietf:params:xml:ns:xmpp-bind'>
<resource>{resource}</resource>
</bind></iq>""".format({
"id": iq_id,
2023-08-26 15:23:27 +00:00
"resource": resource.xml_escape()
2023-08-26 15:16:36 +00:00
}).to_utf8()) != OK:
return ERR_CONNECTION_ERROR
response = _wait_blocking_for_utf8_data(connection.stream)
if response == null: return ERR_CONNECTION_ERROR
if parsed_response.parse_a_bit(response): return ERR_CONNECTION_ERROR
## Step 16: Server accepts submitted resourcepart and informs client of
## successful resource binding:
var iq = parsed_response._root.take_named_child_element("iq")
2023-08-26 15:16:36 +00:00
if iq.attributes["id"] != String(iq_id):
push_error("iq id mismatch, expected: " + String(iq_id) + " but got: " + iq.attributes["id"])
return ERR_CANT_CONNECT
bind = iq.take_named_child_element("bind")
if bind == null or bind.attributes["xmlns"] != "urn:ietf:params:xml:ns:xmpp-bind":
return ERR_CANT_CONNECT
var jid = bind.take_named_child_element("jid")
if jid == null: return ERR_CANT_CONNECT
connection.jid = jid.children[0].data
return OK
func _plain_auth_message(identity: String, password: String) -> String:
var raw = PoolByteArray([0])
raw.append_array(identity.to_utf8() + PoolByteArray([0]))
raw.append_array(password.to_utf8())
return Marshalls.raw_to_base64(raw)
func _wait_blocking_for_utf8_data(stream: StreamPeer, timeout_ms: int = 1000): # -> String or null:
var enter_time := OS.get_system_time_msecs()
if stream is StreamPeerSSL: stream.poll()
var available_bytes := stream.get_available_bytes()
while available_bytes == 0:
if OS.get_system_time_msecs() - enter_time >= timeout_ms:
return null
if stream is StreamPeerSSL: stream.poll()
available_bytes = stream.get_available_bytes()
return stream.get_utf8_string(available_bytes)