60 lines
1.3 KiB
Lua
60 lines
1.3 KiB
Lua
---@class SSE
|
|
---@field active boolean if the stream is not active, you should stop the loop.
|
|
---@field private _queue table
|
|
local sse = {}
|
|
|
|
---Construct a new SSE object
|
|
---@return SSE
|
|
function sse:new()
|
|
ngx.header.content_type = "text/event-stream"
|
|
ngx.header.cache_control = "no-cache"
|
|
ngx.header.connection = "keep-alive"
|
|
ngx.status = ngx.HTTP_OK
|
|
ngx.flush(true)
|
|
|
|
local obj = {
|
|
active = true,
|
|
_queue = {},
|
|
}
|
|
|
|
ngx.on_abort(function()
|
|
obj.active = false
|
|
end)
|
|
|
|
return setmetatable(obj, {__index = sse})
|
|
end
|
|
|
|
---add data to the stream, writing on the next dispatch.
|
|
---if `event` is given, it will be the key.
|
|
---@param data string
|
|
---@param event? string
|
|
---@return boolean status
|
|
function sse:enqueue(data, event)
|
|
if not self.active then return false end
|
|
table.insert(self._queue, {
|
|
data = data,
|
|
event = event,
|
|
})
|
|
return true
|
|
end
|
|
|
|
---send all events since the last dispatch and flush the queue.
|
|
---call this every iteration of the loop.
|
|
function sse:dispatch()
|
|
while #self._queue > 0 do
|
|
local msg = table.remove(self._queue, 1)
|
|
if msg.event then
|
|
ngx.print("event: " .. msg.event .. "\n")
|
|
end
|
|
ngx.print("data: " .. msg.data .. "\n\n")
|
|
end
|
|
ngx.flush(true)
|
|
end
|
|
|
|
---close the stream.
|
|
function sse:close()
|
|
self.active = false
|
|
end
|
|
|
|
return sse
|