bug fixes, exclude by regex pattern option

This commit is contained in:
simon 2019-09-21 19:52:33 -04:00
parent 8067eb0bfe
commit 0af56e8306
4 changed files with 60 additions and 20 deletions

View File

@ -20,6 +20,7 @@ GLOBAL OPTIONS:
--directory DIRECTORY, -d DIRECTORY DIRECTORY to watch.
--tar NUMBER Fill a .tar file with up to NUMBER file before executing the beem command.
Set to '1' to disable this feature (default: 1)
--exclude value, -e value Exclude files that match the regex pattern
--help, -h show help
--version, -v print the version

View File

@ -6,23 +6,26 @@ import (
"os"
"os/exec"
"path/filepath"
"regexp"
"sync"
"syscall"
"time"
)
type Beemer struct {
fileMap *sync.Map
tempDir string
beemCommand func(string, string) (string, []string)
beemChan chan string
tarChan chan string
watcher *fsnotify.Watcher
inactiveDelay time.Duration
beemWg *sync.WaitGroup
tarWg *sync.WaitGroup
tar *Tar
tarMaxCount int
fileMap *sync.Map
tempDir string
beemCommand func(string, string) (string, []string)
beemChan chan string
tarChan chan string
watcher *fsnotify.Watcher
inactiveDelay time.Duration
beemWg *sync.WaitGroup
tarWg *sync.WaitGroup
tar *Tar
tarMaxCount int
closing bool
excludePattern *regexp.Regexp
}
type File struct {
@ -36,6 +39,11 @@ func (b *Beemer) initWatchDir(watchDir string) {
err := b.watcher.Add(watchDir)
_ = filepath.Walk(watchDir, func(path string, info os.FileInfo, err error) error {
if b.excludePattern != nil && b.excludePattern.MatchString(path) {
return nil
}
if info.IsDir() {
err := b.handleDirChange(fsnotify.Event{
Name: path,
@ -112,6 +120,10 @@ func (b *Beemer) handleWatcherEvents() {
return
}
if b.excludePattern != nil && b.excludePattern.MatchString(event.Name) {
continue
}
if isDir(event.Name) {
err := b.handleDirChange(event)
if err != nil {
@ -198,6 +210,11 @@ func (b *Beemer) handleFileInactive(t *time.Timer, name string) {
"name": name,
}).Infof("has been inactive for %s and will be beemed", b.inactiveDelay)
if b.closing {
close(b.beemChan)
return
}
b.beemChan <- name
}

18
cli.go
View File

@ -7,6 +7,7 @@ import (
"github.com/urfave/cli"
"os"
"os/signal"
"regexp"
"sync"
"syscall"
"time"
@ -14,7 +15,7 @@ import (
func globalInit() {
//TODO cmdline flag
logrus.SetLevel(logrus.TraceLevel)
//logrus.SetLevel(logrus.TraceLevel)
}
func main() {
@ -32,6 +33,7 @@ func main() {
var watchDir string
var transfers int
var tarMaxCount int
var excludePattern string
var inactiveDelay time.Duration
app.Flags = []cli.Flag{
@ -47,6 +49,11 @@ func main() {
"Example: \"rclone move %file remote:/beem/%dir\"",
Destination: &cmdString,
},
cli.StringFlag{
Name: "exclude, e",
Usage: "Exclude files that match this regex pattern",
Destination: &excludePattern,
},
cli.DurationFlag{
Name: "wait, w",
Usage: "Files will be beemed after `DELAY` of inactivity",
@ -84,6 +91,11 @@ func main() {
tarMaxCount: tarMaxCount,
}
if excludePattern != "" {
beemer.excludePattern, _ = regexp.Compile(excludePattern)
logrus.Infof("Exclude pattern is /%s/", excludePattern)
}
beemer.initTempDir()
beemer.watcher, _ = fsnotify.NewWatcher()
@ -105,7 +117,9 @@ func main() {
go beemer.work()
}
go beemer.tarWork()
if tarMaxCount > 1 {
go beemer.tarWork()
}
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGINT)

22
util.go
View File

@ -7,6 +7,7 @@ import (
"os"
"path/filepath"
"strings"
"time"
)
func (b *Beemer) initTempDir() {
@ -109,12 +110,18 @@ func isDir(name string) bool {
func (b *Beemer) dispose() {
b.watcher.Close()
b.closing = true
b.fileMap.Range(func(key, value interface{}) bool {
value.(*File).WaitTimer.Stop()
return true
})
close(b.beemChan)
go func() {
time.Sleep(15 * time.Second)
close(b.beemChan)
logrus.WithField("chanLen", len(b.beemChan)).Info("Forcefully closed beem channel")
}()
logrus.WithField("chanLen", len(b.beemChan)).Info("Waiting for beem queue to drain...")
<-b.beemChan
@ -122,12 +129,13 @@ func (b *Beemer) dispose() {
logrus.Info("Waiting for current commands to finish...")
b.beemWg.Wait()
close(b.tarChan)
logrus.WithField("chanLen", len(b.tarChan)).Info("Waiting for tar queue to drain...")
<-b.tarChan
logrus.Info("Waiting for current tar process finish...")
b.tarWg.Wait()
if b.tarMaxCount > 1 {
close(b.tarChan)
logrus.WithField("chanLen", len(b.tarChan)).Info("Waiting for tar queue to drain...")
<-b.tarChan
logrus.Info("Waiting for current tar process finish...")
b.tarWg.Wait()
}
logrus.Info("Cleaning up temp dir...")
err := os.RemoveAll(b.tempDir)