diff options
| -rw-r--r-- | weed/command/backup.go | 178 |
1 files changed, 121 insertions, 57 deletions
diff --git a/weed/command/backup.go b/weed/command/backup.go index 0f9088211..59499d789 100644 --- a/weed/command/backup.go +++ b/weed/command/backup.go @@ -4,6 +4,8 @@ import ( "context" "fmt" + "google.golang.org/grpc" + "github.com/seaweedfs/seaweedfs/weed/pb" "github.com/seaweedfs/seaweedfs/weed/security" @@ -66,105 +68,167 @@ var cmdBackup = &Command{ `, } -func runBackup(cmd *Command, args []string) bool { - - util.LoadSecurityConfiguration() - grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.client") +// parseTTL parses the TTL from user input or volume stats. +// Returns (ttl, error, isFatal) where isFatal=true for invalid user input. +func parseTTL(userTTL string, statsTTL string) (*needle.TTL, error, bool) { + if userTTL != "" { + ttl, err := needle.ReadTTL(userTTL) + if err != nil { + // User-provided TTL is invalid - this is fatal + return nil, fmt.Errorf("invalid user-provided ttl %s: %w", userTTL, err), true + } + return ttl, nil, false + } - // Backward compatibility: if -server is provided, use it - masterServer := *s.master - if *s.server != "" { - masterServer = *s.server + ttl, err := needle.ReadTTL(statsTTL) + if err != nil { + return nil, fmt.Errorf("parsing ttl %s from stats: %w", statsTTL, err), false } + return ttl, nil, false +} - if *s.volumeId == -1 { - return false +// parseReplication parses the replication from user input or volume stats. +// Returns (replication, error, isFatal) where isFatal=true for invalid user input. +func parseReplication(userReplication string, statsReplication string) (*super_block.ReplicaPlacement, error, bool) { + if userReplication != "" { + replication, err := super_block.NewReplicaPlacementFromString(userReplication) + if err != nil { + // User-provided replication is invalid - this is fatal + return nil, fmt.Errorf("invalid user-provided replication %s: %w", userReplication, err), true + } + return replication, nil, false } - vid := needle.VolumeId(*s.volumeId) - // find volume location, replication, ttl info - lookup, err := operation.LookupVolumeId(func(_ context.Context) pb.ServerAddress { return pb.ServerAddress(masterServer) }, grpcDialOption, vid.String()) + replication, err := super_block.NewReplicaPlacementFromString(statsReplication) if err != nil { - fmt.Printf("Error looking up volume %d: %v\n", vid, err) - return true + return nil, fmt.Errorf("parsing replication %s from stats: %w", statsReplication, err), false } - volumeServer := lookup.Locations[0].ServerAddress() + return replication, nil, false +} +// backupFromLocation attempts to backup a volume from a specific volume server location. +// Returns (error, isFatal) where isFatal=true means the error is due to invalid user input +// and should not be retried with other locations. +func backupFromLocation(volumeServer pb.ServerAddress, grpcDialOption grpc.DialOption, vid needle.VolumeId) (error, bool) { stats, err := operation.GetVolumeSyncStatus(volumeServer, grpcDialOption, uint32(vid)) if err != nil { - fmt.Printf("Error get volume %d status: %v\n", vid, err) - return true + return fmt.Errorf("getting volume status: %w", err), false } - var ttl *needle.TTL - if *s.ttl != "" { - ttl, err = needle.ReadTTL(*s.ttl) - if err != nil { - fmt.Printf("Error generate volume %d ttl %s: %v\n", vid, *s.ttl, err) - return true - } - } else { - ttl, err = needle.ReadTTL(stats.Ttl) - if err != nil { - fmt.Printf("Error get volume %d ttl %s: %v\n", vid, stats.Ttl, err) - return true - } + + // Parse TTL + ttl, err, isFatal := parseTTL(*s.ttl, stats.Ttl) + if err != nil { + return err, isFatal } - var replication *super_block.ReplicaPlacement - if *s.replication != "" { - replication, err = super_block.NewReplicaPlacementFromString(*s.replication) - if err != nil { - fmt.Printf("Error generate volume %d replication %s : %v\n", vid, *s.replication, err) - return true - } - } else { - replication, err = super_block.NewReplicaPlacementFromString(stats.Replication) - if err != nil { - fmt.Printf("Error get volume %d replication %s : %v\n", vid, stats.Replication, err) - return true - } + + // Parse replication + replication, err, isFatal := parseReplication(*s.replication, stats.Replication) + if err != nil { + return err, isFatal } ver := needle.Version(stats.Version) + // Create or load the volume v, err := storage.NewVolume(util.ResolvePath(*s.dir), util.ResolvePath(*s.dir), *s.collection, vid, storage.NeedleMapInMemory, replication, ttl, 0, ver, 0, 0) if err != nil { - fmt.Printf("Error creating or reading from volume %d: %v\n", vid, err) - return true + return fmt.Errorf("creating or reading volume: %w", err), false } + // Handle compaction if needed if v.SuperBlock.CompactionRevision < uint16(stats.CompactRevision) { if err = v.Compact2(0, 0, nil); err != nil { - fmt.Printf("Compact Volume before synchronizing %v\n", err) - return true + v.Close() + return fmt.Errorf("compacting volume: %w", err), false } if err = v.CommitCompact(); err != nil { - fmt.Printf("Commit Compact before synchronizing %v\n", err) - return true + v.Close() + return fmt.Errorf("committing compaction: %w", err), false } v.SuperBlock.CompactionRevision = uint16(stats.CompactRevision) - v.DataBackend.WriteAt(v.SuperBlock.Bytes(), 0) + if _, err = v.DataBackend.WriteAt(v.SuperBlock.Bytes(), 0); err != nil { + v.Close() + return fmt.Errorf("writing superblock: %w", err), false + } } datSize, _, _ := v.FileStat() + // If local volume is larger than remote, recreate it if datSize > stats.TailOffset { - // remove the old data if err := v.Destroy(false); err != nil { - fmt.Printf("Error destroying volume: %v\n", err) + v.Close() + return fmt.Errorf("destroying volume: %w", err), false } + v.Close() // Close the destroyed volume // recreate an empty volume v, err = storage.NewVolume(util.ResolvePath(*s.dir), util.ResolvePath(*s.dir), *s.collection, vid, storage.NeedleMapInMemory, replication, ttl, 0, ver, 0, 0) if err != nil { - fmt.Printf("Error creating or reading from volume %d: %v\n", vid, err) - return true + return fmt.Errorf("recreating volume: %w", err), false } } - defer v.Close() + // Perform the incremental backup if err := v.IncrementalBackup(volumeServer, grpcDialOption); err != nil { - fmt.Printf("Error synchronizing volume %d: %v\n", vid, err) + v.Close() + return fmt.Errorf("incremental backup: %w", err), false + } + + v.Close() + return nil, false +} + +func runBackup(cmd *Command, args []string) bool { + + util.LoadSecurityConfiguration() + grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.client") + + // Backward compatibility: if -server is provided, use it + masterServer := *s.master + if *s.server != "" { + masterServer = *s.server + } + + if *s.volumeId == -1 { + return false + } + vid := needle.VolumeId(*s.volumeId) + + // find volume location, replication, ttl info + lookup, err := operation.LookupVolumeId(func(_ context.Context) pb.ServerAddress { return pb.ServerAddress(masterServer) }, grpcDialOption, vid.String()) + if err != nil { + fmt.Printf("Error looking up volume %d: %v\n", vid, err) + return true + } + if len(lookup.Locations) == 0 { + fmt.Printf("Error: volume %d has no locations available\n", vid) return true } + // Try each available location until one succeeds + var lastErr error + for i, location := range lookup.Locations { + volumeServer := location.ServerAddress() + fmt.Printf("Attempting to backup volume %d from location %d/%d: %s\n", vid, i+1, len(lookup.Locations), volumeServer) + + err, isFatal := backupFromLocation(volumeServer, grpcDialOption, vid) + if err != nil { + fmt.Printf("Error backing up volume %d from %s: %v\n", vid, volumeServer, err) + lastErr = err + // Check if this is a fatal user-input error + if isFatal { + return true + } + continue + } + + // Success! + fmt.Printf("Successfully backed up volume %d from %s\n", vid, volumeServer) + return true + } + + // All locations failed + fmt.Printf("Failed to backup volume %d after trying all %d locations. Last error: %v\n", vid, len(lookup.Locations), lastErr) + return true } |
