--tar option

This commit is contained in:
simon 2019-07-24 15:41:45 -04:00
parent 1362aae61a
commit bb3282c467
5 changed files with 196 additions and 21 deletions

View File

@ -4,6 +4,8 @@
[![CodeFactor](https://www.codefactor.io/repository/github/simon987/beemer/badge)](https://www.codefactor.io/repository/github/simon987/beemer)
**beemer** executes a custom command on files written in the watched directory and deletes it.
Optionally, queue files in a .tar file and execute the command when the number of files in the
archive reaches `NUMBER` (see [usage](#usage)).
### Usage
@ -16,6 +18,8 @@ GLOBAL OPTIONS:
--command value, -c value Will be executed on file write. You can use %file, %name and %dir. Example: "rclone move %file remote:/beem/%dir"
--wait DELAY, -w DELAY Files will be beemed after DELAY of inactivity (default: 10s)
--directory DIRECTORY, -d DIRECTORY DIRECTORY to watch.
--tar NUMBER Fill a .tar file with up to NUMBER file before executing the beem command.
Set to '1' to disable this feature (default: 1)
--help, -h show help
--version, -v print the version
@ -23,6 +27,17 @@ GLOBAL OPTIONS:
### Examples
Bundle up to 100 files in a tar file before moving to another directory
\**Note that %dir is always `/tmp/beemer`* when `--tar` is specified
When `--tar NUM` is specified, the beem command will be called at most
every `NUM` new files.
It will also be called during cleanup when SIGINT (`Ctrl-C`) is received.
```bash
./beemer -w 1s -d ./test --tar 100 -c "mv %file /mnt/store/my_tars/"
```
Upload file to an rclone remote when it has been inactive for at least 30s,
keeps the directory structure
```bash

View File

@ -16,9 +16,13 @@ type Beemer struct {
tempDir string
beemCommand func(string, string) (string, []string)
beemChan chan string
tarChan chan string
watcher *fsnotify.Watcher
inactiveDelay time.Duration
globalWg *sync.WaitGroup
beemWg *sync.WaitGroup
tarWg *sync.WaitGroup
tar *Tar
tarMaxCount int
}
type File struct {
@ -26,7 +30,7 @@ type File struct {
BeemLock bool
}
func (b Beemer) initWatchDir(watchDir string) {
func (b *Beemer) initWatchDir(watchDir string) {
logrus.WithField("dir", watchDir).Info("Watching directory for changes")
@ -55,7 +59,7 @@ func (b Beemer) initWatchDir(watchDir string) {
}
}
func (b Beemer) getAndResetTimer(name string) *time.Timer {
func (b *Beemer) getAndResetTimer(name string) *time.Timer {
file, ok := b.fileMap[name]
if ok {
@ -74,7 +78,7 @@ func (b Beemer) getAndResetTimer(name string) *time.Timer {
return newTimer
}
func (b Beemer) handleDirChange(event fsnotify.Event) error {
func (b *Beemer) handleDirChange(event fsnotify.Event) error {
if event.Op&fsnotify.Create == fsnotify.Create {
return b.watcher.Add(event.Name)
@ -85,7 +89,7 @@ func (b Beemer) handleDirChange(event fsnotify.Event) error {
return nil
}
func (b Beemer) handleFileChange(event fsnotify.Event) {
func (b *Beemer) handleFileChange(event fsnotify.Event) {
if event.Op&fsnotify.Write == fsnotify.Write || event.Op&fsnotify.Create == fsnotify.Create {
t := b.getAndResetTimer(event.Name)
@ -100,7 +104,7 @@ func (b Beemer) handleFileChange(event fsnotify.Event) {
}
}
func (b Beemer) handleWatcherEvents() {
func (b *Beemer) handleWatcherEvents() {
for {
select {
case event, ok := <-b.watcher.Events:
@ -134,15 +138,57 @@ func (b Beemer) handleWatcherEvents() {
}
}
func (b Beemer) work() {
b.globalWg.Add(1)
func (b *Beemer) work() {
b.beemWg.Add(1)
for name := range b.beemChan {
b.beemFile(name)
}
b.globalWg.Done()
b.beemWg.Done()
}
func (b Beemer) handleFileInactive(t *time.Timer, name string) {
func (b *Beemer) tarWork() {
b.tarWg.Add(1)
for filename := range b.tarChan {
err := b.tar.AddFile(filename)
if err != nil {
logrus.WithField("filename", filename).Error(err)
} else {
_ = os.Remove(filename)
}
logrus.WithFields(logrus.Fields{
"filename": filename,
"tar": b.tar.Name,
"count": b.tar.FileCount,
}).Info("Added file to tar")
if b.tar.FileCount >= b.tarMaxCount {
b.beemTar()
}
}
if b.tar.FileCount > 0 {
logrus.WithField("fileCount", b.tar.FileCount).Info("Beeming partial tar file")
b.beemTar()
}
b.tarWg.Done()
}
func (b *Beemer) beemTar() {
name := b.tar.Name
b.tar.Close()
var err error
b.tar, err = NewTar(getTarPath(b.tempDir))
if err != nil {
logrus.Error(err)
}
b.executeBeemCommand(name, name)
}
func (b *Beemer) handleFileInactive(t *time.Timer, name string) {
<-t.C
b.fileMap[name].BeemLock = true
@ -154,11 +200,9 @@ func (b Beemer) handleFileInactive(t *time.Timer, name string) {
b.beemChan <- name
}
func (b Beemer) beemFile(filename string) {
func (b *Beemer) executeBeemCommand(oldName string, newName string) {
newName := moveToTempDir(filename, b.tempDir)
name, args := b.beemCommand(newName, filepath.Dir(filename))
name, args := b.beemCommand(newName, filepath.Dir(oldName))
cmd := exec.Command(name, args...)
@ -169,7 +213,7 @@ func (b Beemer) beemFile(filename string) {
out, err := cmd.CombinedOutput()
if err != nil {
logrus.WithField("name", filename).WithError(err).Error(string(out))
logrus.WithField("name", oldName).WithError(err).Error(string(out))
}
logrus.WithFields(logrus.Fields{
@ -181,6 +225,17 @@ func (b Beemer) beemFile(filename string) {
err = os.Remove(newName)
if err != nil && !os.IsNotExist(err) {
logrus.WithField("name", filename).Error(err)
logrus.WithField("name", oldName).Error(err)
}
}
func (b *Beemer) beemFile(filename string) {
newName := moveToTempDir(filename, b.tempDir)
if b.tar != nil {
b.tarChan <- newName
} else {
b.executeBeemCommand(filename, newName)
}
}

24
cli.go
View File

@ -31,6 +31,7 @@ func main() {
var cmdString string
var watchDir string
var transfers int
var tarMaxCount int
var inactiveDelay time.Duration
app.Flags = []cli.Flag{
@ -57,6 +58,13 @@ func main() {
Usage: "`DIRECTORY` to watch.",
Destination: &watchDir,
},
cli.IntFlag{
Name: "tar",
Usage: "Fill a .tar file with up to `NUMBER` file before executing the beem command." +
"Set to '1' to disable this feature",
Value: 1,
Destination: &tarMaxCount,
},
}
app.Action = func(c *cli.Context) error {
@ -68,15 +76,27 @@ func main() {
beemer := Beemer{
fileMap: make(map[string]*File, 0),
beemChan: make(chan string, transfers),
tarChan: make(chan string, 100),
beemCommand: parseCommand(cmdString),
inactiveDelay: inactiveDelay,
globalWg: &sync.WaitGroup{},
beemWg: &sync.WaitGroup{},
tarWg: &sync.WaitGroup{},
tarMaxCount: tarMaxCount,
}
beemer.initTempDir()
beemer.watcher, _ = fsnotify.NewWatcher()
if tarMaxCount > 1 {
var err error
beemer.tar, err = NewTar(getTarPath(beemer.tempDir))
if err != nil {
logrus.Fatal(err)
}
}
go beemer.handleWatcherEvents()
beemer.initWatchDir(watchDir)
@ -85,6 +105,8 @@ func main() {
go beemer.work()
}
go beemer.tarWork()
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGINT)

71
tar.go Normal file
View File

@ -0,0 +1,71 @@
package main
import (
"archive/tar"
"io"
"os"
"path/filepath"
"time"
)
type Tar struct {
FileCount int
Name string
writer *tar.Writer
}
func NewTar(path string) (*Tar, error) {
fp, err := os.OpenFile(path, os.O_CREATE|os.O_RDWR|os.O_TRUNC, 0666)
if err != nil {
return nil, err
}
writer := tar.NewWriter(fp)
return &Tar{
0,
path,
writer,
}, nil
}
func getTarPath(tempDir string) string {
return filepath.Join(tempDir, time.Now().Format("2006-01-02 15.04.05.000.tar"))
}
func (t *Tar) AddFile(path string) error {
// Header
fi, err := os.Stat(path)
if err != nil {
return err
}
header, err := tar.FileInfoHeader(fi, "")
if err != nil {
return err
}
err = t.writer.WriteHeader(header)
if err != nil {
return err
}
// Content
in, err := os.Open(path)
if err != nil {
return err
}
_, err = io.Copy(t.writer, in)
if err != nil {
return err
}
t.FileCount += 1
return nil
}
func (t *Tar) Close() {
_ = t.writer.Close()
}

20
util.go
View File

@ -9,8 +9,8 @@ import (
"strings"
)
func (b Beemer) initTempDir() {
tmpdir := filepath.Join(os.TempDir(), "work")
func (b *Beemer) initTempDir() {
tmpdir := filepath.Join(os.TempDir(), "beemer")
err := os.Mkdir(tmpdir, 0700)
if err != nil && !os.IsExist(err) {
logrus.Fatal(err)
@ -106,15 +106,27 @@ func isDir(name string) bool {
return false
}
func (b Beemer) dispose() {
func (b *Beemer) dispose() {
b.watcher.Close()
for _, v := range b.fileMap {
v.WaitTimer.Stop()
}
close(b.beemChan)
logrus.WithField("chanLen", len(b.beemChan)).Info("Waiting for beem queue to drain...")
<-b.beemChan
logrus.Info("Waiting for current commands to finish...")
b.globalWg.Wait()
b.beemWg.Wait()
close(b.tarChan)
logrus.WithField("chanLen", len(b.tarChan)).Info("Waiting for tar queue to drain...")
<-b.tarChan
logrus.Info("Waiting for current tar process finish...")
b.tarWg.Wait()
logrus.Info("Cleaning up temp dir...")
err := os.RemoveAll(b.tempDir)