diff --git a/go/libraries/events/event_flush.go b/go/libraries/events/event_flush.go index d8f5daa34d..f2fa3cd899 100644 --- a/go/libraries/events/event_flush.go +++ b/go/libraries/events/event_flush.go @@ -65,7 +65,7 @@ func (f FileFlusher) Flush(ctx context.Context) error { evtsDir := f.fbp.GetEventsDirPath() - err := lockAndFlush(ctx, fs, evtsDir, f.fbp.LockPath, f.flush) + err := f.lockAndFlush(ctx, fs, evtsDir, f.fbp.LockPath) if err != nil { return err } @@ -109,7 +109,7 @@ func (f FileFlusher) flush(ctx context.Context, path string) error { var _ Flusher = &FileFlusher{} // lockAndFlush locks the given lockPath and passes the flushCB to the filesys' Iter method -func lockAndFlush(ctx context.Context, fs filesys.Filesys, dirPath string, lockPath string, fcb flushCB) error { +func (f FileFlusher) lockAndFlush(ctx context.Context, fs filesys.Filesys, dirPath string, lockPath string) error { fsLock := filesys.CreateFilesysLock(fs, lockPath) isUnlocked, err := fsLock.TryLock() @@ -134,7 +134,7 @@ func lockAndFlush(ctx context.Context, fs filesys.Filesys, dirPath string, lockP var returnErr error iterErr := fs.Iter(dirPath, false, func(path string, size int64, isDir bool) (stop bool) { - if err := fcb(ctx, path); err != nil { + if err := f.flush(ctx, path); err != nil { if errors.Is(err, errInvalidFile) { // ignore invalid files found in the events directory return false diff --git a/go/libraries/events/event_flush_test.go b/go/libraries/events/event_flush_test.go index d56800892d..bb98c7d455 100644 --- a/go/libraries/events/event_flush_test.go +++ b/go/libraries/events/event_flush_test.go @@ -86,46 +86,49 @@ func TestEventFlushing(t *testing.T) { }, } - filesystems := []string{"inMemFS", "local"} - + // filesystems := []string{"inMemFS", "local"} + filesystems := []string{"local"} + for _, fsName := range filesystems { - for _, test := range tests { + t.Run(fsName, func(t *testing.T) { + for _, test := range tests { - t.Run(test.name, func(t *testing.T) { - ctx := context.Background() + t.Run(test.name, func(t *testing.T) { + ctx := context.Background() - var ft *flushTester + var ft *flushTester - if fsName == "inMemFS" { - fs := filesys.NewInMemFS([]string{tempEvtsDir}, nil, tempEvtsDir) + if fsName == "inMemFS" { + fs := filesys.NewInMemFS([]string{tempEvtsDir}, nil, tempEvtsDir) - ft = createFlushTester(fs, homeDir, doltTestDir) - } else { - fs := filesys.LocalFS + ft = createFlushTester(fs, homeDir, doltTestDir) + } else { + fs := filesys.LocalFS - path := filepath.Join(dPath, evtPath) - dDir := testLib.TestDir(path) + path := filepath.Join(dPath, evtPath) + dDir := testLib.TestDir(path) - ft = createFlushTester(fs, "", dDir) - } + ft = createFlushTester(fs, "", dDir) + } - ces := make([]*eventsapi.ClientEvent, 0) + ces := make([]*eventsapi.ClientEvent, 0) - for i := 0; i < test.numEvents; i++ { - ce := &eventsapi.ClientEvent{} - ces = append(ces, ce) - } + for i := 0; i < test.numEvents; i++ { + ce := &eventsapi.ClientEvent{} + ces = append(ces, ce) + } - assert.Equal(t, len(ces), test.numEvents) + assert.Equal(t, len(ces), test.numEvents) - err := ft.Fbp.WriteEvents(testVersion, ces) - assert.Equal(t, err, nil) + err := ft.Fbp.WriteEvents(testVersion, ces) + assert.Equal(t, err, nil) - err = ft.Flusher.Flush(ctx) + err = ft.Flusher.Flush(ctx) - assert.Equal(t, err, nil) - assert.Equal(t, len(ft.Client.CES), len(ces)) - }) - } + assert.NoError(t, err) + assert.Equal(t, len(ft.Client.CES), len(ces)) + }) + } + }) } } diff --git a/go/libraries/events/file_backed_proc_test.go b/go/libraries/events/file_backed_proc_test.go index fb1f9f5fcd..0deefb64b4 100644 --- a/go/libraries/events/file_backed_proc_test.go +++ b/go/libraries/events/file_backed_proc_test.go @@ -42,8 +42,10 @@ func (namer *SequentialNamer) Name(bytes []byte) string { } func (namer *SequentialNamer) Check(data []byte, path string) (bool, error) { - // todo - return true, nil + filename := filepath.Base(path) + ext := filepath.Ext(filename) + + return ext == evtDataExt, nil } func (namer *SequentialNamer) GetIdx() int {