Skip to content

Commit

Permalink
Use file lock to synchronize concurrent access
Browse files Browse the repository at this point in the history
  • Loading branch information
hdecarne committed Jan 13, 2024
1 parent 9781fc7 commit 306e1bd
Show file tree
Hide file tree
Showing 2 changed files with 64 additions and 0 deletions.
62 changes: 62 additions & 0 deletions storage/fs.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"path/filepath"
"slices"
"strconv"
"syscall"

"github.com/hdecarne-github/go-log"
"github.com/rs/zerolog"
Expand Down Expand Up @@ -61,6 +62,11 @@ func (backend *fsBackend) Create(name string, data []byte) (string, error) {
}

func (backend *fsBackend) Update(name string, data []byte) (Version, error) {
lock, err := backend.lock(syscall.LOCK_EX)
if err != nil {
return 0, err
}
defer lock.release()
backend.logger.Debug().Msgf("updating entry '%s'...", name)
entryPath, err := backend.checkEntryPath(name, false)
if err != nil {
Expand Down Expand Up @@ -93,6 +99,11 @@ func (backend *fsBackend) Update(name string, data []byte) (Version, error) {
}

func (backend *fsBackend) Delete(name string) error {
lock, err := backend.lock(syscall.LOCK_EX)
if err != nil {
return err
}
defer lock.release()
backend.logger.Debug().Msgf("deleting entry '%s'...", name)
entryPath, err := backend.checkEntryPath(name, false)
if err != nil {
Expand All @@ -107,6 +118,11 @@ func (backend *fsBackend) Delete(name string) error {
}

func (backend *fsBackend) List() (Names, error) {
lock, err := backend.lock(syscall.LOCK_SH)
if err != nil {
return nil, err
}
defer lock.release()
dirEntries, err := os.ReadDir(backend.path)
if err != nil {
return nil, fmt.Errorf("failed to read storage path '%s' (cause: %w)", backend.path, err)
Expand Down Expand Up @@ -136,6 +152,11 @@ func (names *fsBackendNames) Next() string {
}

func (backend *fsBackend) Get(name string) ([]byte, error) {
lock, err := backend.lock(syscall.LOCK_SH)
if err != nil {
return nil, err
}
defer lock.release()
entryPath, err := backend.checkEntryPath(name, false)
if err != nil {
return nil, err
Expand All @@ -153,6 +174,11 @@ func (backend *fsBackend) Get(name string) ([]byte, error) {
}

func (backend *fsBackend) GetVersions(name string) ([]Version, error) {
lock, err := backend.lock(syscall.LOCK_SH)
if err != nil {
return nil, err
}
defer lock.release()
entryPath, err := backend.checkEntryPath(name, false)
if err != nil {
return nil, err
Expand All @@ -165,6 +191,11 @@ func (backend *fsBackend) GetVersions(name string) ([]Version, error) {
}

func (backend *fsBackend) GetVersion(name string, version Version) ([]byte, error) {
lock, err := backend.lock(syscall.LOCK_SH)
if err != nil {
return nil, err
}
defer lock.release()
entryPath, err := backend.checkEntryPath(name, false)
if err != nil {
return nil, err
Expand All @@ -180,6 +211,11 @@ func (backend *fsBackend) GetVersion(name string, version Version) ([]byte, erro
}

func (backend *fsBackend) Log(name string, message string) error {
lock, err := backend.lock(syscall.LOCK_EX)
if err != nil {
return err
}
defer lock.release()
entryPath, err := backend.checkEntryPath(name, true)
if err != nil {
return err
Expand Down Expand Up @@ -239,6 +275,32 @@ func (backend *fsBackend) resolveEntryVersionFile(entryPath string, version Vers
return filepath.Join(entryPath, strconv.FormatUint(uint64(version), 10))
}

type fsLock struct {
file *os.File
logger *zerolog.Logger
}

func (lock *fsLock) release() {
err := syscall.Flock(int(lock.file.Fd()), syscall.LOCK_UN)
if err != nil {
lock.logger.Error().Err(err).Msg("failed to release file lock")
}
}

func (backend *fsBackend) lock(how int) (*fsLock, error) {
lockPath := filepath.Join(backend.path, ".lock")
file, err := os.OpenFile(lockPath, os.O_WRONLY|os.O_CREATE, fsBackendFilePerm)
if err != nil {
return nil, fmt.Errorf("failed to create lock file '%s' (cause: %w)", lockPath, err)
}
err = syscall.Flock(int(file.Fd()), how)
if err != nil {
_ = file.Close()
return nil, fmt.Errorf("failed to aquire lock (%d) on lock file '%s' (cause: %w)", how, lockPath, err)
}
return &fsLock{file: file, logger: backend.logger}, nil
}

func NewFSStorage(path string, versionLimit VersionLimit) (Backend, error) {
uri := fmt.Sprintf(fsBackendURIPattern, path)
logger := log.RootLogger().With().Str("Backend", uri).Logger()
Expand Down
2 changes: 2 additions & 0 deletions storage/storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ func TestFSStorageNew(t *testing.T) {
require.NoError(t, err)
checkNew(t, backend)
}

func TestFSStorageCreateUpdateDelete(t *testing.T) {
path, err := os.MkdirTemp("", "TestFSStorageCreateUpdateDelete*")
require.NoError(t, err)
Expand All @@ -47,6 +48,7 @@ func TestFSStorageCreateUpdateDelete(t *testing.T) {
require.NoError(t, err)
checkCreateUpdateDelete(t, backend)
}

func TestFSStorageGetX(t *testing.T) {
path, err := os.MkdirTemp("", "TestFSStorageGetX*")
require.NoError(t, err)
Expand Down

0 comments on commit 306e1bd

Please sign in to comment.