Skip to content

Commit

Permalink
Add support for streaming async events via HTTP serverside events.
Browse files Browse the repository at this point in the history
- `GET /api/events?type=error` opens a long-lived HTTP server side
  event connection that streams error messages.
- async (typically SMTP) errors are now streamed to the frontend and
  disaplyed as an error toast on the admin UI.
  • Loading branch information
knadh committed May 27, 2023
1 parent d359ad2 commit 0b2da4c
Show file tree
Hide file tree
Showing 10 changed files with 193 additions and 12 deletions.
2 changes: 1 addition & 1 deletion cmd/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ func handleReloadApp(c echo.Context) error {
app := c.Get("app").(*App)
go func() {
<-time.After(time.Millisecond * 500)
app.sigChan <- syscall.SIGHUP
app.chReload <- syscall.SIGHUP
}()
return c.JSON(http.StatusOK, okResp{true})
}
54 changes: 54 additions & 0 deletions cmd/events.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package main

import (
"encoding/json"
"fmt"
"log"
"time"

"github.com/labstack/echo/v4"
)

// handleEventStream serves an endpoint that never closes and pushes a
// live event stream (text/event-stream) such as a error messages.
func handleEventStream(c echo.Context) error {
var (
app = c.Get("app").(*App)
)

h := c.Response().Header()
h.Set(echo.HeaderContentType, "text/event-stream")
h.Set(echo.HeaderCacheControl, "no-store")
h.Set(echo.HeaderConnection, "keep-alive")

// Subscribe to the event stream with a random ID.
id := fmt.Sprintf("api:%v", time.Now().UnixNano())
sub, err := app.events.Subscribe(id)
if err != nil {
log.Fatalf("error subscribing to events: %v", err)
}

ctx := c.Request().Context()
for {
select {
case e := <-sub:
b, err := json.Marshal(e)
if err != nil {
app.log.Printf("error marshalling event: %v", err)
continue
}

fmt.Printf("data: %s\n\n", b)

c.Response().Write([]byte(fmt.Sprintf("retry: 3000\ndata: %s\n\n", b)))
c.Response().Flush()

case <-ctx.Done():
// On HTTP connection close, unsubscribe.
app.events.Unsubscribe(id)
return nil
}
}

return nil
}
2 changes: 2 additions & 0 deletions cmd/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,8 @@ func initHTTPHandlers(e *echo.Echo, app *App) {

g.POST("/api/tx", handleSendTxMessage)

g.GET("/api/events", handleEventStream)

if app.constants.BounceWebhooksEnabled {
// Private authenticated bounce endpoint.
g.POST("/webhooks/bounce", handleBounceWebhook)
Expand Down
16 changes: 10 additions & 6 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/knadh/listmonk/internal/buflog"
"github.com/knadh/listmonk/internal/captcha"
"github.com/knadh/listmonk/internal/core"
"github.com/knadh/listmonk/internal/events"
"github.com/knadh/listmonk/internal/i18n"
"github.com/knadh/listmonk/internal/manager"
"github.com/knadh/listmonk/internal/media"
Expand Down Expand Up @@ -48,12 +49,13 @@ type App struct {
bounce *bounce.Manager
paginator *paginator.Paginator
captcha *captcha.Captcha
events *events.Events
notifTpls *notifTpls
log *log.Logger
bufLog *buflog.BufLog

// Channel for passing reload signals.
sigChan chan os.Signal
chReload chan os.Signal

// Global variable that stores the state indicating that a restart is required
// after a settings update.
Expand All @@ -66,8 +68,9 @@ type App struct {

var (
// Buffered log writer for storing N lines of log entries for the UI.
bufLog = buflog.New(5000)
lo = log.New(io.MultiWriter(os.Stdout, bufLog), "",
evStream = events.New()
bufLog = buflog.New(5000)
lo = log.New(io.MultiWriter(os.Stdout, bufLog, evStream.ErrWriter()), "",
log.Ldate|log.Ltime|log.Lshortfile)

ko = koanf.New(".")
Expand Down Expand Up @@ -170,6 +173,7 @@ func main() {
log: lo,
bufLog: bufLog,
captcha: initCaptcha(),
events: evStream,

paginator: paginator.New(paginator.Opt{
DefaultPerPage: 20,
Expand Down Expand Up @@ -240,11 +244,11 @@ func main() {
// Wait for the reload signal with a callback to gracefully shut down resources.
// The `wait` channel is passed to awaitReload to wait for the callback to finish
// within N seconds, or do a force reload.
app.sigChan = make(chan os.Signal)
signal.Notify(app.sigChan, syscall.SIGHUP)
app.chReload = make(chan os.Signal)
signal.Notify(app.chReload, syscall.SIGHUP)

closerWait := make(chan bool)
<-awaitReload(app.sigChan, closerWait, func() {
<-awaitReload(app.chReload, closerWait, func() {
// Stop the HTTP server.
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel()
Expand Down
2 changes: 1 addition & 1 deletion cmd/settings.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ func handleUpdateSettings(c echo.Context) error {
// No running campaigns. Reload the app.
go func() {
<-time.After(time.Millisecond * 500)
app.sigChan <- syscall.SIGHUP
app.chReload <- syscall.SIGHUP
}()

return c.JSON(http.StatusOK, okResp{true})
Expand Down
20 changes: 20 additions & 0 deletions frontend/src/App.vue
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,24 @@ export default Vue.extend({
};
http.send();
},
listenEvents() {
const reMatchLog = /(.+?)\.go:\d+:(.+?)$/im;
const evtSource = new EventSource(uris.errorEvents, { withCredentials: true });
let numEv = 0;
evtSource.onmessage = (e) => {
if (numEv > 50) {
return;
}
numEv += 1;
const d = JSON.parse(e.data);
if (d && d.type === 'error') {
const msg = reMatchLog.exec(d.message.trim());
this.$utils.toast(msg[2], 'is-danger', null, true);
}
};
},
},
computed: {
Expand All @@ -155,6 +173,8 @@ export default Vue.extend({
window.addEventListener('resize', () => {
this.windowWidth = window.innerWidth;
});
this.listenEvents();
},
});
</script>
Expand Down
1 change: 1 addition & 0 deletions frontend/src/constants.js
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ export const uris = Object.freeze({
previewTemplate: '/api/templates/:id/preview',
previewRawTemplate: '/api/templates/preview',
exportSubscribers: '/api/subscribers/export',
errorEvents: '/api/events?type=error',
base: `${baseURL}/static`,
root: rootURL,
static: `${baseURL}/static`,
Expand Down
4 changes: 2 additions & 2 deletions frontend/src/utils.js
Original file line number Diff line number Diff line change
Expand Up @@ -160,11 +160,11 @@ export default class Utils {
});
};

toast = (msg, typ, duration) => {
toast = (msg, typ, duration, queue) => {
Toast.open({
message: this.escapeHTML(msg),
type: !typ ? 'is-success' : typ,
queue: false,
queue,
duration: duration || 3000,
position: 'is-top',
pauseOnHover: true,
Expand Down
100 changes: 100 additions & 0 deletions internal/events/events.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
// Package events implements a simple event broadcasting mechanism
// for usage in broadcasting error messages, postbacks etc. various
// channels.
package events

import (
"bytes"
"fmt"
"io"
"sync"
)

const (
TypeError = "error"
)

// Event represents a single event in the system.
type Event struct {
ID string `json:"id"`
Type string `json:"type"`
Message string `json:"message"`
Data interface{} `json:"data"`
Channels []string `json:"-"`
}

type Events struct {
subs map[string]chan Event
sync.RWMutex
}

// New returns a new instance of Events.
func New() *Events {
return &Events{
subs: make(map[string]chan Event),
}
}

// Subscribe returns a channel to which the given event `types` are streamed.
// id is the unique identifier for the caller. A caller can only register
// for subscription once.
func (ev *Events) Subscribe(id string) (chan Event, error) {
ev.Lock()
defer ev.Unlock()

if ch, ok := ev.subs[id]; ok {
return ch, nil
}

ch := make(chan Event, 100)
ev.subs[id] = ch

return ch, nil
}

// Unsubscribe unsubscribes a subscriber (obviously).
func (ev *Events) Unsubscribe(id string) {
ev.Lock()
defer ev.Unlock()
delete(ev.subs, id)
}

// Publish publishes an event to all subscribers.
func (ev *Events) Publish(e Event) error {
ev.Lock()
defer ev.Unlock()

for _, ch := range ev.subs {
select {
case ch <- e:
default:
return fmt.Errorf("event queue full for type: %s", e.Type)
}
}

return nil
}

// This implements an io.Writer specifically for receiving error messages
// mirrored (io.MultiWriter) from error log writing.
type wri struct {
ev *Events
}

func (w *wri) Write(b []byte) (n int, err error) {
// Only broadcast error messages.
if !bytes.Contains(b, []byte("error")) {
return 0, nil
}

w.ev.Publish(Event{
Type: TypeError,
Message: string(b),
})

return len(b), nil
}

func (ev *Events) ErrWriter() io.Writer {
return &wri{ev: ev}
}
4 changes: 2 additions & 2 deletions internal/manager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -395,8 +395,8 @@ func (m *Manager) worker() {
out.Headers = h

if err := m.messengers[msg.Campaign.Messenger].Push(out); err != nil {
m.logger.Printf("error sending message in campaign %s: subscriber %s: %v",
msg.Campaign.Name, msg.Subscriber.UUID, err)
m.logger.Printf("error sending message in campaign %s: subscriber %d: %v",
msg.Campaign.Name, msg.Subscriber.ID, err)

select {
case m.campMsgErrorQueue <- msgError{camp: msg.Campaign, err: err}:
Expand Down

0 comments on commit 0b2da4c

Please sign in to comment.