Schema load inside DB transaction

This commit is contained in:
Gabriel Herbert
2025-02-27 17:58:53 +01:00
parent e68c850348
commit 39b2a25868
63 changed files with 578 additions and 662 deletions

34
cache/cache_access.go vendored
View File

@@ -9,6 +9,7 @@ import (
"sync"
"github.com/gofrs/uuid"
"github.com/jackc/pgx/v5"
)
var (
@@ -41,13 +42,26 @@ func LoadAccessIfUnknown(loginId int64) error {
if exists {
return nil
}
return load(loginId)
ctx, ctxCanc := context.WithTimeout(context.Background(), db.CtxDefTimeoutSysTask)
defer ctxCanc()
tx, err := db.Pool.Begin(ctx)
if err != nil {
return err
}
defer tx.Rollback(ctx)
if err := load_tx(ctx, tx, loginId); err != nil {
return err
}
return tx.Commit(ctx)
}
// renew permissions for all cached logins
func RenewAccessAll() error {
// renew permissions for all known logins
func RenewAccessAll_tx(ctx context.Context, tx pgx.Tx) error {
for loginId, _ := range loginIdMapAccess {
if err := RenewAccessById(loginId); err != nil {
if err := RenewAccessById_tx(ctx, tx, loginId); err != nil {
return err
}
}
@@ -55,20 +69,20 @@ func RenewAccessAll() error {
}
// renew permissions for one known login
func RenewAccessById(loginId int64) error {
func RenewAccessById_tx(ctx context.Context, tx pgx.Tx, loginId int64) error {
access_mx.RLock()
_, exists := loginIdMapAccess[loginId]
access_mx.RUnlock()
if !exists {
return nil
}
return load(loginId)
return load_tx(ctx, tx, loginId)
}
// load access permissions for login ID into cache
func load(loginId int64) error {
func load_tx(ctx context.Context, tx pgx.Tx, loginId int64) error {
roleIds, err := loadRoleIds(loginId)
roleIds, err := loadRoleIds_tx(ctx, tx, loginId)
if err != nil {
return err
}
@@ -146,10 +160,10 @@ func load(loginId int64) error {
return nil
}
func loadRoleIds(loginId int64) ([]uuid.UUID, error) {
func loadRoleIds_tx(ctx context.Context, tx pgx.Tx, loginId int64) ([]uuid.UUID, error) {
roleIds := make([]uuid.UUID, 0)
rows, err := db.Pool.Query(context.Background(), `
rows, err := tx.Query(ctx, `
-- get nested children of assigned roles
WITH RECURSIVE child_ids AS (
SELECT role_id_child

View File

@@ -3,10 +3,10 @@ package cache
import (
"context"
"r3/config/captionMap"
"r3/db"
"r3/types"
"sync"
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgtype"
)
@@ -25,16 +25,7 @@ func GetCaptionMapCustom() types.CaptionMapsAll {
return captionMapCustom
}
func LoadCaptionMapCustom() error {
ctx, ctxCanc := context.WithTimeout(context.Background(), db.CtxDefTimeoutSysTask)
defer ctxCanc()
tx, err := db.Pool.Begin(ctx)
if err != nil {
return err
}
defer tx.Rollback(ctx)
func LoadCaptionMapCustom_tx(ctx context.Context, tx pgx.Tx) error {
cus, err := captionMap.Get_tx(ctx, tx, pgtype.UUID{}, "instance")
if err != nil {
return err
@@ -44,5 +35,5 @@ func LoadCaptionMapCustom() error {
captionMapCustom = cus
caption_mx.Unlock()
return tx.Commit(ctx)
return nil
}

7
cache/cache_dict.go vendored
View File

@@ -2,9 +2,10 @@ package cache
import (
"context"
"r3/db"
"slices"
"sync"
"github.com/jackc/pgx/v5"
)
var (
@@ -24,11 +25,11 @@ func GetSearchDictionaryIsValid(entry string) bool {
return slices.Contains(dict, entry)
}
func LoadSearchDictionaries() error {
func LoadSearchDictionaries_tx(ctx context.Context, tx pgx.Tx) error {
dict_mx.Lock()
defer dict_mx.Unlock()
err := db.Pool.QueryRow(context.Background(), `
err := tx.QueryRow(ctx, `
SELECT ARRAY_AGG(cfgname::TEXT)
FROM pg_catalog.pg_ts_config
`).Scan(&dict)

6
cache/cache_ics.go vendored
View File

@@ -1,11 +1,13 @@
package cache
import (
"context"
"r3/schema/field"
"r3/types"
"sync"
"github.com/gofrs/uuid"
"github.com/jackc/pgx/v5"
)
var (
@@ -13,7 +15,7 @@ var (
fieldIdMapIcs = make(map[uuid.UUID]types.FieldCalendar)
)
func GetCalendarField(fieldId uuid.UUID) (types.FieldCalendar, error) {
func GetCalendarField_tx(ctx context.Context, tx pgx.Tx, fieldId uuid.UUID) (types.FieldCalendar, error) {
ics_mx.Lock()
defer ics_mx.Unlock()
@@ -22,7 +24,7 @@ func GetCalendarField(fieldId uuid.UUID) (types.FieldCalendar, error) {
return f, nil
}
f, err := field.GetCalendar(fieldId)
f, err := field.GetCalendar_tx(ctx, tx, fieldId)
if err != nil {
return f, err
}

17
cache/cache_mail.go vendored
View File

@@ -3,7 +3,6 @@ package cache
import (
"context"
"fmt"
"r3/db"
"r3/types"
"sync"
@@ -52,22 +51,6 @@ func GetMailAccountsExist() bool {
return len(mailAccountIdMap) != 0
}
func LoadMailAccountMap() error {
ctx, ctxCanc := context.WithTimeout(context.Background(), db.CtxDefTimeoutSysTask)
defer ctxCanc()
tx, err := db.Pool.Begin(ctx)
if err != nil {
return err
}
defer tx.Rollback(ctx)
if err := LoadMailAccountMap_tx(ctx, tx); err != nil {
return err
}
return tx.Commit(ctx)
}
func LoadMailAccountMap_tx(ctx context.Context, tx pgx.Tx) error {
rows, err := tx.Query(ctx, `

View File

@@ -3,7 +3,6 @@ package cache
import (
"context"
"fmt"
"r3/db"
"r3/types"
"sync"
@@ -33,22 +32,6 @@ func GetOauthClient(id int32) (types.OauthClient, error) {
return c, nil
}
func LoadOauthClientMap() error {
ctx, ctxCanc := context.WithTimeout(context.Background(), db.CtxDefTimeoutSysTask)
defer ctxCanc()
tx, err := db.Pool.Begin(ctx)
if err != nil {
return err
}
defer tx.Rollback(ctx)
if err := LoadOauthClientMap_tx(ctx, tx); err != nil {
return err
}
return tx.Commit(ctx)
}
func LoadOauthClientMap_tx(ctx context.Context, tx pgx.Tx) error {
rows, err := tx.Query(ctx, `

View File

@@ -2,10 +2,10 @@ package cache
import (
"context"
"r3/db"
"sync"
"github.com/gofrs/uuid"
"github.com/jackc/pgx/v5"
)
var (
@@ -29,13 +29,13 @@ func GetPresetRecordId(presetId uuid.UUID) int64 {
return v
}
func renewPresetRecordIds() error {
func renewPresetRecordIds_tx(ctx context.Context, tx pgx.Tx) error {
preset_mx.Lock()
defer preset_mx.Unlock()
presetIdMapRecordId = make(map[uuid.UUID]int64)
rows, err := db.Pool.Query(context.Background(), `
rows, err := tx.Query(ctx, `
SELECT preset_id, record_id_wofk
FROM instance.preset_record
`)

16
cache/cache_pwa.go vendored
View File

@@ -49,22 +49,6 @@ func GetPwaDomainMap() map[string]uuid.UUID {
return pwaDomainMap
}
func LoadPwaDomainMap() error {
ctx, ctxCanc := context.WithTimeout(context.Background(), db.CtxDefTimeoutSysTask)
defer ctxCanc()
tx, err := db.Pool.Begin(ctx)
if err != nil {
return err
}
defer tx.Rollback(ctx)
if err := LoadPwaDomainMap_tx(ctx, tx); err != nil {
return err
}
return tx.Commit(ctx)
}
func LoadPwaDomainMap_tx(ctx context.Context, tx pgx.Tx) error {
pwa_mx.Lock()
defer pwa_mx.Unlock()

60
cache/cache_schema.go vendored
View File

@@ -4,6 +4,7 @@
package cache
import (
"context"
"encoding/json"
"fmt"
"r3/config/module_meta"
@@ -32,6 +33,7 @@ import (
"sync"
"github.com/gofrs/uuid"
"github.com/jackc/pgx/v5"
"golang.org/x/exp/maps"
)
@@ -69,8 +71,8 @@ func GetModuleCacheJson(moduleId uuid.UUID) (json.RawMessage, error) {
}
return json, nil
}
func LoadModuleIdMapMeta() error {
moduleIdMapMetaNew, err := module_meta.GetIdMap()
func LoadModuleIdMapMeta_tx(ctx context.Context, tx pgx.Tx) error {
moduleIdMapMetaNew, err := module_meta.GetIdMap_tx(ctx, tx)
if err != nil {
return err
}
@@ -91,21 +93,21 @@ func LoadModuleIdMapMeta() error {
}
// load all modules into the schema cache
func LoadSchema() error {
return UpdateSchema(maps.Keys(moduleIdMapMeta), true)
func LoadSchema_tx(ctx context.Context, tx pgx.Tx) error {
return UpdateSchema_tx(ctx, tx, maps.Keys(moduleIdMapMeta), true)
}
// update module schema cache
func UpdateSchema(moduleIds []uuid.UUID, initialLoad bool) error {
func UpdateSchema_tx(ctx context.Context, tx pgx.Tx, moduleIds []uuid.UUID, initialLoad bool) error {
var err error
if err := updateSchemaCache(moduleIds); err != nil {
if err := updateSchemaCache_tx(ctx, tx, moduleIds); err != nil {
return err
}
// renew caches, affected by potentially changed modules (preset records, login access)
renewIcsFields()
if err := renewPresetRecordIds(); err != nil {
if err := renewPresetRecordIds_tx(ctx, tx); err != nil {
return err
}
@@ -125,7 +127,7 @@ func UpdateSchema(moduleIds []uuid.UUID, initialLoad bool) error {
// update change date for updated modules
now := tools.GetTimeUnix()
if err := module_meta.SetDateChange(moduleIds, now); err != nil {
if err := module_meta.SetDateChange_tx(ctx, tx, moduleIds, now); err != nil {
return err
}
@@ -134,7 +136,7 @@ func UpdateSchema(moduleIds []uuid.UUID, initialLoad bool) error {
for _, id := range moduleIds {
meta, exists := moduleIdMapMeta[id]
if !exists {
meta, err = module_meta.Get(id)
meta, err = module_meta.Get_tx(ctx, tx, id)
if err != nil {
return err
}
@@ -146,13 +148,13 @@ func UpdateSchema(moduleIds []uuid.UUID, initialLoad bool) error {
return nil
}
func updateSchemaCache(moduleIds []uuid.UUID) error {
func updateSchemaCache_tx(ctx context.Context, tx pgx.Tx, moduleIds []uuid.UUID) error {
Schema_mx.Lock()
defer Schema_mx.Unlock()
log.Info("cache", fmt.Sprintf("starting schema processing for %d module(s)", len(moduleIds)))
mods, err := module.Get(moduleIds)
mods, err := module.Get_tx(ctx, tx, moduleIds)
if err != nil {
return err
}
@@ -177,7 +179,7 @@ func updateSchemaCache(moduleIds []uuid.UUID) error {
// get articles
log.Info("cache", "load articles")
mod.Articles, err = article.Get(mod.Id)
mod.Articles, err = article.Get_tx(ctx, tx, mod.Id)
if err != nil {
return err
}
@@ -185,7 +187,7 @@ func updateSchemaCache(moduleIds []uuid.UUID) error {
// get relations
log.Info("cache", "load relations")
rels, err := relation.Get(mod.Id)
rels, err := relation.Get_tx(ctx, tx, mod.Id)
if err != nil {
return err
}
@@ -193,7 +195,7 @@ func updateSchemaCache(moduleIds []uuid.UUID) error {
for _, rel := range rels {
// get attributes
atrs, err := attribute.Get(rel.Id)
atrs, err := attribute.Get_tx(ctx, tx, rel.Id)
if err != nil {
return err
}
@@ -205,13 +207,13 @@ func updateSchemaCache(moduleIds []uuid.UUID) error {
}
// get indexes
rel.Indexes, err = pgIndex.Get(rel.Id)
rel.Indexes, err = pgIndex.Get_tx(ctx, tx, rel.Id)
if err != nil {
return err
}
// get presets
rel.Presets, err = preset.Get(rel.Id)
rel.Presets, err = preset.Get_tx(ctx, tx, rel.Id)
if err != nil {
return err
}
@@ -224,7 +226,7 @@ func updateSchemaCache(moduleIds []uuid.UUID) error {
// get forms
log.Info("cache", "load forms")
mod.Forms, err = form.Get(mod.Id, []uuid.UUID{})
mod.Forms, err = form.Get_tx(ctx, tx, mod.Id, []uuid.UUID{})
if err != nil {
return err
}
@@ -232,7 +234,7 @@ func updateSchemaCache(moduleIds []uuid.UUID) error {
// get menu tabs
log.Info("cache", "load menu tabs")
mod.MenuTabs, err = menuTab.Get(mod.Id)
mod.MenuTabs, err = menuTab.Get_tx(ctx, tx, mod.Id)
if err != nil {
return err
}
@@ -240,7 +242,7 @@ func updateSchemaCache(moduleIds []uuid.UUID) error {
// get icons
log.Info("cache", "load icons")
mod.Icons, err = icon.Get(mod.Id)
mod.Icons, err = icon.Get_tx(ctx, tx, mod.Id)
if err != nil {
return err
}
@@ -248,7 +250,7 @@ func updateSchemaCache(moduleIds []uuid.UUID) error {
// get roles
log.Info("cache", "load roles")
mod.Roles, err = role.Get(mod.Id)
mod.Roles, err = role.Get_tx(ctx, tx, mod.Id)
if err != nil {
return err
}
@@ -261,13 +263,13 @@ func updateSchemaCache(moduleIds []uuid.UUID) error {
// get login forms
log.Info("cache", "load login forms")
mod.LoginForms, err = loginForm.Get(mod.Id)
mod.LoginForms, err = loginForm.Get_tx(ctx, tx, mod.Id)
if err != nil {
return err
}
// get triggers
mod.PgTriggers, err = pgTrigger.Get(mod.Id)
mod.PgTriggers, err = pgTrigger.Get_tx(ctx, tx, mod.Id)
if err != nil {
return err
}
@@ -275,7 +277,7 @@ func updateSchemaCache(moduleIds []uuid.UUID) error {
// store & backfill PG functions
log.Info("cache", "load PG functions")
mod.PgFunctions, err = pgFunction.Get(mod.Id)
mod.PgFunctions, err = pgFunction.Get_tx(ctx, tx, mod.Id)
if err != nil {
return err
}
@@ -286,7 +288,7 @@ func updateSchemaCache(moduleIds []uuid.UUID) error {
// get JS functions
log.Info("cache", "load JS functions")
mod.JsFunctions, err = jsFunction.Get(mod.Id)
mod.JsFunctions, err = jsFunction.Get_tx(ctx, tx, mod.Id)
if err != nil {
return err
}
@@ -294,7 +296,7 @@ func updateSchemaCache(moduleIds []uuid.UUID) error {
// get collections
log.Info("cache", "load collections")
mod.Collections, err = collection.Get(mod.Id)
mod.Collections, err = collection.Get_tx(ctx, tx, mod.Id)
if err != nil {
return err
}
@@ -302,7 +304,7 @@ func updateSchemaCache(moduleIds []uuid.UUID) error {
// get APIs
log.Info("cache", "load APIs")
mod.Apis, err = api.Get(mod.Id, uuid.Nil)
mod.Apis, err = api.Get_tx(ctx, tx, mod.Id, uuid.Nil)
if err != nil {
return err
}
@@ -314,7 +316,7 @@ func updateSchemaCache(moduleIds []uuid.UUID) error {
// get client events
log.Info("cache", "load client events")
mod.ClientEvents, err = clientEvent.Get(mod.Id)
mod.ClientEvents, err = clientEvent.Get_tx(ctx, tx, mod.Id)
if err != nil {
return err
}
@@ -325,7 +327,7 @@ func updateSchemaCache(moduleIds []uuid.UUID) error {
// get variables
log.Info("cache", "load variables")
mod.Variables, err = variable.Get(mod.Id)
mod.Variables, err = variable.Get_tx(ctx, tx, mod.Id)
if err != nil {
return err
}
@@ -333,7 +335,7 @@ func updateSchemaCache(moduleIds []uuid.UUID) error {
// get widgets
log.Info("cache", "load widgets")
mod.Widgets, err = widget.Get(mod.Id)
mod.Widgets, err = widget.Get_tx(ctx, tx, mod.Id)
if err != nil {
return err
}

View File

@@ -22,7 +22,7 @@ var (
// register cluster node with shared database
// read existing node ID from configuration file if exists
func StartNode() error {
func StartNode_tx(ctx context.Context, tx pgx.Tx) error {
// create node ID for itself if it does not exist yet
if config.File.Cluster.NodeId == "" {
@@ -47,7 +47,7 @@ func StartNode() error {
// check whether node is already registered
var nodeName string
err = db.Pool.QueryRow(context.Background(), `
err = tx.QueryRow(ctx, `
SELECT name
FROM instance_cluster.node
WHERE id = $1
@@ -60,14 +60,14 @@ func StartNode() error {
if !exists {
// generate new node name
if err := db.Pool.QueryRow(context.Background(), `
if err := tx.QueryRow(ctx, `
SELECT CONCAT('node',(COUNT(*)+1)::TEXT)
FROM instance_cluster.node
`).Scan(&nodeName); err != nil {
return err
}
if _, err := db.Pool.Exec(context.Background(), `
if _, err := tx.Exec(ctx, `
INSERT INTO instance_cluster.node (id,name,hostname,date_started,
date_check_in,stat_memory,cluster_master,running)
VALUES ($1,$2,$3,$4,0,-1,false,true)
@@ -76,7 +76,7 @@ func StartNode() error {
}
} else {
// node is starting up - set start time, disable master role and delete missed events
if _, err := db.Pool.Exec(context.Background(), `
if _, err := tx.Exec(ctx, `
UPDATE instance_cluster.node
SET date_started = $1, cluster_master = false, running = true
WHERE id = $2
@@ -84,7 +84,7 @@ func StartNode() error {
return err
}
if _, err := db.Pool.Exec(context.Background(), `
if _, err := tx.Exec(ctx, `
DELETE FROM instance_cluster.node_event
WHERE node_id = $1
`, nodeId); err != nil {
@@ -98,9 +98,9 @@ func StartNode() error {
log.SetNodeId(nodeId)
return nil
}
func StopNode() error {
func StopNode(ctx context.Context) error {
// on shutdown: Give up master role and disable running state
_, err := db.Pool.Exec(context.Background(), `
_, err := db.Pool.Exec(ctx, `
UPDATE instance_cluster.node
SET cluster_master = false, running = false
WHERE id = $1
@@ -151,7 +151,7 @@ func SetNode_tx(ctx context.Context, tx pgx.Tx, id uuid.UUID, name string) error
// helper
// creates node events to some nodes (by node IDs) or all but the current node (if no node IDs are given)
func CreateEventForNodes(nodeIds []uuid.UUID, content string, payload interface{}, target types.ClusterEventTarget) error {
func CreateEventForNodes_tx(ctx context.Context, tx pgx.Tx, nodeIds []uuid.UUID, content string, payload interface{}, target types.ClusterEventTarget) error {
payloadJson, err := json.Marshal(payload)
if err != nil {
return err
@@ -176,7 +176,7 @@ func CreateEventForNodes(nodeIds []uuid.UUID, content string, payload interface{
if len(nodeIds) == 0 {
// if no node IDs are defined, apply to all other nodes
if _, err := db.Pool.Exec(context.Background(), `
if _, err := tx.Exec(ctx, `
INSERT INTO instance_cluster.node_event (
node_id, content, payload, target_address,
target_device, target_login_id
@@ -189,7 +189,7 @@ func CreateEventForNodes(nodeIds []uuid.UUID, content string, payload interface{
return err
}
} else {
if _, err := db.Pool.Exec(context.Background(), `
if _, err := tx.Exec(ctx, `
INSERT INTO instance_cluster.node_event (
node_id, content, payload, target_address,
target_device, target_login_id
@@ -204,6 +204,6 @@ func CreateEventForNodes(nodeIds []uuid.UUID, content string, payload interface{
}
return nil
}
func createEventsForOtherNodes(content string, payload interface{}, target types.ClusterEventTarget) error {
return CreateEventForNodes([]uuid.UUID{}, content, payload, target)
func createEventsForOtherNodes_tx(ctx context.Context, tx pgx.Tx, content string, payload interface{}, target types.ClusterEventTarget) error {
return CreateEventForNodes_tx(ctx, tx, []uuid.UUID{}, content, payload, target)
}

View File

@@ -54,11 +54,11 @@ func CheckInNode() error {
}
// events relevant to all cluster nodes
func ClientEventsChanged(updateNodes bool, address string, loginId int64) error {
func ClientEventsChanged_tx(ctx context.Context, tx pgx.Tx, updateNodes bool, address string, loginId int64) error {
target := types.ClusterEventTarget{Address: address, Device: types.WebsocketClientDeviceFatClient, LoginId: loginId}
if updateNodes {
if err := createEventsForOtherNodes("clientEventsChanged", nil, target); err != nil {
if err := createEventsForOtherNodes_tx(ctx, tx, "clientEventsChanged", nil, target); err != nil {
return err
}
}
@@ -120,16 +120,16 @@ func CollectionsUpdated(updates []types.ClusterEventCollectionUpdated) {
}
}
}
func ConfigChanged(updateNodes bool, loadConfigFromDb bool, productionModeChange bool) error {
func ConfigChanged_tx(ctx context.Context, tx pgx.Tx, updateNodes bool, loadConfigFromDb bool, productionModeChange bool) error {
if updateNodes {
if err := createEventsForOtherNodes("configChanged", productionModeChange, types.ClusterEventTarget{}); err != nil {
if err := createEventsForOtherNodes_tx(ctx, tx, "configChanged", productionModeChange, types.ClusterEventTarget{}); err != nil {
return err
}
}
// load all config settings from the database
if loadConfigFromDb {
config.LoadFromDb()
config.LoadFromDb_tx(ctx, tx)
}
// inform clients about changed config
@@ -144,7 +144,7 @@ func ConfigChanged(updateNodes bool, loadConfigFromDb bool, productionModeChange
config.SetLogLevels()
return nil
}
func FilesCopied(updateNodes bool, address string, loginId int64,
func FilesCopied_tx(ctx context.Context, tx pgx.Tx, updateNodes bool, address string, loginId int64,
attributeId uuid.UUID, fileIds []uuid.UUID, recordId int64) error {
target := types.ClusterEventTarget{Address: address, Device: types.WebsocketClientDeviceBrowser, LoginId: loginId}
@@ -155,7 +155,7 @@ func FilesCopied(updateNodes bool, address string, loginId int64,
}
if updateNodes {
if err := createEventsForOtherNodes("filesCopied", payload, target); err != nil {
if err := createEventsForOtherNodes_tx(ctx, tx, "filesCopied", payload, target); err != nil {
return err
}
}
@@ -166,7 +166,7 @@ func FilesCopied(updateNodes bool, address string, loginId int64,
}
return nil
}
func FileRequested(updateNodes bool, address string, loginId int64, attributeId uuid.UUID,
func FileRequested_tx(ctx context.Context, tx pgx.Tx, updateNodes bool, address string, loginId int64, attributeId uuid.UUID,
fileId uuid.UUID, fileHash string, fileName string, chooseApp bool) error {
target := types.ClusterEventTarget{Address: address, Device: types.WebsocketClientDeviceFatClient, LoginId: loginId}
@@ -179,7 +179,7 @@ func FileRequested(updateNodes bool, address string, loginId int64, attributeId
}
if updateNodes {
if err := createEventsForOtherNodes("fileRequested", payload, target); err != nil {
if err := createEventsForOtherNodes_tx(ctx, tx, "fileRequested", payload, target); err != nil {
return err
}
}
@@ -190,7 +190,7 @@ func FileRequested(updateNodes bool, address string, loginId int64, attributeId
}
return nil
}
func JsFunctionCalled(updateNodes bool, address string, loginId int64, jsFunctionId uuid.UUID, arguments []interface{}) error {
func JsFunctionCalled_tx(ctx context.Context, tx pgx.Tx, updateNodes bool, address string, loginId int64, jsFunctionId uuid.UUID, arguments []interface{}) error {
target := types.ClusterEventTarget{Address: address, Device: types.WebsocketClientDeviceBrowser, LoginId: loginId}
payload := types.ClusterEventJsFunctionCalled{
@@ -199,7 +199,7 @@ func JsFunctionCalled(updateNodes bool, address string, loginId int64, jsFunctio
}
if updateNodes {
if err := createEventsForOtherNodes("jsFunctionCalled", payload, target); err != nil {
if err := createEventsForOtherNodes_tx(ctx, tx, "jsFunctionCalled", payload, target); err != nil {
return err
}
}
@@ -210,10 +210,10 @@ func JsFunctionCalled(updateNodes bool, address string, loginId int64, jsFunctio
}
return nil
}
func KeystrokesRequested(updateNodes bool, address string, loginId int64, keystrokes string) error {
func KeystrokesRequested_tx(ctx context.Context, tx pgx.Tx, updateNodes bool, address string, loginId int64, keystrokes string) error {
target := types.ClusterEventTarget{Address: address, Device: types.WebsocketClientDeviceFatClient, LoginId: loginId}
if updateNodes {
if err := createEventsForOtherNodes("keystrokesRequested", keystrokes, target); err != nil {
if err := createEventsForOtherNodes_tx(ctx, tx, "keystrokesRequested", keystrokes, target); err != nil {
return err
}
}
@@ -224,26 +224,26 @@ func KeystrokesRequested(updateNodes bool, address string, loginId int64, keystr
}
return nil
}
func LoginDisabled(updateNodes bool, loginId int64) error {
func LoginDisabled_tx(ctx context.Context, tx pgx.Tx, updateNodes bool, loginId int64) error {
target := types.ClusterEventTarget{LoginId: loginId}
if updateNodes {
if err := createEventsForOtherNodes("loginDisabled", nil, target); err != nil {
if err := createEventsForOtherNodes_tx(ctx, tx, "loginDisabled", nil, target); err != nil {
return err
}
}
WebsocketClientEvents <- types.ClusterEvent{Content: "kick", Target: target}
return nil
}
func LoginReauthorized(updateNodes bool, loginId int64) error {
func LoginReauthorized_tx(ctx context.Context, tx pgx.Tx, updateNodes bool, loginId int64) error {
target := types.ClusterEventTarget{LoginId: loginId}
if updateNodes {
if err := createEventsForOtherNodes("loginReauthorized", nil, target); err != nil {
if err := createEventsForOtherNodes_tx(ctx, tx, "loginReauthorized", nil, target); err != nil {
return err
}
}
// renew access cache
if err := cache.RenewAccessById(loginId); err != nil {
if err := cache.RenewAccessById_tx(ctx, tx, loginId); err != nil {
return err
}
@@ -251,15 +251,15 @@ func LoginReauthorized(updateNodes bool, loginId int64) error {
WebsocketClientEvents <- types.ClusterEvent{Content: "renew", Target: target}
return nil
}
func LoginReauthorizedAll(updateNodes bool) error {
func LoginReauthorizedAll_tx(ctx context.Context, tx pgx.Tx, updateNodes bool) error {
if updateNodes {
if err := createEventsForOtherNodes("loginReauthorizedAll", nil, types.ClusterEventTarget{}); err != nil {
if err := createEventsForOtherNodes_tx(ctx, tx, "loginReauthorizedAll", nil, types.ClusterEventTarget{}); err != nil {
return err
}
}
// renew access cache for all logins
if err := cache.RenewAccessAll(); err != nil {
if err := cache.RenewAccessAll_tx(ctx, tx); err != nil {
return err
}
@@ -275,11 +275,11 @@ func MasterAssigned(state bool) error {
SchedulerRestart <- true
return nil
}
func SchemaChanged(updateNodes bool, moduleIds []uuid.UUID) error {
func SchemaChanged_tx(ctx context.Context, tx pgx.Tx, updateNodes bool, moduleIds []uuid.UUID) error {
target := types.ClusterEventTarget{Device: types.WebsocketClientDeviceBrowser}
if updateNodes {
if err := createEventsForOtherNodes("schemaChanged", moduleIds, target); err != nil {
if err := createEventsForOtherNodes_tx(ctx, tx, "schemaChanged", moduleIds, target); err != nil {
return err
}
}
@@ -294,10 +294,10 @@ func SchemaChanged(updateNodes bool, moduleIds []uuid.UUID) error {
if len(moduleIds) != 0 {
// modules were changed, update schema & access cache
if err := cache.UpdateSchema(moduleIds, false); err != nil {
if err := cache.UpdateSchema_tx(ctx, tx, moduleIds, false); err != nil {
return err
}
if err := cache.RenewAccessAll(); err != nil {
if err := cache.RenewAccessAll_tx(ctx, tx); err != nil {
return err
}
@@ -305,10 +305,10 @@ func SchemaChanged(updateNodes bool, moduleIds []uuid.UUID) error {
WebsocketClientEvents <- types.ClusterEvent{Content: "renew"}
} else {
// no module IDs are given if modules were deleted, module options were changed, or custom captions were updated
if err := cache.LoadModuleIdMapMeta(); err != nil {
if err := cache.LoadModuleIdMapMeta_tx(ctx, tx); err != nil {
return err
}
if err := cache.LoadCaptionMapCustom(); err != nil {
if err := cache.LoadCaptionMapCustom_tx(ctx, tx); err != nil {
return err
}
}
@@ -317,9 +317,9 @@ func SchemaChanged(updateNodes bool, moduleIds []uuid.UUID) error {
SchedulerRestart <- true
return nil
}
func TasksChanged(updateNodes bool) error {
func TasksChanged_tx(ctx context.Context, tx pgx.Tx, updateNodes bool) error {
if updateNodes {
if err := createEventsForOtherNodes("tasksChanged", nil, types.ClusterEventTarget{}); err != nil {
if err := createEventsForOtherNodes_tx(ctx, tx, "tasksChanged", nil, types.ClusterEventTarget{}); err != nil {
return err
}
}

View File

@@ -5,7 +5,6 @@ import (
"encoding/json"
"math/rand"
"os"
"r3/db"
"r3/log"
"r3/tools"
"r3/types"
@@ -15,6 +14,7 @@ import (
"github.com/gbrlsnchs/jwt/v3"
"github.com/gofrs/uuid"
"github.com/jackc/pgx/v5"
)
var (
@@ -156,7 +156,7 @@ func SetLogLevels() {
log.SetLogLevel("transfer", int(GetUint64("logTransfer")))
log.SetLogLevel("websocket", int(GetUint64("logWebsocket")))
}
func SetInstanceIdIfEmpty() error {
func SetInstanceIdIfEmpty_tx(ctx context.Context, tx pgx.Tx) error {
if GetString("instanceId") != "" {
return nil
}
@@ -165,18 +165,7 @@ func SetInstanceIdIfEmpty() error {
if err != nil {
return err
}
ctx := context.Background()
tx, err := db.Pool.Begin(ctx)
if err != nil {
return err
}
defer tx.Rollback(ctx)
if err := SetString_tx(ctx, tx, "instanceId", id.String()); err != nil {
return err
}
return tx.Commit(ctx)
return SetString_tx(ctx, tx, "instanceId", id.String())
}
// config file
@@ -221,23 +210,13 @@ func WriteFile() error {
}
// token
func ProcessTokenSecret() error {
func ProcessTokenSecret_tx(ctx context.Context, tx pgx.Tx) error {
secret := GetString("tokenSecret")
if secret == "" {
min, max := 32, 48
secret = tools.RandStringRunes(rand.Intn(max-min+1) + min)
ctx := context.Background()
tx, err := db.Pool.Begin(ctx)
if err != nil {
return err
}
if err := SetString_tx(ctx, tx, "tokenSecret", secret); err != nil {
tx.Rollback(ctx)
return err
}
if err := tx.Commit(ctx); err != nil {
return err
}
}

View File

@@ -130,6 +130,18 @@ func LoadFromDb() error {
ctx, ctxCanc := context.WithTimeout(context.Background(), db.CtxDefTimeoutSysTask)
defer ctxCanc()
tx, err := db.Pool.Begin(ctx)
if err != nil {
return err
}
defer tx.Rollback(ctx)
if err := LoadFromDb_tx(ctx, tx); err != nil {
return err
}
return tx.Commit(ctx)
}
func LoadFromDb_tx(ctx context.Context, tx pgx.Tx) error {
access_mx.Lock()
defer access_mx.Unlock()
@@ -144,7 +156,7 @@ func LoadFromDb() error {
storeUint64Slice[name] = make([]uint64, 0)
}
rows, err := db.Pool.Query(ctx, "SELECT name, value FROM instance.config")
rows, err := tx.Query(ctx, "SELECT name, value FROM instance.config")
if err != nil {
return err
}

View File

@@ -19,12 +19,12 @@ func Create_tx(ctx context.Context, tx pgx.Tx, moduleId uuid.UUID, hidden bool,
return err
}
func Get(moduleId uuid.UUID) (types.ModuleMeta, error) {
func Get_tx(ctx context.Context, tx pgx.Tx, moduleId uuid.UUID) (types.ModuleMeta, error) {
var m = types.ModuleMeta{
Id: moduleId,
}
err := db.Pool.QueryRow(context.Background(), `
err := tx.QueryRow(ctx, `
SELECT hidden, owner, position, date_change, languages_custom
FROM instance.module_meta
WHERE module_id = $1
@@ -44,10 +44,10 @@ func GetDateChange(moduleId uuid.UUID) (uint64, error) {
`, moduleId).Scan(&dateChange)
return dateChange, err
}
func GetIdMap() (map[uuid.UUID]types.ModuleMeta, error) {
func GetIdMap_tx(ctx context.Context, tx pgx.Tx) (map[uuid.UUID]types.ModuleMeta, error) {
moduleIdMap := make(map[uuid.UUID]types.ModuleMeta)
rows, err := db.Pool.Query(context.Background(), `
rows, err := tx.Query(ctx, `
SELECT module_id, hidden, owner, position, date_change, languages_custom
FROM instance.module_meta
`)
@@ -58,9 +58,7 @@ func GetIdMap() (map[uuid.UUID]types.ModuleMeta, error) {
for rows.Next() {
var m types.ModuleMeta
if err := rows.Scan(&m.Id, &m.Hidden, &m.Owner, &m.Position,
&m.DateChange, &m.LanguagesCustom); err != nil {
if err := rows.Scan(&m.Id, &m.Hidden, &m.Owner, &m.Position, &m.DateChange, &m.LanguagesCustom); err != nil {
return moduleIdMap, err
}
if m.LanguagesCustom == nil {
@@ -89,8 +87,8 @@ func GetOwner(moduleId uuid.UUID) (bool, error) {
return isOwner, err
}
func SetDateChange(moduleIds []uuid.UUID, date int64) error {
_, err := db.Pool.Exec(context.Background(), `
func SetDateChange_tx(ctx context.Context, tx pgx.Tx, moduleIds []uuid.UUID, date int64) error {
_, err := tx.Exec(ctx, `
UPDATE instance.module_meta
SET date_change = $2
WHERE module_id = ANY($1)

View File

@@ -23,7 +23,9 @@ var (
CtxDefTimeoutDbTask = 300 * time.Second // heavy DB operations (init/upgrade/relation retention cleanup)
CtxDefTimeoutLogWrite = 30 * time.Second // writing to database log
CtxDefTimeoutPgFunc = 240 * time.Second // executing plsql functions, to be replaced by config option
CtxDefTimeoutShutdown = 10 * time.Second // shutting down system
CtxDefTimeoutSysTask = 30 * time.Second // executing system tasks
CtxDefTimeoutSysStart = 300 * time.Second // executing system startup tasks
CtxDefTimeoutTransfer = 600 * time.Second // executing module transfers, to be replaced by config option
)

View File

@@ -3,6 +3,7 @@ package initialize
import (
"context"
"fmt"
"r3/bruteforce"
"r3/config"
"r3/db"
"r3/db/upgrade"
@@ -68,6 +69,9 @@ func PrepareDbIfNew() error {
if err := config.LoadFromDb(); err != nil {
return err
}
bruteforce.SetConfig()
config.ActivateLicense()
config.SetLogLevels()
// before doing any more work, upgrade DB if necessary
if err := upgrade.RunIfRequired(); err != nil {

View File

@@ -73,8 +73,21 @@ func Handler(w http.ResponseWriter, r *http.Request) {
return
}
// start DB transaction
tx, err := db.Pool.Begin(ctx)
if err != nil {
handler.AbortRequest(w, handlerContext, err, handler.ErrGeneral)
return
}
defer tx.Rollback(ctx)
if err := db.SetSessionConfig_tx(ctx, tx, loginId); err != nil {
handler.AbortRequest(w, handlerContext, err, handler.ErrGeneral)
return
}
// get calendar field details from cache
f, err := cache.GetCalendarField(fieldId)
f, err := cache.GetCalendarField_tx(ctx, tx, fieldId)
if err != nil {
handler.AbortRequest(w, handlerContext, err, handler.ErrGeneral)
return
@@ -198,18 +211,6 @@ func Handler(w http.ResponseWriter, r *http.Request) {
}
// get data
tx, err := db.Pool.Begin(ctx)
if err != nil {
handler.AbortRequest(w, handlerContext, err, handler.ErrGeneral)
return
}
defer tx.Rollback(ctx)
if err := db.SetSessionConfig_tx(ctx, tx, loginId); err != nil {
handler.AbortRequest(w, handlerContext, err, handler.ErrGeneral)
return
}
var query string
results, _, err := data.Get_tx(ctx, tx, dataGet, loginId, &query)
if err != nil {

View File

@@ -94,13 +94,11 @@ func Handler(w http.ResponseWriter, r *http.Request) {
handler.AbortRequest(w, logContext, err, handler.ErrGeneral)
return
}
if err := tx.Commit(ctx); err != nil {
if err := cluster.ConfigChanged_tx(ctx, tx, true, false, false); err != nil {
handler.AbortRequest(w, logContext, err, handler.ErrGeneral)
return
}
// apply new config
if err := cluster.ConfigChanged(true, false, false); err != nil {
if err := tx.Commit(ctx); err != nil {
handler.AbortRequest(w, logContext, err, handler.ErrGeneral)
return
}

View File

@@ -3,7 +3,6 @@ package ldap
import (
"context"
"r3/cache"
"r3/db"
"r3/login"
"r3/types"
"strings"
@@ -150,22 +149,6 @@ func Set_tx(ctx context.Context, tx pgx.Tx, l types.Ldap) error {
}
return nil
}
func UpdateCache() error {
ctx, ctxCanc := context.WithTimeout(context.Background(), db.CtxDefTimeoutSysTask)
defer ctxCanc()
tx, err := db.Pool.Begin(ctx)
if err != nil {
return err
}
defer tx.Rollback(ctx)
if err := UpdateCache_tx(ctx, tx); err != nil {
return err
}
return tx.Commit(ctx)
}
func UpdateCache_tx(ctx context.Context, tx pgx.Tx) error {
ldaps, err := Get_tx(ctx, tx)
if err != nil {

View File

@@ -143,16 +143,11 @@ func SetLdapLogin(ldap types.Ldap, ldapKey string, name string,
return err
}
// commit before renewing access cache (to apply new permissions)
if err := tx.Commit(ctx); err != nil {
return err
}
// roles needed to be changed for active login, reauthorize
if active && rolesChanged {
log.Info("ldap", fmt.Sprintf("user account '%s' received new roles, renewing access permissions", name))
if err := cluster.LoginReauthorized(true, loginId); err != nil {
if err := cluster.LoginReauthorized_tx(ctx, tx, true, loginId); err != nil {
log.Warning("ldap", fmt.Sprintf("could not renew access permissions for '%s'", name), err)
}
}
@@ -161,9 +156,9 @@ func SetLdapLogin(ldap types.Ldap, ldapKey string, name string,
if !active && activeEx {
log.Info("ldap", fmt.Sprintf("user account '%s' is locked, kicking active sessions", name))
if err := cluster.LoginDisabled(true, loginId); err != nil {
if err := cluster.LoginDisabled_tx(ctx, tx, true, loginId); err != nil {
log.Warning("ldap", fmt.Sprintf("could not kick active sessions for '%s'", name), err)
}
}
return nil
return tx.Commit(ctx)
}

View File

@@ -175,24 +175,26 @@ func LogsGet_tx(ctx context.Context, tx pgx.Tx, byString pgtype.Text, limit int,
}, nil
}
func LogsRemoveForNode() error {
ctx, ctxCanc := context.WithTimeout(context.Background(), db.CtxDefTimeoutSysTask)
defer ctxCanc()
func LogsRemoveForNode(ctx context.Context) error {
tx, err := db.Pool.Begin(ctx)
if err != nil {
return err
}
defer tx.Rollback(ctx)
if _, err := tx.Exec(ctx, `
DELETE FROM instance.login_session
WHERE node_id = $1
`, cache.GetNodeId()); err != nil {
if err := LogsRemoveForNode_tx(ctx, tx); err != nil {
return err
}
return tx.Commit(ctx)
}
func LogsRemoveForNode_tx(ctx context.Context, tx pgx.Tx) error {
_, err := tx.Exec(ctx, `
DELETE FROM instance.login_session
WHERE node_id = $1
`, cache.GetNodeId())
return err
}
// retrieves concurrent session count for limited or not-limited logins
// also retrieves if the given loginId already had a session

156
r3.go
View File

@@ -13,6 +13,7 @@ import (
"os"
"os/signal"
"path/filepath"
"r3/bruteforce"
"r3/cache"
"r3/cluster"
"r3/config"
@@ -332,19 +333,22 @@ func (prg *program) execute(svc service.Service) {
// check for first database start
if err := initialize.PrepareDbIfNew(); err != nil {
prg.executeAborted(svc, fmt.Errorf("failed to initiate database on first start, %v", err))
prg.executeAborted(svc, fmt.Errorf("failed to initialize database on first start, %v", err))
return
}
// apply configuration from database
if err := cluster.ConfigChanged(false, true, false); err != nil {
if err := config.LoadFromDb(); err != nil {
prg.executeAborted(svc, fmt.Errorf("failed to apply configuration from database, %v", err))
return
}
bruteforce.SetConfig()
config.ActivateLicense()
config.SetLogLevels()
// store host details in cache (before cluster node startup)
if err := cache.SetHostnameFromOs(); err != nil {
prg.executeAborted(svc, fmt.Errorf("failed to load host details, %v", err))
// run automatic database upgrade if required
if err := upgrade.RunIfRequired(); err != nil {
prg.executeAborted(svc, fmt.Errorf("failed automatic upgrade of database, %v", err))
return
}
@@ -365,67 +369,9 @@ func (prg *program) execute(svc service.Service) {
return
}
// run automatic database upgrade if required
if err := upgrade.RunIfRequired(); err != nil {
prg.executeAborted(svc, fmt.Errorf("failed automatic upgrade of database, %v", err))
return
}
// setup cluster node with shared database
if err := cluster.StartNode(); err != nil {
prg.executeAborted(svc, fmt.Errorf("failed to setup cluster node, %v", err))
return
}
// remove login sessions logs for this cluster node (in case they were not removed on shutdown)
if err := login_session.LogsRemoveForNode(); err != nil {
prg.logger.Error(err)
}
// initialize caches
// module meta data must be loaded before module schema (informs about what modules to load)
if err := cache.LoadModuleIdMapMeta(); err != nil {
prg.executeAborted(svc, fmt.Errorf("failed to initialize module meta cache, %v", err))
return
}
if err := cache.LoadCaptionMapCustom(); err != nil {
prg.executeAborted(svc, fmt.Errorf("failed to initialize custom caption map cache, %v", err))
return
}
if err := cache.LoadSchema(); err != nil {
prg.executeAborted(svc, fmt.Errorf("failed to initialize schema cache, %v", err))
return
}
if err := cache.LoadMailAccountMap(); err != nil {
prg.executeAborted(svc, fmt.Errorf("failed to initialize mail account cache, %v", err))
return
}
if err := cache.LoadOauthClientMap(); err != nil {
prg.executeAborted(svc, fmt.Errorf("failed to initialize oauth client cache, %v", err))
return
}
if err := cache.LoadPwaDomainMap(); err != nil {
prg.executeAborted(svc, fmt.Errorf("failed to initialize PWA domain cache, %v", err))
return
}
if err := cache.LoadSearchDictionaries(); err != nil {
// failure is not mission critical (in case of no access to DB system tables)
log.Error("server", "failed to read/update text search dictionaries", err)
}
if err := ldap.UpdateCache(); err != nil {
prg.executeAborted(svc, fmt.Errorf("failed to initialize LDAP cache, %v", err))
return
}
// process token secret for future client authentication from database
if err := config.ProcessTokenSecret(); err != nil {
prg.executeAborted(svc, fmt.Errorf("failed to process token secret, %v", err))
return
}
// set unique instance ID if empty
if err := config.SetInstanceIdIfEmpty(); err != nil {
prg.executeAborted(svc, fmt.Errorf("failed to set instance ID, %v", err))
// prepare cluster node & load caches
if err := initClusterAndCaches(); err != nil {
prg.executeAborted(svc, fmt.Errorf("failed to initalize caches during startup, %v", err))
return
}
@@ -437,7 +383,7 @@ func (prg *program) execute(svc service.Service) {
// start scheduler (must start after module cache)
go scheduler.Start()
// prepare web server
// start web server
go websocket.StartBackgroundTasks()
mux := http.NewServeMux()
@@ -544,6 +490,71 @@ func (prg *program) execute(svc service.Service) {
}
}
func initClusterAndCaches() error {
ctx, ctxCanc := context.WithTimeout(context.Background(), db.CtxDefTimeoutSysStart)
defer ctxCanc()
tx, err := db.Pool.Begin(ctx)
if err != nil {
return err
}
defer tx.Rollback(ctx)
// store host details in cache (before cluster node startup)
if err := cache.SetHostnameFromOs(); err != nil {
return fmt.Errorf("failed to load host details, %v", err)
}
// setup cluster node with shared database
if err := cluster.StartNode_tx(ctx, tx); err != nil {
return err
}
// remove login sessions logs for this cluster node (in case they were not removed on shutdown)
if err := login_session.LogsRemoveForNode_tx(ctx, tx); err != nil {
return err
}
// initialize caches
// module meta data must be loaded before module schema (informs about what modules to load)
if err := cache.LoadModuleIdMapMeta_tx(ctx, tx); err != nil {
return fmt.Errorf("failed to initialize module meta cache, %v", err)
}
if err := cache.LoadCaptionMapCustom_tx(ctx, tx); err != nil {
return fmt.Errorf("failed to initialize custom caption map cache, %v", err)
}
if err := cache.LoadSchema_tx(ctx, tx); err != nil {
return fmt.Errorf("failed to initialize schema cache, %v", err)
}
if err := cache.LoadMailAccountMap_tx(ctx, tx); err != nil {
return fmt.Errorf("failed to initialize mail account cache, %v", err)
}
if err := cache.LoadOauthClientMap_tx(ctx, tx); err != nil {
return fmt.Errorf("failed to initialize oauth client cache, %v", err)
}
if err := cache.LoadPwaDomainMap_tx(ctx, tx); err != nil {
return fmt.Errorf("failed to initialize PWA domain cache, %v", err)
}
if err := cache.LoadSearchDictionaries_tx(ctx, tx); err != nil {
// failure is not mission critical (in case of no access to DB system tables)
log.Error("server", "failed to read/update text search dictionaries", err)
}
if err := ldap.UpdateCache_tx(ctx, tx); err != nil {
return fmt.Errorf("failed to initialize LDAP cache, %v", err)
}
// process token secret for future client authentication from database
if err := config.ProcessTokenSecret_tx(ctx, tx); err != nil {
return fmt.Errorf("failed to process token secret, %v", err)
}
// set unique instance ID if empty
if err := config.SetInstanceIdIfEmpty_tx(ctx, tx); err != nil {
return fmt.Errorf("failed to set instance ID, %v", err)
}
return tx.Commit(ctx)
}
// properly shuts down application, if execution is aborted prematurely
func (prg *program) executeAborted(svc service.Service, err error) {
if err != nil {
@@ -578,8 +589,11 @@ func (prg *program) Stop(svc service.Service) error {
}
prg.stopping.Store(true)
ctx, ctxCanc := context.WithTimeout(context.Background(), db.CtxDefTimeoutShutdown)
defer ctxCanc()
// remove login session logs for this cluster node
if err := login_session.LogsRemoveForNode(); err != nil {
if err := login_session.LogsRemoveForNode(ctx); err != nil {
prg.logger.Error(err)
}
@@ -588,10 +602,6 @@ func (prg *program) Stop(svc service.Service) error {
// stop web server if running
if prg.webServer != nil {
ctx, ctxCanc := context.WithTimeout(context.Background(), 5*time.Second)
defer ctxCanc()
if err := prg.webServer.Shutdown(ctx); err != nil {
prg.logger.Error(err)
}
@@ -600,7 +610,7 @@ func (prg *program) Stop(svc service.Service) error {
// close database connection and deregister cluster node if DB is open
if db.Pool != nil {
if err := cluster.StopNode(); err != nil {
if err := cluster.StopNode(ctx); err != nil {
prg.logger.Error(err)
}
db.Close()

View File

@@ -137,13 +137,13 @@ func Exec_tx(ctx context.Context, tx pgx.Tx, address string, loginId int64, isAd
case "event":
switch action {
case "clientEventsChanged":
return eventClientEventsChanged(loginId, address)
return eventClientEventsChanged_tx(ctx, tx, loginId, address)
case "filesCopied":
return eventFilesCopied(reqJson, loginId, address)
return eventFilesCopied_tx(ctx, tx, reqJson, loginId, address)
case "fileRequested":
return eventFileRequested(ctx, reqJson, loginId, address)
return eventFileRequested_tx(ctx, tx, reqJson, loginId, address)
case "keystrokesRequested":
return eventKeystrokesRequested(reqJson, loginId, address)
return eventKeystrokesRequested_tx(ctx, tx, reqJson, loginId, address)
}
case "feedback":
switch action {
@@ -323,7 +323,7 @@ func Exec_tx(ctx context.Context, tx pgx.Tx, address string, loginId int64, isAd
case "setNode":
return ClusterNodeSet_tx(ctx, tx, reqJson)
case "shutdownNode":
return ClusterNodeShutdown(reqJson)
return ClusterNodeShutdown_tx(ctx, tx, reqJson)
}
case "dataSql":
switch action {
@@ -410,11 +410,11 @@ func Exec_tx(ctx context.Context, tx pgx.Tx, address string, loginId int64, isAd
case "getRecords":
return LoginGetRecords_tx(ctx, tx, reqJson)
case "kick":
return LoginKick(reqJson)
return LoginKick(ctx, tx, reqJson)
case "reauth":
return LoginReauth(reqJson)
return LoginReauth_tx(ctx, tx, reqJson)
case "reauthAll":
return LoginReauthAll()
return LoginReauthAll_tx(ctx, tx)
case "resetTotp":
return LoginResetTotp_tx(ctx, tx, reqJson)
case "set":
@@ -585,12 +585,12 @@ func Exec_tx(ctx context.Context, tx pgx.Tx, address string, loginId int64, isAd
case "check":
return SchemaCheck_tx(ctx, tx, reqJson)
case "reload":
return SchemaReload(reqJson)
return SchemaReload_tx(ctx, tx, reqJson)
}
case "task":
switch action {
case "informChanged":
return nil, cluster.TasksChanged(true)
return nil, cluster.TasksChanged_tx(ctx, tx, true)
case "run":
return TaskRun_tx(ctx, tx, reqJson)
case "set":

View File

@@ -94,7 +94,7 @@ func clientEventExecFatClient_tx(ctx context.Context, tx pgx.Tx, reqJson json.Ra
// execute valid actions
if ce.Action == "callJsFunction" && ce.JsFunctionId.Valid {
return nil, cluster.JsFunctionCalled(true, address, loginId, ce.JsFunctionId.Bytes, req.Arguments)
return nil, cluster.JsFunctionCalled_tx(ctx, tx, true, address, loginId, ce.JsFunctionId.Bytes, req.Arguments)
}
if ce.Action == "callPgFunction" && ce.PgFunctionId.Valid {

View File

@@ -37,7 +37,7 @@ func ClusterNodeSet_tx(ctx context.Context, tx pgx.Tx, reqJson json.RawMessage)
return nil, cluster.SetNode_tx(ctx, tx, req.Id, req.Name)
}
func ClusterNodeShutdown(reqJson json.RawMessage) (interface{}, error) {
func ClusterNodeShutdown_tx(ctx context.Context, tx pgx.Tx, reqJson json.RawMessage) (interface{}, error) {
var req struct {
Id uuid.UUID `json:"id"`
@@ -45,6 +45,6 @@ func ClusterNodeShutdown(reqJson json.RawMessage) (interface{}, error) {
if err := json.Unmarshal(reqJson, &req); err != nil {
return nil, err
}
return nil, cluster.CreateEventForNodes([]uuid.UUID{req.Id},
return nil, cluster.CreateEventForNodes_tx(ctx, tx, []uuid.UUID{req.Id},
"shutdownTriggered", "{}", types.ClusterEventTarget{})
}

View File

@@ -94,5 +94,5 @@ func ConfigSet_tx(ctx context.Context, tx pgx.Tx, reqJson json.RawMessage) (inte
}
}
}
return nil, cluster.ConfigChanged(true, false, productionModeChange)
return nil, cluster.ConfigChanged_tx(ctx, tx, true, false, productionModeChange)
}

View File

@@ -5,16 +5,16 @@ import (
"encoding/json"
"fmt"
"r3/cluster"
"r3/db"
"r3/schema"
"strings"
"github.com/gofrs/uuid"
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgtype"
)
// requests for browser clients
func eventFilesCopied(reqJson json.RawMessage, loginId int64, address string) (interface{}, error) {
func eventFilesCopied_tx(ctx context.Context, tx pgx.Tx, reqJson json.RawMessage, loginId int64, address string) (interface{}, error) {
// request file(s) to be copied (synchronized across all browser clients)
var req struct {
AttributeId uuid.UUID `json:"attributeId"`
@@ -24,14 +24,14 @@ func eventFilesCopied(reqJson json.RawMessage, loginId int64, address string) (i
if err := json.Unmarshal(reqJson, &req); err != nil {
return nil, err
}
return nil, cluster.FilesCopied(true, address, loginId, req.AttributeId, req.FileIds, req.RecordId)
return nil, cluster.FilesCopied_tx(ctx, tx, true, address, loginId, req.AttributeId, req.FileIds, req.RecordId)
}
// requests for fat clients
func eventClientEventsChanged(loginId int64, address string) (interface{}, error) {
return nil, cluster.ClientEventsChanged(true, address, loginId)
func eventClientEventsChanged_tx(ctx context.Context, tx pgx.Tx, loginId int64, address string) (interface{}, error) {
return nil, cluster.ClientEventsChanged_tx(ctx, tx, true, address, loginId)
}
func eventFileRequested(ctx context.Context, reqJson json.RawMessage, loginId int64, address string) (interface{}, error) {
func eventFileRequested_tx(ctx context.Context, tx pgx.Tx, reqJson json.RawMessage, loginId int64, address string) (interface{}, error) {
var req struct {
AttributeId uuid.UUID `json:"attributeId"`
FileId uuid.UUID `json:"fileId"`
@@ -47,7 +47,7 @@ func eventFileRequested(ctx context.Context, reqJson json.RawMessage, loginId in
// files before 3.1 do not have a hash value, empty hash is then compared against new file version hash
var hash pgtype.Text
var name string
if err := db.Pool.QueryRow(ctx, fmt.Sprintf(`
if err := tx.QueryRow(ctx, fmt.Sprintf(`
SELECT v.hash, r.name
FROM instance.file_version AS v
JOIN instance_file."%s" AS r
@@ -75,14 +75,14 @@ func eventFileRequested(ctx context.Context, reqJson json.RawMessage, loginId in
"\\", "",
"&", "").Replace(name)
return nil, cluster.FileRequested(true, address, loginId,
return nil, cluster.FileRequested_tx(ctx, tx, true, address, loginId,
req.AttributeId, req.FileId, hash.String, name, req.ChooseApp)
}
func eventKeystrokesRequested(reqJson json.RawMessage, loginId int64, address string) (interface{}, error) {
func eventKeystrokesRequested_tx(ctx context.Context, tx pgx.Tx, reqJson json.RawMessage, loginId int64, address string) (interface{}, error) {
var keystrokes string
if err := json.Unmarshal(reqJson, &keystrokes); err != nil {
return nil, err
}
return nil, cluster.KeystrokesRequested(true, address, loginId, keystrokes)
return nil, cluster.KeystrokesRequested_tx(ctx, tx, true, address, loginId, keystrokes)
}

View File

@@ -12,7 +12,7 @@ func LicenseDel_tx(ctx context.Context, tx pgx.Tx) (interface{}, error) {
if err := config.SetString_tx(ctx, tx, "licenseFile", ""); err != nil {
return nil, err
}
if err := cluster.ConfigChanged(true, false, false); err != nil {
if err := cluster.ConfigChanged_tx(ctx, tx, true, false, false); err != nil {
return nil, err
}
return nil, nil

View File

@@ -188,7 +188,7 @@ func LoginSetMembers_tx(ctx context.Context, tx pgx.Tx, reqJson json.RawMessage)
}
return nil, login.SetRoleLoginIds_tx(ctx, tx, req.RoleId, req.LoginIds)
}
func LoginKick(reqJson json.RawMessage) (interface{}, error) {
func LoginKick(ctx context.Context, tx pgx.Tx, reqJson json.RawMessage) (interface{}, error) {
var req struct {
Id int64 `json:"id"`
@@ -197,9 +197,9 @@ func LoginKick(reqJson json.RawMessage) (interface{}, error) {
if err := json.Unmarshal(reqJson, &req); err != nil {
return nil, err
}
return nil, cluster.LoginDisabled(true, req.Id)
return nil, cluster.LoginDisabled_tx(ctx, tx, true, req.Id)
}
func LoginReauth(reqJson json.RawMessage) (interface{}, error) {
func LoginReauth_tx(ctx context.Context, tx pgx.Tx, reqJson json.RawMessage) (interface{}, error) {
var req struct {
Id int64 `json:"id"`
@@ -208,10 +208,10 @@ func LoginReauth(reqJson json.RawMessage) (interface{}, error) {
if err := json.Unmarshal(reqJson, &req); err != nil {
return nil, err
}
return nil, cluster.LoginReauthorized(true, req.Id)
return nil, cluster.LoginReauthorized_tx(ctx, tx, true, req.Id)
}
func LoginReauthAll() (interface{}, error) {
return nil, cluster.LoginReauthorizedAll(true)
func LoginReauthAll_tx(ctx context.Context, tx pgx.Tx) (interface{}, error) {
return nil, cluster.LoginReauthorizedAll_tx(ctx, tx, true)
}
func LoginResetTotp_tx(ctx context.Context, tx pgx.Tx, reqJson json.RawMessage) (interface{}, error) {
var req struct {

View File

@@ -23,7 +23,7 @@ func SchemaCheck_tx(ctx context.Context, tx pgx.Tx, reqJson json.RawMessage) (in
return nil, schema.ValidateDependency_tx(ctx, tx, req.ModuleId)
}
func SchemaReload(reqJson json.RawMessage) (interface{}, error) {
func SchemaReload_tx(ctx context.Context, tx pgx.Tx, reqJson json.RawMessage) (interface{}, error) {
var req struct {
ModuleId pgtype.UUID `json:"moduleId"`
@@ -37,5 +37,5 @@ func SchemaReload(reqJson json.RawMessage) (interface{}, error) {
if req.ModuleId.Valid {
modIds = append(modIds, req.ModuleId.Bytes)
}
return nil, cluster.SchemaChanged(true, modIds)
return nil, cluster.SchemaChanged_tx(ctx, tx, true, modIds)
}

View File

@@ -12,12 +12,21 @@ import (
"syscall"
"github.com/gofrs/uuid"
"github.com/jackc/pgx/v5"
)
// collect cluster events from shared database for node to react to
func clusterProcessEvents() error {
ctx, ctxCanc := context.WithTimeout(context.Background(), db.CtxDefTimeoutSysTask)
defer ctxCanc()
rows, err := db.Pool.Query(context.Background(), `
tx, err := db.Pool.Begin(ctx)
if err != nil {
return err
}
defer tx.Rollback(ctx)
rows, err := tx.Query(ctx, `
SELECT content, payload,
COALESCE(target_address, ''),
COALESCE(target_device, 0),
@@ -43,11 +52,11 @@ func clusterProcessEvents() error {
// no events, nothing to do
if len(events) == 0 {
return nil
return tx.Commit(ctx)
}
// delete collected events
if _, err := db.Pool.Exec(context.Background(), `
if _, err := tx.Exec(ctx, `
DELETE FROM instance_cluster.node_event
WHERE node_id = $1
`, cache.GetNodeId()); err != nil {
@@ -58,92 +67,99 @@ func clusterProcessEvents() error {
collectionUpdates := make([]types.ClusterEventCollectionUpdated, 0)
for _, e := range events {
log.Info("cluster", fmt.Sprintf("node is reacting to event '%s'", e.Content))
var jsonPayload []byte
switch v := e.Payload.(type) {
case string:
jsonPayload = []byte(v)
}
switch e.Content {
case "clientEventsChanged":
err = cluster.ClientEventsChanged(false, e.Target.Address, e.Target.LoginId)
case "collectionUpdated":
var p types.ClusterEventCollectionUpdated
if err := json.Unmarshal(jsonPayload, &p); err != nil {
return err
}
collectionUpdates = append(collectionUpdates, p)
err = nil
case "configChanged":
var switchToMaintenance bool
if err := json.Unmarshal(jsonPayload, &switchToMaintenance); err != nil {
return err
}
err = cluster.ConfigChanged(false, true, switchToMaintenance)
case "filesCopied":
var p types.ClusterEventFilesCopied
if err := json.Unmarshal(jsonPayload, &p); err != nil {
return err
}
err = cluster.FilesCopied(false, e.Target.Address,
e.Target.LoginId, p.AttributeId, p.FileIds, p.RecordId)
case "fileRequested":
var p types.ClusterEventFileRequested
if err := json.Unmarshal(jsonPayload, &p); err != nil {
return err
}
err = cluster.FileRequested(false, e.Target.Address, e.Target.LoginId,
p.AttributeId, p.FileId, p.FileHash, p.FileName, p.ChooseApp)
case "jsFunctionCalled":
var p types.ClusterEventJsFunctionCalled
if err := json.Unmarshal(jsonPayload, &p); err != nil {
return err
}
err = cluster.JsFunctionCalled(false, e.Target.Address,
e.Target.LoginId, p.JsFunctionId, p.Arguments)
case "keystrokesRequested":
var keystrokes string
if err := json.Unmarshal(jsonPayload, &keystrokes); err != nil {
return err
}
err = cluster.KeystrokesRequested(false, e.Target.Address, e.Target.LoginId, keystrokes)
case "loginDisabled":
err = cluster.LoginDisabled(false, e.Target.LoginId)
case "loginReauthorized":
err = cluster.LoginReauthorized(false, e.Target.LoginId)
case "loginReauthorizedAll":
err = cluster.LoginReauthorizedAll(false)
case "masterAssigned":
var p types.ClusterEventMasterAssigned
if err := json.Unmarshal(jsonPayload, &p); err != nil {
return err
}
err = cluster.MasterAssigned(p.State)
case "schemaChanged":
var moduleIds []uuid.UUID
if err := json.Unmarshal(jsonPayload, &moduleIds); err != nil {
return err
}
err = cluster.SchemaChanged(false, moduleIds)
case "tasksChanged":
err = cluster.TasksChanged(false)
case "taskTriggered":
var p types.ClusterEventTaskTriggered
if err := json.Unmarshal(jsonPayload, &p); err != nil {
return err
}
runTaskDirectly(p.TaskName, p.PgFunctionId, p.PgFunctionScheduleId)
case "shutdownTriggered":
OsExit <- syscall.SIGTERM
}
if err != nil {
if err := clusterProcessEvent(ctx, tx, e, &collectionUpdates); err != nil {
return err
}
}
// apply collection updates
cluster.CollectionsUpdated(collectionUpdates)
return nil
return tx.Commit(ctx)
}
func clusterProcessEvent(ctx context.Context, tx pgx.Tx, e types.ClusterEvent, collectionUpdates *[]types.ClusterEventCollectionUpdated) error {
log.Info("cluster", fmt.Sprintf("node is reacting to event '%s'", e.Content))
var err error
var jsonPayload []byte
switch v := e.Payload.(type) {
case string:
jsonPayload = []byte(v)
}
switch e.Content {
case "clientEventsChanged":
err = cluster.ClientEventsChanged_tx(ctx, tx, false, e.Target.Address, e.Target.LoginId)
case "collectionUpdated":
var p types.ClusterEventCollectionUpdated
if err := json.Unmarshal(jsonPayload, &p); err != nil {
return err
}
*collectionUpdates = append(*collectionUpdates, p)
err = nil
case "configChanged":
var switchToMaintenance bool
if err := json.Unmarshal(jsonPayload, &switchToMaintenance); err != nil {
return err
}
err = cluster.ConfigChanged_tx(ctx, tx, false, true, switchToMaintenance)
case "filesCopied":
var p types.ClusterEventFilesCopied
if err := json.Unmarshal(jsonPayload, &p); err != nil {
return err
}
err = cluster.FilesCopied_tx(ctx, tx, false, e.Target.Address,
e.Target.LoginId, p.AttributeId, p.FileIds, p.RecordId)
case "fileRequested":
var p types.ClusterEventFileRequested
if err := json.Unmarshal(jsonPayload, &p); err != nil {
return err
}
err = cluster.FileRequested_tx(ctx, tx, false, e.Target.Address, e.Target.LoginId,
p.AttributeId, p.FileId, p.FileHash, p.FileName, p.ChooseApp)
case "jsFunctionCalled":
var p types.ClusterEventJsFunctionCalled
if err := json.Unmarshal(jsonPayload, &p); err != nil {
return err
}
err = cluster.JsFunctionCalled_tx(ctx, tx, false, e.Target.Address,
e.Target.LoginId, p.JsFunctionId, p.Arguments)
case "keystrokesRequested":
var keystrokes string
if err := json.Unmarshal(jsonPayload, &keystrokes); err != nil {
return err
}
err = cluster.KeystrokesRequested_tx(ctx, tx, false, e.Target.Address, e.Target.LoginId, keystrokes)
case "loginDisabled":
err = cluster.LoginDisabled_tx(ctx, tx, false, e.Target.LoginId)
case "loginReauthorized":
err = cluster.LoginReauthorized_tx(ctx, tx, false, e.Target.LoginId)
case "loginReauthorizedAll":
err = cluster.LoginReauthorizedAll_tx(ctx, tx, false)
case "masterAssigned":
var p types.ClusterEventMasterAssigned
if err := json.Unmarshal(jsonPayload, &p); err != nil {
return err
}
err = cluster.MasterAssigned(p.State)
case "schemaChanged":
var moduleIds []uuid.UUID
if err := json.Unmarshal(jsonPayload, &moduleIds); err != nil {
return err
}
err = cluster.SchemaChanged_tx(ctx, tx, false, moduleIds)
case "tasksChanged":
err = cluster.TasksChanged_tx(ctx, tx, false)
case "taskTriggered":
var p types.ClusterEventTaskTriggered
if err := json.Unmarshal(jsonPayload, &p); err != nil {
return err
}
runTaskDirectly(p.TaskName, p.PgFunctionId, p.PgFunctionScheduleId)
case "shutdownTriggered":
OsExit <- syscall.SIGTERM
}
return err
}

View File

@@ -32,10 +32,12 @@ func systemMsgMaintenance() error {
if err := config.SetUint64_tx(ctx, tx, "productionMode", 0); err != nil {
return err
}
if err := cluster.ConfigChanged_tx(ctx, tx, true, false, true); err != nil {
return err
}
if err := tx.Commit(ctx); err != nil {
return err
}
cluster.ConfigChanged(true, false, true)
}
return nil
}

View File

@@ -4,7 +4,6 @@ import (
"context"
"errors"
"fmt"
"r3/db"
"r3/db/check"
"r3/schema"
"r3/schema/column"
@@ -18,7 +17,7 @@ import (
func Copy_tx(ctx context.Context, tx pgx.Tx, id uuid.UUID) error {
apis, err := Get(uuid.Nil, id)
apis, err := Get_tx(ctx, tx, uuid.Nil, id)
if err != nil {
return err
}
@@ -62,7 +61,7 @@ func Del_tx(ctx context.Context, tx pgx.Tx, id uuid.UUID) error {
return err
}
func Get(moduleId uuid.UUID, id uuid.UUID) ([]types.Api, error) {
func Get_tx(ctx context.Context, tx pgx.Tx, moduleId uuid.UUID, id uuid.UUID) ([]types.Api, error) {
apis := make([]types.Api, 0)
sqlWheres := []string{}
@@ -76,7 +75,7 @@ func Get(moduleId uuid.UUID, id uuid.UUID) ([]types.Api, error) {
sqlValues = append(sqlValues, id)
}
rows, err := db.Pool.Query(context.Background(), fmt.Sprintf(`
rows, err := tx.Query(ctx, fmt.Sprintf(`
SELECT id, module_id, name, comment, has_delete, has_get,
has_post, limit_def, limit_max, verbose_def, version
FROM app.api
@@ -102,11 +101,11 @@ func Get(moduleId uuid.UUID, id uuid.UUID) ([]types.Api, error) {
// collect query and columns
for i, a := range apis {
a.Query, err = query.Get("api", a.Id, 0, 0, 0)
a.Query, err = query.Get_tx(ctx, tx, "api", a.Id, 0, 0, 0)
if err != nil {
return apis, err
}
a.Columns, err = column.Get("api", a.Id)
a.Columns, err = column.Get_tx(ctx, tx, "api", a.Id)
if err != nil {
return apis, err
}

View File

@@ -3,7 +3,6 @@ package article
import (
"context"
"errors"
"r3/db"
"r3/schema"
"r3/schema/caption"
"r3/types"
@@ -58,11 +57,11 @@ func Del_tx(ctx context.Context, tx pgx.Tx, id uuid.UUID) error {
return err
}
func Get(moduleId uuid.UUID) ([]types.Article, error) {
func Get_tx(ctx context.Context, tx pgx.Tx, moduleId uuid.UUID) ([]types.Article, error) {
articles := make([]types.Article, 0)
rows, err := db.Pool.Query(context.Background(), `
rows, err := tx.Query(ctx, `
SELECT id, name
FROM app.article
WHERE module_id = $1
@@ -83,7 +82,7 @@ func Get(moduleId uuid.UUID) ([]types.Article, error) {
}
for i, a := range articles {
articles[i].Captions, err = caption.Get("article", a.Id, []string{"articleBody", "articleTitle"})
articles[i].Captions, err = caption.Get_tx(ctx, tx, "article", a.Id, []string{"articleBody", "articleTitle"})
if err != nil {
return articles, err
}

View File

@@ -4,7 +4,6 @@ import (
"context"
"errors"
"fmt"
"r3/db"
"r3/db/check"
"r3/schema"
"r3/schema/caption"
@@ -63,13 +62,13 @@ func Del_tx(ctx context.Context, tx pgx.Tx, id uuid.UUID) error {
return err
}
func Get(relationId uuid.UUID) ([]types.Attribute, error) {
func Get_tx(ctx context.Context, tx pgx.Tx, relationId uuid.UUID) ([]types.Attribute, error) {
var onUpdateNull pgtype.Text
var onDeleteNull pgtype.Text
attributes := make([]types.Attribute, 0)
rows, err := db.Pool.Query(context.Background(), `
rows, err := tx.Query(ctx, `
SELECT id, relationship_id, icon_id, name, content, content_use,
length, length_fract, nullable, encrypted, def, on_update, on_delete
FROM app.attribute
@@ -96,7 +95,7 @@ func Get(relationId uuid.UUID) ([]types.Attribute, error) {
}
for i, atr := range attributes {
attributes[i].Captions, err = caption.Get("attribute", atr.Id, []string{"attributeTitle"})
attributes[i].Captions, err = caption.Get_tx(ctx, tx, "attribute", atr.Id, []string{"attributeTitle"})
if err != nil {
return attributes, err
}

View File

@@ -4,21 +4,20 @@ import (
"context"
"errors"
"fmt"
"r3/db"
"r3/types"
"github.com/gofrs/uuid"
"github.com/jackc/pgx/v5"
)
func Get(entity string, id uuid.UUID, expectedContents []string) (types.CaptionMap, error) {
func Get_tx(ctx context.Context, tx pgx.Tx, entity string, id uuid.UUID, expectedContents []string) (types.CaptionMap, error) {
caps := make(types.CaptionMap)
for _, content := range expectedContents {
caps[content] = make(map[string]string)
}
rows, err := db.Pool.Query(context.Background(), fmt.Sprintf(`
rows, err := tx.Query(ctx, fmt.Sprintf(`
SELECT language_code, content, value
FROM app.caption
WHERE %s_id = $1

View File

@@ -2,7 +2,6 @@ package clientEvent
import (
"context"
"r3/db"
"r3/schema"
"r3/schema/caption"
"r3/types"
@@ -17,10 +16,10 @@ func Del_tx(ctx context.Context, tx pgx.Tx, id uuid.UUID) error {
return err
}
func Get(moduleId uuid.UUID) ([]types.ClientEvent, error) {
func Get_tx(ctx context.Context, tx pgx.Tx, moduleId uuid.UUID) ([]types.ClientEvent, error) {
clientEvents := make([]types.ClientEvent, 0)
rows, err := db.Pool.Query(context.Background(), `
rows, err := tx.Query(ctx, `
SELECT id, action, arguments, event, hotkey_modifier1,
hotkey_modifier2, hotkey_char, js_function_id, pg_function_id
FROM app.client_event
@@ -44,7 +43,7 @@ func Get(moduleId uuid.UUID) ([]types.ClientEvent, error) {
}
for i, e := range clientEvents {
clientEvents[i].Captions, err = caption.Get("client_event", e.Id, []string{"clientEventTitle"})
clientEvents[i].Captions, err = caption.Get_tx(ctx, tx, "client_event", e.Id, []string{"clientEventTitle"})
if err != nil {
return clientEvents, err
}

View File

@@ -2,7 +2,6 @@ package collection
import (
"context"
"r3/db"
"r3/schema"
"r3/schema/collection/consumer"
"r3/schema/column"
@@ -19,10 +18,10 @@ func Del_tx(ctx context.Context, tx pgx.Tx, id uuid.UUID) error {
return err
}
func Get(moduleId uuid.UUID) ([]types.Collection, error) {
func Get_tx(ctx context.Context, tx pgx.Tx, moduleId uuid.UUID) ([]types.Collection, error) {
collections := make([]types.Collection, 0)
rows, err := db.Pool.Query(context.Background(), `
rows, err := tx.Query(ctx, `
SELECT id, icon_id, name
FROM app.collection
WHERE module_id = $1
@@ -45,15 +44,15 @@ func Get(moduleId uuid.UUID) ([]types.Collection, error) {
// collect query and columns
for i, c := range collections {
c.Query, err = query.Get("collection", c.Id, 0, 0, 0)
c.Query, err = query.Get_tx(ctx, tx, "collection", c.Id, 0, 0, 0)
if err != nil {
return collections, err
}
c.Columns, err = column.Get("collection", c.Id)
c.Columns, err = column.Get_tx(ctx, tx, "collection", c.Id)
if err != nil {
return collections, err
}
c.InHeader, err = consumer.Get("collection", c.Id, "headerDisplay")
c.InHeader, err = consumer.Get_tx(ctx, tx, "collection", c.Id, "headerDisplay")
if err != nil {
return collections, err
}

View File

@@ -4,7 +4,6 @@ import (
"context"
"errors"
"fmt"
"r3/db"
"r3/schema/compatible"
"r3/schema/openForm"
"r3/types"
@@ -17,7 +16,7 @@ import (
var entitiesAllowed = []string{"collection", "field", "menu", "widget"}
func GetOne(entity string, entityId uuid.UUID, content string) (types.CollectionConsumer, error) {
func GetOne_tx(ctx context.Context, tx pgx.Tx, entity string, entityId uuid.UUID, content string) (types.CollectionConsumer, error) {
var err error
var c types.CollectionConsumer
@@ -25,7 +24,7 @@ func GetOne(entity string, entityId uuid.UUID, content string) (types.Collection
return c, errors.New("invalid collection consumer entity")
}
if err := db.Pool.QueryRow(context.Background(), fmt.Sprintf(`
if err := tx.QueryRow(ctx, fmt.Sprintf(`
SELECT id, collection_id, column_id_display, flags, on_mobile
FROM app.collection_consumer
WHERE %s_id = $1
@@ -34,20 +33,20 @@ func GetOne(entity string, entityId uuid.UUID, content string) (types.Collection
return c, err
}
c.OpenForm, err = openForm.Get("collection_consumer", c.Id, pgtype.Text{})
c.OpenForm, err = openForm.Get_tx(ctx, tx, "collection_consumer", c.Id, pgtype.Text{})
if err != nil {
return c, err
}
return c, nil
}
func Get(entity string, entityId uuid.UUID, content string) ([]types.CollectionConsumer, error) {
func Get_tx(ctx context.Context, tx pgx.Tx, entity string, entityId uuid.UUID, content string) ([]types.CollectionConsumer, error) {
var consumers = make([]types.CollectionConsumer, 0)
if !slices.Contains(entitiesAllowed, entity) {
return consumers, errors.New("invalid collection consumer entity")
}
rows, err := db.Pool.Query(context.Background(), fmt.Sprintf(`
rows, err := tx.Query(ctx, fmt.Sprintf(`
SELECT id, collection_id, column_id_display, flags, on_mobile
FROM app.collection_consumer
WHERE %s_id = $1
@@ -67,7 +66,7 @@ func Get(entity string, entityId uuid.UUID, content string) ([]types.CollectionC
}
for i, c := range consumers {
consumers[i].OpenForm, err = openForm.Get("collection_consumer", c.Id, pgtype.Text{})
consumers[i].OpenForm, err = openForm.Get_tx(ctx, tx, "collection_consumer", c.Id, pgtype.Text{})
if err != nil {
return consumers, err
}

View File

@@ -4,7 +4,6 @@ import (
"context"
"errors"
"fmt"
"r3/db"
"r3/schema"
"r3/schema/caption"
"r3/schema/compatible"
@@ -24,14 +23,14 @@ func Del_tx(ctx context.Context, tx pgx.Tx, id uuid.UUID) error {
return err
}
func Get(entity string, entityId uuid.UUID) ([]types.Column, error) {
func Get_tx(ctx context.Context, tx pgx.Tx, entity string, entityId uuid.UUID) ([]types.Column, error) {
columns := make([]types.Column, 0)
if !slices.Contains(allowedEntities, entity) {
return columns, errors.New("bad entity")
}
rows, err := db.Pool.Query(context.Background(), fmt.Sprintf(`
rows, err := tx.Query(ctx, fmt.Sprintf(`
SELECT id, attribute_id, index, batch, basis, length, display, group_by,
aggregator, distincted, hidden, on_mobile, sub_query, styles
FROM app.column
@@ -56,7 +55,7 @@ func Get(entity string, entityId uuid.UUID) ([]types.Column, error) {
for i, c := range columns {
if c.SubQuery {
c.Query, err = query.Get("column", c.Id, 0, 0, 0)
c.Query, err = query.Get_tx(ctx, tx, "column", c.Id, 0, 0, 0)
if err != nil {
return columns, err
}
@@ -64,7 +63,7 @@ func Get(entity string, entityId uuid.UUID) ([]types.Column, error) {
c.Query.RelationId = pgtype.UUID{}
}
c.Captions, err = caption.Get("column", c.Id, []string{"columnTitle"})
c.Captions, err = caption.Get_tx(ctx, tx, "column", c.Id, []string{"columnTitle"})
if err != nil {
return columns, err
}

View File

@@ -6,7 +6,6 @@ import (
"encoding/json"
"errors"
"fmt"
"r3/db"
"r3/schema"
"r3/schema/caption"
"r3/schema/collection/consumer"
@@ -29,11 +28,10 @@ func Del_tx(ctx context.Context, tx pgx.Tx, id uuid.UUID) error {
return err
}
func Get(formId uuid.UUID) ([]interface{}, error) {
func Get_tx(ctx context.Context, tx pgx.Tx, formId uuid.UUID) ([]interface{}, error) {
fields := make([]interface{}, 0)
rows, err := db.Pool.Query(context.Background(), `
rows, err := tx.Query(ctx, `
SELECT f.id, f.parent_id, f.tab_id, f.icon_id, f.content, f.state,
f.flags, f.on_mobile, a.content,
@@ -401,11 +399,11 @@ func Get(formId uuid.UUID) ([]interface{}, error) {
for _, pos := range posButtonLookup {
var field = fields[pos].(types.FieldButton)
field.OpenForm, err = openForm.Get("field", field.Id, pgtype.Text{})
field.OpenForm, err = openForm.Get_tx(ctx, tx, "field", field.Id, pgtype.Text{})
if err != nil {
return fields, err
}
field.Captions, err = caption.Get("field", field.Id, []string{"fieldTitle"})
field.Captions, err = caption.Get_tx(ctx, tx, "field", field.Id, []string{"fieldTitle"})
if err != nil {
return fields, err
}
@@ -416,19 +414,19 @@ func Get(formId uuid.UUID) ([]interface{}, error) {
for _, pos := range posCalendarLookup {
var field = fields[pos].(types.FieldCalendar)
field.OpenForm, err = openForm.Get("field", field.Id, pgtype.Text{})
field.OpenForm, err = openForm.Get_tx(ctx, tx, "field", field.Id, pgtype.Text{})
if err != nil {
return fields, err
}
field.Query, err = query.Get("field", field.Id, 0, 0, 0)
field.Query, err = query.Get_tx(ctx, tx, "field", field.Id, 0, 0, 0)
if err != nil {
return fields, err
}
field.Columns, err = column.Get("field", field.Id)
field.Columns, err = column.Get_tx(ctx, tx, "field", field.Id)
if err != nil {
return fields, err
}
field.Collections, err = consumer.Get("field", field.Id, "fieldFilterSelector")
field.Collections, err = consumer.Get_tx(ctx, tx, "field", field.Id, "fieldFilterSelector")
if err != nil {
return fields, err
}
@@ -439,15 +437,15 @@ func Get(formId uuid.UUID) ([]interface{}, error) {
for _, pos := range posChartLookup {
var field = fields[pos].(types.FieldChart)
field.Query, err = query.Get("field", field.Id, 0, 0, 0)
field.Query, err = query.Get_tx(ctx, tx, "field", field.Id, 0, 0, 0)
if err != nil {
return fields, err
}
field.Columns, err = column.Get("field", field.Id)
field.Columns, err = column.Get_tx(ctx, tx, "field", field.Id)
if err != nil {
return fields, err
}
field.Captions, err = caption.Get("field", field.Id, []string{"fieldTitle"})
field.Captions, err = caption.Get_tx(ctx, tx, "field", field.Id, []string{"fieldTitle"})
if err != nil {
return fields, err
}
@@ -458,11 +456,11 @@ func Get(formId uuid.UUID) ([]interface{}, error) {
for _, pos := range posDataLookup {
var field = fields[pos].(types.FieldData)
field.DefCollection, err = consumer.GetOne("field", field.Id, "fieldDataDefault")
field.DefCollection, err = consumer.GetOne_tx(ctx, tx, "field", field.Id, "fieldDataDefault")
if err != nil {
return fields, err
}
field.Captions, err = caption.Get("field", field.Id, []string{"fieldTitle", "fieldHelp"})
field.Captions, err = caption.Get_tx(ctx, tx, "field", field.Id, []string{"fieldTitle", "fieldHelp"})
if err != nil {
return fields, err
}
@@ -473,23 +471,23 @@ func Get(formId uuid.UUID) ([]interface{}, error) {
for _, pos := range posDataRelLookup {
var field = fields[pos].(types.FieldDataRelationship)
field.OpenForm, err = openForm.Get("field", field.Id, pgtype.Text{})
field.OpenForm, err = openForm.Get_tx(ctx, tx, "field", field.Id, pgtype.Text{})
if err != nil {
return fields, err
}
field.Query, err = query.Get("field", field.Id, 0, 0, 0)
field.Query, err = query.Get_tx(ctx, tx, "field", field.Id, 0, 0, 0)
if err != nil {
return fields, err
}
field.Columns, err = column.Get("field", field.Id)
field.Columns, err = column.Get_tx(ctx, tx, "field", field.Id)
if err != nil {
return fields, err
}
field.DefCollection, err = consumer.GetOne("field", field.Id, "fieldDataDefault")
field.DefCollection, err = consumer.GetOne_tx(ctx, tx, "field", field.Id, "fieldDataDefault")
if err != nil {
return fields, err
}
field.Captions, err = caption.Get("field", field.Id, []string{"fieldTitle", "fieldHelp"})
field.Captions, err = caption.Get_tx(ctx, tx, "field", field.Id, []string{"fieldTitle", "fieldHelp"})
if err != nil {
return fields, err
}
@@ -500,7 +498,7 @@ func Get(formId uuid.UUID) ([]interface{}, error) {
for _, pos := range posHeaderLookup {
var field = fields[pos].(types.FieldHeader)
field.Captions, err = caption.Get("field", field.Id, []string{"fieldTitle"})
field.Captions, err = caption.Get_tx(ctx, tx, "field", field.Id, []string{"fieldTitle"})
if err != nil {
return fields, err
}
@@ -511,19 +509,19 @@ func Get(formId uuid.UUID) ([]interface{}, error) {
for _, pos := range posKanbanLookup {
var field = fields[pos].(types.FieldKanban)
field.OpenForm, err = openForm.Get("field", field.Id, pgtype.Text{})
field.OpenForm, err = openForm.Get_tx(ctx, tx, "field", field.Id, pgtype.Text{})
if err != nil {
return fields, err
}
field.Query, err = query.Get("field", field.Id, 0, 0, 0)
field.Query, err = query.Get_tx(ctx, tx, "field", field.Id, 0, 0, 0)
if err != nil {
return fields, err
}
field.Columns, err = column.Get("field", field.Id)
field.Columns, err = column.Get_tx(ctx, tx, "field", field.Id)
if err != nil {
return fields, err
}
field.Collections, err = consumer.Get("field", field.Id, "fieldFilterSelector")
field.Collections, err = consumer.Get_tx(ctx, tx, "field", field.Id, "fieldFilterSelector")
if err != nil {
return fields, err
}
@@ -534,27 +532,27 @@ func Get(formId uuid.UUID) ([]interface{}, error) {
for _, pos := range posListLookup {
var field = fields[pos].(types.FieldList)
field.OpenForm, err = openForm.Get("field", field.Id, pgtype.Text{})
field.OpenForm, err = openForm.Get_tx(ctx, tx, "field", field.Id, pgtype.Text{})
if err != nil {
return fields, err
}
field.OpenFormBulk, err = openForm.Get("field", field.Id, pgtype.Text{String: "bulk", Valid: true})
field.OpenFormBulk, err = openForm.Get_tx(ctx, tx, "field", field.Id, pgtype.Text{String: "bulk", Valid: true})
if err != nil {
return fields, err
}
field.Captions, err = caption.Get("field", field.Id, []string{"fieldTitle"})
field.Captions, err = caption.Get_tx(ctx, tx, "field", field.Id, []string{"fieldTitle"})
if err != nil {
return fields, err
}
field.Query, err = query.Get("field", field.Id, 0, 0, 0)
field.Query, err = query.Get_tx(ctx, tx, "field", field.Id, 0, 0, 0)
if err != nil {
return fields, err
}
field.Columns, err = column.Get("field", field.Id)
field.Columns, err = column.Get_tx(ctx, tx, "field", field.Id)
if err != nil {
return fields, err
}
field.Collections, err = consumer.Get("field", field.Id, "fieldFilterSelector")
field.Collections, err = consumer.Get_tx(ctx, tx, "field", field.Id, "fieldFilterSelector")
if err != nil {
return fields, err
}
@@ -564,11 +562,11 @@ func Get(formId uuid.UUID) ([]interface{}, error) {
// lookup tabs fields: get tabs
for _, pos := range posTabsLookup {
var field = fields[pos].(types.FieldTabs)
field.Captions, err = caption.Get("field", field.Id, []string{"fieldTitle"})
field.Captions, err = caption.Get_tx(ctx, tx, "field", field.Id, []string{"fieldTitle"})
if err != nil {
return fields, err
}
field.Tabs, err = tab.Get("field", field.Id)
field.Tabs, err = tab.Get_tx(ctx, tx, "field", field.Id)
if err != nil {
return fields, err
}
@@ -579,15 +577,15 @@ func Get(formId uuid.UUID) ([]interface{}, error) {
for _, pos := range posVariableLookup {
var field = fields[pos].(types.FieldVariable)
field.Query, err = query.Get("field", field.Id, 0, 0, 0)
field.Query, err = query.Get_tx(ctx, tx, "field", field.Id, 0, 0, 0)
if err != nil {
return fields, err
}
field.Columns, err = column.Get("field", field.Id)
field.Columns, err = column.Get_tx(ctx, tx, "field", field.Id)
if err != nil {
return fields, err
}
field.Captions, err = caption.Get("field", field.Id, []string{"fieldTitle", "fieldHelp"})
field.Captions, err = caption.Get_tx(ctx, tx, "field", field.Id, []string{"fieldTitle", "fieldHelp"})
if err != nil {
return fields, err
}
@@ -640,12 +638,12 @@ func Get(formId uuid.UUID) ([]interface{}, error) {
// recursively resolve all fields with their children
return getChildren(uuid.Nil, uuid.Nil), nil
}
func GetCalendar(fieldId uuid.UUID) (types.FieldCalendar, error) {
func GetCalendar_tx(ctx context.Context, tx pgx.Tx, fieldId uuid.UUID) (types.FieldCalendar, error) {
var f types.FieldCalendar
f.Id = fieldId
err := db.Pool.QueryRow(context.Background(), `
err := tx.QueryRow(ctx, `
SELECT attribute_id_date0, attribute_id_date1, index_date0, index_date1,
date_range0, date_range1, days, days_toggle
FROM app.field_calendar
@@ -659,19 +657,19 @@ func GetCalendar(fieldId uuid.UUID) (types.FieldCalendar, error) {
return f, err
}
f.OpenForm, err = openForm.Get("field", f.Id, pgtype.Text{})
f.OpenForm, err = openForm.Get_tx(ctx, tx, "field", f.Id, pgtype.Text{})
if err != nil {
return f, err
}
f.Query, err = query.Get("field", f.Id, 0, 0, 0)
f.Query, err = query.Get_tx(ctx, tx, "field", f.Id, 0, 0, 0)
if err != nil {
return f, err
}
f.Columns, err = column.Get("field", f.Id)
f.Columns, err = column.Get_tx(ctx, tx, "field", f.Id)
if err != nil {
return f, err
}
f.Collections, err = consumer.Get("field", f.Id, "fieldFilterSelector")
f.Collections, err = consumer.Get_tx(ctx, tx, "field", f.Id, "fieldFilterSelector")
if err != nil {
return f, err
}

View File

@@ -5,7 +5,6 @@ import (
"encoding/json"
"errors"
"fmt"
"r3/db"
"r3/schema"
"r3/schema/article"
"r3/schema/caption"
@@ -22,7 +21,7 @@ import (
func Copy_tx(ctx context.Context, tx pgx.Tx, moduleId uuid.UUID, id uuid.UUID, newName string) error {
forms, err := Get(uuid.Nil, []uuid.UUID{id})
forms, err := Get_tx(ctx, tx, uuid.Nil, []uuid.UUID{id})
if err != nil {
return err
}
@@ -129,7 +128,7 @@ func Del_tx(ctx context.Context, tx pgx.Tx, id uuid.UUID) error {
return err
}
func Get(moduleId uuid.UUID, ids []uuid.UUID) ([]types.Form, error) {
func Get_tx(ctx context.Context, tx pgx.Tx, moduleId uuid.UUID, ids []uuid.UUID) ([]types.Form, error) {
forms := make([]types.Form, 0)
sqlWheres := []string{}
@@ -147,7 +146,7 @@ func Get(moduleId uuid.UUID, ids []uuid.UUID) ([]types.Form, error) {
sqlValues = append(sqlValues, ids)
}
rows, err := db.Pool.Query(context.Background(), fmt.Sprintf(`
rows, err := tx.Query(ctx, fmt.Sprintf(`
SELECT id, preset_id_open, icon_id, field_id_focus, name, no_data_actions, ARRAY(
SELECT article_id
FROM app.article_form
@@ -178,27 +177,27 @@ func Get(moduleId uuid.UUID, ids []uuid.UUID) ([]types.Form, error) {
// collect form query, fields, functions, states and captions
for i, form := range forms {
form.Query, err = query.Get("form", form.Id, 0, 0, 0)
form.Query, err = query.Get_tx(ctx, tx, "form", form.Id, 0, 0, 0)
if err != nil {
return forms, err
}
form.Fields, err = field.Get(form.Id)
form.Fields, err = field.Get_tx(ctx, tx, form.Id)
if err != nil {
return forms, err
}
form.Actions, err = getActions(form.Id)
form.Actions, err = getActions_tx(ctx, tx, form.Id)
if err != nil {
return forms, err
}
form.Functions, err = getFunctions(form.Id)
form.Functions, err = getFunctions_tx(ctx, tx, form.Id)
if err != nil {
return forms, err
}
form.States, err = getStates(form.Id)
form.States, err = getStates_tx(ctx, tx, form.Id)
if err != nil {
return forms, err
}
form.Captions, err = caption.Get("form", form.Id, []string{"formTitle"})
form.Captions, err = caption.Get_tx(ctx, tx, "form", form.Id, []string{"formTitle"})
if err != nil {
return forms, err
}

View File

@@ -2,7 +2,6 @@ package form
import (
"context"
"r3/db"
"r3/schema"
"r3/schema/caption"
"r3/types"
@@ -11,10 +10,10 @@ import (
"github.com/jackc/pgx/v5"
)
func getActions(formId uuid.UUID) ([]types.FormAction, error) {
func getActions_tx(ctx context.Context, tx pgx.Tx, formId uuid.UUID) ([]types.FormAction, error) {
actions := make([]types.FormAction, 0)
rows, err := db.Pool.Query(context.Background(), `
rows, err := tx.Query(ctx, `
SELECT id, js_function_id, icon_id, state, color
FROM app.form_action
WHERE form_id = $1
@@ -34,7 +33,7 @@ func getActions(formId uuid.UUID) ([]types.FormAction, error) {
}
for i, a := range actions {
actions[i].Captions, err = caption.Get("form_action", a.Id, []string{"formActionTitle"})
actions[i].Captions, err = caption.Get_tx(ctx, tx, "form_action", a.Id, []string{"formActionTitle"})
if err != nil {
return actions, err
}

View File

@@ -2,17 +2,16 @@ package form
import (
"context"
"r3/db"
"r3/types"
"github.com/gofrs/uuid"
"github.com/jackc/pgx/v5"
)
func getFunctions(formId uuid.UUID) ([]types.FormFunction, error) {
func getFunctions_tx(ctx context.Context, tx pgx.Tx, formId uuid.UUID) ([]types.FormFunction, error) {
fncs := make([]types.FormFunction, 0)
rows, err := db.Pool.Query(context.Background(), `
rows, err := tx.Query(ctx, `
SELECT js_function_id, event, event_before
FROM app.form_function
WHERE form_id = $1

View File

@@ -2,7 +2,6 @@ package form
import (
"context"
"r3/db"
"r3/schema"
"r3/types"
@@ -10,11 +9,10 @@ import (
"github.com/jackc/pgx/v5"
)
func getStates(formId uuid.UUID) ([]types.FormState, error) {
func getStates_tx(ctx context.Context, tx pgx.Tx, formId uuid.UUID) ([]types.FormState, error) {
states := make([]types.FormState, 0)
rows, err := db.Pool.Query(context.Background(), `
rows, err := tx.Query(ctx, `
SELECT id, description
FROM app.form_state
WHERE form_id = $1
@@ -36,11 +34,11 @@ func getStates(formId uuid.UUID) ([]types.FormState, error) {
}
for i, _ := range states {
states[i].Conditions, err = getStateConditions(states[i].Id)
states[i].Conditions, err = getStateConditions_tx(ctx, tx, states[i].Id)
if err != nil {
return states, nil
}
states[i].Effects, err = getStateEffects(states[i].Id)
states[i].Effects, err = getStateEffects_tx(ctx, tx, states[i].Id)
if err != nil {
return states, nil
}
@@ -48,10 +46,10 @@ func getStates(formId uuid.UUID) ([]types.FormState, error) {
return states, nil
}
func getStateConditions(formStateId uuid.UUID) ([]types.FormStateCondition, error) {
func getStateConditions_tx(ctx context.Context, tx pgx.Tx, formStateId uuid.UUID) ([]types.FormStateCondition, error) {
conditions := make([]types.FormStateCondition, 0)
rows, err := db.Pool.Query(context.Background(), `
rows, err := tx.Query(ctx, `
SELECT position, connector, operator
FROM app.form_state_condition
WHERE form_state_id = $1
@@ -71,11 +69,11 @@ func getStateConditions(formStateId uuid.UUID) ([]types.FormStateCondition, erro
}
for i, c := range conditions {
c.Side0, err = getStateConditionSide(formStateId, c.Position, 0)
c.Side0, err = getStateConditionSide_tx(ctx, tx, formStateId, c.Position, 0)
if err != nil {
return conditions, err
}
c.Side1, err = getStateConditionSide(formStateId, c.Position, 1)
c.Side1, err = getStateConditionSide_tx(ctx, tx, formStateId, c.Position, 1)
if err != nil {
return conditions, err
}
@@ -84,10 +82,10 @@ func getStateConditions(formStateId uuid.UUID) ([]types.FormStateCondition, erro
return conditions, nil
}
func getStateConditionSide(formStateId uuid.UUID, position int, side int) (types.FormStateConditionSide, error) {
func getStateConditionSide_tx(ctx context.Context, tx pgx.Tx, formStateId uuid.UUID, position int, side int) (types.FormStateConditionSide, error) {
var s types.FormStateConditionSide
err := db.Pool.QueryRow(context.Background(), `
err := tx.QueryRow(ctx, `
SELECT collection_id, column_id, field_id, form_state_id_result,
preset_id, role_id, variable_id, brackets, content, value
FROM app.form_state_condition_side
@@ -100,11 +98,10 @@ func getStateConditionSide(formStateId uuid.UUID, position int, side int) (types
return s, err
}
func getStateEffects(formStateId uuid.UUID) ([]types.FormStateEffect, error) {
func getStateEffects_tx(ctx context.Context, tx pgx.Tx, formStateId uuid.UUID) ([]types.FormStateEffect, error) {
effects := make([]types.FormStateEffect, 0)
rows, err := db.Pool.Query(context.Background(), `
rows, err := tx.Query(ctx, `
SELECT field_id, form_action_id, tab_id, new_data, new_state
FROM app.form_state_effect
WHERE form_state_id = $1

View File

@@ -2,7 +2,6 @@ package icon
import (
"context"
"r3/db"
"r3/schema"
"r3/types"
@@ -15,11 +14,11 @@ func Del_tx(ctx context.Context, tx pgx.Tx, id uuid.UUID) error {
return err
}
func Get(moduleId uuid.UUID) ([]types.Icon, error) {
func Get_tx(ctx context.Context, tx pgx.Tx, moduleId uuid.UUID) ([]types.Icon, error) {
icons := make([]types.Icon, 0)
rows, err := db.Pool.Query(context.Background(), `
rows, err := tx.Query(ctx, `
SELECT id, name, file
FROM app.icon
WHERE module_id = $1

View File

@@ -3,7 +3,6 @@ package jsFunction
import (
"context"
"fmt"
"r3/db"
"r3/schema"
"r3/schema/caption"
"r3/types"
@@ -28,12 +27,12 @@ func Del_tx(ctx context.Context, tx pgx.Tx, id uuid.UUID) error {
return err
}
func Get(moduleId uuid.UUID) ([]types.JsFunction, error) {
func Get_tx(ctx context.Context, tx pgx.Tx, moduleId uuid.UUID) ([]types.JsFunction, error) {
var err error
functions := make([]types.JsFunction, 0)
rows, err := db.Pool.Query(context.Background(), `
rows, err := tx.Query(ctx, `
SELECT id, form_id, name, code_args, code_function, code_returns, is_client_event_exec
FROM app.js_function
WHERE module_id = $1
@@ -57,7 +56,7 @@ func Get(moduleId uuid.UUID) ([]types.JsFunction, error) {
for i, f := range functions {
f.ModuleId = moduleId
f.Captions, err = caption.Get("js_function", f.Id, []string{"jsFunctionTitle", "jsFunctionDesc"})
f.Captions, err = caption.Get_tx(ctx, tx, "js_function", f.Id, []string{"jsFunctionTitle", "jsFunctionDesc"})
if err != nil {
return functions, err
}

View File

@@ -2,7 +2,6 @@ package loginForm
import (
"context"
"r3/db"
"r3/schema"
"r3/schema/caption"
"r3/types"
@@ -16,10 +15,10 @@ func Del_tx(ctx context.Context, tx pgx.Tx, id uuid.UUID) error {
return err
}
func Get(moduleId uuid.UUID) ([]types.LoginForm, error) {
func Get_tx(ctx context.Context, tx pgx.Tx, moduleId uuid.UUID) ([]types.LoginForm, error) {
loginForms := make([]types.LoginForm, 0)
rows, err := db.Pool.Query(context.Background(), `
rows, err := tx.Query(ctx, `
SELECT id, attribute_id_login, attribute_id_lookup, form_id, name
FROM app.login_form
WHERE module_id = $1
@@ -43,7 +42,7 @@ func Get(moduleId uuid.UUID) ([]types.LoginForm, error) {
// get captions
for i, l := range loginForms {
loginForms[i].Captions, err = caption.Get("login_form", l.Id, []string{"loginFormTitle"})
loginForms[i].Captions, err = caption.Get_tx(ctx, tx, "login_form", l.Id, []string{"loginFormTitle"})
if err != nil {
return loginForms, err
}

View File

@@ -3,7 +3,6 @@ package menuTab
import (
"context"
"fmt"
"r3/db"
"r3/schema"
"r3/schema/caption"
"r3/schema/collection/consumer"
@@ -22,10 +21,10 @@ func Del_tx(ctx context.Context, tx pgx.Tx, id uuid.UUID) error {
return err
}
func Get(moduleId uuid.UUID) ([]types.MenuTab, error) {
func Get_tx(ctx context.Context, tx pgx.Tx, moduleId uuid.UUID) ([]types.MenuTab, error) {
menuTabs := make([]types.MenuTab, 0)
rows, err := db.Pool.Query(context.Background(), `
rows, err := tx.Query(ctx, `
SELECT id, icon_id
FROM app.menu_tab
WHERE module_id = $1
@@ -48,12 +47,12 @@ func Get(moduleId uuid.UUID) ([]types.MenuTab, error) {
// get menus and captions
for i, mt := range menuTabs {
mt.Menus, err = getMenus(mt.Id, pgtype.UUID{})
mt.Menus, err = getMenus_tx(ctx, tx, mt.Id, pgtype.UUID{})
if err != nil {
return menuTabs, err
}
mt.Captions, err = caption.Get("menu_tab", mt.Id, []string{"menuTabTitle"})
mt.Captions, err = caption.Get_tx(ctx, tx, "menu_tab", mt.Id, []string{"menuTabTitle"})
if err != nil {
return menuTabs, err
}
@@ -96,7 +95,7 @@ func Set_tx(ctx context.Context, tx pgx.Tx, position int, mt types.MenuTab) erro
}
// menus
func getMenus(menuTabId uuid.UUID, parentId pgtype.UUID) ([]types.Menu, error) {
func getMenus_tx(ctx context.Context, tx pgx.Tx, menuTabId uuid.UUID, parentId pgtype.UUID) ([]types.Menu, error) {
menus := make([]types.Menu, 0)
@@ -105,7 +104,7 @@ func getMenus(menuTabId uuid.UUID, parentId pgtype.UUID) ([]types.Menu, error) {
nullCheck = "AND parent_id = $2"
}
rows, err := db.Pool.Query(context.Background(), fmt.Sprintf(`
rows, err := tx.Query(ctx, fmt.Sprintf(`
SELECT id, form_id, icon_id, show_children, color
FROM app.menu
WHERE menu_tab_id = $1
@@ -128,15 +127,15 @@ func getMenus(menuTabId uuid.UUID, parentId pgtype.UUID) ([]types.Menu, error) {
// get children & collections & captions
for i, m := range menus {
m.Menus, err = getMenus(menuTabId, pgtype.UUID{Bytes: m.Id, Valid: true})
m.Menus, err = getMenus_tx(ctx, tx, menuTabId, pgtype.UUID{Bytes: m.Id, Valid: true})
if err != nil {
return menus, err
}
m.Collections, err = consumer.Get("menu", m.Id, "menuDisplay")
m.Collections, err = consumer.Get_tx(ctx, tx, "menu", m.Id, "menuDisplay")
if err != nil {
return menus, err
}
m.Captions, err = caption.Get("menu", m.Id, []string{"menuTitle"})
m.Captions, err = caption.Get_tx(ctx, tx, "menu", m.Id, []string{"menuTitle"})
if err != nil {
return menus, err
}

View File

@@ -80,10 +80,10 @@ func Del_tx(ctx context.Context, tx pgx.Tx, id uuid.UUID) error {
return err
}
func Get(ids []uuid.UUID) ([]types.Module, error) {
func Get_tx(ctx context.Context, tx pgx.Tx, ids []uuid.UUID) ([]types.Module, error) {
modules := make([]types.Module, 0)
rows, err := db.Pool.Query(context.Background(), `
rows, err := tx.Query(ctx, `
SELECT id, parent_id, form_id, icon_id, icon_id_pwa1, icon_id_pwa2,
js_function_id_on_login, pg_function_id_login_sync, name, name_pwa, name_pwa_short,
color1, position, language_main, release_build, release_build_app, release_date,
@@ -129,12 +129,12 @@ func Get(ids []uuid.UUID) ([]types.Module, error) {
// get start forms & captions
for i, mod := range modules {
mod.StartForms, err = getStartForms(mod.Id)
mod.StartForms, err = getStartForms_tx(ctx, tx, mod.Id)
if err != nil {
return modules, err
}
mod.Captions, err = caption.Get("module", mod.Id, []string{"moduleTitle"})
mod.Captions, err = caption.Get_tx(ctx, tx, "module", mod.Id, []string{"moduleTitle"})
if err != nil {
return modules, err
}
@@ -263,7 +263,7 @@ func SetReturnId_tx(ctx context.Context, tx pgx.Tx, mod types.Module) (uuid.UUID
}
// set dependencies to other modules
dependsOnCurrent, err := getDependsOn_tx(tx, mod.Id)
dependsOnCurrent, err := getDependsOn_tx(ctx, tx, mod.Id)
if err != nil {
return mod.Id, err
}
@@ -355,10 +355,10 @@ func SetReturnId_tx(ctx context.Context, tx pgx.Tx, mod types.Module) (uuid.UUID
return mod.Id, caption.Set_tx(ctx, tx, mod.Id, mod.Captions)
}
func getStartForms(id uuid.UUID) ([]types.ModuleStartForm, error) {
func getStartForms_tx(ctx context.Context, tx pgx.Tx, id uuid.UUID) ([]types.ModuleStartForm, error) {
startForms := make([]types.ModuleStartForm, 0)
rows, err := db.Pool.Query(context.Background(), `
rows, err := tx.Query(ctx, `
SELECT role_id, form_id
FROM app.module_start_form
WHERE module_id = $1
@@ -379,10 +379,10 @@ func getStartForms(id uuid.UUID) ([]types.ModuleStartForm, error) {
return startForms, nil
}
func getDependsOn_tx(tx pgx.Tx, id uuid.UUID) ([]uuid.UUID, error) {
func getDependsOn_tx(ctx context.Context, tx pgx.Tx, id uuid.UUID) ([]uuid.UUID, error) {
moduleIdsDependsOn := make([]uuid.UUID, 0)
rows, err := tx.Query(context.Background(), `
rows, err := tx.Query(ctx, `
SELECT module_id_on
FROM app.module_depends
WHERE module_id = $1

View File

@@ -4,7 +4,6 @@ import (
"context"
"errors"
"fmt"
"r3/db"
"r3/schema/compatible"
"r3/types"
"slices"
@@ -16,7 +15,7 @@ import (
var entitiesAllowed = []string{"column", "collection_consumer", "field"}
func Get(entity string, id uuid.UUID, formContext pgtype.Text) (f types.OpenForm, err error) {
func Get_tx(ctx context.Context, tx pgx.Tx, entity string, id uuid.UUID, formContext pgtype.Text) (f types.OpenForm, err error) {
if !slices.Contains(entitiesAllowed, entity) {
return f, errors.New("invalid open form entity")
@@ -31,7 +30,7 @@ func Get(entity string, id uuid.UUID, formContext pgtype.Text) (f types.OpenForm
sqlWhere = "AND context = $2"
}
err = db.Pool.QueryRow(context.Background(), fmt.Sprintf(`
err = tx.QueryRow(ctx, fmt.Sprintf(`
SELECT form_id_open, relation_index_open, attribute_id_apply,
relation_index_apply, pop_up_type, max_height, max_width
FROM app.open_form

View File

@@ -4,7 +4,6 @@ import (
"context"
"errors"
"fmt"
"r3/db"
"r3/db/check"
"r3/schema"
"r3/schema/caption"
@@ -40,12 +39,12 @@ func Del_tx(ctx context.Context, tx pgx.Tx, id uuid.UUID) error {
return nil
}
func Get(moduleId uuid.UUID) ([]types.PgFunction, error) {
func Get_tx(ctx context.Context, tx pgx.Tx, moduleId uuid.UUID) ([]types.PgFunction, error) {
var err error
functions := make([]types.PgFunction, 0)
rows, err := db.Pool.Query(context.Background(), `
rows, err := tx.Query(ctx, `
SELECT id, name, code_args, code_function, code_returns,
is_frontend_exec, is_login_sync, is_trigger, volatility
FROM app.pg_function
@@ -70,11 +69,11 @@ func Get(moduleId uuid.UUID) ([]types.PgFunction, error) {
for i, f := range functions {
f.ModuleId = moduleId
f.Schedules, err = getSchedules(f.Id)
f.Schedules, err = getSchedules_tx(ctx, tx, f.Id)
if err != nil {
return functions, err
}
f.Captions, err = caption.Get("pg_function", f.Id, []string{"pgFunctionTitle", "pgFunctionDesc"})
f.Captions, err = caption.Get_tx(ctx, tx, "pg_function", f.Id, []string{"pgFunctionTitle", "pgFunctionDesc"})
if err != nil {
return functions, err
}
@@ -82,24 +81,6 @@ func Get(moduleId uuid.UUID) ([]types.PgFunction, error) {
}
return functions, nil
}
func getSchedules(pgFunctionId uuid.UUID) ([]types.PgFunctionSchedule, error) {
schedules := make([]types.PgFunctionSchedule, 0)
ctx, ctxCanc := context.WithTimeout(context.Background(), db.CtxDefTimeoutSysTask)
defer ctxCanc()
tx, err := db.Pool.Begin(ctx)
if err != nil {
return schedules, err
}
defer tx.Rollback(ctx)
schedules, err = getSchedules_tx(ctx, tx, pgFunctionId)
if err != nil {
return schedules, err
}
return schedules, tx.Commit(ctx)
}
func getSchedules_tx(ctx context.Context, tx pgx.Tx, pgFunctionId uuid.UUID) ([]types.PgFunctionSchedule, error) {
schedules := make([]types.PgFunctionSchedule, 0)
@@ -338,7 +319,7 @@ func RecreateAffectedBy_tx(ctx context.Context, tx pgx.Tx, entity string, entity
if err != nil {
return err
}
f.Captions, err = caption.Get("pg_function", f.Id, []string{"pgFunctionTitle", "pgFunctionDesc"})
f.Captions, err = caption.Get_tx(ctx, tx, "pg_function", f.Id, []string{"pgFunctionTitle", "pgFunctionDesc"})
if err != nil {
return err
}

View File

@@ -4,7 +4,6 @@ import (
"context"
"errors"
"fmt"
"r3/db"
"r3/schema"
"r3/schema/compatible"
"r3/types"
@@ -59,10 +58,10 @@ func Del_tx(ctx context.Context, tx pgx.Tx, id uuid.UUID) error {
return err
}
func Get(relationId uuid.UUID) ([]types.PgIndex, error) {
func Get_tx(ctx context.Context, tx pgx.Tx, relationId uuid.UUID) ([]types.PgIndex, error) {
pgIndexes := make([]types.PgIndex, 0)
rows, err := db.Pool.Query(context.Background(), `
rows, err := tx.Query(ctx, `
SELECT id, attribute_id_dict, method, no_duplicates, auto_fki, primary_key
FROM app.pg_index
WHERE relation_id = $1
@@ -89,7 +88,7 @@ func Get(relationId uuid.UUID) ([]types.PgIndex, error) {
// get index attributes
for i, pgi := range pgIndexes {
pgi.Attributes, err = GetAttributes(pgi.Id)
pgi.Attributes, err = getAttributes_tx(ctx, tx, pgi.Id)
if err != nil {
return pgIndexes, err
}
@@ -98,10 +97,10 @@ func Get(relationId uuid.UUID) ([]types.PgIndex, error) {
return pgIndexes, nil
}
func GetAttributes(pgIndexId uuid.UUID) ([]types.PgIndexAttribute, error) {
func getAttributes_tx(ctx context.Context, tx pgx.Tx, pgIndexId uuid.UUID) ([]types.PgIndexAttribute, error) {
attributes := make([]types.PgIndexAttribute, 0)
rows, err := db.Pool.Query(context.Background(), `
rows, err := tx.Query(ctx, `
SELECT attribute_id, order_asc
FROM app.pg_index_attribute
WHERE pg_index_id = $1

View File

@@ -4,7 +4,6 @@ import (
"context"
"errors"
"fmt"
"r3/db"
"r3/schema"
"r3/types"
"slices"
@@ -36,10 +35,10 @@ func Del_tx(ctx context.Context, tx pgx.Tx, id uuid.UUID) error {
return nil
}
func Get(moduleId uuid.UUID) ([]types.PgTrigger, error) {
func Get_tx(ctx context.Context, tx pgx.Tx, moduleId uuid.UUID) ([]types.PgTrigger, error) {
triggers := make([]types.PgTrigger, 0)
rows, err := db.Pool.Query(context.Background(), `
rows, err := tx.Query(ctx, `
SELECT id, relation_id, pg_function_id, on_insert, on_update, on_delete,
is_constraint, is_deferrable, is_deferred, per_row, fires,
code_condition

View File

@@ -4,7 +4,6 @@ import (
"context"
"errors"
"fmt"
"r3/db"
"r3/schema"
"r3/schema/compatible"
"r3/types"
@@ -52,11 +51,11 @@ func Del_tx(ctx context.Context, tx pgx.Tx, id uuid.UUID) error {
return nil
}
func Get(relationId uuid.UUID) ([]types.Preset, error) {
func Get_tx(ctx context.Context, tx pgx.Tx, relationId uuid.UUID) ([]types.Preset, error) {
presets := make([]types.Preset, 0)
rows, err := db.Pool.Query(context.Background(), `
rows, err := tx.Query(ctx, `
SELECT id, name, protected
FROM app.preset
WHERE relation_id = $1
@@ -78,8 +77,7 @@ func Get(relationId uuid.UUID) ([]types.Preset, error) {
// get preset values
for i, p := range presets {
presets[i].Values, err = getValues(p.Id)
presets[i].Values, err = getValues_tx(ctx, tx, p.Id)
if err != nil {
return presets, err
}
@@ -176,10 +174,10 @@ func Set_tx(ctx context.Context, tx pgx.Tx, relationId uuid.UUID, id uuid.UUID,
}
// preset values
func getValues(presetId uuid.UUID) ([]types.PresetValue, error) {
func getValues_tx(ctx context.Context, tx pgx.Tx, presetId uuid.UUID) ([]types.PresetValue, error) {
values := make([]types.PresetValue, 0)
rows, err := db.Pool.Query(context.Background(), `
rows, err := tx.Query(ctx, `
SELECT id, preset_id, preset_id_refer, attribute_id, protected, value
FROM app.preset_value
WHERE preset_id = $1

View File

@@ -16,7 +16,7 @@ import (
var allowedEntities = []string{"api", "form", "field", "collection", "column", "query_filter_query"}
func Get(entity string, id uuid.UUID, filterIndex int, filterPosition int, filterSide int) (types.Query, error) {
func Get_tx(ctx context.Context, tx pgx.Tx, entity string, id uuid.UUID, filterIndex int, filterPosition int, filterSide int) (types.Query, error) {
var q types.Query
q.Joins = make([]types.QueryJoin, 0)
@@ -39,7 +39,7 @@ func Get(entity string, id uuid.UUID, filterIndex int, filterPosition int, filte
`, filterIndex, filterPosition, filterSide)
}
err := db.Pool.QueryRow(context.Background(), fmt.Sprintf(`
err := tx.QueryRow(ctx, fmt.Sprintf(`
SELECT id, relation_id, fixed_limit
FROM app.query
WHERE %s_id = $1
@@ -56,7 +56,7 @@ func Get(entity string, id uuid.UUID, filterIndex int, filterPosition int, filte
}
// retrieve joins
rows, err := db.Pool.Query(context.Background(), `
rows, err := tx.Query(ctx, `
SELECT relation_id, attribute_id, index_from, index, connector,
apply_create, apply_update, apply_delete
FROM app.query_join
@@ -80,13 +80,13 @@ func Get(entity string, id uuid.UUID, filterIndex int, filterPosition int, filte
}
// retrieve filters
q.Filters, err = getFilters(q.Id, pgtype.UUID{})
q.Filters, err = getFilters_tx(ctx, tx, q.Id, pgtype.UUID{})
if err != nil {
return q, err
}
// retrieve orderings
rows, err = db.Pool.Query(context.Background(), `
rows, err = tx.Query(ctx, `
SELECT attribute_id, index, ascending
FROM app.query_order
WHERE query_id = $1
@@ -107,7 +107,7 @@ func Get(entity string, id uuid.UUID, filterIndex int, filterPosition int, filte
}
// retrieve lookups
rows, err = db.Pool.Query(context.Background(), `
rows, err = tx.Query(ctx, `
SELECT pg_index_id, index
FROM app.query_lookup
WHERE query_id = $1
@@ -128,7 +128,7 @@ func Get(entity string, id uuid.UUID, filterIndex int, filterPosition int, filte
}
// retrieve choices
rows, err = db.Pool.Query(context.Background(), `
rows, err = tx.Query(ctx, `
SELECT id, name
FROM app.query_choice
WHERE query_id = $1
@@ -149,12 +149,12 @@ func Get(entity string, id uuid.UUID, filterIndex int, filterPosition int, filte
}
for i, c := range q.Choices {
c.Filters, err = getFilters(q.Id, pgtype.UUID{Bytes: c.Id, Valid: true})
c.Filters, err = getFilters_tx(ctx, tx, q.Id, pgtype.UUID{Bytes: c.Id, Valid: true})
if err != nil {
return q, err
}
c.Captions, err = caption.Get("query_choice", c.Id, []string{"queryChoiceTitle"})
c.Captions, err = caption.Get_tx(ctx, tx, "query_choice", c.Id, []string{"queryChoiceTitle"})
if err != nil {
return q, err
}
@@ -377,7 +377,7 @@ func Set_tx(ctx context.Context, tx pgx.Tx, entity string, entityId uuid.UUID, f
return nil
}
func getFilters(queryId uuid.UUID, queryChoiceId pgtype.UUID) ([]types.QueryFilter, error) {
func getFilters_tx(ctx context.Context, tx pgx.Tx, queryId uuid.UUID, queryChoiceId pgtype.UUID) ([]types.QueryFilter, error) {
var filters = make([]types.QueryFilter, 0)
params := make([]interface{}, 0)
@@ -396,7 +396,7 @@ func getFilters(queryId uuid.UUID, queryChoiceId pgtype.UUID) ([]types.QueryFilt
}
filterPos := make([]typeFilterPos, 0)
rows, err := db.Pool.Query(context.Background(), fmt.Sprintf(`
rows, err := tx.Query(ctx, fmt.Sprintf(`
SELECT connector, operator, index, position
FROM app.query_filter
WHERE query_id = $1
@@ -419,11 +419,11 @@ func getFilters(queryId uuid.UUID, queryChoiceId pgtype.UUID) ([]types.QueryFilt
for _, fp := range filterPos {
fp.filter.Side0, err = getFilterSide(queryId, fp.filter.Index, fp.position, 0)
fp.filter.Side0, err = getFilterSide_tx(ctx, tx, queryId, fp.filter.Index, fp.position, 0)
if err != nil {
return filters, err
}
fp.filter.Side1, err = getFilterSide(queryId, fp.filter.Index, fp.position, 1)
fp.filter.Side1, err = getFilterSide_tx(ctx, tx, queryId, fp.filter.Index, fp.position, 1)
if err != nil {
return filters, err
}
@@ -431,11 +431,11 @@ func getFilters(queryId uuid.UUID, queryChoiceId pgtype.UUID) ([]types.QueryFilt
}
return filters, nil
}
func getFilterSide(queryId uuid.UUID, filterIndex int, filterPosition int, side int) (types.QueryFilterSide, error) {
func getFilterSide_tx(ctx context.Context, tx pgx.Tx, queryId uuid.UUID, filterIndex int, filterPosition int, side int) (types.QueryFilterSide, error) {
var s types.QueryFilterSide
var err error
if err := db.Pool.QueryRow(context.Background(), `
if err := tx.QueryRow(ctx, `
SELECT attribute_id, attribute_index, attribute_nested, brackets,
collection_id, column_id, content, field_id, now_offset, preset_id,
role_id, variable_id, query_aggregator, value
@@ -453,7 +453,7 @@ func getFilterSide(queryId uuid.UUID, filterIndex int, filterPosition int, side
}
if s.Content == "subQuery" {
s.Query, err = Get("query_filter_query", queryId, filterIndex, filterPosition, side)
s.Query, err = Get_tx(ctx, tx, "query_filter_query", queryId, filterIndex, filterPosition, side)
if err != nil {
return s, err
}

View File

@@ -74,10 +74,10 @@ func delPkSeq_tx(ctx context.Context, tx pgx.Tx, modName string, id uuid.UUID) e
return err
}
func Get(moduleId uuid.UUID) ([]types.Relation, error) {
func Get_tx(ctx context.Context, tx pgx.Tx, moduleId uuid.UUID) ([]types.Relation, error) {
relations := make([]types.Relation, 0)
rows, err := db.Pool.Query(context.Background(), `
rows, err := tx.Query(ctx, `
SELECT id, name, comment, encryption, retention_count, retention_days, (
SELECT id
FROM app.attribute
@@ -107,7 +107,7 @@ func Get(moduleId uuid.UUID) ([]types.Relation, error) {
}
for i, r := range relations {
relations[i].Policies, err = getPolicies(r.Id)
relations[i].Policies, err = getPolicies_tx(ctx, tx, r.Id)
if err != nil {
return relations, err
}

View File

@@ -2,7 +2,6 @@ package relation
import (
"context"
"r3/db"
"r3/types"
"github.com/gofrs/uuid"
@@ -17,10 +16,10 @@ func delPolicies_tx(ctx context.Context, tx pgx.Tx, relationId uuid.UUID) error
return err
}
func getPolicies(relationId uuid.UUID) ([]types.RelationPolicy, error) {
func getPolicies_tx(ctx context.Context, tx pgx.Tx, relationId uuid.UUID) ([]types.RelationPolicy, error) {
policies := make([]types.RelationPolicy, 0)
rows, err := db.Pool.Query(context.Background(), `
rows, err := tx.Query(ctx, `
SELECT role_id, pg_function_id_excl, pg_function_id_incl,
action_delete, action_select, action_update
FROM app.relation_policy

View File

@@ -4,7 +4,6 @@ import (
"context"
"errors"
"fmt"
"r3/db"
"r3/schema"
"r3/schema/caption"
"r3/schema/compatible"
@@ -24,10 +23,10 @@ func Del_tx(ctx context.Context, tx pgx.Tx, id uuid.UUID) error {
return err
}
func Get(moduleId uuid.UUID) ([]types.Role, error) {
func Get_tx(ctx context.Context, tx pgx.Tx, moduleId uuid.UUID) ([]types.Role, error) {
roles := make([]types.Role, 0)
rows, err := db.Pool.Query(context.Background(), `
rows, err := tx.Query(ctx, `
SELECT r.id, r.name, r.content, r.assignable, ARRAY(
SELECT role_id_child
FROM app.role_child
@@ -54,12 +53,12 @@ func Get(moduleId uuid.UUID) ([]types.Role, error) {
// get access & captions
for i, r := range roles {
r, err = getAccess(r)
r, err = getAccess_tx(ctx, tx, r)
if err != nil {
return roles, err
}
r.Captions, err = caption.Get("role", r.Id, []string{"roleTitle", "roleDesc"})
r.Captions, err = caption.Get_tx(ctx, tx, "role", r.Id, []string{"roleTitle", "roleDesc"})
if err != nil {
return roles, err
}
@@ -68,7 +67,7 @@ func Get(moduleId uuid.UUID) ([]types.Role, error) {
return roles, nil
}
func getAccess(role types.Role) (types.Role, error) {
func getAccess_tx(ctx context.Context, tx pgx.Tx, role types.Role) (types.Role, error) {
role.AccessApis = make(map[uuid.UUID]int)
role.AccessAttributes = make(map[uuid.UUID]int)
@@ -78,7 +77,7 @@ func getAccess(role types.Role) (types.Role, error) {
role.AccessMenus = make(map[uuid.UUID]int)
role.AccessWidgets = make(map[uuid.UUID]int)
rows, err := db.Pool.Query(context.Background(), `
rows, err := tx.Query(ctx, `
SELECT api_id, attribute_id, client_event_id, collection_id,
menu_id, relation_id, widget_id, access
FROM app.role_access

View File

@@ -4,7 +4,6 @@ import (
"context"
"errors"
"fmt"
"r3/db"
"r3/schema"
"r3/schema/caption"
"r3/types"
@@ -21,14 +20,14 @@ func Del_tx(ctx context.Context, tx pgx.Tx, id uuid.UUID) error {
return err
}
func Get(entity string, entityId uuid.UUID) ([]types.Tab, error) {
func Get_tx(ctx context.Context, tx pgx.Tx, entity string, entityId uuid.UUID) ([]types.Tab, error) {
tabs := make([]types.Tab, 0)
if !slices.Contains(allowedEntities, entity) {
return tabs, errors.New("bad entity")
}
rows, err := db.Pool.Query(context.Background(), fmt.Sprintf(`
rows, err := tx.Query(ctx, fmt.Sprintf(`
SELECT id, content_counter, state
FROM app.tab
WHERE %s_id = $1
@@ -48,7 +47,7 @@ func Get(entity string, entityId uuid.UUID) ([]types.Tab, error) {
}
for i, tab := range tabs {
tabs[i].Captions, err = caption.Get("tab", tab.Id, []string{"tabTitle"})
tabs[i].Captions, err = caption.Get_tx(ctx, tx, "tab", tab.Id, []string{"tabTitle"})
if err != nil {
return tabs, err
}

View File

@@ -2,7 +2,6 @@ package variable
import (
"context"
"r3/db"
"r3/schema"
"r3/types"
@@ -15,10 +14,10 @@ func Del_tx(ctx context.Context, tx pgx.Tx, id uuid.UUID) error {
return err
}
func Get(moduleId uuid.UUID) ([]types.Variable, error) {
func Get_tx(ctx context.Context, tx pgx.Tx, moduleId uuid.UUID) ([]types.Variable, error) {
variables := make([]types.Variable, 0)
rows, err := db.Pool.Query(context.Background(), `
rows, err := tx.Query(ctx, `
SELECT v.id, v.form_id, v.name, v.comment, v.content, v.content_use
FROM app.variable AS v
LEFT JOIN app.form AS f ON f.id = v.form_id
@@ -40,7 +39,6 @@ func Get(moduleId uuid.UUID) ([]types.Variable, error) {
}
variables = append(variables, v)
}
return variables, nil
}

View File

@@ -2,7 +2,6 @@ package widget
import (
"context"
"r3/db"
"r3/schema"
"r3/schema/caption"
"r3/schema/collection/consumer"
@@ -17,10 +16,10 @@ func Del_tx(ctx context.Context, tx pgx.Tx, id uuid.UUID) error {
return err
}
func Get(moduleId uuid.UUID) ([]types.Widget, error) {
func Get_tx(ctx context.Context, tx pgx.Tx, moduleId uuid.UUID) ([]types.Widget, error) {
widgets := make([]types.Widget, 0)
rows, err := db.Pool.Query(context.Background(), `
rows, err := tx.Query(ctx, `
SELECT id, form_id, name, size
FROM app.widget
WHERE module_id = $1
@@ -42,11 +41,11 @@ func Get(moduleId uuid.UUID) ([]types.Widget, error) {
// get collections & captions
for i, w := range widgets {
widgets[i].Captions, err = caption.Get("widget", w.Id, []string{"widgetTitle"})
widgets[i].Captions, err = caption.Get_tx(ctx, tx, "widget", w.Id, []string{"widgetTitle"})
if err != nil {
return widgets, err
}
widgets[i].Collection, err = consumer.GetOne("widget", w.Id, "widgetDisplay")
widgets[i].Collection, err = consumer.GetOne_tx(ctx, tx, "widget", w.Id, "widgetDisplay")
if err != nil {
return widgets, err
}

View File

@@ -174,17 +174,21 @@ func ImportFromFiles(filePathsImport []string) error {
log.Info("transfer", "module files were moved to transfer path if imported")
if err := tx.Commit(ctx); err != nil {
return err
}
log.Info("transfer", "changes were committed successfully")
// update schema cache
moduleIdsUpdated := make([]uuid.UUID, 0)
for id, _ := range moduleIdMapImportMeta {
moduleIdsUpdated = append(moduleIdsUpdated, id)
}
return cluster.SchemaChanged(true, moduleIdsUpdated)
if err := cluster.SchemaChanged_tx(ctx, tx, true, moduleIdsUpdated); err != nil {
return err
}
if err := tx.Commit(ctx); err != nil {
return err
}
log.Info("transfer", "changes were committed successfully")
return nil
}
func importModule_tx(ctx context.Context, tx pgx.Tx, mod types.Module, firstRun bool, lastRun bool,