diff options
| author | yulai.li <blacktear23@gmail.com> | 2022-06-26 22:43:37 +0800 |
|---|---|---|
| committer | yulai.li <blacktear23@gmail.com> | 2022-06-26 22:43:37 +0800 |
| commit | 46e0b629e529f3aff535f90dd25eb719adf1c0d0 (patch) | |
| tree | 734125b48b6d96f8796a2b89b924312cd169ef0e /weed/topology | |
| parent | a5bd0b3a1644a77dcc0b9ff41c4ce8eb3ea0d566 (diff) | |
| parent | dc59ccd110a321db7d0b0480631aa95a3d9ba7e6 (diff) | |
| download | seaweedfs-46e0b629e529f3aff535f90dd25eb719adf1c0d0.tar.xz seaweedfs-46e0b629e529f3aff535f90dd25eb719adf1c0d0.zip | |
Update tikv client version and add one PC support
Diffstat (limited to 'weed/topology')
| -rw-r--r-- | weed/topology/allocate_volume.go | 6 | ||||
| -rw-r--r-- | weed/topology/cluster_commands.go | 20 | ||||
| -rw-r--r-- | weed/topology/data_node.go | 19 | ||||
| -rw-r--r-- | weed/topology/data_node_ec.go | 4 | ||||
| -rw-r--r-- | weed/topology/node.go | 9 | ||||
| -rw-r--r-- | weed/topology/rack.go | 7 | ||||
| -rw-r--r-- | weed/topology/store_replicate.go | 24 | ||||
| -rw-r--r-- | weed/topology/topology.go | 56 | ||||
| -rw-r--r-- | weed/topology/topology_ec.go | 7 | ||||
| -rw-r--r-- | weed/topology/topology_event_handling.go | 6 | ||||
| -rw-r--r-- | weed/topology/topology_test.go | 4 | ||||
| -rw-r--r-- | weed/topology/topology_vacuum.go | 51 | ||||
| -rw-r--r-- | weed/topology/volume_growth.go | 41 | ||||
| -rw-r--r-- | weed/topology/volume_layout.go | 66 | ||||
| -rw-r--r-- | weed/topology/volume_location_list.go | 5 |
15 files changed, 244 insertions, 81 deletions
diff --git a/weed/topology/allocate_volume.go b/weed/topology/allocate_volume.go index 7c7fae683..f21d0fc0a 100644 --- a/weed/topology/allocate_volume.go +++ b/weed/topology/allocate_volume.go @@ -15,9 +15,9 @@ type AllocateVolumeResult struct { func AllocateVolume(dn *DataNode, grpcDialOption grpc.DialOption, vid needle.VolumeId, option *VolumeGrowOption) error { - return operation.WithVolumeServerClient(dn.Url(), grpcDialOption, func(client volume_server_pb.VolumeServerClient) error { + return operation.WithVolumeServerClient(false, dn.ServerAddress(), grpcDialOption, func(client volume_server_pb.VolumeServerClient) error { - _, deleteErr := client.AllocateVolume(context.Background(), &volume_server_pb.AllocateVolumeRequest{ + _, allocateErr := client.AllocateVolume(context.Background(), &volume_server_pb.AllocateVolumeRequest{ VolumeId: uint32(vid), Collection: option.Collection, Replication: option.ReplicaPlacement.String(), @@ -26,7 +26,7 @@ func AllocateVolume(dn *DataNode, grpcDialOption grpc.DialOption, vid needle.Vol MemoryMapMaxSizeMb: option.MemoryMapMaxSizeMb, DiskType: string(option.DiskType), }) - return deleteErr + return allocateErr }) } diff --git a/weed/topology/cluster_commands.go b/weed/topology/cluster_commands.go index 152691ccb..1bcc6b449 100644 --- a/weed/topology/cluster_commands.go +++ b/weed/topology/cluster_commands.go @@ -1,9 +1,12 @@ package topology import ( + "encoding/json" + "fmt" "github.com/chrislusf/raft" "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/storage/needle" + hashicorpRaft "github.com/hashicorp/raft" ) type MaxVolumeIdCommand struct { @@ -20,6 +23,7 @@ func (c *MaxVolumeIdCommand) CommandName() string { return "MaxVolumeId" } +// deprecatedCommandApply represents the old interface to apply a command to the server. func (c *MaxVolumeIdCommand) Apply(server raft.Server) (interface{}, error) { topo := server.Context().(*Topology) before := topo.GetMaxVolumeId() @@ -29,3 +33,19 @@ func (c *MaxVolumeIdCommand) Apply(server raft.Server) (interface{}, error) { return nil, nil } + +func (s *MaxVolumeIdCommand) Persist(sink hashicorpRaft.SnapshotSink) error { + b, err := json.Marshal(s) + if err != nil { + return fmt.Errorf("marshal: %v", err) + } + _, err = sink.Write(b) + if err != nil { + sink.Cancel() + return fmt.Errorf("sink.Write(): %v", err) + } + return sink.Close() +} + +func (s *MaxVolumeIdCommand) Release() { +} diff --git a/weed/topology/data_node.go b/weed/topology/data_node.go index 2813f7b45..6bdbd965f 100644 --- a/weed/topology/data_node.go +++ b/weed/topology/data_node.go @@ -2,22 +2,23 @@ package topology import ( "fmt" + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/pb" "github.com/chrislusf/seaweedfs/weed/pb/master_pb" + "github.com/chrislusf/seaweedfs/weed/storage" "github.com/chrislusf/seaweedfs/weed/storage/needle" "github.com/chrislusf/seaweedfs/weed/storage/types" "github.com/chrislusf/seaweedfs/weed/util" - "strconv" - - "github.com/chrislusf/seaweedfs/weed/glog" - "github.com/chrislusf/seaweedfs/weed/storage" ) type DataNode struct { NodeImpl Ip string Port int + GrpcPort int PublicUrl string LastSeen int64 // unix time in seconds + Counter int // in race condition, the previous dataNode was not dead } func NewDataNode(id string) *DataNode { @@ -109,6 +110,9 @@ func (dn *DataNode) DeltaUpdateVolumes(newVolumes, deletedVolumes []storage.Volu for _, v := range deletedVolumes { disk := dn.getOrCreateDisk(v.DiskType) + if _, found := disk.volumes[v.Id]; !found { + continue + } delete(disk.volumes, v.Id) deltaDiskUsages := newDiskUsages() @@ -206,7 +210,11 @@ func (dn *DataNode) MatchLocation(ip string, port int) bool { } func (dn *DataNode) Url() string { - return dn.Ip + ":" + strconv.Itoa(dn.Port) + return util.JoinHostPort(dn.Ip, dn.Port) +} + +func (dn *DataNode) ServerAddress() pb.ServerAddress { + return pb.NewServerAddress(dn.Ip, dn.Port, dn.GrpcPort) } func (dn *DataNode) ToMap() interface{} { @@ -240,6 +248,7 @@ func (dn *DataNode) ToDataNodeInfo() *master_pb.DataNodeInfo { m := &master_pb.DataNodeInfo{ Id: string(dn.Id()), DiskInfos: make(map[string]*master_pb.DiskInfo), + GrpcPort: uint32(dn.GrpcPort), } for _, c := range dn.Children() { disk := c.(*Disk) diff --git a/weed/topology/data_node_ec.go b/weed/topology/data_node_ec.go index 330b16b24..bf72fa9af 100644 --- a/weed/topology/data_node_ec.go +++ b/weed/topology/data_node_ec.go @@ -58,7 +58,7 @@ func (dn *DataNode) UpdateEcShards(actualShards []*erasure_coding.EcVolumeInfo) } for _, ecShards := range actualShards { - if dn.hasEcShards(ecShards.VolumeId) { + if dn.HasEcShards(ecShards.VolumeId) { continue } @@ -79,7 +79,7 @@ func (dn *DataNode) UpdateEcShards(actualShards []*erasure_coding.EcVolumeInfo) return } -func (dn *DataNode) hasEcShards(volumeId needle.VolumeId) (found bool) { +func (dn *DataNode) HasEcShards(volumeId needle.VolumeId) (found bool) { dn.RLock() defer dn.RUnlock() for _, c := range dn.children { diff --git a/weed/topology/node.go b/weed/topology/node.go index 4772cb411..c5956177a 100644 --- a/weed/topology/node.go +++ b/weed/topology/node.go @@ -3,6 +3,7 @@ package topology import ( "errors" "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/stats" "github.com/chrislusf/seaweedfs/weed/storage/erasure_coding" "github.com/chrislusf/seaweedfs/weed/storage/needle" "github.com/chrislusf/seaweedfs/weed/storage/types" @@ -246,6 +247,14 @@ func (n *NodeImpl) CollectDeadNodeAndFullVolumes(freshThreshHold int64, volumeSi } else if float64(v.Size) > float64(volumeSizeLimit)*growThreshold { n.GetTopology().chanCrowdedVolumes <- v } + copyCount := v.ReplicaPlacement.GetCopyCount() + if copyCount > 1 { + if copyCount > len(n.GetTopology().Lookup(v.Collection, v.Id)) { + stats.MasterReplicaPlacementMismatch.WithLabelValues(v.Collection, v.Id.String()).Set(1) + } else { + stats.MasterReplicaPlacementMismatch.WithLabelValues(v.Collection, v.Id.String()).Set(0) + } + } } } } else { diff --git a/weed/topology/rack.go b/weed/topology/rack.go index 8eb2a717c..cd09746b2 100644 --- a/weed/topology/rack.go +++ b/weed/topology/rack.go @@ -3,7 +3,7 @@ package topology import ( "github.com/chrislusf/seaweedfs/weed/pb/master_pb" "github.com/chrislusf/seaweedfs/weed/storage/types" - "strconv" + "github.com/chrislusf/seaweedfs/weed/util" "time" ) @@ -30,7 +30,7 @@ func (r *Rack) FindDataNode(ip string, port int) *DataNode { } return nil } -func (r *Rack) GetOrCreateDataNode(ip string, port int, publicUrl string, maxVolumeCounts map[string]uint32) *DataNode { +func (r *Rack) GetOrCreateDataNode(ip string, port int, grpcPort int, publicUrl string, maxVolumeCounts map[string]uint32) *DataNode { for _, c := range r.Children() { dn := c.(*DataNode) if dn.MatchLocation(ip, port) { @@ -38,9 +38,10 @@ func (r *Rack) GetOrCreateDataNode(ip string, port int, publicUrl string, maxVol return dn } } - dn := NewDataNode(ip + ":" + strconv.Itoa(port)) + dn := NewDataNode(util.JoinHostPort(ip, port)) dn.Ip = ip dn.Port = port + dn.GrpcPort = grpcPort dn.PublicUrl = publicUrl dn.LastSeen = time.Now().Unix() r.LinkChildNode(dn) diff --git a/weed/topology/store_replicate.go b/weed/topology/store_replicate.go index b114b468d..7bb10f1da 100644 --- a/weed/topology/store_replicate.go +++ b/weed/topology/store_replicate.go @@ -13,6 +13,7 @@ import ( "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/operation" "github.com/chrislusf/seaweedfs/weed/security" + "github.com/chrislusf/seaweedfs/weed/stats" "github.com/chrislusf/seaweedfs/weed/storage" "github.com/chrislusf/seaweedfs/weed/storage/needle" "github.com/chrislusf/seaweedfs/weed/storage/types" @@ -28,7 +29,7 @@ func ReplicatedWrite(masterFn operation.GetMasterFn, grpcDialOption grpc.DialOpt var remoteLocations []operation.Location if r.FormValue("type") != "replicate" { // this is the initial request - remoteLocations, err = getWritableRemoteReplications(s, grpcDialOption, volumeId, masterFn) + remoteLocations, err = GetWritableRemoteReplications(s, grpcDialOption, volumeId, masterFn) if err != nil { glog.V(0).Infoln(err) return @@ -44,6 +45,7 @@ func ReplicatedWrite(masterFn operation.GetMasterFn, grpcDialOption grpc.DialOpt if s.GetVolume(volumeId) != nil { isUnchanged, err = s.WriteVolumeNeedle(volumeId, n, true, fsync) if err != nil { + stats.VolumeServerRequestCounter.WithLabelValues(stats.ErrorWriteToLocalDisk).Inc() err = fmt.Errorf("failed to write to local disk: %v", err) glog.V(0).Infoln(err) return @@ -74,6 +76,7 @@ func ReplicatedWrite(masterFn operation.GetMasterFn, grpcDialOption grpc.DialOpt tmpMap := make(map[string]string) err := json.Unmarshal(n.Pairs, &tmpMap) if err != nil { + stats.VolumeServerRequestCounter.WithLabelValues(stats.ErrorUnmarshalPairs).Inc() glog.V(0).Infoln("Unmarshal pairs error:", err) } for k, v := range tmpMap { @@ -83,11 +86,22 @@ func ReplicatedWrite(masterFn operation.GetMasterFn, grpcDialOption grpc.DialOpt // volume server do not know about encryption // TODO optimize here to compress data only once - _, err := operation.UploadData(u.String(), string(n.Name), false, n.Data, n.IsCompressed(), string(n.Mime), pairMap, jwt) + uploadOption := &operation.UploadOption{ + UploadUrl: u.String(), + Filename: string(n.Name), + Cipher: false, + IsInputCompressed: n.IsCompressed(), + MimeType: string(n.Mime), + PairMap: pairMap, + Jwt: jwt, + } + _, err := operation.UploadData(n.Data, uploadOption) return err }); err != nil { + stats.VolumeServerRequestCounter.WithLabelValues(stats.ErrorWriteToReplicas).Inc() err = fmt.Errorf("failed to write to replicas for volume %d: %v", volumeId, err) glog.V(0).Infoln(err) + return false, err } } return @@ -100,7 +114,7 @@ func ReplicatedDelete(masterFn operation.GetMasterFn, grpcDialOption grpc.DialOp var remoteLocations []operation.Location if r.FormValue("type") != "replicate" { - remoteLocations, err = getWritableRemoteReplications(store, grpcDialOption, volumeId, masterFn) + remoteLocations, err = GetWritableRemoteReplications(store, grpcDialOption, volumeId, masterFn) if err != nil { glog.V(0).Infoln(err) return @@ -160,7 +174,7 @@ func DistributedOperation(locations []operation.Location, op func(location opera return ret.Error() } -func getWritableRemoteReplications(s *storage.Store, grpcDialOption grpc.DialOption, volumeId needle.VolumeId, masterFn operation.GetMasterFn) (remoteLocations []operation.Location, err error) { +func GetWritableRemoteReplications(s *storage.Store, grpcDialOption grpc.DialOption, volumeId needle.VolumeId, masterFn operation.GetMasterFn) (remoteLocations []operation.Location, err error) { v := s.GetVolume(volumeId) if v != nil && v.ReplicaPlacement.GetCopyCount() == 1 { @@ -170,7 +184,7 @@ func getWritableRemoteReplications(s *storage.Store, grpcDialOption grpc.DialOpt // not on local store, or has replications lookupResult, lookupErr := operation.LookupVolumeId(masterFn, grpcDialOption, volumeId.String()) if lookupErr == nil { - selfUrl := s.Ip + ":" + strconv.Itoa(s.Port) + selfUrl := util.JoinHostPort(s.Ip, s.Port) for _, location := range lookupResult.Locations { if location.Url != selfUrl { remoteLocations = append(remoteLocations, location) diff --git a/weed/topology/topology.go b/weed/topology/topology.go index 4cbe22a42..631c1fa29 100644 --- a/weed/topology/topology.go +++ b/weed/topology/topology.go @@ -1,14 +1,18 @@ package topology import ( + "encoding/json" "errors" "fmt" - "github.com/chrislusf/seaweedfs/weed/storage/types" "math/rand" "sync" "time" + "github.com/chrislusf/seaweedfs/weed/pb" + "github.com/chrislusf/seaweedfs/weed/storage/types" + "github.com/chrislusf/raft" + hashicorpRaft "github.com/hashicorp/raft" "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/pb/master_pb" @@ -39,7 +43,10 @@ type Topology struct { Configuration *Configuration - RaftServer raft.Server + RaftServer raft.Server + HashicorpRaft *hashicorpRaft.Raft + UuidAccessLock sync.RWMutex + UuidMap map[string][]string } func NewTopology(id string, seq sequence.Sequencer, volumeSizeLimit uint64, pulse int, replicationAsMin bool) *Topology { @@ -71,19 +78,25 @@ func (t *Topology) IsLeader() bool { return true } if leader, err := t.Leader(); err == nil { - if t.RaftServer.Name() == leader { + if pb.ServerAddress(t.RaftServer.Name()) == leader { return true } } + } else if t.HashicorpRaft != nil { + if t.HashicorpRaft.State() == hashicorpRaft.Leader { + return true + } } return false } -func (t *Topology) Leader() (string, error) { - l := "" +func (t *Topology) Leader() (pb.ServerAddress, error) { + var l pb.ServerAddress for count := 0; count < 3; count++ { if t.RaftServer != nil { - l = t.RaftServer.Leader() + l = pb.ServerAddress(t.RaftServer.Leader()) + } else if t.HashicorpRaft != nil { + l = pb.ServerAddress(t.HashicorpRaft.Leader()) } else { return "", errors.New("Raft Server not ready yet!") } @@ -123,8 +136,18 @@ func (t *Topology) Lookup(collection string, vid needle.VolumeId) (dataNodes []* func (t *Topology) NextVolumeId() (needle.VolumeId, error) { vid := t.GetMaxVolumeId() next := vid.Next() - if _, err := t.RaftServer.Do(NewMaxVolumeIdCommand(next)); err != nil { - return 0, err + if t.RaftServer != nil { + if _, err := t.RaftServer.Do(NewMaxVolumeIdCommand(next)); err != nil { + return 0, err + } + } else if t.HashicorpRaft != nil { + b, err := json.Marshal(NewMaxVolumeIdCommand(next)) + if err != nil { + return 0, fmt.Errorf("failed marshal NewMaxVolumeIdCommand: %+v", err) + } + if future := t.HashicorpRaft.Apply(b, time.Second); future.Error() != nil { + return 0, future.Error() + } } return next, nil } @@ -136,7 +159,7 @@ func (t *Topology) HasWritableVolume(option *VolumeGrowOption) bool { return active > 0 } -func (t *Topology) PickForWrite(count uint64, option *VolumeGrowOption) (string, uint64, *DataNode, error) { +func (t *Topology) PickForWrite(count uint64, option *VolumeGrowOption) (string, uint64, *VolumeLocationList, error) { vid, count, datanodes, err := t.GetVolumeLayout(option.Collection, option.ReplicaPlacement, option.Ttl, option.DiskType).PickForWrite(count, option) if err != nil { return "", 0, nil, fmt.Errorf("failed to find writable volumes for collection:%s replication:%s ttl:%s error: %v", option.Collection, option.ReplicaPlacement.String(), option.Ttl.String(), err) @@ -145,7 +168,7 @@ func (t *Topology) PickForWrite(count uint64, option *VolumeGrowOption) (string, return "", 0, nil, fmt.Errorf("no writable volumes available for collection:%s replication:%s ttl:%s", option.Collection, option.ReplicaPlacement.String(), option.Ttl.String()) } fileId := t.Sequence.NextFileId(count) - return needle.NewFileId(*vid, fileId, rand.Uint32()).String(), count, datanodes.Head(), nil + return needle.NewFileId(*vid, fileId, rand.Uint32()).String(), count, datanodes, nil } func (t *Topology) GetVolumeLayout(collectionName string, rp *super_block.ReplicaPlacement, ttl *needle.TTL, diskType types.DiskType) *VolumeLayout { @@ -205,7 +228,7 @@ func (t *Topology) RegisterVolumeLayout(v storage.VolumeInfo, dn *DataNode) { vl.EnsureCorrectWritables(&v) } func (t *Topology) UnRegisterVolumeLayout(v storage.VolumeInfo, dn *DataNode) { - glog.Infof("removing volume info: %+v", v) + glog.Infof("removing volume info: %+v from %v", v, dn.id) diskType := types.ToDiskType(v.DiskType) volumeLayout := t.GetVolumeLayout(v.Collection, v.ReplicaPlacement, v.Ttl, diskType) volumeLayout.UnRegisterVolume(&v, dn) @@ -282,3 +305,14 @@ func (t *Topology) IncrementalSyncDataNodeRegistration(newVolumes, deletedVolume return } + +func (t *Topology) DataNodeRegistration(dcName, rackName string, dn *DataNode) { + if dn.Parent() != nil { + return + } + // registration to topo + dc := t.GetOrCreateDataCenter(dcName) + rack := dc.GetOrCreateRack(rackName) + rack.LinkChildNode(dn) + glog.Infof("[%s] reLink To topo ", dn.Id()) +} diff --git a/weed/topology/topology_ec.go b/weed/topology/topology_ec.go index 022eeb578..fdc4f274e 100644 --- a/weed/topology/topology_ec.go +++ b/weed/topology/topology_ec.go @@ -2,6 +2,7 @@ package topology import ( "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/pb" "github.com/chrislusf/seaweedfs/weed/pb/master_pb" "github.com/chrislusf/seaweedfs/weed/storage/erasure_coding" "github.com/chrislusf/seaweedfs/weed/storage/needle" @@ -135,16 +136,16 @@ func (t *Topology) LookupEcShards(vid needle.VolumeId) (locations *EcShardLocati return } -func (t *Topology) ListEcServersByCollection(collection string) (dataNodes []string) { +func (t *Topology) ListEcServersByCollection(collection string) (dataNodes []pb.ServerAddress) { t.ecShardMapLock.RLock() defer t.ecShardMapLock.RUnlock() - dateNodeMap := make(map[string]bool) + dateNodeMap := make(map[pb.ServerAddress]bool) for _, ecVolumeLocation := range t.ecShardMap { if ecVolumeLocation.Collection == collection { for _, locations := range ecVolumeLocation.Locations { for _, loc := range locations { - dateNodeMap[string(loc.Id())] = true + dateNodeMap[loc.ServerAddress()] = true } } } diff --git a/weed/topology/topology_event_handling.go b/weed/topology/topology_event_handling.go index 0f1db74df..fe3717233 100644 --- a/weed/topology/topology_event_handling.go +++ b/weed/topology/topology_event_handling.go @@ -1,6 +1,7 @@ package topology import ( + "github.com/chrislusf/seaweedfs/weed/storage/erasure_coding" "github.com/chrislusf/seaweedfs/weed/storage/types" "google.golang.org/grpc" "math/rand" @@ -24,7 +25,7 @@ func (t *Topology) StartRefreshWritableVolumes(grpcDialOption grpc.DialOption, g c := time.Tick(15 * time.Minute) for _ = range c { if t.IsLeader() { - t.Vacuum(grpcDialOption, garbageThreshold, preallocate) + t.Vacuum(grpcDialOption, garbageThreshold, 0, "", preallocate) } } }(garbageThreshold) @@ -84,7 +85,8 @@ func (t *Topology) UnRegisterDataNode(dn *DataNode) { negativeUsages := dn.GetDiskUsages().negative() dn.UpAdjustDiskUsageDelta(negativeUsages) - + dn.DeltaUpdateVolumes([]storage.VolumeInfo{}, dn.GetVolumes()) + dn.DeltaUpdateEcShards([]*erasure_coding.EcVolumeInfo{}, dn.GetEcShards()) if dn.Parent() != nil { dn.Parent().UnlinkChildNode(dn.Id()) } diff --git a/weed/topology/topology_test.go b/weed/topology/topology_test.go index ecfe9d8d1..2ece48a95 100644 --- a/weed/topology/topology_test.go +++ b/weed/topology/topology_test.go @@ -31,7 +31,7 @@ func TestHandlingVolumeServerHeartbeat(t *testing.T) { maxVolumeCounts := make(map[string]uint32) maxVolumeCounts[""] = 25 maxVolumeCounts["ssd"] = 12 - dn := rack.GetOrCreateDataNode("127.0.0.1", 34534, "127.0.0.1", maxVolumeCounts) + dn := rack.GetOrCreateDataNode("127.0.0.1", 34534, 0, "127.0.0.1", maxVolumeCounts) { volumeCount := 7 @@ -177,7 +177,7 @@ func TestAddRemoveVolume(t *testing.T) { maxVolumeCounts := make(map[string]uint32) maxVolumeCounts[""] = 25 maxVolumeCounts["ssd"] = 12 - dn := rack.GetOrCreateDataNode("127.0.0.1", 34534, "127.0.0.1", maxVolumeCounts) + dn := rack.GetOrCreateDataNode("127.0.0.1", 34534, 0, "127.0.0.1", maxVolumeCounts) v := storage.VolumeInfo{ Id: needle.VolumeId(1), diff --git a/weed/topology/topology_vacuum.go b/weed/topology/topology_vacuum.go index 9feb55b73..e53aa2853 100644 --- a/weed/topology/topology_vacuum.go +++ b/weed/topology/topology_vacuum.go @@ -2,6 +2,8 @@ package topology import ( "context" + "github.com/chrislusf/seaweedfs/weed/pb" + "io" "sync/atomic" "time" @@ -19,8 +21,8 @@ func (t *Topology) batchVacuumVolumeCheck(grpcDialOption grpc.DialOption, vid ne ch := make(chan int, locationlist.Length()) errCount := int32(0) for index, dn := range locationlist.list { - go func(index int, url string, vid needle.VolumeId) { - err := operation.WithVolumeServerClient(url, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { + go func(index int, url pb.ServerAddress, vid needle.VolumeId) { + err := operation.WithVolumeServerClient(false, url, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { resp, err := volumeServerClient.VacuumVolumeCheck(context.Background(), &volume_server_pb.VacuumVolumeCheckRequest{ VolumeId: uint32(vid), }) @@ -39,7 +41,7 @@ func (t *Topology) batchVacuumVolumeCheck(grpcDialOption grpc.DialOption, vid ne if err != nil { glog.V(0).Infof("Checking vacuuming %d on %s: %v", vid, url, err) } - }(index, dn.Url(), vid) + }(index, dn.ServerAddress(), vid) } vacuumLocationList := NewVolumeLocationList() @@ -58,6 +60,7 @@ func (t *Topology) batchVacuumVolumeCheck(grpcDialOption grpc.DialOption, vid ne } return vacuumLocationList, errCount == 0 && len(vacuumLocationList.list) > 0 } + func (t *Topology) batchVacuumVolumeCompact(grpcDialOption grpc.DialOption, vl *VolumeLayout, vid needle.VolumeId, locationlist *VolumeLocationList, preallocate int64) bool { vl.accessLock.Lock() @@ -66,14 +69,29 @@ func (t *Topology) batchVacuumVolumeCompact(grpcDialOption grpc.DialOption, vl * ch := make(chan bool, locationlist.Length()) for index, dn := range locationlist.list { - go func(index int, url string, vid needle.VolumeId) { + go func(index int, url pb.ServerAddress, vid needle.VolumeId) { glog.V(0).Infoln(index, "Start vacuuming", vid, "on", url) - err := operation.WithVolumeServerClient(url, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { - _, err := volumeServerClient.VacuumVolumeCompact(context.Background(), &volume_server_pb.VacuumVolumeCompactRequest{ + err := operation.WithVolumeServerClient(true, url, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { + stream, err := volumeServerClient.VacuumVolumeCompact(context.Background(), &volume_server_pb.VacuumVolumeCompactRequest{ VolumeId: uint32(vid), Preallocate: preallocate, }) - return err + if err != nil { + return err + } + + for { + resp, recvErr := stream.Recv() + if recvErr != nil { + if recvErr == io.EOF { + break + } else { + return recvErr + } + } + glog.V(0).Infof("%d vacuum %d on %s processed %d bytes", index, vid, url, resp.ProcessedBytes) + } + return nil }) if err != nil { glog.Errorf("Error when vacuuming %d on %s: %v", vid, url, err) @@ -82,7 +100,7 @@ func (t *Topology) batchVacuumVolumeCompact(grpcDialOption grpc.DialOption, vl * glog.V(0).Infof("Complete vacuuming %d on %s", vid, url) ch <- true } - }(index, dn.Url(), vid) + }(index, dn.ServerAddress(), vid) } isVacuumSuccess := true @@ -99,12 +117,13 @@ func (t *Topology) batchVacuumVolumeCompact(grpcDialOption grpc.DialOption, vl * } return isVacuumSuccess } + func (t *Topology) batchVacuumVolumeCommit(grpcDialOption grpc.DialOption, vl *VolumeLayout, vid needle.VolumeId, locationlist *VolumeLocationList) bool { isCommitSuccess := true isReadOnly := false for _, dn := range locationlist.list { glog.V(0).Infoln("Start Committing vacuum", vid, "on", dn.Url()) - err := operation.WithVolumeServerClient(dn.Url(), grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { + err := operation.WithVolumeServerClient(false, dn.ServerAddress(), grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { resp, err := volumeServerClient.VacuumVolumeCommit(context.Background(), &volume_server_pb.VacuumVolumeCommitRequest{ VolumeId: uint32(vid), }) @@ -127,10 +146,11 @@ func (t *Topology) batchVacuumVolumeCommit(grpcDialOption grpc.DialOption, vl *V } return isCommitSuccess } + func (t *Topology) batchVacuumVolumeCleanup(grpcDialOption grpc.DialOption, vl *VolumeLayout, vid needle.VolumeId, locationlist *VolumeLocationList) { for _, dn := range locationlist.list { glog.V(0).Infoln("Start cleaning up", vid, "on", dn.Url()) - err := operation.WithVolumeServerClient(dn.Url(), grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { + err := operation.WithVolumeServerClient(false, dn.ServerAddress(), grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { _, err := volumeServerClient.VacuumVolumeCleanup(context.Background(), &volume_server_pb.VacuumVolumeCleanupRequest{ VolumeId: uint32(vid), }) @@ -144,7 +164,7 @@ func (t *Topology) batchVacuumVolumeCleanup(grpcDialOption grpc.DialOption, vl * } } -func (t *Topology) Vacuum(grpcDialOption grpc.DialOption, garbageThreshold float64, preallocate int64) { +func (t *Topology) Vacuum(grpcDialOption grpc.DialOption, garbageThreshold float64, volumeId uint32, collection string, preallocate int64) { // if there is vacuum going on, return immediately swapped := atomic.CompareAndSwapInt64(&t.vacuumLockCounter, 0, 1) @@ -155,12 +175,19 @@ func (t *Topology) Vacuum(grpcDialOption grpc.DialOption, garbageThreshold float // now only one vacuum process going on - glog.V(1).Infof("Start vacuum on demand with threshold: %f", garbageThreshold) + glog.V(1).Infof("Start vacuum on demand with threshold: %f collection: %s volumeId: %d", + garbageThreshold, collection, volumeId) for _, col := range t.collectionMap.Items() { c := col.(*Collection) + if collection != "" && collection != c.Name { + continue + } for _, vl := range c.storageType2VolumeLayout.Items() { if vl != nil { volumeLayout := vl.(*VolumeLayout) + if volumeId > 0 && volumeLayout.Lookup(needle.VolumeId(volumeId)) == nil { + continue + } t.vacuumOneVolumeLayout(grpcDialOption, volumeLayout, c, garbageThreshold, preallocate) } } diff --git a/weed/topology/volume_growth.go b/weed/topology/volume_growth.go index e2949848d..238ca99f4 100644 --- a/weed/topology/volume_growth.go +++ b/weed/topology/volume_growth.go @@ -3,6 +3,7 @@ package topology import ( "encoding/json" "fmt" + "github.com/chrislusf/seaweedfs/weed/pb/master_pb" "math/rand" "sync" @@ -77,42 +78,50 @@ func (vg *VolumeGrowth) findVolumeCount(copyCount int) (count int) { return } -func (vg *VolumeGrowth) AutomaticGrowByType(option *VolumeGrowOption, grpcDialOption grpc.DialOption, topo *Topology, targetCount int) (count int, err error) { +func (vg *VolumeGrowth) AutomaticGrowByType(option *VolumeGrowOption, grpcDialOption grpc.DialOption, topo *Topology, targetCount int) (result []*master_pb.VolumeLocation, err error) { if targetCount == 0 { targetCount = vg.findVolumeCount(option.ReplicaPlacement.GetCopyCount()) } - count, err = vg.GrowByCountAndType(grpcDialOption, targetCount, option, topo) - if count > 0 && count%option.ReplicaPlacement.GetCopyCount() == 0 { - return count, nil + result, err = vg.GrowByCountAndType(grpcDialOption, targetCount, option, topo) + if len(result) > 0 && len(result)%option.ReplicaPlacement.GetCopyCount() == 0 { + return result, nil } - return count, err + return result, err } -func (vg *VolumeGrowth) GrowByCountAndType(grpcDialOption grpc.DialOption, targetCount int, option *VolumeGrowOption, topo *Topology) (counter int, err error) { +func (vg *VolumeGrowth) GrowByCountAndType(grpcDialOption grpc.DialOption, targetCount int, option *VolumeGrowOption, topo *Topology) (result []*master_pb.VolumeLocation, err error) { vg.accessLock.Lock() defer vg.accessLock.Unlock() for i := 0; i < targetCount; i++ { - if c, e := vg.findAndGrow(grpcDialOption, topo, option); e == nil { - counter += c + if res, e := vg.findAndGrow(grpcDialOption, topo, option); e == nil { + result = append(result, res...) } else { - glog.V(0).Infof("create %d volume, created %d: %v", targetCount, counter, e) - return counter, e + glog.V(0).Infof("create %d volume, created %d: %v", targetCount, len(result), e) + return result, e } } return } -func (vg *VolumeGrowth) findAndGrow(grpcDialOption grpc.DialOption, topo *Topology, option *VolumeGrowOption) (int, error) { +func (vg *VolumeGrowth) findAndGrow(grpcDialOption grpc.DialOption, topo *Topology, option *VolumeGrowOption) (result []*master_pb.VolumeLocation, err error) { servers, e := vg.findEmptySlotsForOneVolume(topo, option) if e != nil { - return 0, e + return nil, e } vid, raftErr := topo.NextVolumeId() if raftErr != nil { - return 0, raftErr + return nil, raftErr } - err := vg.grow(grpcDialOption, topo, vid, option, servers...) - return len(servers), err + if err = vg.grow(grpcDialOption, topo, vid, option, servers...); err == nil { + for _, server := range servers { + result = append(result, &master_pb.VolumeLocation{ + Url: server.Url(), + PublicUrl: server.PublicUrl, + NewVids: []uint32{uint32(vid)}, + }) + } + } + return } // 1. find the main data node @@ -181,7 +190,7 @@ func (vg *VolumeGrowth) findEmptySlotsForOneVolume(topo *Topology, option *Volum return nil, rackErr } - //find main rack and other racks + //find main server and other servers mainServer, otherServers, serverErr := mainRack.(*Rack).PickNodesByWeight(rp.SameRackCount+1, option, func(node Node) error { if option.DataNode != "" && node.IsDataNode() && node.Id() != NodeId(option.DataNode) { return fmt.Errorf("Not matching preferred data node:%s", option.DataNode) diff --git a/weed/topology/volume_layout.go b/weed/topology/volume_layout.go index f315cb7e4..167aee8ea 100644 --- a/weed/topology/volume_layout.go +++ b/weed/topology/volume_layout.go @@ -114,6 +114,8 @@ type VolumeLayout struct { volumeSizeLimit uint64 replicationAsMin bool accessLock sync.RWMutex + growRequestCount int + growRequestTime time.Time } type VolumeLayoutStats struct { @@ -138,9 +140,7 @@ func NewVolumeLayout(rp *super_block.ReplicaPlacement, ttl *needle.TTL, diskType } func (vl *VolumeLayout) String() string { - vl.accessLock.RLock() - defer vl.accessLock.RUnlock() - return fmt.Sprintf("rp:%v, ttl:%v, vid2location:%v, writables:%v, volumeSizeLimit:%v", vl.rp, vl.ttl, vl.vid2location, vl.writables, vl.volumeSizeLimit) + return fmt.Sprintf("rp:%v, ttl:%v, writables:%v, volumeSizeLimit:%v", vl.rp, vl.ttl, vl.writables, vl.volumeSizeLimit) } func (vl *VolumeLayout) RegisterVolume(v *storage.VolumeInfo, dn *DataNode) { @@ -218,6 +218,13 @@ func (vl *VolumeLayout) ensureCorrectWritables(vid needle.VolumeId) { vl.setVolumeWritable(vid) } } else { + if !vl.enoughCopies(vid) { + glog.V(0).Infof("volume %d does not have enough copies", vid) + } + if !vl.isAllWritable(vid) { + glog.V(0).Infof("volume %d are not all writable", vid) + } + glog.V(0).Infof("volume %d remove from writable", vid) vl.removeFromWritable(vid) } } @@ -279,7 +286,7 @@ func (vl *VolumeLayout) PickForWrite(count uint64, option *VolumeGrowOption) (*n //glog.V(0).Infoln("No more writable volumes!") return nil, 0, nil, errors.New("No more writable volumes!") } - if option.DataCenter == "" { + if option.DataCenter == "" && option.Rack == "" && option.DataNode == "" { vid := vl.writables[rand.Intn(lenWriters)] locationList := vl.vid2location[vid] if locationList != nil { @@ -293,23 +300,45 @@ func (vl *VolumeLayout) PickForWrite(count uint64, option *VolumeGrowOption) (*n for _, v := range vl.writables { volumeLocationList := vl.vid2location[v] for _, dn := range volumeLocationList.list { - if dn.GetDataCenter().Id() == NodeId(option.DataCenter) { - if option.Rack != "" && dn.GetRack().Id() != NodeId(option.Rack) { - continue - } - if option.DataNode != "" && dn.Id() != NodeId(option.DataNode) { - continue - } - counter++ - if rand.Intn(counter) < 1 { - vid, locationList = v, volumeLocationList - } + if option.DataCenter != "" && dn.GetDataCenter().Id() != NodeId(option.DataCenter) { + continue + } + if option.Rack != "" && dn.GetRack().Id() != NodeId(option.Rack) { + continue + } + if option.DataNode != "" && dn.Id() != NodeId(option.DataNode) { + continue + } + counter++ + if rand.Intn(counter) < 1 { + vid, locationList = v, volumeLocationList.Copy() } } } return &vid, count, locationList, nil } +func (vl *VolumeLayout) HasGrowRequest() bool { + if vl.growRequestCount > 0 && vl.growRequestTime.Add(time.Minute).After(time.Now()) { + return true + } + return false +} +func (vl *VolumeLayout) AddGrowRequest() { + vl.growRequestTime = time.Now() + vl.growRequestCount++ +} +func (vl *VolumeLayout) DoneGrowRequest() { + vl.growRequestTime = time.Unix(0, 0) + vl.growRequestCount = 0 +} + +func (vl *VolumeLayout) ShouldGrowVolumes(option *VolumeGrowOption) bool { + active, crowded := vl.GetActiveVolumeCount(option) + //glog.V(0).Infof("active volume: %d, high usage volume: %d\n", active, high) + return active <= crowded +} + func (vl *VolumeLayout) GetActiveVolumeCount(option *VolumeGrowOption) (active, crowded int) { vl.accessLock.RLock() defer vl.accessLock.RUnlock() @@ -411,8 +440,11 @@ func (vl *VolumeLayout) SetVolumeCapacityFull(vid needle.VolumeId) bool { vl.accessLock.Lock() defer vl.accessLock.Unlock() - // glog.V(0).Infoln("Volume", vid, "reaches full capacity.") - return vl.removeFromWritable(vid) + wasWritable := vl.removeFromWritable(vid) + if wasWritable { + glog.V(0).Infof("Volume %d reaches full capacity.", vid) + } + return wasWritable } func (vl *VolumeLayout) removeFromCrowded(vid needle.VolumeId) { diff --git a/weed/topology/volume_location_list.go b/weed/topology/volume_location_list.go index 548c4cd25..03580ae5b 100644 --- a/weed/topology/volume_location_list.go +++ b/weed/topology/volume_location_list.go @@ -31,6 +31,11 @@ func (dnll *VolumeLocationList) Head() *DataNode { return dnll.list[0] } +func (dnll *VolumeLocationList) Rest() []*DataNode { + //mark first node as master volume + return dnll.list[1:] +} + func (dnll *VolumeLocationList) Length() int { if dnll == nil { return 0 |
