From 14bd930b92a583a91ddc44eeaca0bdea7edd87c7 Mon Sep 17 00:00:00 2001
From: Pascal Bleser
Date: Thu, 27 Nov 2025 18:29:39 +0100
Subject: [PATCH] groupware: JMAP WS push notifications support
---
pkg/jmap/jmap_api_ws.go | 4 +-
pkg/jmap/jmap_client.go | 4 +
pkg/jmap/jmap_error.go | 1 +
pkg/jmap/jmap_http.go | 185 ++++++++++++++----
pkg/jmap/jmap_integration_email_test.go | 56 +++---
pkg/jmap/jmap_integration_test.go | 25 ++-
pkg/jmap/jmap_integration_ws_test.go | 118 +++++++++++
pkg/jmap/jmap_model.go | 6 +-
.../pkg/groupware/groupware_framework.go | 8 +-
9 files changed, 338 insertions(+), 69 deletions(-)
create mode 100644 pkg/jmap/jmap_integration_ws_test.go
diff --git a/pkg/jmap/jmap_api_ws.go b/pkg/jmap/jmap_api_ws.go
index 62cef36ce3..71b8ce8c8f 100644
--- a/pkg/jmap/jmap_api_ws.go
+++ b/pkg/jmap/jmap_api_ws.go
@@ -5,9 +5,9 @@ import (
)
func (j *Client) EnablePush(pushState string, session *Session, _ *log.Logger) Error {
- return nil // TODO
+ panic("not implemented") // TODO implement push
}
func (j *Client) DisablePush(_ *Session, _ *log.Logger) Error {
- return nil // TODO
+ panic("not implemented") // TODO implement push
}
diff --git a/pkg/jmap/jmap_client.go b/pkg/jmap/jmap_client.go
index 118b33481f..f1133eaa2f 100644
--- a/pkg/jmap/jmap_client.go
+++ b/pkg/jmap/jmap_client.go
@@ -38,6 +38,10 @@ func NewClient(session SessionClient, api ApiClient, blob BlobClient, ws WsClien
}
}
+func (j *Client) EnableNotifications(pushState string, sessionProvider func() (*Session, error)) (WsClient, error) {
+ return j.ws.EnableNotifications(pushState, sessionProvider, j)
+}
+
func (j *Client) AddSessionEventListener(listener SessionEventListener) {
j.sessionEventListeners.add(listener)
}
diff --git a/pkg/jmap/jmap_error.go b/pkg/jmap/jmap_error.go
index d700a5f25a..d5d9b5c6d1 100644
--- a/pkg/jmap/jmap_error.go
+++ b/pkg/jmap/jmap_error.go
@@ -36,6 +36,7 @@ const (
JmapErrorWssFailedToSendWebSocketPushDisable
JmapErrorWssFailedToClose
JmapErrorWssFailedToRetrieveSession
+ JmapErrorSocketPushUnsupported
JmapErrorMissingCreatedObject
)
diff --git a/pkg/jmap/jmap_http.go b/pkg/jmap/jmap_http.go
index 630725bcb6..50dc459c6d 100644
--- a/pkg/jmap/jmap_http.go
+++ b/pkg/jmap/jmap_http.go
@@ -40,6 +40,13 @@ const (
logHttpStatus = "status"
logHttpStatusCode = "status-code"
logHttpUrl = "url"
+ logProto = "proto"
+ logProtoJmap = "jmap"
+ logProtoJmapWs = "jmapws"
+ logType = "type"
+ logTypeRequest = "request"
+ logTypeResponse = "response"
+ logTypePush = "push"
)
/*
@@ -55,6 +62,8 @@ type HttpJmapApiClientEventListener interface {
OnFailedRequestWithStatus(endpoint string, status int)
OnResponseBodyReadingError(endpoint string, err error)
OnResponseBodyUnmarshallingError(endpoint string, err error)
+ OnSuccessfulWsRequest(endpoint string, status int)
+ OnFailedWsHandshakeRequestWithStatus(endpoint string, status int)
}
type nullHttpJmapApiClientEventListener struct {
@@ -70,6 +79,10 @@ func (l nullHttpJmapApiClientEventListener) OnResponseBodyReadingError(endpoint
}
func (l nullHttpJmapApiClientEventListener) OnResponseBodyUnmarshallingError(endpoint string, err error) {
}
+func (l nullHttpJmapApiClientEventListener) OnSuccessfulWsRequest(endpoint string, status int) {
+}
+func (l nullHttpJmapApiClientEventListener) OnFailedWsHandshakeRequestWithStatus(endpoint string, status int) {
+}
var _ HttpJmapApiClientEventListener = nullHttpJmapApiClientEventListener{}
@@ -206,7 +219,7 @@ func (h *HttpJmapClient) Command(ctx context.Context, logger *log.Logger, sessio
if logger.Trace().Enabled() {
requestBytes, err := httputil.DumpRequestOut(req, true)
if err == nil {
- logger.Trace().Str(logEndpoint, endpoint).Str("proto", "jmap").Str("type", "request").Msg(string(requestBytes))
+ logger.Trace().Str(logEndpoint, endpoint).Str(logProto, logProtoJmap).Str(logType, logTypeRequest).Msg(string(requestBytes))
}
}
h.auth(session.Username, logger, req)
@@ -221,7 +234,9 @@ func (h *HttpJmapClient) Command(ctx context.Context, logger *log.Logger, sessio
if logger.Trace().Enabled() {
responseBytes, err := httputil.DumpResponse(res, true)
if err == nil {
- logger.Trace().Str(logEndpoint, endpoint).Str("proto", "jmap").Str("type", "response").Msg(string(responseBytes))
+ logger.Trace().Str(logEndpoint, endpoint).Str(logProto, logProtoJmap).Str(logType, logTypeResponse).
+ Str(logHttpStatus, log.SafeString(res.Status)).Int(logHttpStatusCode, res.StatusCode).
+ Msg(string(responseBytes))
}
}
@@ -267,7 +282,7 @@ func (h *HttpJmapClient) UploadBinary(ctx context.Context, logger *log.Logger, s
if logger.Trace().Enabled() {
requestBytes, err := httputil.DumpRequestOut(req, false)
if err == nil {
- logger.Trace().Str(logEndpoint, endpoint).Str("proto", "jmap").Str("type", "request").Msg(string(requestBytes))
+ logger.Trace().Str(logEndpoint, endpoint).Str(logProto, logProtoJmap).Str(logType, logTypeRequest).Msg(string(requestBytes))
}
}
@@ -282,7 +297,9 @@ func (h *HttpJmapClient) UploadBinary(ctx context.Context, logger *log.Logger, s
if logger.Trace().Enabled() {
responseBytes, err := httputil.DumpResponse(res, true)
if err == nil {
- logger.Trace().Str(logEndpoint, endpoint).Str("proto", "jmap").Str("type", "response").Msg(string(responseBytes))
+ logger.Trace().Str(logEndpoint, endpoint).Str(logProto, logProtoJmap).Str(logType, logTypeResponse).
+ Str(logHttpStatus, log.SafeString(res.Status)).Int(logHttpStatusCode, res.StatusCode).
+ Msg(string(responseBytes))
}
}
@@ -337,7 +354,7 @@ func (h *HttpJmapClient) DownloadBinary(ctx context.Context, logger *log.Logger,
if logger.Trace().Enabled() {
requestBytes, err := httputil.DumpRequestOut(req, true)
if err == nil {
- logger.Trace().Str(logEndpoint, endpoint).Str("proto", "jmap").Str("type", "request").Msg(string(requestBytes))
+ logger.Trace().Str(logEndpoint, endpoint).Str(logProto, logProtoJmap).Str(logType, logTypeRequest).Msg(string(requestBytes))
}
}
h.auth(session.Username, logger, req)
@@ -351,7 +368,9 @@ func (h *HttpJmapClient) DownloadBinary(ctx context.Context, logger *log.Logger,
if logger.Trace().Enabled() {
responseBytes, err := httputil.DumpResponse(res, false)
if err == nil {
- logger.Trace().Str(logEndpoint, endpoint).Str("proto", "jmap").Str("type", "response").Msg(string(responseBytes))
+ logger.Trace().Str(logEndpoint, endpoint).Str(logProto, logProtoJmap).Str(logType, logTypeResponse).
+ Str(logHttpStatus, log.SafeString(res.Status)).Int(logHttpStatusCode, res.StatusCode).
+ Msg(string(responseBytes))
}
}
language := Language(res.Header.Get("Content-Language"))
@@ -384,9 +403,17 @@ func (h *HttpJmapClient) DownloadBinary(ctx context.Context, logger *log.Logger,
}, language, nil
}
+type WebSocketPushEnableType string
+type WebSocketPushDisableType string
+
+const (
+ WebSocketPushTypeEnable = WebSocketPushEnableType("WebSocketPushEnable")
+ WebSocketPushTypeDisable = WebSocketPushDisableType("WebSocketPushDisable")
+)
+
type WebSocketPushEnable struct {
// This MUST be the string "WebSocketPushEnable".
- Type string `json:"@type"`
+ Type WebSocketPushEnableType `json:"@type"`
// A list of data type names (e.g., "Mailbox" or "Email") that the client is interested in.
//
@@ -404,30 +431,21 @@ type WebSocketPushEnable struct {
type WebSocketPushDisable struct {
// This MUST be the string "WebSocketPushDisable".
- Type string `json:"@type"`
+ Type WebSocketPushDisableType `json:"@type"`
}
type HttpWsClientFactory struct {
dialer *websocket.Dialer
masterUser string
masterPassword string
+ logger *log.Logger
+ eventListener HttpJmapApiClientEventListener
}
var _ WsClientFactory = &HttpWsClientFactory{}
-func NewHttpWsClientFactory(dialer *websocket.Dialer, masterUser string, masterPassword string, logger *log.Logger) (*HttpWsClientFactory, error) {
- /*
- d := websocket.Dialer{
- TLSClientConfig: &tls.Config{InsecureSkipVerify: true}, // TODO configurable
- HandshakeTimeout: 5 * time.Second,
- // RFC 8887: Section 4.2:
- // Otherwise, the client MUST make an authenticated HTTP request [RFC7235] on the encrypted connection
- // and MUST include the value "jmap" in the list of protocols for the "Sec-WebSocket-Protocol" header
- // field.
- Subprotocols: []string{"jmap"},
- }
- */
-
+func NewHttpWsClientFactory(dialer *websocket.Dialer, masterUser string, masterPassword string, logger *log.Logger,
+ eventListener HttpJmapApiClientEventListener) (*HttpWsClientFactory, error) {
// RFC 8887: Section 4.2:
// Otherwise, the client MUST make an authenticated HTTP request [RFC7235] on the encrypted connection
// and MUST include the value "jmap" in the list of protocols for the "Sec-WebSocket-Protocol" header
@@ -438,6 +456,8 @@ func NewHttpWsClientFactory(dialer *websocket.Dialer, masterUser string, masterP
dialer: dialer,
masterUser: masterUser,
masterPassword: masterPassword,
+ logger: logger,
+ eventListener: eventListener,
}, nil
}
@@ -447,32 +467,59 @@ func (w *HttpWsClientFactory) auth(username string, h http.Header) error {
return nil
}
-func (w *HttpWsClientFactory) connect(sessionProvider func() (*Session, error)) (*websocket.Conn, string, Error) {
+func (w *HttpWsClientFactory) connect(sessionProvider func() (*Session, error)) (*websocket.Conn, string, string, Error) {
+ logger := w.logger
+
session, err := sessionProvider()
if err != nil {
- return nil, "", SimpleError{code: JmapErrorWssFailedToRetrieveSession, err: err}
+ return nil, "", "", SimpleError{code: JmapErrorWssFailedToRetrieveSession, err: err}
}
if session == nil {
- return nil, "", SimpleError{code: JmapErrorWssFailedToRetrieveSession, err: nil}
+ return nil, "", "", SimpleError{code: JmapErrorWssFailedToRetrieveSession, err: nil}
}
+
+ if !session.SupportsWebsocketPush {
+ return nil, "", "", SimpleError{code: JmapErrorSocketPushUnsupported, err: nil}
+ }
+
username := session.Username
u := session.WebsocketUrl
+ endpoint := session.WebsocketEndpoint
+
+ ctx := context.Background() // TODO WS connection context with a timeout?
h := http.Header{}
w.auth(username, h)
- c, resp, err := w.dialer.Dial(u.String(), h)
+ c, res, err := w.dialer.DialContext(ctx, u.String(), h)
if err != nil {
- return nil, "", SimpleError{code: JmapErrorFailedToEstablishWssConnection, err: err}
+ return nil, "", endpoint, SimpleError{code: JmapErrorFailedToEstablishWssConnection, err: err}
+ }
+
+ if w.logger.Trace().Enabled() {
+ responseBytes, err := httputil.DumpResponse(res, true)
+ if err == nil {
+ logger.Trace().Str(logEndpoint, endpoint).Str(logProto, logProtoJmapWs).Str(logType, logTypeResponse).
+ Str(logHttpStatus, log.SafeString(res.Status)).Int(logHttpStatusCode, res.StatusCode).
+ Msg(string(responseBytes))
+ }
+ }
+
+ if res.StatusCode != 101 {
+ w.eventListener.OnFailedRequestWithStatus(endpoint, res.StatusCode)
+ logger.Error().Str(logHttpStatus, log.SafeString(res.Status)).Int(logHttpStatusCode, res.StatusCode).Msg("HTTP response status code is not 101")
+ return nil, "", endpoint, SimpleError{code: JmapErrorServerResponse, err: fmt.Errorf("JMAP WS API response status is %v", res.Status)}
+ } else {
+ w.eventListener.OnSuccessfulWsRequest(endpoint, res.StatusCode)
}
// RFC 8887: Section 4.2:
// The reply from the server MUST also contain a corresponding "Sec-WebSocket-Protocol" header
// field with a value of "jmap" in order for a JMAP subprotocol connection to be established.
- if !slices.Contains(resp.Header.Values("Sec-WebSocket-Protocol"), "jmap") {
- return nil, "", SimpleError{code: JmapErrorWssConnectionResponseMissingJmapSubprotocol}
+ if !slices.Contains(res.Header.Values("Sec-WebSocket-Protocol"), "jmap") {
+ return nil, "", endpoint, SimpleError{code: JmapErrorWssConnectionResponseMissingJmapSubprotocol}
}
- return c, username, nil
+ return c, username, endpoint, nil
}
type HttpWsClient struct {
@@ -480,30 +527,96 @@ type HttpWsClient struct {
username string
sessionProvider func() (*Session, error)
c *websocket.Conn
+ logger *log.Logger
+ endpoint string
+ listener WsPushListener
WsClient
}
+func (w *HttpWsClient) readPump() {
+ defer func() {
+ w.c.Close()
+ }()
+ //w.c.SetReadLimit(maxMessageSize)
+ //c.conn.SetReadDeadline(time.Now().Add(pongWait))
+ //c.conn.SetPongHandler(func(string) error { c.conn.SetReadDeadline(time.Now().Add(pongWait)); return nil })
+
+ for {
+ if _, message, err := w.c.ReadMessage(); err != nil {
+ if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) {
+ w.logger.Error().Err(err).Msg("unexpected close")
+ }
+ break
+ } else {
+ if w.logger.Trace().Enabled() {
+ w.logger.Trace().Str(logEndpoint, w.endpoint).Str(logProto, logProtoJmapWs).Str(logType, logTypePush).Msg(string(message))
+ }
+
+ var peek struct {
+ Type string `json:"@type"`
+ }
+ if err := json.Unmarshal(message, &peek); err != nil {
+ w.logger.Error().Err(err).Msg("failed to deserialized pushed WS message")
+ continue
+ }
+ switch peek.Type {
+ case string(TypeOfStateChange):
+ var stateChange StateChange
+ if err := json.Unmarshal(message, &stateChange); err != nil {
+ w.logger.Error().Err(err).Msgf("failed to deserialized pushed WS message into a %T", stateChange)
+ continue
+ } else {
+ if w.listener != nil {
+ w.listener.OnNotification(stateChange)
+ } else {
+ w.logger.Warn().Msgf("no listener to be notified of %v", stateChange)
+ }
+ }
+ default:
+ w.logger.Warn().Msgf("unsupported pushed WS message JMAP @type: '%s'", peek.Type)
+ continue
+ }
+ }
+ }
+}
+
func (w *HttpWsClientFactory) EnableNotifications(pushState string, sessionProvider func() (*Session, error), listener WsPushListener) (WsClient, Error) {
- c, username, jerr := w.connect(sessionProvider)
+ c, username, endpoint, jerr := w.connect(sessionProvider)
if jerr != nil {
return nil, jerr
}
- err := c.WriteJSON(WebSocketPushEnable{
- Type: "WebSocketPushEnable",
+ msg := WebSocketPushEnable{
+ Type: WebSocketPushTypeEnable,
DataTypes: nil, // = all datatypes
PushState: pushState, // will be omitted if empty string
- })
+ }
+
+ data, err := json.Marshal(msg)
if err != nil {
return nil, SimpleError{code: JmapErrorWssFailedToSendWebSocketPushEnable, err: err}
}
- return &HttpWsClient{
+ if w.logger.Trace().Enabled() {
+ w.logger.Trace().Str(logEndpoint, endpoint).Str(logProto, logProtoJmapWs).Str(logType, logTypeRequest).Msg(string(data))
+ }
+ if err := c.WriteMessage(websocket.TextMessage, data); err != nil {
+ return nil, SimpleError{code: JmapErrorWssFailedToSendWebSocketPushEnable, err: err}
+ }
+
+ wsc := &HttpWsClient{
client: w,
username: username,
sessionProvider: sessionProvider,
c: c,
- }, nil
+ logger: w.logger,
+ endpoint: endpoint,
+ listener: listener,
+ }
+
+ go wsc.readPump()
+
+ return wsc, nil
}
func (w *HttpWsClientFactory) Close() error {
@@ -516,7 +629,7 @@ func (c *HttpWsClient) DisableNotifications() Error {
}
err := c.c.WriteJSON(WebSocketPushDisable{
- Type: "WebSocketPushDisable",
+ Type: WebSocketPushTypeDisable,
})
if err != nil {
return SimpleError{code: JmapErrorWssFailedToSendWebSocketPushDisable, err: err}
diff --git a/pkg/jmap/jmap_integration_email_test.go b/pkg/jmap/jmap_integration_email_test.go
index 6912d75ba0..65db7206e9 100644
--- a/pkg/jmap/jmap_integration_email_test.go
+++ b/pkg/jmap/jmap_integration_email_test.go
@@ -25,6 +25,34 @@ import (
"github.com/stretchr/testify/require"
)
+func (s *StalwartTest) findInbox(t *testing.T, accountId string) (string, string) {
+ require := require.New(t)
+ respByAccountId, sessionState, _, _, err := s.client.GetAllMailboxes([]string{accountId}, s.session, s.ctx, s.logger, "")
+ require.NoError(err)
+ require.Equal(s.session.State, sessionState)
+ require.Len(respByAccountId, 1)
+ require.Contains(respByAccountId, accountId)
+ resp := respByAccountId[accountId]
+
+ mailboxesNameByRole := map[string]string{}
+ mailboxesUnreadByRole := map[string]int{}
+ for _, m := range resp {
+ if m.Role != "" {
+ mailboxesNameByRole[m.Role] = m.Name
+ mailboxesUnreadByRole[m.Role] = m.UnreadEmails
+ }
+ }
+ require.Contains(mailboxesNameByRole, "inbox")
+ require.Contains(mailboxesUnreadByRole, "inbox")
+ require.Zero(mailboxesUnreadByRole["inbox"])
+
+ inboxId := mailboxId("inbox", resp)
+ require.NotEmpty(inboxId)
+ inboxFolder := mailboxesNameByRole["inbox"]
+ require.NotEmpty(inboxFolder)
+ return inboxId, inboxFolder
+}
+
func TestEmails(t *testing.T) {
if skip(t) {
return
@@ -40,33 +68,7 @@ func TestEmails(t *testing.T) {
accountId := s.session.PrimaryAccounts.Mail
- var inboxFolder string
- var inboxId string
- {
- respByAccountId, sessionState, _, _, err := s.client.GetAllMailboxes([]string{accountId}, s.session, s.ctx, s.logger, "")
- require.NoError(err)
- require.Equal(s.session.State, sessionState)
- require.Len(respByAccountId, 1)
- require.Contains(respByAccountId, accountId)
- resp := respByAccountId[accountId]
-
- mailboxesNameByRole := map[string]string{}
- mailboxesUnreadByRole := map[string]int{}
- for _, m := range resp {
- if m.Role != "" {
- mailboxesNameByRole[m.Role] = m.Name
- mailboxesUnreadByRole[m.Role] = m.UnreadEmails
- }
- }
- require.Contains(mailboxesNameByRole, "inbox")
- require.Contains(mailboxesUnreadByRole, "inbox")
- require.Zero(mailboxesUnreadByRole["inbox"])
-
- inboxId = mailboxId("inbox", resp)
- require.NotEmpty(inboxId)
- inboxFolder = mailboxesNameByRole["inbox"]
- require.NotEmpty(inboxFolder)
- }
+ inboxId, inboxFolder := s.findInbox(t, accountId)
var threads int = 0
var mails []filledMail = nil
diff --git a/pkg/jmap/jmap_integration_test.go b/pkg/jmap/jmap_integration_test.go
index 198a327bf1..80edf70cae 100644
--- a/pkg/jmap/jmap_integration_test.go
+++ b/pkg/jmap/jmap_integration_test.go
@@ -45,6 +45,9 @@ import (
const (
EnableTypes = false
+
+ // Wireshark = "/usr/bin/wireshark"
+ Wireshark = ""
)
var (
@@ -282,16 +285,34 @@ func newStalwartTest(t *testing.T) (*StalwartTest, error) {
Path: "/",
}
+ if Wireshark != "" {
+ fmt.Printf("\x1b[45;37;1m Starting Wireshark on port %v \x1b[0m\n", jmapPort)
+ attr := os.ProcAttr{
+ Dir: ".",
+ Env: os.Environ(),
+ Files: []*os.File{os.Stdin, os.Stdout, os.Stderr},
+ }
+ cmd := []string{Wireshark, "-pkSl", "-i", "lo", "-f", fmt.Sprintf("port %d", jmapPort.Int()), "-Y", "http||websocket"}
+ process, err := os.StartProcess(Wireshark, cmd, &attr)
+ require.NoError(t, err)
+ err = process.Release()
+ require.NoError(t, err)
+
+ time.Sleep(10 * time.Second)
+ }
+
sessionUrl := jmapBaseUrl.JoinPath(".well-known", "jmap")
+ eventListener := nullHttpJmapApiClientEventListener{}
+
api := NewHttpJmapClient(
&jh,
masterUsername,
masterPassword,
- nullHttpJmapApiClientEventListener{},
+ eventListener,
)
- wscf, err := NewHttpWsClientFactory(wsd, masterUsername, masterPassword, logger)
+ wscf, err := NewHttpWsClientFactory(wsd, masterUsername, masterPassword, logger, eventListener)
if err != nil {
return nil, err
}
diff --git a/pkg/jmap/jmap_integration_ws_test.go b/pkg/jmap/jmap_integration_ws_test.go
new file mode 100644
index 0000000000..a6b8880b0a
--- /dev/null
+++ b/pkg/jmap/jmap_integration_ws_test.go
@@ -0,0 +1,118 @@
+package jmap
+
+import (
+ "sync"
+ "sync/atomic"
+ "testing"
+ "time"
+
+ "github.com/opencloud-eu/opencloud/pkg/log"
+ "github.com/stretchr/testify/assert"
+ "github.com/stretchr/testify/require"
+)
+
+type testWsPushListener struct {
+ logger *log.Logger
+ mailAccountId string
+ calls atomic.Uint32
+ m sync.Mutex
+ emailStates []string
+ threadStates []string
+ mailboxStates []string
+}
+
+func (l *testWsPushListener) OnNotification(pushState StateChange) {
+ l.calls.Add(1)
+ // pushState is currently not supported by Stalwart, let's use the Email state instead
+ l.logger.Debug().Msgf("received %T: %v", pushState, pushState)
+ if changed, ok := pushState.Changed[l.mailAccountId]; ok {
+ l.m.Lock()
+ if st, ok := changed[EmailType]; ok {
+ l.emailStates = append(l.emailStates, st)
+ }
+ if st, ok := changed[ThreadType]; ok {
+ l.threadStates = append(l.threadStates, st)
+ }
+ if st, ok := changed[MailboxType]; ok {
+ l.mailboxStates = append(l.mailboxStates, st)
+ }
+ l.m.Unlock()
+ }
+}
+
+var _ WsPushListener = &testWsPushListener{}
+
+func TestWs(t *testing.T) {
+ if skip(t) {
+ return
+ }
+
+ require := require.New(t)
+
+ s, err := newStalwartTest(t)
+ require.NoError(err)
+ defer s.Close()
+
+ mailAccountId := s.session.PrimaryAccounts.Mail
+ inboxFolder := ""
+ {
+ _, inboxFolder = s.findInbox(t, mailAccountId)
+ }
+
+ l := &testWsPushListener{logger: s.logger, mailAccountId: mailAccountId}
+ s.client.AddWsPushListener(l)
+ require.Equal(uint32(0), l.calls.Load())
+
+ wsc, err := s.client.EnableNotifications("", func() (*Session, error) { return s.session, nil })
+ require.NoError(err)
+ defer wsc.Close()
+
+ require.Equal(uint32(0), l.calls.Load())
+ {
+ l.m.Lock()
+ require.Len(l.emailStates, 0)
+ require.Len(l.mailboxStates, 0)
+ require.Len(l.threadStates, 0)
+ l.m.Unlock()
+ }
+
+ {
+ _, n, err := s.fillEmailsWithImap(inboxFolder, 1)
+ require.NoError(err)
+ require.Equal(1, n)
+ }
+
+ require.Eventually(func() bool {
+ return l.calls.Load() == uint32(1)
+ }, 3*time.Second, 200*time.Millisecond, "WS push listener was not called after first email state change")
+ {
+ l.m.Lock()
+ require.Len(l.emailStates, 1)
+ require.Len(l.mailboxStates, 1)
+ require.Len(l.threadStates, 1)
+ l.m.Unlock()
+ }
+
+ {
+ _, n, err := s.fillEmailsWithImap(inboxFolder, 1)
+ require.NoError(err)
+ require.Equal(1, n)
+ }
+
+ require.Eventually(func() bool {
+ return l.calls.Load() == uint32(2)
+ }, 3*time.Second, 200*time.Millisecond, "WS push listener was not called after second email state change")
+ {
+ l.m.Lock()
+ require.Len(l.emailStates, 2)
+ require.Len(l.mailboxStates, 2)
+ require.Len(l.threadStates, 2)
+ assert.NotEqual(t, l.emailStates[0], l.emailStates[1])
+ assert.NotEqual(t, l.mailboxStates[0], l.mailboxStates[1])
+ assert.NotEqual(t, l.threadStates[0], l.threadStates[1])
+ l.m.Unlock()
+ }
+
+ err = wsc.DisableNotifications()
+ require.NoError(err)
+}
diff --git a/pkg/jmap/jmap_model.go b/pkg/jmap/jmap_model.go
index dd8b8a14c2..889e08efda 100644
--- a/pkg/jmap/jmap_model.go
+++ b/pkg/jmap/jmap_model.go
@@ -3634,9 +3634,13 @@ type SearchSnippetGetResponse struct {
NotFound []string `json:"notFound,omitempty"`
}
+type StateChangeType string
+
+const TypeOfStateChange = StateChangeType("StateChange")
+
type StateChange struct {
// This MUST be the string "StateChange".
- Type string `json:"@type"`
+ Type StateChangeType `json:"@type"`
// A map of an "account id" to an object encoding the state of data types that have
// changed for that account since the last StateChange object was pushed, for each
diff --git a/services/groupware/pkg/groupware/groupware_framework.go b/services/groupware/pkg/groupware/groupware_framework.go
index 3826bbdbe6..5197b5fce2 100644
--- a/services/groupware/pkg/groupware/groupware_framework.go
+++ b/services/groupware/pkg/groupware/groupware_framework.go
@@ -156,6 +156,12 @@ func (r groupwareHttpJmapApiClientMetricsRecorder) OnResponseBodyReadingError(en
func (r groupwareHttpJmapApiClientMetricsRecorder) OnResponseBodyUnmarshallingError(endpoint string, err error) {
r.m.ResponseBodyUnmarshallingErrorPerEndpointCounter.With(metrics.Endpoint(endpoint)).Inc()
}
+func (r groupwareHttpJmapApiClientMetricsRecorder) OnSuccessfulWsRequest(endpoint string, status int) {
+ // TODO metrics for WSS
+}
+func (r groupwareHttpJmapApiClientMetricsRecorder) OnFailedWsHandshakeRequestWithStatus(endpoint string, status int) {
+ // TODO metrics for WSS
+}
func NewGroupware(config *config.Config, logger *log.Logger, mux *chi.Mux, prometheusRegistry prometheus.Registerer) (*Groupware, error) {
baseUrl, err := url.Parse(config.Mail.BaseUrl)
@@ -229,7 +235,7 @@ func NewGroupware(config *config.Config, logger *log.Logger, mux *chi.Mux, prome
wsDialer.TLSClientConfig = &tls.Config{InsecureSkipVerify: true}
}
- wsf, err := jmap.NewHttpWsClientFactory(wsDialer, masterUsername, masterPassword, logger)
+ wsf, err := jmap.NewHttpWsClientFactory(wsDialer, masterUsername, masterPassword, logger, jmapMetricsAdapter)
if err != nil {
logger.Error().Err(err).Msg("failed to create websocket client")
return nil, GroupwareInitializationError{Message: "failed to create websocket client", Err: err}