mirror of
https://github.com/opencloud-eu/opencloud.git
synced 2026-01-08 05:09:46 -06:00
groupware: JMAP WS push notifications support
This commit is contained in:
@@ -5,9 +5,9 @@ import (
|
||||
)
|
||||
|
||||
func (j *Client) EnablePush(pushState string, session *Session, _ *log.Logger) Error {
|
||||
return nil // TODO
|
||||
panic("not implemented") // TODO implement push
|
||||
}
|
||||
|
||||
func (j *Client) DisablePush(_ *Session, _ *log.Logger) Error {
|
||||
return nil // TODO
|
||||
panic("not implemented") // TODO implement push
|
||||
}
|
||||
|
||||
@@ -38,6 +38,10 @@ func NewClient(session SessionClient, api ApiClient, blob BlobClient, ws WsClien
|
||||
}
|
||||
}
|
||||
|
||||
func (j *Client) EnableNotifications(pushState string, sessionProvider func() (*Session, error)) (WsClient, error) {
|
||||
return j.ws.EnableNotifications(pushState, sessionProvider, j)
|
||||
}
|
||||
|
||||
func (j *Client) AddSessionEventListener(listener SessionEventListener) {
|
||||
j.sessionEventListeners.add(listener)
|
||||
}
|
||||
|
||||
@@ -36,6 +36,7 @@ const (
|
||||
JmapErrorWssFailedToSendWebSocketPushDisable
|
||||
JmapErrorWssFailedToClose
|
||||
JmapErrorWssFailedToRetrieveSession
|
||||
JmapErrorSocketPushUnsupported
|
||||
JmapErrorMissingCreatedObject
|
||||
)
|
||||
|
||||
|
||||
@@ -40,6 +40,13 @@ const (
|
||||
logHttpStatus = "status"
|
||||
logHttpStatusCode = "status-code"
|
||||
logHttpUrl = "url"
|
||||
logProto = "proto"
|
||||
logProtoJmap = "jmap"
|
||||
logProtoJmapWs = "jmapws"
|
||||
logType = "type"
|
||||
logTypeRequest = "request"
|
||||
logTypeResponse = "response"
|
||||
logTypePush = "push"
|
||||
)
|
||||
|
||||
/*
|
||||
@@ -55,6 +62,8 @@ type HttpJmapApiClientEventListener interface {
|
||||
OnFailedRequestWithStatus(endpoint string, status int)
|
||||
OnResponseBodyReadingError(endpoint string, err error)
|
||||
OnResponseBodyUnmarshallingError(endpoint string, err error)
|
||||
OnSuccessfulWsRequest(endpoint string, status int)
|
||||
OnFailedWsHandshakeRequestWithStatus(endpoint string, status int)
|
||||
}
|
||||
|
||||
type nullHttpJmapApiClientEventListener struct {
|
||||
@@ -70,6 +79,10 @@ func (l nullHttpJmapApiClientEventListener) OnResponseBodyReadingError(endpoint
|
||||
}
|
||||
func (l nullHttpJmapApiClientEventListener) OnResponseBodyUnmarshallingError(endpoint string, err error) {
|
||||
}
|
||||
func (l nullHttpJmapApiClientEventListener) OnSuccessfulWsRequest(endpoint string, status int) {
|
||||
}
|
||||
func (l nullHttpJmapApiClientEventListener) OnFailedWsHandshakeRequestWithStatus(endpoint string, status int) {
|
||||
}
|
||||
|
||||
var _ HttpJmapApiClientEventListener = nullHttpJmapApiClientEventListener{}
|
||||
|
||||
@@ -206,7 +219,7 @@ func (h *HttpJmapClient) Command(ctx context.Context, logger *log.Logger, sessio
|
||||
if logger.Trace().Enabled() {
|
||||
requestBytes, err := httputil.DumpRequestOut(req, true)
|
||||
if err == nil {
|
||||
logger.Trace().Str(logEndpoint, endpoint).Str("proto", "jmap").Str("type", "request").Msg(string(requestBytes))
|
||||
logger.Trace().Str(logEndpoint, endpoint).Str(logProto, logProtoJmap).Str(logType, logTypeRequest).Msg(string(requestBytes))
|
||||
}
|
||||
}
|
||||
h.auth(session.Username, logger, req)
|
||||
@@ -221,7 +234,9 @@ func (h *HttpJmapClient) Command(ctx context.Context, logger *log.Logger, sessio
|
||||
if logger.Trace().Enabled() {
|
||||
responseBytes, err := httputil.DumpResponse(res, true)
|
||||
if err == nil {
|
||||
logger.Trace().Str(logEndpoint, endpoint).Str("proto", "jmap").Str("type", "response").Msg(string(responseBytes))
|
||||
logger.Trace().Str(logEndpoint, endpoint).Str(logProto, logProtoJmap).Str(logType, logTypeResponse).
|
||||
Str(logHttpStatus, log.SafeString(res.Status)).Int(logHttpStatusCode, res.StatusCode).
|
||||
Msg(string(responseBytes))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -267,7 +282,7 @@ func (h *HttpJmapClient) UploadBinary(ctx context.Context, logger *log.Logger, s
|
||||
if logger.Trace().Enabled() {
|
||||
requestBytes, err := httputil.DumpRequestOut(req, false)
|
||||
if err == nil {
|
||||
logger.Trace().Str(logEndpoint, endpoint).Str("proto", "jmap").Str("type", "request").Msg(string(requestBytes))
|
||||
logger.Trace().Str(logEndpoint, endpoint).Str(logProto, logProtoJmap).Str(logType, logTypeRequest).Msg(string(requestBytes))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -282,7 +297,9 @@ func (h *HttpJmapClient) UploadBinary(ctx context.Context, logger *log.Logger, s
|
||||
if logger.Trace().Enabled() {
|
||||
responseBytes, err := httputil.DumpResponse(res, true)
|
||||
if err == nil {
|
||||
logger.Trace().Str(logEndpoint, endpoint).Str("proto", "jmap").Str("type", "response").Msg(string(responseBytes))
|
||||
logger.Trace().Str(logEndpoint, endpoint).Str(logProto, logProtoJmap).Str(logType, logTypeResponse).
|
||||
Str(logHttpStatus, log.SafeString(res.Status)).Int(logHttpStatusCode, res.StatusCode).
|
||||
Msg(string(responseBytes))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -337,7 +354,7 @@ func (h *HttpJmapClient) DownloadBinary(ctx context.Context, logger *log.Logger,
|
||||
if logger.Trace().Enabled() {
|
||||
requestBytes, err := httputil.DumpRequestOut(req, true)
|
||||
if err == nil {
|
||||
logger.Trace().Str(logEndpoint, endpoint).Str("proto", "jmap").Str("type", "request").Msg(string(requestBytes))
|
||||
logger.Trace().Str(logEndpoint, endpoint).Str(logProto, logProtoJmap).Str(logType, logTypeRequest).Msg(string(requestBytes))
|
||||
}
|
||||
}
|
||||
h.auth(session.Username, logger, req)
|
||||
@@ -351,7 +368,9 @@ func (h *HttpJmapClient) DownloadBinary(ctx context.Context, logger *log.Logger,
|
||||
if logger.Trace().Enabled() {
|
||||
responseBytes, err := httputil.DumpResponse(res, false)
|
||||
if err == nil {
|
||||
logger.Trace().Str(logEndpoint, endpoint).Str("proto", "jmap").Str("type", "response").Msg(string(responseBytes))
|
||||
logger.Trace().Str(logEndpoint, endpoint).Str(logProto, logProtoJmap).Str(logType, logTypeResponse).
|
||||
Str(logHttpStatus, log.SafeString(res.Status)).Int(logHttpStatusCode, res.StatusCode).
|
||||
Msg(string(responseBytes))
|
||||
}
|
||||
}
|
||||
language := Language(res.Header.Get("Content-Language"))
|
||||
@@ -384,9 +403,17 @@ func (h *HttpJmapClient) DownloadBinary(ctx context.Context, logger *log.Logger,
|
||||
}, language, nil
|
||||
}
|
||||
|
||||
type WebSocketPushEnableType string
|
||||
type WebSocketPushDisableType string
|
||||
|
||||
const (
|
||||
WebSocketPushTypeEnable = WebSocketPushEnableType("WebSocketPushEnable")
|
||||
WebSocketPushTypeDisable = WebSocketPushDisableType("WebSocketPushDisable")
|
||||
)
|
||||
|
||||
type WebSocketPushEnable struct {
|
||||
// This MUST be the string "WebSocketPushEnable".
|
||||
Type string `json:"@type"`
|
||||
Type WebSocketPushEnableType `json:"@type"`
|
||||
|
||||
// A list of data type names (e.g., "Mailbox" or "Email") that the client is interested in.
|
||||
//
|
||||
@@ -404,30 +431,21 @@ type WebSocketPushEnable struct {
|
||||
|
||||
type WebSocketPushDisable struct {
|
||||
// This MUST be the string "WebSocketPushDisable".
|
||||
Type string `json:"@type"`
|
||||
Type WebSocketPushDisableType `json:"@type"`
|
||||
}
|
||||
|
||||
type HttpWsClientFactory struct {
|
||||
dialer *websocket.Dialer
|
||||
masterUser string
|
||||
masterPassword string
|
||||
logger *log.Logger
|
||||
eventListener HttpJmapApiClientEventListener
|
||||
}
|
||||
|
||||
var _ WsClientFactory = &HttpWsClientFactory{}
|
||||
|
||||
func NewHttpWsClientFactory(dialer *websocket.Dialer, masterUser string, masterPassword string, logger *log.Logger) (*HttpWsClientFactory, error) {
|
||||
/*
|
||||
d := websocket.Dialer{
|
||||
TLSClientConfig: &tls.Config{InsecureSkipVerify: true}, // TODO configurable
|
||||
HandshakeTimeout: 5 * time.Second,
|
||||
// RFC 8887: Section 4.2:
|
||||
// Otherwise, the client MUST make an authenticated HTTP request [RFC7235] on the encrypted connection
|
||||
// and MUST include the value "jmap" in the list of protocols for the "Sec-WebSocket-Protocol" header
|
||||
// field.
|
||||
Subprotocols: []string{"jmap"},
|
||||
}
|
||||
*/
|
||||
|
||||
func NewHttpWsClientFactory(dialer *websocket.Dialer, masterUser string, masterPassword string, logger *log.Logger,
|
||||
eventListener HttpJmapApiClientEventListener) (*HttpWsClientFactory, error) {
|
||||
// RFC 8887: Section 4.2:
|
||||
// Otherwise, the client MUST make an authenticated HTTP request [RFC7235] on the encrypted connection
|
||||
// and MUST include the value "jmap" in the list of protocols for the "Sec-WebSocket-Protocol" header
|
||||
@@ -438,6 +456,8 @@ func NewHttpWsClientFactory(dialer *websocket.Dialer, masterUser string, masterP
|
||||
dialer: dialer,
|
||||
masterUser: masterUser,
|
||||
masterPassword: masterPassword,
|
||||
logger: logger,
|
||||
eventListener: eventListener,
|
||||
}, nil
|
||||
}
|
||||
|
||||
@@ -447,32 +467,59 @@ func (w *HttpWsClientFactory) auth(username string, h http.Header) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (w *HttpWsClientFactory) connect(sessionProvider func() (*Session, error)) (*websocket.Conn, string, Error) {
|
||||
func (w *HttpWsClientFactory) connect(sessionProvider func() (*Session, error)) (*websocket.Conn, string, string, Error) {
|
||||
logger := w.logger
|
||||
|
||||
session, err := sessionProvider()
|
||||
if err != nil {
|
||||
return nil, "", SimpleError{code: JmapErrorWssFailedToRetrieveSession, err: err}
|
||||
return nil, "", "", SimpleError{code: JmapErrorWssFailedToRetrieveSession, err: err}
|
||||
}
|
||||
if session == nil {
|
||||
return nil, "", SimpleError{code: JmapErrorWssFailedToRetrieveSession, err: nil}
|
||||
return nil, "", "", SimpleError{code: JmapErrorWssFailedToRetrieveSession, err: nil}
|
||||
}
|
||||
|
||||
if !session.SupportsWebsocketPush {
|
||||
return nil, "", "", SimpleError{code: JmapErrorSocketPushUnsupported, err: nil}
|
||||
}
|
||||
|
||||
username := session.Username
|
||||
u := session.WebsocketUrl
|
||||
endpoint := session.WebsocketEndpoint
|
||||
|
||||
ctx := context.Background() // TODO WS connection context with a timeout?
|
||||
|
||||
h := http.Header{}
|
||||
w.auth(username, h)
|
||||
c, resp, err := w.dialer.Dial(u.String(), h)
|
||||
c, res, err := w.dialer.DialContext(ctx, u.String(), h)
|
||||
if err != nil {
|
||||
return nil, "", SimpleError{code: JmapErrorFailedToEstablishWssConnection, err: err}
|
||||
return nil, "", endpoint, SimpleError{code: JmapErrorFailedToEstablishWssConnection, err: err}
|
||||
}
|
||||
|
||||
if w.logger.Trace().Enabled() {
|
||||
responseBytes, err := httputil.DumpResponse(res, true)
|
||||
if err == nil {
|
||||
logger.Trace().Str(logEndpoint, endpoint).Str(logProto, logProtoJmapWs).Str(logType, logTypeResponse).
|
||||
Str(logHttpStatus, log.SafeString(res.Status)).Int(logHttpStatusCode, res.StatusCode).
|
||||
Msg(string(responseBytes))
|
||||
}
|
||||
}
|
||||
|
||||
if res.StatusCode != 101 {
|
||||
w.eventListener.OnFailedRequestWithStatus(endpoint, res.StatusCode)
|
||||
logger.Error().Str(logHttpStatus, log.SafeString(res.Status)).Int(logHttpStatusCode, res.StatusCode).Msg("HTTP response status code is not 101")
|
||||
return nil, "", endpoint, SimpleError{code: JmapErrorServerResponse, err: fmt.Errorf("JMAP WS API response status is %v", res.Status)}
|
||||
} else {
|
||||
w.eventListener.OnSuccessfulWsRequest(endpoint, res.StatusCode)
|
||||
}
|
||||
|
||||
// RFC 8887: Section 4.2:
|
||||
// The reply from the server MUST also contain a corresponding "Sec-WebSocket-Protocol" header
|
||||
// field with a value of "jmap" in order for a JMAP subprotocol connection to be established.
|
||||
if !slices.Contains(resp.Header.Values("Sec-WebSocket-Protocol"), "jmap") {
|
||||
return nil, "", SimpleError{code: JmapErrorWssConnectionResponseMissingJmapSubprotocol}
|
||||
if !slices.Contains(res.Header.Values("Sec-WebSocket-Protocol"), "jmap") {
|
||||
return nil, "", endpoint, SimpleError{code: JmapErrorWssConnectionResponseMissingJmapSubprotocol}
|
||||
}
|
||||
|
||||
return c, username, nil
|
||||
return c, username, endpoint, nil
|
||||
}
|
||||
|
||||
type HttpWsClient struct {
|
||||
@@ -480,30 +527,96 @@ type HttpWsClient struct {
|
||||
username string
|
||||
sessionProvider func() (*Session, error)
|
||||
c *websocket.Conn
|
||||
logger *log.Logger
|
||||
endpoint string
|
||||
listener WsPushListener
|
||||
WsClient
|
||||
}
|
||||
|
||||
func (w *HttpWsClient) readPump() {
|
||||
defer func() {
|
||||
w.c.Close()
|
||||
}()
|
||||
//w.c.SetReadLimit(maxMessageSize)
|
||||
//c.conn.SetReadDeadline(time.Now().Add(pongWait))
|
||||
//c.conn.SetPongHandler(func(string) error { c.conn.SetReadDeadline(time.Now().Add(pongWait)); return nil })
|
||||
|
||||
for {
|
||||
if _, message, err := w.c.ReadMessage(); err != nil {
|
||||
if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) {
|
||||
w.logger.Error().Err(err).Msg("unexpected close")
|
||||
}
|
||||
break
|
||||
} else {
|
||||
if w.logger.Trace().Enabled() {
|
||||
w.logger.Trace().Str(logEndpoint, w.endpoint).Str(logProto, logProtoJmapWs).Str(logType, logTypePush).Msg(string(message))
|
||||
}
|
||||
|
||||
var peek struct {
|
||||
Type string `json:"@type"`
|
||||
}
|
||||
if err := json.Unmarshal(message, &peek); err != nil {
|
||||
w.logger.Error().Err(err).Msg("failed to deserialized pushed WS message")
|
||||
continue
|
||||
}
|
||||
switch peek.Type {
|
||||
case string(TypeOfStateChange):
|
||||
var stateChange StateChange
|
||||
if err := json.Unmarshal(message, &stateChange); err != nil {
|
||||
w.logger.Error().Err(err).Msgf("failed to deserialized pushed WS message into a %T", stateChange)
|
||||
continue
|
||||
} else {
|
||||
if w.listener != nil {
|
||||
w.listener.OnNotification(stateChange)
|
||||
} else {
|
||||
w.logger.Warn().Msgf("no listener to be notified of %v", stateChange)
|
||||
}
|
||||
}
|
||||
default:
|
||||
w.logger.Warn().Msgf("unsupported pushed WS message JMAP @type: '%s'", peek.Type)
|
||||
continue
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (w *HttpWsClientFactory) EnableNotifications(pushState string, sessionProvider func() (*Session, error), listener WsPushListener) (WsClient, Error) {
|
||||
c, username, jerr := w.connect(sessionProvider)
|
||||
c, username, endpoint, jerr := w.connect(sessionProvider)
|
||||
if jerr != nil {
|
||||
return nil, jerr
|
||||
}
|
||||
|
||||
err := c.WriteJSON(WebSocketPushEnable{
|
||||
Type: "WebSocketPushEnable",
|
||||
msg := WebSocketPushEnable{
|
||||
Type: WebSocketPushTypeEnable,
|
||||
DataTypes: nil, // = all datatypes
|
||||
PushState: pushState, // will be omitted if empty string
|
||||
})
|
||||
}
|
||||
|
||||
data, err := json.Marshal(msg)
|
||||
if err != nil {
|
||||
return nil, SimpleError{code: JmapErrorWssFailedToSendWebSocketPushEnable, err: err}
|
||||
}
|
||||
|
||||
return &HttpWsClient{
|
||||
if w.logger.Trace().Enabled() {
|
||||
w.logger.Trace().Str(logEndpoint, endpoint).Str(logProto, logProtoJmapWs).Str(logType, logTypeRequest).Msg(string(data))
|
||||
}
|
||||
if err := c.WriteMessage(websocket.TextMessage, data); err != nil {
|
||||
return nil, SimpleError{code: JmapErrorWssFailedToSendWebSocketPushEnable, err: err}
|
||||
}
|
||||
|
||||
wsc := &HttpWsClient{
|
||||
client: w,
|
||||
username: username,
|
||||
sessionProvider: sessionProvider,
|
||||
c: c,
|
||||
}, nil
|
||||
logger: w.logger,
|
||||
endpoint: endpoint,
|
||||
listener: listener,
|
||||
}
|
||||
|
||||
go wsc.readPump()
|
||||
|
||||
return wsc, nil
|
||||
}
|
||||
|
||||
func (w *HttpWsClientFactory) Close() error {
|
||||
@@ -516,7 +629,7 @@ func (c *HttpWsClient) DisableNotifications() Error {
|
||||
}
|
||||
|
||||
err := c.c.WriteJSON(WebSocketPushDisable{
|
||||
Type: "WebSocketPushDisable",
|
||||
Type: WebSocketPushTypeDisable,
|
||||
})
|
||||
if err != nil {
|
||||
return SimpleError{code: JmapErrorWssFailedToSendWebSocketPushDisable, err: err}
|
||||
|
||||
@@ -25,6 +25,34 @@ import (
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func (s *StalwartTest) findInbox(t *testing.T, accountId string) (string, string) {
|
||||
require := require.New(t)
|
||||
respByAccountId, sessionState, _, _, err := s.client.GetAllMailboxes([]string{accountId}, s.session, s.ctx, s.logger, "")
|
||||
require.NoError(err)
|
||||
require.Equal(s.session.State, sessionState)
|
||||
require.Len(respByAccountId, 1)
|
||||
require.Contains(respByAccountId, accountId)
|
||||
resp := respByAccountId[accountId]
|
||||
|
||||
mailboxesNameByRole := map[string]string{}
|
||||
mailboxesUnreadByRole := map[string]int{}
|
||||
for _, m := range resp {
|
||||
if m.Role != "" {
|
||||
mailboxesNameByRole[m.Role] = m.Name
|
||||
mailboxesUnreadByRole[m.Role] = m.UnreadEmails
|
||||
}
|
||||
}
|
||||
require.Contains(mailboxesNameByRole, "inbox")
|
||||
require.Contains(mailboxesUnreadByRole, "inbox")
|
||||
require.Zero(mailboxesUnreadByRole["inbox"])
|
||||
|
||||
inboxId := mailboxId("inbox", resp)
|
||||
require.NotEmpty(inboxId)
|
||||
inboxFolder := mailboxesNameByRole["inbox"]
|
||||
require.NotEmpty(inboxFolder)
|
||||
return inboxId, inboxFolder
|
||||
}
|
||||
|
||||
func TestEmails(t *testing.T) {
|
||||
if skip(t) {
|
||||
return
|
||||
@@ -40,33 +68,7 @@ func TestEmails(t *testing.T) {
|
||||
|
||||
accountId := s.session.PrimaryAccounts.Mail
|
||||
|
||||
var inboxFolder string
|
||||
var inboxId string
|
||||
{
|
||||
respByAccountId, sessionState, _, _, err := s.client.GetAllMailboxes([]string{accountId}, s.session, s.ctx, s.logger, "")
|
||||
require.NoError(err)
|
||||
require.Equal(s.session.State, sessionState)
|
||||
require.Len(respByAccountId, 1)
|
||||
require.Contains(respByAccountId, accountId)
|
||||
resp := respByAccountId[accountId]
|
||||
|
||||
mailboxesNameByRole := map[string]string{}
|
||||
mailboxesUnreadByRole := map[string]int{}
|
||||
for _, m := range resp {
|
||||
if m.Role != "" {
|
||||
mailboxesNameByRole[m.Role] = m.Name
|
||||
mailboxesUnreadByRole[m.Role] = m.UnreadEmails
|
||||
}
|
||||
}
|
||||
require.Contains(mailboxesNameByRole, "inbox")
|
||||
require.Contains(mailboxesUnreadByRole, "inbox")
|
||||
require.Zero(mailboxesUnreadByRole["inbox"])
|
||||
|
||||
inboxId = mailboxId("inbox", resp)
|
||||
require.NotEmpty(inboxId)
|
||||
inboxFolder = mailboxesNameByRole["inbox"]
|
||||
require.NotEmpty(inboxFolder)
|
||||
}
|
||||
inboxId, inboxFolder := s.findInbox(t, accountId)
|
||||
|
||||
var threads int = 0
|
||||
var mails []filledMail = nil
|
||||
|
||||
@@ -45,6 +45,9 @@ import (
|
||||
|
||||
const (
|
||||
EnableTypes = false
|
||||
|
||||
// Wireshark = "/usr/bin/wireshark"
|
||||
Wireshark = ""
|
||||
)
|
||||
|
||||
var (
|
||||
@@ -282,16 +285,34 @@ func newStalwartTest(t *testing.T) (*StalwartTest, error) {
|
||||
Path: "/",
|
||||
}
|
||||
|
||||
if Wireshark != "" {
|
||||
fmt.Printf("\x1b[45;37;1m Starting Wireshark on port %v \x1b[0m\n", jmapPort)
|
||||
attr := os.ProcAttr{
|
||||
Dir: ".",
|
||||
Env: os.Environ(),
|
||||
Files: []*os.File{os.Stdin, os.Stdout, os.Stderr},
|
||||
}
|
||||
cmd := []string{Wireshark, "-pkSl", "-i", "lo", "-f", fmt.Sprintf("port %d", jmapPort.Int()), "-Y", "http||websocket"}
|
||||
process, err := os.StartProcess(Wireshark, cmd, &attr)
|
||||
require.NoError(t, err)
|
||||
err = process.Release()
|
||||
require.NoError(t, err)
|
||||
|
||||
time.Sleep(10 * time.Second)
|
||||
}
|
||||
|
||||
sessionUrl := jmapBaseUrl.JoinPath(".well-known", "jmap")
|
||||
|
||||
eventListener := nullHttpJmapApiClientEventListener{}
|
||||
|
||||
api := NewHttpJmapClient(
|
||||
&jh,
|
||||
masterUsername,
|
||||
masterPassword,
|
||||
nullHttpJmapApiClientEventListener{},
|
||||
eventListener,
|
||||
)
|
||||
|
||||
wscf, err := NewHttpWsClientFactory(wsd, masterUsername, masterPassword, logger)
|
||||
wscf, err := NewHttpWsClientFactory(wsd, masterUsername, masterPassword, logger, eventListener)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
118
pkg/jmap/jmap_integration_ws_test.go
Normal file
118
pkg/jmap/jmap_integration_ws_test.go
Normal file
@@ -0,0 +1,118 @@
|
||||
package jmap
|
||||
|
||||
import (
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/opencloud-eu/opencloud/pkg/log"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
type testWsPushListener struct {
|
||||
logger *log.Logger
|
||||
mailAccountId string
|
||||
calls atomic.Uint32
|
||||
m sync.Mutex
|
||||
emailStates []string
|
||||
threadStates []string
|
||||
mailboxStates []string
|
||||
}
|
||||
|
||||
func (l *testWsPushListener) OnNotification(pushState StateChange) {
|
||||
l.calls.Add(1)
|
||||
// pushState is currently not supported by Stalwart, let's use the Email state instead
|
||||
l.logger.Debug().Msgf("received %T: %v", pushState, pushState)
|
||||
if changed, ok := pushState.Changed[l.mailAccountId]; ok {
|
||||
l.m.Lock()
|
||||
if st, ok := changed[EmailType]; ok {
|
||||
l.emailStates = append(l.emailStates, st)
|
||||
}
|
||||
if st, ok := changed[ThreadType]; ok {
|
||||
l.threadStates = append(l.threadStates, st)
|
||||
}
|
||||
if st, ok := changed[MailboxType]; ok {
|
||||
l.mailboxStates = append(l.mailboxStates, st)
|
||||
}
|
||||
l.m.Unlock()
|
||||
}
|
||||
}
|
||||
|
||||
var _ WsPushListener = &testWsPushListener{}
|
||||
|
||||
func TestWs(t *testing.T) {
|
||||
if skip(t) {
|
||||
return
|
||||
}
|
||||
|
||||
require := require.New(t)
|
||||
|
||||
s, err := newStalwartTest(t)
|
||||
require.NoError(err)
|
||||
defer s.Close()
|
||||
|
||||
mailAccountId := s.session.PrimaryAccounts.Mail
|
||||
inboxFolder := ""
|
||||
{
|
||||
_, inboxFolder = s.findInbox(t, mailAccountId)
|
||||
}
|
||||
|
||||
l := &testWsPushListener{logger: s.logger, mailAccountId: mailAccountId}
|
||||
s.client.AddWsPushListener(l)
|
||||
require.Equal(uint32(0), l.calls.Load())
|
||||
|
||||
wsc, err := s.client.EnableNotifications("", func() (*Session, error) { return s.session, nil })
|
||||
require.NoError(err)
|
||||
defer wsc.Close()
|
||||
|
||||
require.Equal(uint32(0), l.calls.Load())
|
||||
{
|
||||
l.m.Lock()
|
||||
require.Len(l.emailStates, 0)
|
||||
require.Len(l.mailboxStates, 0)
|
||||
require.Len(l.threadStates, 0)
|
||||
l.m.Unlock()
|
||||
}
|
||||
|
||||
{
|
||||
_, n, err := s.fillEmailsWithImap(inboxFolder, 1)
|
||||
require.NoError(err)
|
||||
require.Equal(1, n)
|
||||
}
|
||||
|
||||
require.Eventually(func() bool {
|
||||
return l.calls.Load() == uint32(1)
|
||||
}, 3*time.Second, 200*time.Millisecond, "WS push listener was not called after first email state change")
|
||||
{
|
||||
l.m.Lock()
|
||||
require.Len(l.emailStates, 1)
|
||||
require.Len(l.mailboxStates, 1)
|
||||
require.Len(l.threadStates, 1)
|
||||
l.m.Unlock()
|
||||
}
|
||||
|
||||
{
|
||||
_, n, err := s.fillEmailsWithImap(inboxFolder, 1)
|
||||
require.NoError(err)
|
||||
require.Equal(1, n)
|
||||
}
|
||||
|
||||
require.Eventually(func() bool {
|
||||
return l.calls.Load() == uint32(2)
|
||||
}, 3*time.Second, 200*time.Millisecond, "WS push listener was not called after second email state change")
|
||||
{
|
||||
l.m.Lock()
|
||||
require.Len(l.emailStates, 2)
|
||||
require.Len(l.mailboxStates, 2)
|
||||
require.Len(l.threadStates, 2)
|
||||
assert.NotEqual(t, l.emailStates[0], l.emailStates[1])
|
||||
assert.NotEqual(t, l.mailboxStates[0], l.mailboxStates[1])
|
||||
assert.NotEqual(t, l.threadStates[0], l.threadStates[1])
|
||||
l.m.Unlock()
|
||||
}
|
||||
|
||||
err = wsc.DisableNotifications()
|
||||
require.NoError(err)
|
||||
}
|
||||
@@ -3634,9 +3634,13 @@ type SearchSnippetGetResponse struct {
|
||||
NotFound []string `json:"notFound,omitempty"`
|
||||
}
|
||||
|
||||
type StateChangeType string
|
||||
|
||||
const TypeOfStateChange = StateChangeType("StateChange")
|
||||
|
||||
type StateChange struct {
|
||||
// This MUST be the string "StateChange".
|
||||
Type string `json:"@type"`
|
||||
Type StateChangeType `json:"@type"`
|
||||
|
||||
// A map of an "account id" to an object encoding the state of data types that have
|
||||
// changed for that account since the last StateChange object was pushed, for each
|
||||
|
||||
@@ -156,6 +156,12 @@ func (r groupwareHttpJmapApiClientMetricsRecorder) OnResponseBodyReadingError(en
|
||||
func (r groupwareHttpJmapApiClientMetricsRecorder) OnResponseBodyUnmarshallingError(endpoint string, err error) {
|
||||
r.m.ResponseBodyUnmarshallingErrorPerEndpointCounter.With(metrics.Endpoint(endpoint)).Inc()
|
||||
}
|
||||
func (r groupwareHttpJmapApiClientMetricsRecorder) OnSuccessfulWsRequest(endpoint string, status int) {
|
||||
// TODO metrics for WSS
|
||||
}
|
||||
func (r groupwareHttpJmapApiClientMetricsRecorder) OnFailedWsHandshakeRequestWithStatus(endpoint string, status int) {
|
||||
// TODO metrics for WSS
|
||||
}
|
||||
|
||||
func NewGroupware(config *config.Config, logger *log.Logger, mux *chi.Mux, prometheusRegistry prometheus.Registerer) (*Groupware, error) {
|
||||
baseUrl, err := url.Parse(config.Mail.BaseUrl)
|
||||
@@ -229,7 +235,7 @@ func NewGroupware(config *config.Config, logger *log.Logger, mux *chi.Mux, prome
|
||||
wsDialer.TLSClientConfig = &tls.Config{InsecureSkipVerify: true}
|
||||
}
|
||||
|
||||
wsf, err := jmap.NewHttpWsClientFactory(wsDialer, masterUsername, masterPassword, logger)
|
||||
wsf, err := jmap.NewHttpWsClientFactory(wsDialer, masterUsername, masterPassword, logger, jmapMetricsAdapter)
|
||||
if err != nil {
|
||||
logger.Error().Err(err).Msg("failed to create websocket client")
|
||||
return nil, GroupwareInitializationError{Message: "failed to create websocket client", Err: err}
|
||||
|
||||
Reference in New Issue
Block a user