From dc954bc85b3bfcef99999f0b8d5eb2e052f2c127 Mon Sep 17 00:00:00 2001 From: simon Date: Tue, 16 Jul 2019 21:39:27 -0400 Subject: [PATCH] More refactoring --- beemer.go | 178 +++++++++++++++++++++++++++++++++++ cli.go | 96 +++++++++++++++++++ main.go | 270 ------------------------------------------------------ util.go | 25 ++++- 4 files changed, 294 insertions(+), 275 deletions(-) create mode 100644 beemer.go create mode 100644 cli.go delete mode 100644 main.go diff --git a/beemer.go b/beemer.go new file mode 100644 index 0000000..b31c9c4 --- /dev/null +++ b/beemer.go @@ -0,0 +1,178 @@ +package main + +import ( + "github.com/fsnotify/fsnotify" + "github.com/sirupsen/logrus" + "os" + "os/exec" + "path/filepath" + "time" +) + +type Beemer struct { + fileMap map[string]*File + tempDir string + beemCommand func(string, string) (string, []string) + beemChan chan string + watcher *fsnotify.Watcher + inactiveDelay time.Duration +} + +type File struct { + WaitTimer *time.Timer + BeemLock bool +} + +func (b Beemer) initWatchDir(watchDir string) { + + logrus.WithField("dir", watchDir).Info("Watching directory for changes") + + err := b.watcher.Add(watchDir) + _ = filepath.Walk(watchDir, func(path string, info os.FileInfo, err error) error { + if info.IsDir() { + err := b.handleDirChange(fsnotify.Event{ + Name: path, + Op: fsnotify.Create, + }) + + if err != nil { + return err + } + } else { + b.handleFileChange(fsnotify.Event{ + Name: path, + Op: fsnotify.Create, + }) + } + return nil + }) + + if err != nil { + logrus.Fatal(err) + } +} + +func (b Beemer) getAndResetTimer(name string) *time.Timer { + + file, ok := b.fileMap[name] + if ok { + file.WaitTimer.Stop() + if file.BeemLock == true { + return nil + } + } + + newTimer := time.NewTimer(b.inactiveDelay) + b.fileMap[name] = &File{ + newTimer, + false, + } + + return newTimer +} + +func (b Beemer) handleDirChange(event fsnotify.Event) error { + + if event.Op&fsnotify.Create == fsnotify.Create { + return b.watcher.Add(event.Name) + } else if event.Op&fsnotify.Remove == fsnotify.Remove || event.Op&fsnotify.Rename == fsnotify.Rename { + return b.watcher.Remove(event.Name) + } + + return nil +} + +func (b Beemer) handleFileChange(event fsnotify.Event) { + + if event.Op&fsnotify.Write == fsnotify.Write || event.Op&fsnotify.Create == fsnotify.Create { + t := b.getAndResetTimer(event.Name) + if t != nil { + go b.handleFileInactive(t, event.Name) + } + } else if event.Op&fsnotify.Remove == fsnotify.Remove || event.Op&fsnotify.Rename == fsnotify.Rename { + if file, ok := b.fileMap[event.Name]; ok { + file.WaitTimer.Stop() + delete(b.fileMap, event.Name) + } + } +} + +func (b Beemer) handleWatcherEvents() { + for { + select { + case event, ok := <-b.watcher.Events: + if !ok { + return + } + + if isDir(event.Name) { + err := b.handleDirChange(event) + if err != nil { + logrus.Fatal(err) + } + } else { + b.handleFileChange(event) + } + + if event.Op&fsnotify.Chmod != fsnotify.Chmod { + logrus.WithFields(logrus.Fields{ + "name": event.Name, + "op": event.Op, + }).Trace("fsnotify") + } + + case err, ok := <-b.watcher.Errors: + if !ok { + return + } + + logrus.WithError(err).Error("error with watcher") + } + } +} + +func (b Beemer) work() { + for { + select { + case name := <-b.beemChan: + b.beemFile(name) + } + } +} + +func (b Beemer) handleFileInactive(t *time.Timer, name string) { + <-t.C + + b.fileMap[name].BeemLock = true + + logrus.WithFields(logrus.Fields{ + "name": name, + }).Infof("has been inactive for %s and will be beemed", b.inactiveDelay) + + b.beemChan <- name +} + +func (b Beemer) beemFile(filename string) { + + newName := moveToTempDir(filename, b.tempDir) + + name, args := b.beemCommand(newName, filepath.Dir(filename)) + + cmd := exec.Command(name, args...) + out, err := cmd.CombinedOutput() + if err != nil { + logrus.WithField("name", filename).WithError(err).Error(string(out)) + } + + logrus.WithFields(logrus.Fields{ + "name": newName, + "command": name, + "args": args, + "out": string(out), + }).Trace("Executing beem command") + + err = os.Remove(newName) + if err != nil && !os.IsNotExist(err) { + logrus.WithField("name", filename).Error(err) + } +} diff --git a/cli.go b/cli.go new file mode 100644 index 0000000..a0d057e --- /dev/null +++ b/cli.go @@ -0,0 +1,96 @@ +package main + +import ( + "errors" + "github.com/fsnotify/fsnotify" + "github.com/sirupsen/logrus" + "github.com/urfave/cli" + "os" + "time" +) + +func globalInit() { + logrus.SetLevel(logrus.TraceLevel) +} + +func main() { + + globalInit() + + app := cli.NewApp() + app.Name = "work" + app.Usage = "Execute a command on a file after a delay of inactivity" + app.Email = "me@simon987.net" + app.Author = "simon987" + app.Version = "1.1" + + var cmdString string + var watchDir string + var transfers int + var inactiveDelay time.Duration + + app.Flags = []cli.Flag{ + cli.IntFlag{ + Name: "transfers, t", + Usage: "Number of simultaneous transfers", + Destination: &transfers, + Value: 10, + }, + cli.StringFlag{ + Name: "command, c", + Usage: "Will be executed on file write. You can use %file, %name and %dir. " + + "Example: \"rclone move %file remote:/beem/%dir\"", + Destination: &cmdString, + }, + cli.DurationFlag{ + Name: "wait, w", + Usage: "Files will be beemed after `DELAY` of inactivity", + Destination: &inactiveDelay, + Value: time.Second * 10, + }, + cli.StringFlag{ + Name: "directory, d", + Usage: "`DIRECTORY` to watch.", + Destination: &watchDir, + }, + } + + app.Action = func(c *cli.Context) error { + + if !c.IsSet("directory") { + return errors.New("directory must be specified") + } + + beemer := Beemer{ + fileMap: make(map[string]*File, 0), + beemChan: make(chan string, transfers), + beemCommand: parseCommand(cmdString), + inactiveDelay: inactiveDelay, + } + + beemer.initTempDir() + + beemer.watcher, _ = fsnotify.NewWatcher() + + defer beemer.dispose() + + go beemer.handleWatcherEvents() + + beemer.initWatchDir(watchDir) + + for i := 0; i < transfers; i++ { + go beemer.work() + } + + //TODO gracefully handle SIGINT + done := make(chan bool) + <-done + + return nil + } + + err := app.Run(os.Args) + if err != nil { + logrus.Fatal(app.OnUsageError) + } +} diff --git a/main.go b/main.go deleted file mode 100644 index 7ab562e..0000000 --- a/main.go +++ /dev/null @@ -1,270 +0,0 @@ -package main - -import ( - "github.com/fsnotify/fsnotify" - "github.com/pkg/errors" - "github.com/sirupsen/logrus" - "github.com/urfave/cli" - "os" - "os/exec" - "path/filepath" - "time" -) - -type Ctx struct { - GlobalUploadTicker time.Ticker - FileMap map[string]*File - TempDir string - BeemCommand func(string, string) (string, []string) - BeemChan chan string -} - -type File struct { - WaitTimer *time.Timer - BeemLock bool -} - -var InactiveDelay time.Duration - -var ctx = Ctx{ - FileMap: make(map[string]*File, 0), -} - -func main() { - logrus.SetLevel(logrus.TraceLevel) - - app := cli.NewApp() - app.Name = "beemer" - app.Usage = "Execute a command on a file after a delay of inactivity" - app.Email = "me@simon987.net" - app.Author = "simon987" - app.Version = "1.0" - - var cmdString string - var watchDir string - var transfers int - - app.Flags = []cli.Flag{ - cli.IntFlag{ - Name: "transfers, t", - Usage: "Number of simultaneous transfers", - Destination: &transfers, - Value: 10, - }, - cli.StringFlag{ - Name: "command, c", - Usage: "Will be executed on file write. You can use %file, %name and %dir. " + - "Example: \"rclone move %file remote:/beem/%dir\"", - Destination: &cmdString, - }, - cli.DurationFlag{ - Name: "wait, w", - Usage: "Files will be beemed after `DELAY` of inactivity", - Destination: &InactiveDelay, - Value: time.Second * 10, - }, - cli.StringFlag{ - Name: "directory, d", - Usage: "`DIRECTORY` to watch.", - Destination: &watchDir, - }, - } - - app.Action = func(c *cli.Context) error { - - if !c.IsSet("directory") { - return errors.New("Directory must be specified") - } - ctx.BeemChan = make(chan string, transfers) - - initTempDir() - ctx.BeemCommand = parseCommand(cmdString) - - watcher, err := fsnotify.NewWatcher() - if err != nil { - logrus.Fatal(err) - } - - defer watcher.Close() - - go handleWatcherEvents(watcher) - - initWatchDir(watchDir, watcher) - - //TODO gracefully handle SIGINT - for i := 0; i < transfers; i++ { - go beemer() - } - - done := make(chan bool) - <-done - - return nil - } - - err := app.Run(os.Args) - if err != nil { - logrus.Fatal(app.OnUsageError) - } -} - -func initWatchDir(watchDir string, watcher *fsnotify.Watcher) { - - logrus.WithField("dir", watchDir).Info("Watching directory for changes") - - err := watcher.Add(watchDir) - _ = filepath.Walk(watchDir, func(path string, info os.FileInfo, err error) error { - if info.IsDir() { - err := handleDirChange(fsnotify.Event{ - Name: path, - Op: fsnotify.Create, - }, watcher) - - if err != nil { - return err - } - } else { - handleFileChange(fsnotify.Event{ - Name: path, - Op: fsnotify.Create, - }) - } - return nil - }) - - if err != nil { - logrus.Fatal(err) - } -} - -func getAndResetTimer(name string) *time.Timer { - - file, ok := ctx.FileMap[name] - if ok { - file.WaitTimer.Stop() - if file.BeemLock == true { - return nil - } - } - - newTimer := time.NewTimer(InactiveDelay) - ctx.FileMap[name] = &File{ - newTimer, - false, - } - - return newTimer -} - -func isDir(name string) bool { - if stat, err := os.Stat(name); err == nil && stat.IsDir() { - return true - } - return false -} - -func handleDirChange(event fsnotify.Event, watcher *fsnotify.Watcher) error { - - if event.Op&fsnotify.Create == fsnotify.Create { - return watcher.Add(event.Name) - } else if event.Op&fsnotify.Remove == fsnotify.Remove || event.Op&fsnotify.Rename == fsnotify.Rename { - return watcher.Remove(event.Name) - } - - return nil -} - -func handleFileChange(event fsnotify.Event) { - - if event.Op&fsnotify.Write == fsnotify.Write || event.Op&fsnotify.Create == fsnotify.Create { - t := getAndResetTimer(event.Name) - if t != nil { - go handleFileInactive(t, event.Name) - } - } else if event.Op&fsnotify.Remove == fsnotify.Remove || event.Op&fsnotify.Rename == fsnotify.Rename { - if file, ok := ctx.FileMap[event.Name]; ok { - file.WaitTimer.Stop() - delete(ctx.FileMap, event.Name) - } - } -} - -func handleWatcherEvents(watcher *fsnotify.Watcher) { - for { - select { - case event, ok := <-watcher.Events: - if !ok { - return - } - - if isDir(event.Name) { - err := handleDirChange(event, watcher) - if err != nil { - logrus.Fatal(err) - } - } else { - handleFileChange(event) - } - - if event.Op&fsnotify.Chmod != fsnotify.Chmod { - logrus.WithFields(logrus.Fields{ - "name": event.Name, - "op": event.Op, - }).Trace("fsnotify") - } - - case err, ok := <-watcher.Errors: - if !ok { - return - } - - logrus.WithError(err).Error("error with Watcher") - } - } -} - -func beemer() { - for { - select { - case name := <-ctx.BeemChan: - beemFile(name) - } - } -} - -func handleFileInactive(t *time.Timer, name string) { - <-t.C - - ctx.FileMap[name].BeemLock = true - - logrus.WithFields(logrus.Fields{ - "name": name, - }).Infof("has been inactive for %s and will be beemed", InactiveDelay) - - ctx.BeemChan <- name -} - -func beemFile(filename string) { - - newName := moveToTempDir(filename) - - name, args := ctx.BeemCommand(newName, filepath.Dir(filename)) - - cmd := exec.Command(name, args...) - out, err := cmd.CombinedOutput() - if err != nil { - logrus.WithField("name", filename).WithError(err).Error(string(out)) - } - - logrus.WithFields(logrus.Fields{ - "name": newName, - "command": name, - "args": args, - "out": string(out), - }).Trace("Executing beem command") - - err = os.Remove(newName) - if err != nil && !os.IsNotExist(err) { - logrus.WithField("name", filename).Error(err) - } -} diff --git a/util.go b/util.go index feae90f..d91d141 100644 --- a/util.go +++ b/util.go @@ -9,21 +9,21 @@ import ( "strings" ) -func initTempDir() { - tmpdir := filepath.Join(os.TempDir(), "beemer") +func (b Beemer) initTempDir() { + tmpdir := filepath.Join(os.TempDir(), "work") err := os.Mkdir(tmpdir, 0700) if err != nil && !os.IsExist(err) { logrus.Fatal(err) } - ctx.TempDir = tmpdir + b.tempDir = tmpdir logrus.WithField("dir", tmpdir).Infof("Initialized temp dir") } -func moveToTempDir(name string) string { +func moveToTempDir(name string, tempDir string) string { - dir := filepath.Join(ctx.TempDir, filepath.Dir(name)) + dir := filepath.Join(tempDir, filepath.Dir(name)) newName := filepath.Join(dir, filepath.Base(name)) err := os.MkdirAll(dir, 0700) if err != nil && !os.IsExist(err) { @@ -98,3 +98,18 @@ func parseCommand(command string) func(string, string) (string, []string) { return args[0][0], newTokens[1:] } } + +func isDir(name string) bool { + if stat, err := os.Stat(name); err == nil && stat.IsDir() { + return true + } + return false +} + +func (b Beemer) dispose() { + b.watcher.Close() + err := os.RemoveAll(b.tempDir) + if err != nil { + logrus.Fatal(err) + } +}