aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--weed/command/backup.go178
1 files changed, 121 insertions, 57 deletions
diff --git a/weed/command/backup.go b/weed/command/backup.go
index 0f9088211..59499d789 100644
--- a/weed/command/backup.go
+++ b/weed/command/backup.go
@@ -4,6 +4,8 @@ import (
"context"
"fmt"
+ "google.golang.org/grpc"
+
"github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/security"
@@ -66,105 +68,167 @@ var cmdBackup = &Command{
`,
}
-func runBackup(cmd *Command, args []string) bool {
-
- util.LoadSecurityConfiguration()
- grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.client")
+// parseTTL parses the TTL from user input or volume stats.
+// Returns (ttl, error, isFatal) where isFatal=true for invalid user input.
+func parseTTL(userTTL string, statsTTL string) (*needle.TTL, error, bool) {
+ if userTTL != "" {
+ ttl, err := needle.ReadTTL(userTTL)
+ if err != nil {
+ // User-provided TTL is invalid - this is fatal
+ return nil, fmt.Errorf("invalid user-provided ttl %s: %w", userTTL, err), true
+ }
+ return ttl, nil, false
+ }
- // Backward compatibility: if -server is provided, use it
- masterServer := *s.master
- if *s.server != "" {
- masterServer = *s.server
+ ttl, err := needle.ReadTTL(statsTTL)
+ if err != nil {
+ return nil, fmt.Errorf("parsing ttl %s from stats: %w", statsTTL, err), false
}
+ return ttl, nil, false
+}
- if *s.volumeId == -1 {
- return false
+// parseReplication parses the replication from user input or volume stats.
+// Returns (replication, error, isFatal) where isFatal=true for invalid user input.
+func parseReplication(userReplication string, statsReplication string) (*super_block.ReplicaPlacement, error, bool) {
+ if userReplication != "" {
+ replication, err := super_block.NewReplicaPlacementFromString(userReplication)
+ if err != nil {
+ // User-provided replication is invalid - this is fatal
+ return nil, fmt.Errorf("invalid user-provided replication %s: %w", userReplication, err), true
+ }
+ return replication, nil, false
}
- vid := needle.VolumeId(*s.volumeId)
- // find volume location, replication, ttl info
- lookup, err := operation.LookupVolumeId(func(_ context.Context) pb.ServerAddress { return pb.ServerAddress(masterServer) }, grpcDialOption, vid.String())
+ replication, err := super_block.NewReplicaPlacementFromString(statsReplication)
if err != nil {
- fmt.Printf("Error looking up volume %d: %v\n", vid, err)
- return true
+ return nil, fmt.Errorf("parsing replication %s from stats: %w", statsReplication, err), false
}
- volumeServer := lookup.Locations[0].ServerAddress()
+ return replication, nil, false
+}
+// backupFromLocation attempts to backup a volume from a specific volume server location.
+// Returns (error, isFatal) where isFatal=true means the error is due to invalid user input
+// and should not be retried with other locations.
+func backupFromLocation(volumeServer pb.ServerAddress, grpcDialOption grpc.DialOption, vid needle.VolumeId) (error, bool) {
stats, err := operation.GetVolumeSyncStatus(volumeServer, grpcDialOption, uint32(vid))
if err != nil {
- fmt.Printf("Error get volume %d status: %v\n", vid, err)
- return true
+ return fmt.Errorf("getting volume status: %w", err), false
}
- var ttl *needle.TTL
- if *s.ttl != "" {
- ttl, err = needle.ReadTTL(*s.ttl)
- if err != nil {
- fmt.Printf("Error generate volume %d ttl %s: %v\n", vid, *s.ttl, err)
- return true
- }
- } else {
- ttl, err = needle.ReadTTL(stats.Ttl)
- if err != nil {
- fmt.Printf("Error get volume %d ttl %s: %v\n", vid, stats.Ttl, err)
- return true
- }
+
+ // Parse TTL
+ ttl, err, isFatal := parseTTL(*s.ttl, stats.Ttl)
+ if err != nil {
+ return err, isFatal
}
- var replication *super_block.ReplicaPlacement
- if *s.replication != "" {
- replication, err = super_block.NewReplicaPlacementFromString(*s.replication)
- if err != nil {
- fmt.Printf("Error generate volume %d replication %s : %v\n", vid, *s.replication, err)
- return true
- }
- } else {
- replication, err = super_block.NewReplicaPlacementFromString(stats.Replication)
- if err != nil {
- fmt.Printf("Error get volume %d replication %s : %v\n", vid, stats.Replication, err)
- return true
- }
+
+ // Parse replication
+ replication, err, isFatal := parseReplication(*s.replication, stats.Replication)
+ if err != nil {
+ return err, isFatal
}
ver := needle.Version(stats.Version)
+ // Create or load the volume
v, err := storage.NewVolume(util.ResolvePath(*s.dir), util.ResolvePath(*s.dir), *s.collection, vid, storage.NeedleMapInMemory, replication, ttl, 0, ver, 0, 0)
if err != nil {
- fmt.Printf("Error creating or reading from volume %d: %v\n", vid, err)
- return true
+ return fmt.Errorf("creating or reading volume: %w", err), false
}
+ // Handle compaction if needed
if v.SuperBlock.CompactionRevision < uint16(stats.CompactRevision) {
if err = v.Compact2(0, 0, nil); err != nil {
- fmt.Printf("Compact Volume before synchronizing %v\n", err)
- return true
+ v.Close()
+ return fmt.Errorf("compacting volume: %w", err), false
}
if err = v.CommitCompact(); err != nil {
- fmt.Printf("Commit Compact before synchronizing %v\n", err)
- return true
+ v.Close()
+ return fmt.Errorf("committing compaction: %w", err), false
}
v.SuperBlock.CompactionRevision = uint16(stats.CompactRevision)
- v.DataBackend.WriteAt(v.SuperBlock.Bytes(), 0)
+ if _, err = v.DataBackend.WriteAt(v.SuperBlock.Bytes(), 0); err != nil {
+ v.Close()
+ return fmt.Errorf("writing superblock: %w", err), false
+ }
}
datSize, _, _ := v.FileStat()
+ // If local volume is larger than remote, recreate it
if datSize > stats.TailOffset {
- // remove the old data
if err := v.Destroy(false); err != nil {
- fmt.Printf("Error destroying volume: %v\n", err)
+ v.Close()
+ return fmt.Errorf("destroying volume: %w", err), false
}
+ v.Close() // Close the destroyed volume
// recreate an empty volume
v, err = storage.NewVolume(util.ResolvePath(*s.dir), util.ResolvePath(*s.dir), *s.collection, vid, storage.NeedleMapInMemory, replication, ttl, 0, ver, 0, 0)
if err != nil {
- fmt.Printf("Error creating or reading from volume %d: %v\n", vid, err)
- return true
+ return fmt.Errorf("recreating volume: %w", err), false
}
}
- defer v.Close()
+ // Perform the incremental backup
if err := v.IncrementalBackup(volumeServer, grpcDialOption); err != nil {
- fmt.Printf("Error synchronizing volume %d: %v\n", vid, err)
+ v.Close()
+ return fmt.Errorf("incremental backup: %w", err), false
+ }
+
+ v.Close()
+ return nil, false
+}
+
+func runBackup(cmd *Command, args []string) bool {
+
+ util.LoadSecurityConfiguration()
+ grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.client")
+
+ // Backward compatibility: if -server is provided, use it
+ masterServer := *s.master
+ if *s.server != "" {
+ masterServer = *s.server
+ }
+
+ if *s.volumeId == -1 {
+ return false
+ }
+ vid := needle.VolumeId(*s.volumeId)
+
+ // find volume location, replication, ttl info
+ lookup, err := operation.LookupVolumeId(func(_ context.Context) pb.ServerAddress { return pb.ServerAddress(masterServer) }, grpcDialOption, vid.String())
+ if err != nil {
+ fmt.Printf("Error looking up volume %d: %v\n", vid, err)
+ return true
+ }
+ if len(lookup.Locations) == 0 {
+ fmt.Printf("Error: volume %d has no locations available\n", vid)
return true
}
+ // Try each available location until one succeeds
+ var lastErr error
+ for i, location := range lookup.Locations {
+ volumeServer := location.ServerAddress()
+ fmt.Printf("Attempting to backup volume %d from location %d/%d: %s\n", vid, i+1, len(lookup.Locations), volumeServer)
+
+ err, isFatal := backupFromLocation(volumeServer, grpcDialOption, vid)
+ if err != nil {
+ fmt.Printf("Error backing up volume %d from %s: %v\n", vid, volumeServer, err)
+ lastErr = err
+ // Check if this is a fatal user-input error
+ if isFatal {
+ return true
+ }
+ continue
+ }
+
+ // Success!
+ fmt.Printf("Successfully backed up volume %d from %s\n", vid, volumeServer)
+ return true
+ }
+
+ // All locations failed
+ fmt.Printf("Failed to backup volume %d after trying all %d locations. Last error: %v\n", vid, len(lookup.Locations), lastErr)
+
return true
}