aboutsummaryrefslogtreecommitdiff
path: root/weed/s3api/s3_objectlock/object_lock_check.go
blob: a66e587c59b316a4af9df9dbf33984b55abc98b9 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
package s3_objectlock

import (
	"context"
	"errors"
	"fmt"
	"io"
	"strconv"
	"strings"
	"time"

	"github.com/seaweedfs/seaweedfs/weed/glog"
	"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
	"github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants"
)

// ====================================================================
// SHARED OBJECT LOCK CHECKING FUNCTIONS
// ====================================================================
// These functions are used by S3 API, Admin UI, and shell commands for
// checking Object Lock status before bucket deletion.

// EntryHasActiveLock checks if an entry has an active retention or legal hold
// This is a standalone function that can be used by any component
func EntryHasActiveLock(entry *filer_pb.Entry, currentTime time.Time) bool {
	if entry == nil || entry.Extended == nil {
		return false
	}

	// Check for active legal hold (case-insensitive, trimmed for defensive parsing)
	if legalHoldBytes, exists := entry.Extended[s3_constants.ExtLegalHoldKey]; exists {
		legalHold := strings.TrimSpace(strings.ToUpper(string(legalHoldBytes)))
		if legalHold == s3_constants.LegalHoldOn {
			return true
		}
	}

	// Check for active retention (case-insensitive, trimmed for defensive parsing)
	if modeBytes, exists := entry.Extended[s3_constants.ExtObjectLockModeKey]; exists {
		mode := strings.TrimSpace(strings.ToUpper(string(modeBytes)))
		if mode == s3_constants.RetentionModeCompliance || mode == s3_constants.RetentionModeGovernance {
			// Check if retention is still active
			if dateBytes, dateExists := entry.Extended[s3_constants.ExtRetentionUntilDateKey]; dateExists {
				dateStr := strings.TrimSpace(string(dateBytes))
				timestamp, err := strconv.ParseInt(dateStr, 10, 64)
				if err != nil {
					// Fail-safe: if we can't parse the retention date, assume the object is locked
					// to prevent accidental data loss
					glog.Warningf("Failed to parse retention date '%s' for entry, assuming locked: %v", dateStr, err)
					return true
				}
				retainUntil := time.Unix(timestamp, 0)
				if retainUntil.After(currentTime) {
					return true
				}
			}
		}
	}

	return false
}

// HasObjectsWithActiveLocks checks if any objects in the bucket have active retention or legal hold
// This function uses the filer gRPC client to scan the bucket directory
func HasObjectsWithActiveLocks(ctx context.Context, client filer_pb.SeaweedFilerClient, bucketPath string) (bool, error) {
	hasLocks := false
	currentTime := time.Now()

	err := recursivelyCheckLocksWithClient(ctx, client, bucketPath, &hasLocks, currentTime)
	if err != nil {
		return false, fmt.Errorf("error checking for locked objects: %w", err)
	}

	return hasLocks, nil
}

// paginateEntries is a generic helper that handles pagination logic for listing directory entries.
// The processEntry callback is called for each entry; returning stop=true stops iteration early.
func paginateEntries(ctx context.Context, client filer_pb.SeaweedFilerClient, dir string,
	processEntry func(entry *filer_pb.Entry) (stop bool, err error)) error {
	lastFileName := ""
	for {
		resp, err := client.ListEntries(ctx, &filer_pb.ListEntriesRequest{
			Directory:          dir,
			StartFromFileName:  lastFileName,
			InclusiveStartFrom: false,
			Limit:              10000,
		})
		if err != nil {
			return fmt.Errorf("failed to list directory %s: %w", dir, err)
		}

		entriesReceived := false
		for {
			entryResp, recvErr := resp.Recv()
			if recvErr != nil {
				if errors.Is(recvErr, io.EOF) {
					break // Normal end of stream
				}
				return fmt.Errorf("failed to receive entry from %s: %w", dir, recvErr)
			}
			entriesReceived = true
			entry := entryResp.Entry
			lastFileName = entry.Name

			// Skip invalid entry names to prevent path traversal
			if entry.Name == "" || entry.Name == "." || entry.Name == ".." ||
				strings.ContainsAny(entry.Name, "/\\") {
				glog.V(2).Infof("Skipping invalid entry name: %q in %s", entry.Name, dir)
				continue
			}

			stop, err := processEntry(entry)
			if err != nil {
				return err
			}
			if stop {
				return nil
			}
		}

		if !entriesReceived {
			break
		}
	}
	return nil
}

// recursivelyCheckLocksWithClient recursively checks all objects and versions for active locks
func recursivelyCheckLocksWithClient(ctx context.Context, client filer_pb.SeaweedFilerClient, dir string, hasLocks *bool, currentTime time.Time) error {
	if *hasLocks {
		return nil // Early exit if already found a locked object
	}

	return paginateEntries(ctx, client, dir, func(entry *filer_pb.Entry) (bool, error) {
		if *hasLocks {
			return true, nil // Stop iteration
		}

		// Skip special directories
		if entry.Name == s3_constants.MultipartUploadsFolder {
			return false, nil // Continue
		}

		if entry.IsDirectory {
			subDir := dir + "/" + entry.Name
			if entry.Name == s3_constants.VersionsFolder {
				// Check all version files (exact match for .versions folder)
				if err := checkVersionsForLocksWithClient(ctx, client, subDir, hasLocks, currentTime); err != nil {
					return false, err
				}
			} else {
				// Recursively check subdirectories
				if err := recursivelyCheckLocksWithClient(ctx, client, subDir, hasLocks, currentTime); err != nil {
					return false, err
				}
			}
		} else {
			// Check if this object has an active lock
			if EntryHasActiveLock(entry, currentTime) {
				*hasLocks = true
				glog.V(2).Infof("Found object with active lock: %s/%s", dir, entry.Name)
				return true, nil // Stop iteration
			}
		}
		return false, nil // Continue
	})
}

// checkVersionsForLocksWithClient checks all versions in a .versions directory for active locks
func checkVersionsForLocksWithClient(ctx context.Context, client filer_pb.SeaweedFilerClient, versionsDir string, hasLocks *bool, currentTime time.Time) error {
	return paginateEntries(ctx, client, versionsDir, func(entry *filer_pb.Entry) (bool, error) {
		if *hasLocks {
			return true, nil // Stop iteration
		}

		if EntryHasActiveLock(entry, currentTime) {
			*hasLocks = true
			glog.V(2).Infof("Found version with active lock: %s/%s", versionsDir, entry.Name)
			return true, nil // Stop iteration
		}
		return false, nil // Continue
	})
}

// IsObjectLockEnabled checks if Object Lock is enabled on a bucket entry
func IsObjectLockEnabled(entry *filer_pb.Entry) bool {
	if entry == nil || entry.Extended == nil {
		return false
	}

	enabledBytes, exists := entry.Extended[s3_constants.ExtObjectLockEnabledKey]
	if !exists {
		return false
	}

	enabled := string(enabledBytes)
	return enabled == s3_constants.ObjectLockEnabled || enabled == "true"
}

// CheckBucketForLockedObjects is a unified function that checks if a bucket has Object Lock enabled
// and if so, scans for objects with active locks. This combines the bucket lookup and lock check
// into a single operation used by S3 API, Admin UI, and shell commands.
// Returns an error if the bucket has locked objects or if the check fails.
func CheckBucketForLockedObjects(ctx context.Context, client filer_pb.SeaweedFilerClient, bucketsPath, bucketName string) error {
	// Look up the bucket entry
	lookupResp, err := client.LookupDirectoryEntry(ctx, &filer_pb.LookupDirectoryEntryRequest{
		Directory: bucketsPath,
		Name:      bucketName,
	})
	if err != nil {
		return fmt.Errorf("bucket not found: %w", err)
	}

	// Check if Object Lock is enabled
	if !IsObjectLockEnabled(lookupResp.Entry) {
		return nil // No Object Lock, nothing to check
	}

	// Check for objects with active locks
	bucketPath := bucketsPath + "/" + bucketName
	hasLockedObjects, checkErr := HasObjectsWithActiveLocks(ctx, client, bucketPath)
	if checkErr != nil {
		return fmt.Errorf("failed to check for locked objects: %w", checkErr)
	}
	if hasLockedObjects {
		return fmt.Errorf("bucket has objects with active Object Lock retention or legal hold")
	}

	return nil
}