From b0895015ea846a4f90007d6c2ced22f05aa25a1b Mon Sep 17 00:00:00 2001 From: simon Date: Tue, 16 Jul 2019 16:51:18 -0400 Subject: [PATCH] cap maximum number of transfers, don't ignore existing files --- README.md | 12 ++++++----- main.go | 61 ++++++++++++++++++++++++++++++++++++++++++++++++------- 2 files changed, 61 insertions(+), 12 deletions(-) diff --git a/README.md b/README.md index 1fda9cb..5a76ed7 100644 --- a/README.md +++ b/README.md @@ -12,11 +12,13 @@ NAME: beemer - Execute a command on a file after a delay of inactivity GLOBAL OPTIONS: - --command value, -c value Will be executed on file write. You can use %file and %dir. Example: "rclone move %file remote:/beem/%dir" - --wait DELAY, -w DELAY Files will be beemed after DELAY of inactivity (default: 10s) - --directory DIRECTORY, -d DIRECTORY DIRECTORY to watch. If non-empty, its current files & subdirectories will be ignored - --help, -h show help - --version, -v print the version + --transfers value, -t value Number of simultaneous transfers (default: 10) + --command value, -c value Will be executed on file write. You can use %file, %name and %dir. Example: "rclone move %file remote:/beem/%dir" + --wait DELAY, -w DELAY Files will be beemed after DELAY of inactivity (default: 10s) + --directory DIRECTORY, -d DIRECTORY DIRECTORY to watch. + --help, -h show help + --version, -v print the version + ``` ### Examples diff --git a/main.go b/main.go index 768f86e..7ab562e 100644 --- a/main.go +++ b/main.go @@ -16,6 +16,7 @@ type Ctx struct { FileMap map[string]*File TempDir string BeemCommand func(string, string) (string, []string) + BeemChan chan string } type File struct { @@ -41,8 +42,15 @@ func main() { var cmdString string var watchDir string + var transfers int app.Flags = []cli.Flag{ + cli.IntFlag{ + Name: "transfers, t", + Usage: "Number of simultaneous transfers", + Destination: &transfers, + Value: 10, + }, cli.StringFlag{ Name: "command, c", Usage: "Will be executed on file write. You can use %file, %name and %dir. " + @@ -57,7 +65,7 @@ func main() { }, cli.StringFlag{ Name: "directory, d", - Usage: "`DIRECTORY` to watch. If non-empty, its current files & subdirectories will be ignored", + Usage: "`DIRECTORY` to watch.", Destination: &watchDir, }, } @@ -67,6 +75,7 @@ func main() { if !c.IsSet("directory") { return errors.New("Directory must be specified") } + ctx.BeemChan = make(chan string, transfers) initTempDir() ctx.BeemCommand = parseCommand(cmdString) @@ -80,13 +89,13 @@ func main() { go handleWatcherEvents(watcher) - logrus.WithField("dir", watchDir).Info("Watching directory for changes") - err = watcher.Add(watchDir) - if err != nil { - logrus.Fatal(err) - } + initWatchDir(watchDir, watcher) //TODO gracefully handle SIGINT + for i := 0; i < transfers; i++ { + go beemer() + } + done := make(chan bool) <-done @@ -99,6 +108,35 @@ func main() { } } +func initWatchDir(watchDir string, watcher *fsnotify.Watcher) { + + logrus.WithField("dir", watchDir).Info("Watching directory for changes") + + err := watcher.Add(watchDir) + _ = filepath.Walk(watchDir, func(path string, info os.FileInfo, err error) error { + if info.IsDir() { + err := handleDirChange(fsnotify.Event{ + Name: path, + Op: fsnotify.Create, + }, watcher) + + if err != nil { + return err + } + } else { + handleFileChange(fsnotify.Event{ + Name: path, + Op: fsnotify.Create, + }) + } + return nil + }) + + if err != nil { + logrus.Fatal(err) + } +} + func getAndResetTimer(name string) *time.Timer { file, ok := ctx.FileMap[name] @@ -185,6 +223,15 @@ func handleWatcherEvents(watcher *fsnotify.Watcher) { } } +func beemer() { + for { + select { + case name := <-ctx.BeemChan: + beemFile(name) + } + } +} + func handleFileInactive(t *time.Timer, name string) { <-t.C @@ -194,7 +241,7 @@ func handleFileInactive(t *time.Timer, name string) { "name": name, }).Infof("has been inactive for %s and will be beemed", InactiveDelay) - beemFile(name) + ctx.BeemChan <- name } func beemFile(filename string) {