aboutsummaryrefslogtreecommitdiff
path: root/weed/mount/filer_conf.go
blob: de383e660d82a110deb84a57ab593ca2b45c15d6 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
package mount

import (
	"errors"
	"fmt"
	"path/filepath"
	"time"

	"github.com/seaweedfs/seaweedfs/weed/filer"
	"github.com/seaweedfs/seaweedfs/weed/util/log"
	"github.com/seaweedfs/seaweedfs/weed/mount/meta_cache"
	"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
	"github.com/seaweedfs/seaweedfs/weed/util"
)

func (wfs *WFS) subscribeFilerConfEvents() (*meta_cache.MetadataFollower, error) {
	confDir := filer.DirectoryEtcSeaweedFS
	confName := filer.FilerConfName
	confFullName := filepath.Join(filer.DirectoryEtcSeaweedFS, filer.FilerConfName)

	// read current conf
	err := wfs.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
		content, err := filer.ReadInsideFiler(client, confDir, confName)
		if err != nil {
			return err
		}

		fc := filer.NewFilerConf()
		if len(content) > 0 {
			if err := fc.LoadFromBytes(content); err != nil {
				return fmt.Errorf("parse %s: %v", confFullName, err)
			}
		}

		wfs.FilerConf = fc

		return nil
	})
	if err != nil {
		if errors.Is(err, filer_pb.ErrNotFound) {
			log.V(3).Infof("fuse filer conf %s not found", confFullName)
		} else {
			return nil, err
		}
	}

	processEventFn := func(resp *filer_pb.SubscribeMetadataResponse) error {
		message := resp.EventNotification
		if message.NewEntry == nil {
			return nil
		}

		dir := resp.Directory
		name := resp.EventNotification.NewEntry.Name

		if dir != confDir || name != confName {
			return nil
		}

		content := message.NewEntry.Content
		fc := filer.NewFilerConf()
		if len(content) > 0 {
			if err = fc.LoadFromBytes(content); err != nil {
				return fmt.Errorf("parse %s: %v", confFullName, err)
			}
		}

		wfs.FilerConf = fc

		return nil
	}
	return &meta_cache.MetadataFollower{
		PathPrefixToWatch: confFullName,
		ProcessEventFn:    processEventFn,
	}, nil
}

func (wfs *WFS) wormEnforcedForEntry(path util.FullPath, entry *filer_pb.Entry) (wormEnforced, wormEnabled bool) {
	if entry == nil || wfs.FilerConf == nil {
		return false, false
	}

	rule := wfs.FilerConf.MatchStorageRule(string(path))
	if !rule.Worm {
		return false, false
	}

	// worm is not enforced
	if entry.WormEnforcedAtTsNs == 0 {
		return false, true
	}

	// worm will never expire
	if rule.WormRetentionTimeSeconds == 0 {
		return true, true
	}

	enforcedAt := time.Unix(0, entry.WormEnforcedAtTsNs)

	// worm is expired
	if time.Now().Sub(enforcedAt).Seconds() >= float64(rule.WormRetentionTimeSeconds) {
		return false, true
	}

	return true, true
}