From bb3282c4677f9273adbd8f9f81a10ea93fd83747 Mon Sep 17 00:00:00 2001 From: simon Date: Wed, 24 Jul 2019 15:41:45 -0400 Subject: [PATCH] --tar option --- README.md | 15 ++++++++++ beemer.go | 87 +++++++++++++++++++++++++++++++++++++++++++++---------- cli.go | 24 ++++++++++++++- tar.go | 71 +++++++++++++++++++++++++++++++++++++++++++++ util.go | 20 ++++++++++--- 5 files changed, 196 insertions(+), 21 deletions(-) create mode 100644 tar.go diff --git a/README.md b/README.md index 5a76ed7..04b3128 100644 --- a/README.md +++ b/README.md @@ -4,6 +4,8 @@ [![CodeFactor](https://www.codefactor.io/repository/github/simon987/beemer/badge)](https://www.codefactor.io/repository/github/simon987/beemer) **beemer** executes a custom command on files written in the watched directory and deletes it. +Optionally, queue files in a .tar file and execute the command when the number of files in the +archive reaches `NUMBER` (see [usage](#usage)). ### Usage @@ -16,6 +18,8 @@ GLOBAL OPTIONS: --command value, -c value Will be executed on file write. You can use %file, %name and %dir. Example: "rclone move %file remote:/beem/%dir" --wait DELAY, -w DELAY Files will be beemed after DELAY of inactivity (default: 10s) --directory DIRECTORY, -d DIRECTORY DIRECTORY to watch. + --tar NUMBER Fill a .tar file with up to NUMBER file before executing the beem command. + Set to '1' to disable this feature (default: 1) --help, -h show help --version, -v print the version @@ -23,6 +27,17 @@ GLOBAL OPTIONS: ### Examples +Bundle up to 100 files in a tar file before moving to another directory + +\**Note that %dir is always `/tmp/beemer`* when `--tar` is specified + +When `--tar NUM` is specified, the beem command will be called at most +every `NUM` new files. +It will also be called during cleanup when SIGINT (`Ctrl-C`) is received. +```bash +./beemer -w 1s -d ./test --tar 100 -c "mv %file /mnt/store/my_tars/" +``` + Upload file to an rclone remote when it has been inactive for at least 30s, keeps the directory structure ```bash diff --git a/beemer.go b/beemer.go index 1106e4e..a8a69db 100644 --- a/beemer.go +++ b/beemer.go @@ -16,9 +16,13 @@ type Beemer struct { tempDir string beemCommand func(string, string) (string, []string) beemChan chan string + tarChan chan string watcher *fsnotify.Watcher inactiveDelay time.Duration - globalWg *sync.WaitGroup + beemWg *sync.WaitGroup + tarWg *sync.WaitGroup + tar *Tar + tarMaxCount int } type File struct { @@ -26,7 +30,7 @@ type File struct { BeemLock bool } -func (b Beemer) initWatchDir(watchDir string) { +func (b *Beemer) initWatchDir(watchDir string) { logrus.WithField("dir", watchDir).Info("Watching directory for changes") @@ -55,7 +59,7 @@ func (b Beemer) initWatchDir(watchDir string) { } } -func (b Beemer) getAndResetTimer(name string) *time.Timer { +func (b *Beemer) getAndResetTimer(name string) *time.Timer { file, ok := b.fileMap[name] if ok { @@ -74,7 +78,7 @@ func (b Beemer) getAndResetTimer(name string) *time.Timer { return newTimer } -func (b Beemer) handleDirChange(event fsnotify.Event) error { +func (b *Beemer) handleDirChange(event fsnotify.Event) error { if event.Op&fsnotify.Create == fsnotify.Create { return b.watcher.Add(event.Name) @@ -85,7 +89,7 @@ func (b Beemer) handleDirChange(event fsnotify.Event) error { return nil } -func (b Beemer) handleFileChange(event fsnotify.Event) { +func (b *Beemer) handleFileChange(event fsnotify.Event) { if event.Op&fsnotify.Write == fsnotify.Write || event.Op&fsnotify.Create == fsnotify.Create { t := b.getAndResetTimer(event.Name) @@ -100,7 +104,7 @@ func (b Beemer) handleFileChange(event fsnotify.Event) { } } -func (b Beemer) handleWatcherEvents() { +func (b *Beemer) handleWatcherEvents() { for { select { case event, ok := <-b.watcher.Events: @@ -134,15 +138,57 @@ func (b Beemer) handleWatcherEvents() { } } -func (b Beemer) work() { - b.globalWg.Add(1) +func (b *Beemer) work() { + b.beemWg.Add(1) for name := range b.beemChan { b.beemFile(name) } - b.globalWg.Done() + b.beemWg.Done() } -func (b Beemer) handleFileInactive(t *time.Timer, name string) { +func (b *Beemer) tarWork() { + b.tarWg.Add(1) + for filename := range b.tarChan { + + err := b.tar.AddFile(filename) + if err != nil { + logrus.WithField("filename", filename).Error(err) + } else { + _ = os.Remove(filename) + } + logrus.WithFields(logrus.Fields{ + "filename": filename, + "tar": b.tar.Name, + "count": b.tar.FileCount, + }).Info("Added file to tar") + + if b.tar.FileCount >= b.tarMaxCount { + b.beemTar() + } + } + + if b.tar.FileCount > 0 { + logrus.WithField("fileCount", b.tar.FileCount).Info("Beeming partial tar file") + b.beemTar() + } + + b.tarWg.Done() +} + +func (b *Beemer) beemTar() { + + name := b.tar.Name + b.tar.Close() + var err error + b.tar, err = NewTar(getTarPath(b.tempDir)) + if err != nil { + logrus.Error(err) + } + + b.executeBeemCommand(name, name) +} + +func (b *Beemer) handleFileInactive(t *time.Timer, name string) { <-t.C b.fileMap[name].BeemLock = true @@ -154,11 +200,9 @@ func (b Beemer) handleFileInactive(t *time.Timer, name string) { b.beemChan <- name } -func (b Beemer) beemFile(filename string) { +func (b *Beemer) executeBeemCommand(oldName string, newName string) { - newName := moveToTempDir(filename, b.tempDir) - - name, args := b.beemCommand(newName, filepath.Dir(filename)) + name, args := b.beemCommand(newName, filepath.Dir(oldName)) cmd := exec.Command(name, args...) @@ -169,7 +213,7 @@ func (b Beemer) beemFile(filename string) { out, err := cmd.CombinedOutput() if err != nil { - logrus.WithField("name", filename).WithError(err).Error(string(out)) + logrus.WithField("name", oldName).WithError(err).Error(string(out)) } logrus.WithFields(logrus.Fields{ @@ -181,6 +225,17 @@ func (b Beemer) beemFile(filename string) { err = os.Remove(newName) if err != nil && !os.IsNotExist(err) { - logrus.WithField("name", filename).Error(err) + logrus.WithField("name", oldName).Error(err) + } +} + +func (b *Beemer) beemFile(filename string) { + + newName := moveToTempDir(filename, b.tempDir) + + if b.tar != nil { + b.tarChan <- newName + } else { + b.executeBeemCommand(filename, newName) } } diff --git a/cli.go b/cli.go index 8ff69cc..25b0472 100644 --- a/cli.go +++ b/cli.go @@ -31,6 +31,7 @@ func main() { var cmdString string var watchDir string var transfers int + var tarMaxCount int var inactiveDelay time.Duration app.Flags = []cli.Flag{ @@ -57,6 +58,13 @@ func main() { Usage: "`DIRECTORY` to watch.", Destination: &watchDir, }, + cli.IntFlag{ + Name: "tar", + Usage: "Fill a .tar file with up to `NUMBER` file before executing the beem command." + + "Set to '1' to disable this feature", + Value: 1, + Destination: &tarMaxCount, + }, } app.Action = func(c *cli.Context) error { @@ -68,15 +76,27 @@ func main() { beemer := Beemer{ fileMap: make(map[string]*File, 0), beemChan: make(chan string, transfers), + tarChan: make(chan string, 100), beemCommand: parseCommand(cmdString), inactiveDelay: inactiveDelay, - globalWg: &sync.WaitGroup{}, + beemWg: &sync.WaitGroup{}, + tarWg: &sync.WaitGroup{}, + tarMaxCount: tarMaxCount, } beemer.initTempDir() beemer.watcher, _ = fsnotify.NewWatcher() + if tarMaxCount > 1 { + var err error + + beemer.tar, err = NewTar(getTarPath(beemer.tempDir)) + if err != nil { + logrus.Fatal(err) + } + } + go beemer.handleWatcherEvents() beemer.initWatchDir(watchDir) @@ -85,6 +105,8 @@ func main() { go beemer.work() } + go beemer.tarWork() + sigChan := make(chan os.Signal, 1) signal.Notify(sigChan, syscall.SIGINT) diff --git a/tar.go b/tar.go new file mode 100644 index 0000000..c929c84 --- /dev/null +++ b/tar.go @@ -0,0 +1,71 @@ +package main + +import ( + "archive/tar" + "io" + "os" + "path/filepath" + "time" +) + +type Tar struct { + FileCount int + Name string + writer *tar.Writer +} + +func NewTar(path string) (*Tar, error) { + + fp, err := os.OpenFile(path, os.O_CREATE|os.O_RDWR|os.O_TRUNC, 0666) + if err != nil { + return nil, err + } + + writer := tar.NewWriter(fp) + + return &Tar{ + 0, + path, + writer, + }, nil +} + +func getTarPath(tempDir string) string { + return filepath.Join(tempDir, time.Now().Format("2006-01-02 15.04.05.000.tar")) +} + +func (t *Tar) AddFile(path string) error { + + // Header + fi, err := os.Stat(path) + if err != nil { + return err + } + header, err := tar.FileInfoHeader(fi, "") + if err != nil { + return err + } + err = t.writer.WriteHeader(header) + if err != nil { + return err + } + + // Content + in, err := os.Open(path) + if err != nil { + return err + } + + _, err = io.Copy(t.writer, in) + if err != nil { + return err + } + + t.FileCount += 1 + + return nil +} + +func (t *Tar) Close() { + _ = t.writer.Close() +} diff --git a/util.go b/util.go index 865c6e8..a3a0f65 100644 --- a/util.go +++ b/util.go @@ -9,8 +9,8 @@ import ( "strings" ) -func (b Beemer) initTempDir() { - tmpdir := filepath.Join(os.TempDir(), "work") +func (b *Beemer) initTempDir() { + tmpdir := filepath.Join(os.TempDir(), "beemer") err := os.Mkdir(tmpdir, 0700) if err != nil && !os.IsExist(err) { logrus.Fatal(err) @@ -106,15 +106,27 @@ func isDir(name string) bool { return false } -func (b Beemer) dispose() { +func (b *Beemer) dispose() { b.watcher.Close() + + for _, v := range b.fileMap { + v.WaitTimer.Stop() + } + close(b.beemChan) logrus.WithField("chanLen", len(b.beemChan)).Info("Waiting for beem queue to drain...") <-b.beemChan logrus.Info("Waiting for current commands to finish...") - b.globalWg.Wait() + b.beemWg.Wait() + + close(b.tarChan) + logrus.WithField("chanLen", len(b.tarChan)).Info("Waiting for tar queue to drain...") + <-b.tarChan + + logrus.Info("Waiting for current tar process finish...") + b.tarWg.Wait() logrus.Info("Cleaning up temp dir...") err := os.RemoveAll(b.tempDir)