diff options
Diffstat (limited to 'weed/topology')
| -rw-r--r-- | weed/topology/allocate_volume.go | 23 | ||||
| -rw-r--r-- | weed/topology/cluster_commands.go | 6 | ||||
| -rw-r--r-- | weed/topology/collection.go | 18 | ||||
| -rw-r--r-- | weed/topology/data_center.go | 18 | ||||
| -rw-r--r-- | weed/topology/data_node.go | 107 | ||||
| -rw-r--r-- | weed/topology/data_node_ec.go | 135 | ||||
| -rw-r--r-- | weed/topology/node.go | 185 | ||||
| -rw-r--r-- | weed/topology/rack.go | 19 | ||||
| -rw-r--r-- | weed/topology/store_replicate.go | 210 | ||||
| -rw-r--r-- | weed/topology/topology.go | 123 | ||||
| -rw-r--r-- | weed/topology/topology_ec.go | 173 | ||||
| -rw-r--r-- | weed/topology/topology_event_handling.go | 6 | ||||
| -rw-r--r-- | weed/topology/topology_map.go | 21 | ||||
| -rw-r--r-- | weed/topology/topology_test.go | 78 | ||||
| -rw-r--r-- | weed/topology/topology_vacuum.go | 121 | ||||
| -rw-r--r-- | weed/topology/volume_growth.go | 75 | ||||
| -rw-r--r-- | weed/topology/volume_growth_test.go | 221 | ||||
| -rw-r--r-- | weed/topology/volume_layout.go | 108 | ||||
| -rw-r--r-- | weed/topology/volume_location_list.go | 4 |
19 files changed, 1309 insertions, 342 deletions
diff --git a/weed/topology/allocate_volume.go b/weed/topology/allocate_volume.go index 55796ab43..e5dc48652 100644 --- a/weed/topology/allocate_volume.go +++ b/weed/topology/allocate_volume.go @@ -2,29 +2,28 @@ package topology import ( "context" - "time" "github.com/chrislusf/seaweedfs/weed/operation" "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb" - "github.com/chrislusf/seaweedfs/weed/storage" + "github.com/chrislusf/seaweedfs/weed/storage/needle" + "google.golang.org/grpc" ) type AllocateVolumeResult struct { Error string } -func AllocateVolume(dn *DataNode, vid storage.VolumeId, option *VolumeGrowOption) error { +func AllocateVolume(dn *DataNode, grpcDialOption grpc.DialOption, vid needle.VolumeId, option *VolumeGrowOption) error { - return operation.WithVolumeServerClient(dn.Url(), func(client volume_server_pb.VolumeServerClient) error { - ctx, cancel := context.WithTimeout(context.Background(), time.Duration(5*time.Second)) - defer cancel() + return operation.WithVolumeServerClient(dn.Url(), grpcDialOption, func(client volume_server_pb.VolumeServerClient) error { - _, deleteErr := client.AssignVolume(ctx, &volume_server_pb.AssignVolumeRequest{ - VolumdId: uint32(vid), - Collection: option.Collection, - Replication: option.ReplicaPlacement.String(), - Ttl: option.Ttl.String(), - Preallocate: option.Prealloacte, + _, deleteErr := client.AllocateVolume(context.Background(), &volume_server_pb.AllocateVolumeRequest{ + VolumeId: uint32(vid), + Collection: option.Collection, + Replication: option.ReplicaPlacement.String(), + Ttl: option.Ttl.String(), + Preallocate: option.Prealloacte, + MemoryMapMaxSizeMb: option.MemoryMapMaxSizeMb, }) return deleteErr }) diff --git a/weed/topology/cluster_commands.go b/weed/topology/cluster_commands.go index 7a36c25ec..152691ccb 100644 --- a/weed/topology/cluster_commands.go +++ b/weed/topology/cluster_commands.go @@ -3,14 +3,14 @@ package topology import ( "github.com/chrislusf/raft" "github.com/chrislusf/seaweedfs/weed/glog" - "github.com/chrislusf/seaweedfs/weed/storage" + "github.com/chrislusf/seaweedfs/weed/storage/needle" ) type MaxVolumeIdCommand struct { - MaxVolumeId storage.VolumeId `json:"maxVolumeId"` + MaxVolumeId needle.VolumeId `json:"maxVolumeId"` } -func NewMaxVolumeIdCommand(value storage.VolumeId) *MaxVolumeIdCommand { +func NewMaxVolumeIdCommand(value needle.VolumeId) *MaxVolumeIdCommand { return &MaxVolumeIdCommand{ MaxVolumeId: value, } diff --git a/weed/topology/collection.go b/weed/topology/collection.go index a17f0c961..5b410d1eb 100644 --- a/weed/topology/collection.go +++ b/weed/topology/collection.go @@ -3,18 +3,24 @@ package topology import ( "fmt" - "github.com/chrislusf/seaweedfs/weed/storage" + "github.com/chrislusf/seaweedfs/weed/storage/needle" + "github.com/chrislusf/seaweedfs/weed/storage/super_block" "github.com/chrislusf/seaweedfs/weed/util" ) type Collection struct { Name string volumeSizeLimit uint64 + replicationAsMin bool storageType2VolumeLayout *util.ConcurrentReadMap } -func NewCollection(name string, volumeSizeLimit uint64) *Collection { - c := &Collection{Name: name, volumeSizeLimit: volumeSizeLimit} +func NewCollection(name string, volumeSizeLimit uint64, replicationAsMin bool) *Collection { + c := &Collection{ + Name: name, + volumeSizeLimit: volumeSizeLimit, + replicationAsMin: replicationAsMin, + } c.storageType2VolumeLayout = util.NewConcurrentReadMap() return c } @@ -23,18 +29,18 @@ func (c *Collection) String() string { return fmt.Sprintf("Name:%s, volumeSizeLimit:%d, storageType2VolumeLayout:%v", c.Name, c.volumeSizeLimit, c.storageType2VolumeLayout) } -func (c *Collection) GetOrCreateVolumeLayout(rp *storage.ReplicaPlacement, ttl *storage.TTL) *VolumeLayout { +func (c *Collection) GetOrCreateVolumeLayout(rp *super_block.ReplicaPlacement, ttl *needle.TTL) *VolumeLayout { keyString := rp.String() if ttl != nil { keyString += ttl.String() } vl := c.storageType2VolumeLayout.Get(keyString, func() interface{} { - return NewVolumeLayout(rp, ttl, c.volumeSizeLimit) + return NewVolumeLayout(rp, ttl, c.volumeSizeLimit, c.replicationAsMin) }) return vl.(*VolumeLayout) } -func (c *Collection) Lookup(vid storage.VolumeId) []*DataNode { +func (c *Collection) Lookup(vid needle.VolumeId) []*DataNode { for _, vl := range c.storageType2VolumeLayout.Items() { if vl != nil { if list := vl.(*VolumeLayout).Lookup(vid); list != nil { diff --git a/weed/topology/data_center.go b/weed/topology/data_center.go index bcf2dfd31..dc3accb71 100644 --- a/weed/topology/data_center.go +++ b/weed/topology/data_center.go @@ -1,5 +1,7 @@ package topology +import "github.com/chrislusf/seaweedfs/weed/pb/master_pb" + type DataCenter struct { NodeImpl } @@ -38,3 +40,19 @@ func (dc *DataCenter) ToMap() interface{} { m["Racks"] = racks return m } + +func (dc *DataCenter) ToDataCenterInfo() *master_pb.DataCenterInfo { + m := &master_pb.DataCenterInfo{ + Id: string(dc.Id()), + VolumeCount: uint64(dc.GetVolumeCount()), + MaxVolumeCount: uint64(dc.GetMaxVolumeCount()), + FreeVolumeCount: uint64(dc.FreeSpace()), + ActiveVolumeCount: uint64(dc.GetActiveVolumeCount()), + RemoteVolumeCount: uint64(dc.GetRemoteVolumeCount()), + } + for _, c := range dc.Children() { + rack := c.(*Rack) + m.RackInfos = append(m.RackInfos, rack.ToRackInfo()) + } + return m +} diff --git a/weed/topology/data_node.go b/weed/topology/data_node.go index 6ea6d3938..efdf5285b 100644 --- a/weed/topology/data_node.go +++ b/weed/topology/data_node.go @@ -2,7 +2,13 @@ package topology import ( "fmt" + "github.com/chrislusf/seaweedfs/weed/util" "strconv" + "sync" + + "github.com/chrislusf/seaweedfs/weed/pb/master_pb" + "github.com/chrislusf/seaweedfs/weed/storage/erasure_coding" + "github.com/chrislusf/seaweedfs/weed/storage/needle" "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/storage" @@ -10,18 +16,21 @@ import ( type DataNode struct { NodeImpl - volumes map[storage.VolumeId]storage.VolumeInfo - Ip string - Port int - PublicUrl string - LastSeen int64 // unix time in seconds + volumes map[needle.VolumeId]storage.VolumeInfo + Ip string + Port int + PublicUrl string + LastSeen int64 // unix time in seconds + ecShards map[needle.VolumeId]*erasure_coding.EcVolumeInfo + ecShardsLock sync.RWMutex } func NewDataNode(id string) *DataNode { s := &DataNode{} s.id = NodeId(id) s.nodeType = "DataNode" - s.volumes = make(map[storage.VolumeId]storage.VolumeInfo) + s.volumes = make(map[needle.VolumeId]storage.VolumeInfo) + s.ecShards = make(map[needle.VolumeId]*erasure_coding.EcVolumeInfo) s.NodeImpl.value = s return s } @@ -32,25 +41,37 @@ func (dn *DataNode) String() string { return fmt.Sprintf("Node:%s, volumes:%v, Ip:%s, Port:%d, PublicUrl:%s", dn.NodeImpl.String(), dn.volumes, dn.Ip, dn.Port, dn.PublicUrl) } -func (dn *DataNode) AddOrUpdateVolume(v storage.VolumeInfo) (isNew bool) { +func (dn *DataNode) AddOrUpdateVolume(v storage.VolumeInfo) (isNew, isChangedRO bool) { dn.Lock() defer dn.Unlock() - if _, ok := dn.volumes[v.Id]; !ok { + if oldV, ok := dn.volumes[v.Id]; !ok { dn.volumes[v.Id] = v dn.UpAdjustVolumeCountDelta(1) + if v.IsRemote() { + dn.UpAdjustRemoteVolumeCountDelta(1) + } if !v.ReadOnly { dn.UpAdjustActiveVolumeCountDelta(1) } dn.UpAdjustMaxVolumeId(v.Id) isNew = true } else { + if oldV.IsRemote() != v.IsRemote() { + if v.IsRemote() { + dn.UpAdjustRemoteVolumeCountDelta(1) + } + if oldV.IsRemote() { + dn.UpAdjustRemoteVolumeCountDelta(-1) + } + } + isChangedRO = dn.volumes[v.Id].ReadOnly != v.ReadOnly dn.volumes[v.Id] = v } return } -func (dn *DataNode) UpdateVolumes(actualVolumes []storage.VolumeInfo) (newVolumes, deletedVolumes []storage.VolumeInfo) { - actualVolumeMap := make(map[storage.VolumeId]storage.VolumeInfo) +func (dn *DataNode) UpdateVolumes(actualVolumes []storage.VolumeInfo) (newVolumes, deletedVolumes, changeRO []storage.VolumeInfo) { + actualVolumeMap := make(map[needle.VolumeId]storage.VolumeInfo) for _, v := range actualVolumes { actualVolumeMap[v.Id] = v } @@ -61,15 +82,42 @@ func (dn *DataNode) UpdateVolumes(actualVolumes []storage.VolumeInfo) (newVolume delete(dn.volumes, vid) deletedVolumes = append(deletedVolumes, v) dn.UpAdjustVolumeCountDelta(-1) - dn.UpAdjustActiveVolumeCountDelta(-1) + if v.IsRemote() { + dn.UpAdjustRemoteVolumeCountDelta(-1) + } + if !v.ReadOnly { + dn.UpAdjustActiveVolumeCountDelta(-1) + } } } dn.Unlock() for _, v := range actualVolumes { - isNew := dn.AddOrUpdateVolume(v) + isNew, isChangedRO := dn.AddOrUpdateVolume(v) if isNew { newVolumes = append(newVolumes, v) } + if isChangedRO { + changeRO = append(changeRO, v) + } + } + return +} + +func (dn *DataNode) DeltaUpdateVolumes(newlVolumes, deletedVolumes []storage.VolumeInfo) { + dn.Lock() + for _, v := range deletedVolumes { + delete(dn.volumes, v.Id) + dn.UpAdjustVolumeCountDelta(-1) + if v.IsRemote() { + dn.UpAdjustRemoteVolumeCountDelta(-1) + } + if !v.ReadOnly { + dn.UpAdjustActiveVolumeCountDelta(-1) + } + } + dn.Unlock() + for _, v := range newlVolumes { + dn.AddOrUpdateVolume(v) } return } @@ -83,7 +131,7 @@ func (dn *DataNode) GetVolumes() (ret []storage.VolumeInfo) { return ret } -func (dn *DataNode) GetVolumesById(id storage.VolumeId) (storage.VolumeInfo, error) { +func (dn *DataNode) GetVolumesById(id needle.VolumeId) (storage.VolumeInfo, error) { dn.RLock() defer dn.RUnlock() vInfo, ok := dn.volumes[id] @@ -123,8 +171,41 @@ func (dn *DataNode) ToMap() interface{} { ret := make(map[string]interface{}) ret["Url"] = dn.Url() ret["Volumes"] = dn.GetVolumeCount() + ret["VolumeIds"] = dn.GetVolumeIds() + ret["EcShards"] = dn.GetEcShardCount() ret["Max"] = dn.GetMaxVolumeCount() ret["Free"] = dn.FreeSpace() ret["PublicUrl"] = dn.PublicUrl return ret } + +func (dn *DataNode) ToDataNodeInfo() *master_pb.DataNodeInfo { + m := &master_pb.DataNodeInfo{ + Id: string(dn.Id()), + VolumeCount: uint64(dn.GetVolumeCount()), + MaxVolumeCount: uint64(dn.GetMaxVolumeCount()), + FreeVolumeCount: uint64(dn.FreeSpace()), + ActiveVolumeCount: uint64(dn.GetActiveVolumeCount()), + RemoteVolumeCount: uint64(dn.GetRemoteVolumeCount()), + } + for _, v := range dn.GetVolumes() { + m.VolumeInfos = append(m.VolumeInfos, v.ToVolumeInformationMessage()) + } + for _, ecv := range dn.GetEcShards() { + m.EcShardInfos = append(m.EcShardInfos, ecv.ToVolumeEcShardInformationMessage()) + } + return m +} + +// GetVolumeIds returns the human readable volume ids limited to count of max 100. +func (dn *DataNode) GetVolumeIds() string { + dn.RLock() + defer dn.RUnlock() + ids := make([]int, 0, len(dn.volumes)) + + for k := range dn.volumes { + ids = append(ids, int(k)) + } + + return util.HumanReadableIntsMax(100, ids...) +} diff --git a/weed/topology/data_node_ec.go b/weed/topology/data_node_ec.go new file mode 100644 index 000000000..75c8784fe --- /dev/null +++ b/weed/topology/data_node_ec.go @@ -0,0 +1,135 @@ +package topology + +import ( + "github.com/chrislusf/seaweedfs/weed/storage/erasure_coding" + "github.com/chrislusf/seaweedfs/weed/storage/needle" +) + +func (dn *DataNode) GetEcShards() (ret []*erasure_coding.EcVolumeInfo) { + dn.RLock() + for _, ecVolumeInfo := range dn.ecShards { + ret = append(ret, ecVolumeInfo) + } + dn.RUnlock() + return ret +} + +func (dn *DataNode) UpdateEcShards(actualShards []*erasure_coding.EcVolumeInfo) (newShards, deletedShards []*erasure_coding.EcVolumeInfo) { + // prepare the new ec shard map + actualEcShardMap := make(map[needle.VolumeId]*erasure_coding.EcVolumeInfo) + for _, ecShards := range actualShards { + actualEcShardMap[ecShards.VolumeId] = ecShards + } + + // found out the newShards and deletedShards + var newShardCount, deletedShardCount int + dn.ecShardsLock.RLock() + for vid, ecShards := range dn.ecShards { + if actualEcShards, ok := actualEcShardMap[vid]; !ok { + // dn registered ec shards not found in the new set of ec shards + deletedShards = append(deletedShards, ecShards) + deletedShardCount += ecShards.ShardIdCount() + } else { + // found, but maybe the actual shard could be missing + a := actualEcShards.Minus(ecShards) + if a.ShardIdCount() > 0 { + newShards = append(newShards, a) + newShardCount += a.ShardIdCount() + } + d := ecShards.Minus(actualEcShards) + if d.ShardIdCount() > 0 { + deletedShards = append(deletedShards, d) + deletedShardCount += d.ShardIdCount() + } + } + } + for _, ecShards := range actualShards { + if _, found := dn.ecShards[ecShards.VolumeId]; !found { + newShards = append(newShards, ecShards) + newShardCount += ecShards.ShardIdCount() + } + } + dn.ecShardsLock.RUnlock() + + if len(newShards) > 0 || len(deletedShards) > 0 { + // if changed, set to the new ec shard map + dn.ecShardsLock.Lock() + dn.ecShards = actualEcShardMap + dn.UpAdjustEcShardCountDelta(int64(newShardCount - deletedShardCount)) + dn.ecShardsLock.Unlock() + } + + return +} + +func (dn *DataNode) DeltaUpdateEcShards(newShards, deletedShards []*erasure_coding.EcVolumeInfo) { + + for _, newShard := range newShards { + dn.AddOrUpdateEcShard(newShard) + } + + for _, deletedShard := range deletedShards { + dn.DeleteEcShard(deletedShard) + } + +} + +func (dn *DataNode) AddOrUpdateEcShard(s *erasure_coding.EcVolumeInfo) { + dn.ecShardsLock.Lock() + defer dn.ecShardsLock.Unlock() + + delta := 0 + if existing, ok := dn.ecShards[s.VolumeId]; !ok { + dn.ecShards[s.VolumeId] = s + delta = s.ShardBits.ShardIdCount() + } else { + oldCount := existing.ShardBits.ShardIdCount() + existing.ShardBits = existing.ShardBits.Plus(s.ShardBits) + delta = existing.ShardBits.ShardIdCount() - oldCount + } + + dn.UpAdjustEcShardCountDelta(int64(delta)) + +} + +func (dn *DataNode) DeleteEcShard(s *erasure_coding.EcVolumeInfo) { + dn.ecShardsLock.Lock() + defer dn.ecShardsLock.Unlock() + + if existing, ok := dn.ecShards[s.VolumeId]; ok { + oldCount := existing.ShardBits.ShardIdCount() + existing.ShardBits = existing.ShardBits.Minus(s.ShardBits) + delta := existing.ShardBits.ShardIdCount() - oldCount + dn.UpAdjustEcShardCountDelta(int64(delta)) + if existing.ShardBits.ShardIdCount() == 0 { + delete(dn.ecShards, s.VolumeId) + } + } + +} + +func (dn *DataNode) HasVolumesById(id needle.VolumeId) (hasVolumeId bool) { + + // check whether normal volumes has this volume id + dn.RLock() + _, ok := dn.volumes[id] + if ok { + hasVolumeId = true + } + dn.RUnlock() + + if hasVolumeId { + return + } + + // check whether ec shards has this volume id + dn.ecShardsLock.RLock() + _, ok = dn.ecShards[id] + if ok { + hasVolumeId = true + } + dn.ecShardsLock.RUnlock() + + return + +} diff --git a/weed/topology/node.go b/weed/topology/node.go index b7d2f79ec..114417edf 100644 --- a/weed/topology/node.go +++ b/weed/topology/node.go @@ -5,26 +5,32 @@ import ( "math/rand" "strings" "sync" + "sync/atomic" "github.com/chrislusf/seaweedfs/weed/glog" - "github.com/chrislusf/seaweedfs/weed/storage" + "github.com/chrislusf/seaweedfs/weed/storage/erasure_coding" + "github.com/chrislusf/seaweedfs/weed/storage/needle" ) type NodeId string type Node interface { Id() NodeId String() string - FreeSpace() int - ReserveOneVolume(r int) (*DataNode, error) - UpAdjustMaxVolumeCountDelta(maxVolumeCountDelta int) - UpAdjustVolumeCountDelta(volumeCountDelta int) - UpAdjustActiveVolumeCountDelta(activeVolumeCountDelta int) - UpAdjustMaxVolumeId(vid storage.VolumeId) + FreeSpace() int64 + ReserveOneVolume(r int64) (*DataNode, error) + UpAdjustMaxVolumeCountDelta(maxVolumeCountDelta int64) + UpAdjustVolumeCountDelta(volumeCountDelta int64) + UpAdjustRemoteVolumeCountDelta(remoteVolumeCountDelta int64) + UpAdjustEcShardCountDelta(ecShardCountDelta int64) + UpAdjustActiveVolumeCountDelta(activeVolumeCountDelta int64) + UpAdjustMaxVolumeId(vid needle.VolumeId) - GetVolumeCount() int - GetActiveVolumeCount() int - GetMaxVolumeCount() int - GetMaxVolumeId() storage.VolumeId + GetVolumeCount() int64 + GetEcShardCount() int64 + GetActiveVolumeCount() int64 + GetRemoteVolumeCount() int64 + GetMaxVolumeCount() int64 + GetMaxVolumeId() needle.VolumeId SetParent(Node) LinkChildNode(node Node) UnlinkChildNode(nodeId NodeId) @@ -39,14 +45,16 @@ type Node interface { GetValue() interface{} //get reference to the topology,dc,rack,datanode } type NodeImpl struct { + volumeCount int64 + remoteVolumeCount int64 + activeVolumeCount int64 + ecShardCount int64 + maxVolumeCount int64 id NodeId - volumeCount int - activeVolumeCount int - maxVolumeCount int parent Node sync.RWMutex // lock children children map[NodeId]Node - maxVolumeId storage.VolumeId + maxVolumeId needle.VolumeId //for rack, data center, topology nodeType string @@ -54,56 +62,64 @@ type NodeImpl struct { } // the first node must satisfy filterFirstNodeFn(), the rest nodes must have one free slot -func (n *NodeImpl) RandomlyPickNodes(numberOfNodes int, filterFirstNodeFn func(dn Node) error) (firstNode Node, restNodes []Node, err error) { - candidates := make([]Node, 0, len(n.children)) +func (n *NodeImpl) PickNodesByWeight(numberOfNodes int, filterFirstNodeFn func(dn Node) error) (firstNode Node, restNodes []Node, err error) { + var totalWeights int64 var errs []string n.RLock() + candidates := make([]Node, 0, len(n.children)) + candidatesWeights := make([]int64, 0, len(n.children)) + //pick nodes which has enough free volumes as candidates, and use free volumes number as node weight. for _, node := range n.children { - if err := filterFirstNodeFn(node); err == nil { - candidates = append(candidates, node) - } else { - errs = append(errs, string(node.Id())+":"+err.Error()) + if node.FreeSpace() <= 0 { + continue } + totalWeights += node.FreeSpace() + candidates = append(candidates, node) + candidatesWeights = append(candidatesWeights, node.FreeSpace()) } n.RUnlock() - if len(candidates) == 0 { - return nil, nil, errors.New("No matching data node found! \n" + strings.Join(errs, "\n")) + if len(candidates) < numberOfNodes { + glog.V(0).Infoln(n.Id(), "failed to pick", numberOfNodes, "from ", len(candidates), "node candidates") + return nil, nil, errors.New("No enough data node found!") } - firstNode = candidates[rand.Intn(len(candidates))] - glog.V(2).Infoln(n.Id(), "picked main node:", firstNode.Id()) - restNodes = make([]Node, numberOfNodes-1) - candidates = candidates[:0] - n.RLock() - for _, node := range n.children { - if node.Id() == firstNode.Id() { - continue - } - if node.FreeSpace() <= 0 { - continue + //pick nodes randomly by weights, the node picked earlier has higher final weights + sortedCandidates := make([]Node, 0, len(candidates)) + for i := 0; i < len(candidates); i++ { + weightsInterval := rand.Int63n(totalWeights) + lastWeights := int64(0) + for k, weights := range candidatesWeights { + if (weightsInterval >= lastWeights) && (weightsInterval < lastWeights+weights) { + sortedCandidates = append(sortedCandidates, candidates[k]) + candidatesWeights[k] = 0 + totalWeights -= weights + break + } + lastWeights += weights } - glog.V(2).Infoln("select rest node candidate:", node.Id()) - candidates = append(candidates, node) } - n.RUnlock() - glog.V(2).Infoln(n.Id(), "picking", numberOfNodes-1, "from rest", len(candidates), "node candidates") - ret := len(restNodes) == 0 - for k, node := range candidates { - if k < len(restNodes) { - restNodes[k] = node - if k == len(restNodes)-1 { - ret = true + + restNodes = make([]Node, 0, numberOfNodes-1) + ret := false + n.RLock() + for k, node := range sortedCandidates { + if err := filterFirstNodeFn(node); err == nil { + firstNode = node + if k >= numberOfNodes-1 { + restNodes = sortedCandidates[:numberOfNodes-1] + } else { + restNodes = append(restNodes, sortedCandidates[:k]...) + restNodes = append(restNodes, sortedCandidates[k+1:numberOfNodes]...) } + ret = true + break } else { - r := rand.Intn(k + 1) - if r < len(restNodes) { - restNodes[r] = node - } + errs = append(errs, string(node.Id())+":"+err.Error()) } } + n.RUnlock() if !ret { - glog.V(2).Infoln(n.Id(), "failed to pick", numberOfNodes-1, "from rest", len(candidates), "node candidates") - err = errors.New("No enough data node found!") + return nil, nil, errors.New("No matching data node found! \n" + strings.Join(errs, "\n")) } return } @@ -126,8 +142,12 @@ func (n *NodeImpl) String() string { func (n *NodeImpl) Id() NodeId { return n.id } -func (n *NodeImpl) FreeSpace() int { - return n.maxVolumeCount - n.volumeCount +func (n *NodeImpl) FreeSpace() int64 { + freeVolumeSlotCount := n.maxVolumeCount + n.remoteVolumeCount - n.volumeCount + if n.ecShardCount > 0 { + freeVolumeSlotCount = freeVolumeSlotCount - n.ecShardCount/erasure_coding.DataShardsCount - 1 + } + return freeVolumeSlotCount } func (n *NodeImpl) SetParent(node Node) { n.parent = node @@ -146,7 +166,7 @@ func (n *NodeImpl) Parent() Node { func (n *NodeImpl) GetValue() interface{} { return n.value } -func (n *NodeImpl) ReserveOneVolume(r int) (assignedNode *DataNode, err error) { +func (n *NodeImpl) ReserveOneVolume(r int64) (assignedNode *DataNode, err error) { n.RLock() defer n.RUnlock() for _, node := range n.children { @@ -171,25 +191,52 @@ func (n *NodeImpl) ReserveOneVolume(r int) (assignedNode *DataNode, err error) { return nil, errors.New("No free volume slot found!") } -func (n *NodeImpl) UpAdjustMaxVolumeCountDelta(maxVolumeCountDelta int) { //can be negative - n.maxVolumeCount += maxVolumeCountDelta +func (n *NodeImpl) UpAdjustMaxVolumeCountDelta(maxVolumeCountDelta int64) { //can be negative + if maxVolumeCountDelta == 0 { + return + } + atomic.AddInt64(&n.maxVolumeCount, maxVolumeCountDelta) if n.parent != nil { n.parent.UpAdjustMaxVolumeCountDelta(maxVolumeCountDelta) } } -func (n *NodeImpl) UpAdjustVolumeCountDelta(volumeCountDelta int) { //can be negative - n.volumeCount += volumeCountDelta +func (n *NodeImpl) UpAdjustVolumeCountDelta(volumeCountDelta int64) { //can be negative + if volumeCountDelta == 0 { + return + } + atomic.AddInt64(&n.volumeCount, volumeCountDelta) if n.parent != nil { n.parent.UpAdjustVolumeCountDelta(volumeCountDelta) } } -func (n *NodeImpl) UpAdjustActiveVolumeCountDelta(activeVolumeCountDelta int) { //can be negative - n.activeVolumeCount += activeVolumeCountDelta +func (n *NodeImpl) UpAdjustRemoteVolumeCountDelta(remoteVolumeCountDelta int64) { //can be negative + if remoteVolumeCountDelta == 0 { + return + } + atomic.AddInt64(&n.remoteVolumeCount, remoteVolumeCountDelta) + if n.parent != nil { + n.parent.UpAdjustRemoteVolumeCountDelta(remoteVolumeCountDelta) + } +} +func (n *NodeImpl) UpAdjustEcShardCountDelta(ecShardCountDelta int64) { //can be negative + if ecShardCountDelta == 0 { + return + } + atomic.AddInt64(&n.ecShardCount, ecShardCountDelta) + if n.parent != nil { + n.parent.UpAdjustEcShardCountDelta(ecShardCountDelta) + } +} +func (n *NodeImpl) UpAdjustActiveVolumeCountDelta(activeVolumeCountDelta int64) { //can be negative + if activeVolumeCountDelta == 0 { + return + } + atomic.AddInt64(&n.activeVolumeCount, activeVolumeCountDelta) if n.parent != nil { n.parent.UpAdjustActiveVolumeCountDelta(activeVolumeCountDelta) } } -func (n *NodeImpl) UpAdjustMaxVolumeId(vid storage.VolumeId) { //can be negative +func (n *NodeImpl) UpAdjustMaxVolumeId(vid needle.VolumeId) { //can be negative if n.maxVolumeId < vid { n.maxVolumeId = vid if n.parent != nil { @@ -197,16 +244,22 @@ func (n *NodeImpl) UpAdjustMaxVolumeId(vid storage.VolumeId) { //can be negative } } } -func (n *NodeImpl) GetMaxVolumeId() storage.VolumeId { +func (n *NodeImpl) GetMaxVolumeId() needle.VolumeId { return n.maxVolumeId } -func (n *NodeImpl) GetVolumeCount() int { +func (n *NodeImpl) GetVolumeCount() int64 { return n.volumeCount } -func (n *NodeImpl) GetActiveVolumeCount() int { +func (n *NodeImpl) GetEcShardCount() int64 { + return n.ecShardCount +} +func (n *NodeImpl) GetRemoteVolumeCount() int64 { + return n.remoteVolumeCount +} +func (n *NodeImpl) GetActiveVolumeCount() int64 { return n.activeVolumeCount } -func (n *NodeImpl) GetMaxVolumeCount() int { +func (n *NodeImpl) GetMaxVolumeCount() int64 { return n.maxVolumeCount } @@ -218,6 +271,8 @@ func (n *NodeImpl) LinkChildNode(node Node) { n.UpAdjustMaxVolumeCountDelta(node.GetMaxVolumeCount()) n.UpAdjustMaxVolumeId(node.GetMaxVolumeId()) n.UpAdjustVolumeCountDelta(node.GetVolumeCount()) + n.UpAdjustRemoteVolumeCountDelta(node.GetRemoteVolumeCount()) + n.UpAdjustEcShardCountDelta(node.GetEcShardCount()) n.UpAdjustActiveVolumeCountDelta(node.GetActiveVolumeCount()) node.SetParent(n) glog.V(0).Infoln(n, "adds child", node.Id()) @@ -232,6 +287,8 @@ func (n *NodeImpl) UnlinkChildNode(nodeId NodeId) { node.SetParent(nil) delete(n.children, node.Id()) n.UpAdjustVolumeCountDelta(-node.GetVolumeCount()) + n.UpAdjustRemoteVolumeCountDelta(-node.GetRemoteVolumeCount()) + n.UpAdjustEcShardCountDelta(-node.GetEcShardCount()) n.UpAdjustActiveVolumeCountDelta(-node.GetActiveVolumeCount()) n.UpAdjustMaxVolumeCountDelta(-node.GetMaxVolumeCount()) glog.V(0).Infoln(n, "removes", node.Id()) diff --git a/weed/topology/rack.go b/weed/topology/rack.go index a48d64323..1921c0c05 100644 --- a/weed/topology/rack.go +++ b/weed/topology/rack.go @@ -1,6 +1,7 @@ package topology import ( + "github.com/chrislusf/seaweedfs/weed/pb/master_pb" "strconv" "time" ) @@ -27,7 +28,7 @@ func (r *Rack) FindDataNode(ip string, port int) *DataNode { } return nil } -func (r *Rack) GetOrCreateDataNode(ip string, port int, publicUrl string, maxVolumeCount int) *DataNode { +func (r *Rack) GetOrCreateDataNode(ip string, port int, publicUrl string, maxVolumeCount int64) *DataNode { for _, c := range r.Children() { dn := c.(*DataNode) if dn.MatchLocation(ip, port) { @@ -58,3 +59,19 @@ func (r *Rack) ToMap() interface{} { m["DataNodes"] = dns return m } + +func (r *Rack) ToRackInfo() *master_pb.RackInfo { + m := &master_pb.RackInfo{ + Id: string(r.Id()), + VolumeCount: uint64(r.GetVolumeCount()), + MaxVolumeCount: uint64(r.GetMaxVolumeCount()), + FreeVolumeCount: uint64(r.FreeSpace()), + ActiveVolumeCount: uint64(r.GetActiveVolumeCount()), + RemoteVolumeCount: uint64(r.GetRemoteVolumeCount()), + } + for _, c := range r.Children() { + dn := c.(*DataNode) + m.DataNodeInfos = append(m.DataNodeInfos, dn.ToDataNodeInfo()) + } + return m +} diff --git a/weed/topology/store_replicate.go b/weed/topology/store_replicate.go index c73fb706a..481e72fe0 100644 --- a/weed/topology/store_replicate.go +++ b/weed/topology/store_replicate.go @@ -1,7 +1,6 @@ package topology import ( - "bytes" "encoding/json" "errors" "fmt" @@ -14,101 +13,113 @@ import ( "github.com/chrislusf/seaweedfs/weed/operation" "github.com/chrislusf/seaweedfs/weed/security" "github.com/chrislusf/seaweedfs/weed/storage" + "github.com/chrislusf/seaweedfs/weed/storage/needle" "github.com/chrislusf/seaweedfs/weed/util" ) -func ReplicatedWrite(masterNode string, s *storage.Store, - volumeId storage.VolumeId, needle *storage.Needle, - r *http.Request) (size uint32, errorStatus string) { +func ReplicatedWrite(masterNode string, s *storage.Store, volumeId needle.VolumeId, n *needle.Needle, r *http.Request) (isUnchanged bool, err error) { //check JWT jwt := security.GetJwt(r) - ret, err := s.Write(volumeId, needle) - needToReplicate := !s.HasVolume(volumeId) - if err != nil { - errorStatus = "Failed to write to local disk (" + err.Error() + ")" - size = ret - return + // check whether this is a replicated write request + var remoteLocations []operation.Location + if r.FormValue("type") != "replicate" { + // this is the initial request + remoteLocations, err = getWritableRemoteReplications(s, volumeId, masterNode) + if err != nil { + glog.V(0).Infoln(err) + return + } } - needToReplicate = needToReplicate || s.GetVolume(volumeId).NeedToReplicate() - if !needToReplicate { - needToReplicate = s.GetVolume(volumeId).NeedToReplicate() + // read fsync value + fsync := false + if r.FormValue("fsync") == "true" { + fsync = true } - if needToReplicate { //send to other replica locations - if r.FormValue("type") != "replicate" { - - if err = distributedOperation(masterNode, s, volumeId, func(location operation.Location) error { - u := url.URL{ - Scheme: "http", - Host: location.Url, - Path: r.URL.Path, - } - q := url.Values{ - "type": {"replicate"}, - } - if needle.LastModified > 0 { - q.Set("ts", strconv.FormatUint(needle.LastModified, 10)) - } - if needle.IsChunkedManifest() { - q.Set("cm", "true") + + if s.GetVolume(volumeId) != nil { + isUnchanged, err = s.WriteVolumeNeedle(volumeId, n, fsync) + if err != nil { + err = fmt.Errorf("failed to write to local disk: %v", err) + glog.V(0).Infoln(err) + return + } + } + + if len(remoteLocations) > 0 { //send to other replica locations + if err = distributedOperation(remoteLocations, s, func(location operation.Location) error { + u := url.URL{ + Scheme: "http", + Host: location.Url, + Path: r.URL.Path, + } + q := url.Values{ + "type": {"replicate"}, + "ttl": {n.Ttl.String()}, + } + if n.LastModified > 0 { + q.Set("ts", strconv.FormatUint(n.LastModified, 10)) + } + if n.IsChunkedManifest() { + q.Set("cm", "true") + } + u.RawQuery = q.Encode() + + pairMap := make(map[string]string) + if n.HasPairs() { + tmpMap := make(map[string]string) + err := json.Unmarshal(n.Pairs, &tmpMap) + if err != nil { + glog.V(0).Infoln("Unmarshal pairs error:", err) } - u.RawQuery = q.Encode() - - pairMap := make(map[string]string) - if needle.HasPairs() { - tmpMap := make(map[string]string) - err := json.Unmarshal(needle.Pairs, &tmpMap) - if err != nil { - glog.V(0).Infoln("Unmarshal pairs error:", err) - } - for k, v := range tmpMap { - pairMap[storage.PairNamePrefix+k] = v - } + for k, v := range tmpMap { + pairMap[needle.PairNamePrefix+k] = v } - - _, err := operation.Upload(u.String(), - string(needle.Name), bytes.NewReader(needle.Data), needle.IsGzipped(), string(needle.Mime), - pairMap, jwt) - return err - }); err != nil { - ret = 0 - errorStatus = fmt.Sprintf("Failed to write to replicas for volume %d: %v", volumeId, err) } + + // volume server do not know about encryption + _, err := operation.UploadData(u.String(), string(n.Name), false, n.Data, n.IsCompressed(), string(n.Mime), pairMap, jwt) + return err + }); err != nil { + err = fmt.Errorf("failed to write to replicas for volume %d: %v", volumeId, err) + glog.V(0).Infoln(err) } } - size = ret return } func ReplicatedDelete(masterNode string, store *storage.Store, - volumeId storage.VolumeId, n *storage.Needle, - r *http.Request) (uint32, error) { + volumeId needle.VolumeId, n *needle.Needle, + r *http.Request) (size uint32, err error) { //check JWT jwt := security.GetJwt(r) - ret, err := store.Delete(volumeId, n) + var remoteLocations []operation.Location + if r.FormValue("type") != "replicate" { + remoteLocations, err = getWritableRemoteReplications(store, volumeId, masterNode) + if err != nil { + glog.V(0).Infoln(err) + return + } + } + + size, err = store.DeleteVolumeNeedle(volumeId, n) if err != nil { glog.V(0).Infoln("delete error:", err) - return ret, err + return } - needToReplicate := !store.HasVolume(volumeId) - if !needToReplicate && ret > 0 { - needToReplicate = store.GetVolume(volumeId).NeedToReplicate() - } - if needToReplicate { //send to other replica locations - if r.FormValue("type") != "replicate" { - if err = distributedOperation(masterNode, store, volumeId, func(location operation.Location) error { - return util.Delete("http://"+location.Url+r.URL.Path+"?type=replicate", jwt) - }); err != nil { - ret = 0 - } + if len(remoteLocations) > 0 { //send to other replica locations + if err = distributedOperation(remoteLocations, store, func(location operation.Location) error { + return util.Delete("http://"+location.Url+r.URL.Path+"?type=replicate", string(jwt)) + }); err != nil { + size = 0 } } - return ret, err + return } type DistributedOperationResult map[string]error @@ -131,32 +142,53 @@ type RemoteResult struct { Error error } -func distributedOperation(masterNode string, store *storage.Store, volumeId storage.VolumeId, op func(location operation.Location) error) error { - if lookupResult, lookupErr := operation.Lookup(masterNode, volumeId.String()); lookupErr == nil { - length := 0 - selfUrl := (store.Ip + ":" + strconv.Itoa(store.Port)) - results := make(chan RemoteResult) +func distributedOperation(locations []operation.Location, store *storage.Store, op func(location operation.Location) error) error { + length := len(locations) + results := make(chan RemoteResult) + for _, location := range locations { + go func(location operation.Location, results chan RemoteResult) { + results <- RemoteResult{location.Url, op(location)} + }(location, results) + } + ret := DistributedOperationResult(make(map[string]error)) + for i := 0; i < length; i++ { + result := <-results + ret[result.Host] = result.Error + } + + return ret.Error() +} + +func getWritableRemoteReplications(s *storage.Store, volumeId needle.VolumeId, masterNode string) ( + remoteLocations []operation.Location, err error) { + + v := s.GetVolume(volumeId) + if v != nil && v.ReplicaPlacement.GetCopyCount() == 1 { + return + } + + // not on local store, or has replications + lookupResult, lookupErr := operation.Lookup(masterNode, volumeId.String()) + if lookupErr == nil { + selfUrl := s.Ip + ":" + strconv.Itoa(s.Port) for _, location := range lookupResult.Locations { if location.Url != selfUrl { - length++ - go func(location operation.Location, results chan RemoteResult) { - results <- RemoteResult{location.Url, op(location)} - }(location, results) + remoteLocations = append(remoteLocations, location) } } - ret := DistributedOperationResult(make(map[string]error)) - for i := 0; i < length; i++ { - result := <-results - ret[result.Host] = result.Error - } - if volume := store.GetVolume(volumeId); volume != nil { - if length+1 < volume.ReplicaPlacement.GetCopyCount() { - return fmt.Errorf("replicating opetations [%d] is less than volume's replication copy count [%d]", length+1, volume.ReplicaPlacement.GetCopyCount()) - } - } - return ret.Error() } else { - glog.V(0).Infoln() - return fmt.Errorf("Failed to lookup for %d: %v", volumeId, lookupErr) + err = fmt.Errorf("failed to lookup for %d: %v", volumeId, lookupErr) + return } + + if v != nil { + // has one local and has remote replications + copyCount := v.ReplicaPlacement.GetCopyCount() + if len(lookupResult.Locations) < copyCount { + err = fmt.Errorf("replicating opetations [%d] is less than volume %d replication copy count [%d]", + len(lookupResult.Locations), volumeId, copyCount) + } + } + + return } diff --git a/weed/topology/topology.go b/weed/topology/topology.go index 4242bfa05..993f444a7 100644 --- a/weed/topology/topology.go +++ b/weed/topology/topology.go @@ -2,24 +2,33 @@ package topology import ( "errors" + "fmt" "math/rand" + "sync" "github.com/chrislusf/raft" + "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/pb/master_pb" "github.com/chrislusf/seaweedfs/weed/sequence" "github.com/chrislusf/seaweedfs/weed/storage" + "github.com/chrislusf/seaweedfs/weed/storage/needle" + "github.com/chrislusf/seaweedfs/weed/storage/super_block" "github.com/chrislusf/seaweedfs/weed/util" ) type Topology struct { + vacuumLockCounter int64 NodeImpl - collectionMap *util.ConcurrentReadMap + collectionMap *util.ConcurrentReadMap + ecShardMap map[needle.VolumeId]*EcShardLocations + ecShardMapLock sync.RWMutex pulse int64 - volumeSizeLimit uint64 + volumeSizeLimit uint64 + replicationAsMin bool Sequence sequence.Sequencer @@ -30,15 +39,17 @@ type Topology struct { RaftServer raft.Server } -func NewTopology(id string, seq sequence.Sequencer, volumeSizeLimit uint64, pulse int) *Topology { +func NewTopology(id string, seq sequence.Sequencer, volumeSizeLimit uint64, pulse int, replicationAsMin bool) *Topology { t := &Topology{} t.id = NodeId(id) t.nodeType = "Topology" t.NodeImpl.value = t t.children = make(map[NodeId]Node) t.collectionMap = util.NewConcurrentReadMap() + t.ecShardMap = make(map[needle.VolumeId]*EcShardLocations) t.pulse = int64(pulse) t.volumeSizeLimit = volumeSizeLimit + t.replicationAsMin = replicationAsMin t.Sequence = seq @@ -50,8 +61,13 @@ func NewTopology(id string, seq sequence.Sequencer, volumeSizeLimit uint64, puls } func (t *Topology) IsLeader() bool { - if leader, e := t.Leader(); e == nil { - return leader == t.RaftServer.Name() + if t.RaftServer != nil { + if t.RaftServer.State() == raft.Leader { + return true + } + if t.RaftServer.Leader() == "" { + return true + } } return false } @@ -66,13 +82,13 @@ func (t *Topology) Leader() (string, error) { if l == "" { // We are a single node cluster, we are the leader - return t.RaftServer.Name(), errors.New("Raft Server not initialized!") + return t.RaftServer.Name(), nil } return l, nil } -func (t *Topology) Lookup(collection string, vid storage.VolumeId) []*DataNode { +func (t *Topology) Lookup(collection string, vid needle.VolumeId) (dataNodes []*DataNode) { //maybe an issue if lots of collections? if collection == "" { for _, c := range t.collectionMap.Items() { @@ -85,14 +101,24 @@ func (t *Topology) Lookup(collection string, vid storage.VolumeId) []*DataNode { return c.(*Collection).Lookup(vid) } } + + if locations, found := t.LookupEcShards(vid); found { + for _, loc := range locations.Locations { + dataNodes = append(dataNodes, loc...) + } + return dataNodes + } + return nil } -func (t *Topology) NextVolumeId() storage.VolumeId { +func (t *Topology) NextVolumeId() (needle.VolumeId, error) { vid := t.GetMaxVolumeId() next := vid.Next() - go t.RaftServer.Do(NewMaxVolumeIdCommand(next)) - return next + if _, err := t.RaftServer.Do(NewMaxVolumeIdCommand(next)); err != nil { + return 0, err + } + return next, nil } func (t *Topology) HasWritableVolume(option *VolumeGrowOption) bool { @@ -102,19 +128,43 @@ func (t *Topology) HasWritableVolume(option *VolumeGrowOption) bool { func (t *Topology) PickForWrite(count uint64, option *VolumeGrowOption) (string, uint64, *DataNode, error) { vid, count, datanodes, err := t.GetVolumeLayout(option.Collection, option.ReplicaPlacement, option.Ttl).PickForWrite(count, option) - if err != nil || datanodes.Length() == 0 { - return "", 0, nil, errors.New("No writable volumes available!") + if err != nil { + return "", 0, nil, fmt.Errorf("failed to find writable volumes for collection:%s replication:%s ttl:%s error: %v", option.Collection, option.ReplicaPlacement.String(), option.Ttl.String(), err) + } + if datanodes.Length() == 0 { + return "", 0, nil, fmt.Errorf("no writable volumes available for collection:%s replication:%s ttl:%s", option.Collection, option.ReplicaPlacement.String(), option.Ttl.String()) } - fileId, count := t.Sequence.NextFileId(count) - return storage.NewFileId(*vid, fileId, rand.Uint32()).String(), count, datanodes.Head(), nil + fileId := t.Sequence.NextFileId(count) + return needle.NewFileId(*vid, fileId, rand.Uint32()).String(), count, datanodes.Head(), nil } -func (t *Topology) GetVolumeLayout(collectionName string, rp *storage.ReplicaPlacement, ttl *storage.TTL) *VolumeLayout { +func (t *Topology) GetVolumeLayout(collectionName string, rp *super_block.ReplicaPlacement, ttl *needle.TTL) *VolumeLayout { return t.collectionMap.Get(collectionName, func() interface{} { - return NewCollection(collectionName, t.volumeSizeLimit) + return NewCollection(collectionName, t.volumeSizeLimit, t.replicationAsMin) }).(*Collection).GetOrCreateVolumeLayout(rp, ttl) } +func (t *Topology) ListCollections(includeNormalVolumes, includeEcVolumes bool) (ret []string) { + + mapOfCollections := make(map[string]bool) + for _, c := range t.collectionMap.Items() { + mapOfCollections[c.(*Collection).Name] = true + } + + if includeEcVolumes { + t.ecShardMapLock.RLock() + for _, ecVolumeLocation := range t.ecShardMap { + mapOfCollections[ecVolumeLocation.Collection] = true + } + t.ecShardMapLock.RUnlock() + } + + for k := range mapOfCollections { + ret = append(ret, k) + } + return ret +} + func (t *Topology) FindCollection(collectionName string) (*Collection, bool) { c, hasCollection := t.collectionMap.Find(collectionName) if !hasCollection { @@ -152,6 +202,7 @@ func (t *Topology) GetOrCreateDataCenter(dcName string) *DataCenter { } func (t *Topology) SyncDataNodeRegistration(volumes []*master_pb.VolumeInformationMessage, dn *DataNode) (newVolumes, deletedVolumes []storage.VolumeInfo) { + // convert into in memory struct storage.VolumeInfo var volumeInfos []storage.VolumeInfo for _, v := range volumes { if vi, err := storage.NewVolumeInfo(v); err == nil { @@ -160,12 +211,48 @@ func (t *Topology) SyncDataNodeRegistration(volumes []*master_pb.VolumeInformati glog.V(0).Infof("Fail to convert joined volume information: %v", err) } } - newVolumes, deletedVolumes = dn.UpdateVolumes(volumeInfos) - for _, v := range volumeInfos { + // find out the delta volumes + var changedVolumes []storage.VolumeInfo + newVolumes, deletedVolumes, changedVolumes = dn.UpdateVolumes(volumeInfos) + for _, v := range newVolumes { t.RegisterVolumeLayout(v, dn) } for _, v := range deletedVolumes { t.UnRegisterVolumeLayout(v, dn) } + for _, v := range changedVolumes { + vl := t.GetVolumeLayout(v.Collection, v.ReplicaPlacement, v.Ttl) + vl.ensureCorrectWritables(&v) + } + return +} + +func (t *Topology) IncrementalSyncDataNodeRegistration(newVolumes, deletedVolumes []*master_pb.VolumeShortInformationMessage, dn *DataNode) { + var newVis, oldVis []storage.VolumeInfo + for _, v := range newVolumes { + vi, err := storage.NewVolumeInfoFromShort(v) + if err != nil { + glog.V(0).Infof("NewVolumeInfoFromShort %v: %v", v, err) + continue + } + newVis = append(newVis, vi) + } + for _, v := range deletedVolumes { + vi, err := storage.NewVolumeInfoFromShort(v) + if err != nil { + glog.V(0).Infof("NewVolumeInfoFromShort %v: %v", v, err) + continue + } + oldVis = append(oldVis, vi) + } + dn.DeltaUpdateVolumes(newVis, oldVis) + + for _, vi := range newVis { + t.RegisterVolumeLayout(vi, dn) + } + for _, vi := range oldVis { + t.UnRegisterVolumeLayout(vi, dn) + } + return } diff --git a/weed/topology/topology_ec.go b/weed/topology/topology_ec.go new file mode 100644 index 000000000..93b39bb5d --- /dev/null +++ b/weed/topology/topology_ec.go @@ -0,0 +1,173 @@ +package topology + +import ( + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/pb/master_pb" + "github.com/chrislusf/seaweedfs/weed/storage/erasure_coding" + "github.com/chrislusf/seaweedfs/weed/storage/needle" +) + +type EcShardLocations struct { + Collection string + Locations [erasure_coding.TotalShardsCount][]*DataNode +} + +func (t *Topology) SyncDataNodeEcShards(shardInfos []*master_pb.VolumeEcShardInformationMessage, dn *DataNode) (newShards, deletedShards []*erasure_coding.EcVolumeInfo) { + // convert into in memory struct storage.VolumeInfo + var shards []*erasure_coding.EcVolumeInfo + for _, shardInfo := range shardInfos { + shards = append(shards, + erasure_coding.NewEcVolumeInfo( + shardInfo.Collection, + needle.VolumeId(shardInfo.Id), + erasure_coding.ShardBits(shardInfo.EcIndexBits))) + } + // find out the delta volumes + newShards, deletedShards = dn.UpdateEcShards(shards) + for _, v := range newShards { + t.RegisterEcShards(v, dn) + } + for _, v := range deletedShards { + t.UnRegisterEcShards(v, dn) + } + return +} + +func (t *Topology) IncrementalSyncDataNodeEcShards(newEcShards, deletedEcShards []*master_pb.VolumeEcShardInformationMessage, dn *DataNode) { + // convert into in memory struct storage.VolumeInfo + var newShards, deletedShards []*erasure_coding.EcVolumeInfo + for _, shardInfo := range newEcShards { + newShards = append(newShards, + erasure_coding.NewEcVolumeInfo( + shardInfo.Collection, + needle.VolumeId(shardInfo.Id), + erasure_coding.ShardBits(shardInfo.EcIndexBits))) + } + for _, shardInfo := range deletedEcShards { + deletedShards = append(deletedShards, + erasure_coding.NewEcVolumeInfo( + shardInfo.Collection, + needle.VolumeId(shardInfo.Id), + erasure_coding.ShardBits(shardInfo.EcIndexBits))) + } + + dn.DeltaUpdateEcShards(newShards, deletedShards) + + for _, v := range newShards { + t.RegisterEcShards(v, dn) + } + for _, v := range deletedShards { + t.UnRegisterEcShards(v, dn) + } + return +} + +func NewEcShardLocations(collection string) *EcShardLocations { + return &EcShardLocations{ + Collection: collection, + } +} + +func (loc *EcShardLocations) AddShard(shardId erasure_coding.ShardId, dn *DataNode) (added bool) { + dataNodes := loc.Locations[shardId] + for _, n := range dataNodes { + if n.Id() == dn.Id() { + return false + } + } + loc.Locations[shardId] = append(dataNodes, dn) + return true +} + +func (loc *EcShardLocations) DeleteShard(shardId erasure_coding.ShardId, dn *DataNode) (deleted bool) { + dataNodes := loc.Locations[shardId] + foundIndex := -1 + for index, n := range dataNodes { + if n.Id() == dn.Id() { + foundIndex = index + } + } + if foundIndex < 0 { + return false + } + loc.Locations[shardId] = append(dataNodes[:foundIndex], dataNodes[foundIndex+1:]...) + return true +} + +func (t *Topology) RegisterEcShards(ecShardInfos *erasure_coding.EcVolumeInfo, dn *DataNode) { + + t.ecShardMapLock.Lock() + defer t.ecShardMapLock.Unlock() + + locations, found := t.ecShardMap[ecShardInfos.VolumeId] + if !found { + locations = NewEcShardLocations(ecShardInfos.Collection) + t.ecShardMap[ecShardInfos.VolumeId] = locations + } + for _, shardId := range ecShardInfos.ShardIds() { + locations.AddShard(shardId, dn) + } +} + +func (t *Topology) UnRegisterEcShards(ecShardInfos *erasure_coding.EcVolumeInfo, dn *DataNode) { + glog.Infof("removing ec shard info:%+v", ecShardInfos) + t.ecShardMapLock.Lock() + defer t.ecShardMapLock.Unlock() + + locations, found := t.ecShardMap[ecShardInfos.VolumeId] + if !found { + return + } + for _, shardId := range ecShardInfos.ShardIds() { + locations.DeleteShard(shardId, dn) + } +} + +func (t *Topology) LookupEcShards(vid needle.VolumeId) (locations *EcShardLocations, found bool) { + t.ecShardMapLock.RLock() + defer t.ecShardMapLock.RUnlock() + + locations, found = t.ecShardMap[vid] + + return +} + +func (t *Topology) ListEcServersByCollection(collection string) (dataNodes []string) { + t.ecShardMapLock.RLock() + defer t.ecShardMapLock.RUnlock() + + dateNodeMap := make(map[string]bool) + for _, ecVolumeLocation := range t.ecShardMap { + if ecVolumeLocation.Collection == collection { + for _, locations := range ecVolumeLocation.Locations { + for _, loc := range locations { + dateNodeMap[string(loc.Id())] = true + } + } + } + } + + for k, _ := range dateNodeMap { + dataNodes = append(dataNodes, k) + } + + return +} + +func (t *Topology) DeleteEcCollection(collection string) { + t.ecShardMapLock.Lock() + defer t.ecShardMapLock.Unlock() + + var vids []needle.VolumeId + for vid, ecVolumeLocation := range t.ecShardMap { + if ecVolumeLocation.Collection == collection { + vids = append(vids, vid) + } + } + + for _, vid := range vids { + delete(t.ecShardMap, vid) + } + + return +} diff --git a/weed/topology/topology_event_handling.go b/weed/topology/topology_event_handling.go index a301103eb..068bd401e 100644 --- a/weed/topology/topology_event_handling.go +++ b/weed/topology/topology_event_handling.go @@ -1,6 +1,7 @@ package topology import ( + "google.golang.org/grpc" "math/rand" "time" @@ -8,7 +9,7 @@ import ( "github.com/chrislusf/seaweedfs/weed/storage" ) -func (t *Topology) StartRefreshWritableVolumes(garbageThreshold float64, preallocate int64) { +func (t *Topology) StartRefreshWritableVolumes(grpcDialOption grpc.DialOption, garbageThreshold float64, preallocate int64) { go func() { for { if t.IsLeader() { @@ -22,7 +23,7 @@ func (t *Topology) StartRefreshWritableVolumes(garbageThreshold float64, preallo c := time.Tick(15 * time.Minute) for _ = range c { if t.IsLeader() { - t.Vacuum(garbageThreshold, preallocate) + t.Vacuum(grpcDialOption, garbageThreshold, preallocate) } } }(garbageThreshold) @@ -58,6 +59,7 @@ func (t *Topology) UnRegisterDataNode(dn *DataNode) { vl.SetVolumeUnavailable(dn, v.Id) } dn.UpAdjustVolumeCountDelta(-dn.GetVolumeCount()) + dn.UpAdjustRemoteVolumeCountDelta(-dn.GetRemoteVolumeCount()) dn.UpAdjustActiveVolumeCountDelta(-dn.GetActiveVolumeCount()) dn.UpAdjustMaxVolumeCountDelta(-dn.GetMaxVolumeCount()) if dn.Parent() != nil { diff --git a/weed/topology/topology_map.go b/weed/topology/topology_map.go index 769ba0e2a..73c55d77d 100644 --- a/weed/topology/topology_map.go +++ b/weed/topology/topology_map.go @@ -23,7 +23,7 @@ func (t *Topology) ToMap() interface{} { } } } - m["layouts"] = layouts + m["Layouts"] = layouts return m } @@ -68,9 +68,28 @@ func (t *Topology) ToVolumeLocations() (volumeLocations []*master_pb.VolumeLocat for _, v := range dn.GetVolumes() { volumeLocation.NewVids = append(volumeLocation.NewVids, uint32(v.Id)) } + for _, s := range dn.GetEcShards() { + volumeLocation.NewVids = append(volumeLocation.NewVids, uint32(s.VolumeId)) + } volumeLocations = append(volumeLocations, volumeLocation) } } } return } + +func (t *Topology) ToTopologyInfo() *master_pb.TopologyInfo { + m := &master_pb.TopologyInfo{ + Id: string(t.Id()), + VolumeCount: uint64(t.GetVolumeCount()), + MaxVolumeCount: uint64(t.GetMaxVolumeCount()), + FreeVolumeCount: uint64(t.FreeSpace()), + ActiveVolumeCount: uint64(t.GetActiveVolumeCount()), + RemoteVolumeCount: uint64(t.GetRemoteVolumeCount()), + } + for _, c := range t.Children() { + dc := c.(*DataCenter) + m.DataCenterInfos = append(m.DataCenterInfos, dc.ToDataCenterInfo()) + } + return m +} diff --git a/weed/topology/topology_test.go b/weed/topology/topology_test.go index 07dc9c67b..2fe381ca2 100644 --- a/weed/topology/topology_test.go +++ b/weed/topology/topology_test.go @@ -4,6 +4,9 @@ import ( "github.com/chrislusf/seaweedfs/weed/pb/master_pb" "github.com/chrislusf/seaweedfs/weed/sequence" "github.com/chrislusf/seaweedfs/weed/storage" + "github.com/chrislusf/seaweedfs/weed/storage/needle" + "github.com/chrislusf/seaweedfs/weed/storage/super_block" + "testing" ) @@ -20,7 +23,7 @@ func TestRemoveDataCenter(t *testing.T) { } func TestHandlingVolumeServerHeartbeat(t *testing.T) { - topo := NewTopology("weedfs", sequence.NewMemorySequencer(), 32*1024, 5) + topo := NewTopology("weedfs", sequence.NewMemorySequencer(), 32*1024, 5, false) dc := topo.GetOrCreateDataCenter("dc1") rack := dc.GetOrCreateRack("rack1") @@ -39,7 +42,7 @@ func TestHandlingVolumeServerHeartbeat(t *testing.T) { DeletedByteCount: 34524, ReadOnly: false, ReplicaPlacement: uint32(0), - Version: uint32(1), + Version: uint32(needle.CurrentVersion), Ttl: 0, } volumeMessages = append(volumeMessages, volumeMessage) @@ -47,8 +50,8 @@ func TestHandlingVolumeServerHeartbeat(t *testing.T) { topo.SyncDataNodeRegistration(volumeMessages, dn) - assert(t, "activeVolumeCount1", topo.activeVolumeCount, volumeCount) - assert(t, "volumeCount", topo.volumeCount, volumeCount) + assert(t, "activeVolumeCount1", int(topo.activeVolumeCount), volumeCount) + assert(t, "volumeCount", int(topo.volumeCount), volumeCount) } { @@ -64,20 +67,68 @@ func TestHandlingVolumeServerHeartbeat(t *testing.T) { DeletedByteCount: 345240, ReadOnly: false, ReplicaPlacement: uint32(0), - Version: uint32(1), + Version: uint32(needle.CurrentVersion), Ttl: 0, } volumeMessages = append(volumeMessages, volumeMessage) } topo.SyncDataNodeRegistration(volumeMessages, dn) - assert(t, "activeVolumeCount1", topo.activeVolumeCount, volumeCount) - assert(t, "volumeCount", topo.volumeCount, volumeCount) + //rp, _ := storage.NewReplicaPlacementFromString("000") + //layout := topo.GetVolumeLayout("", rp, needle.EMPTY_TTL) + //assert(t, "writables", len(layout.writables), volumeCount) + + assert(t, "activeVolumeCount1", int(topo.activeVolumeCount), volumeCount) + assert(t, "volumeCount", int(topo.volumeCount), volumeCount) + } + + { + volumeCount := 6 + newVolumeShortMessage := &master_pb.VolumeShortInformationMessage{ + Id: uint32(3), + Collection: "", + ReplicaPlacement: uint32(0), + Version: uint32(needle.CurrentVersion), + Ttl: 0, + } + topo.IncrementalSyncDataNodeRegistration( + []*master_pb.VolumeShortInformationMessage{newVolumeShortMessage}, + nil, + dn) + rp, _ := super_block.NewReplicaPlacementFromString("000") + layout := topo.GetVolumeLayout("", rp, needle.EMPTY_TTL) + assert(t, "writables after repeated add", len(layout.writables), volumeCount) + + assert(t, "activeVolumeCount1", int(topo.activeVolumeCount), volumeCount) + assert(t, "volumeCount", int(topo.volumeCount), volumeCount) + + topo.IncrementalSyncDataNodeRegistration( + nil, + []*master_pb.VolumeShortInformationMessage{newVolumeShortMessage}, + dn) + assert(t, "writables after deletion", len(layout.writables), volumeCount-1) + assert(t, "activeVolumeCount1", int(topo.activeVolumeCount), volumeCount-1) + assert(t, "volumeCount", int(topo.volumeCount), volumeCount-1) + + topo.IncrementalSyncDataNodeRegistration( + []*master_pb.VolumeShortInformationMessage{newVolumeShortMessage}, + nil, + dn) + + for vid, _ := range layout.vid2location { + println("after add volume id", vid) + } + for _, vid := range layout.writables { + println("after add writable volume id", vid) + } + + assert(t, "writables after add back", len(layout.writables), volumeCount) + } topo.UnRegisterDataNode(dn) - assert(t, "activeVolumeCount2", topo.activeVolumeCount, 0) + assert(t, "activeVolumeCount2", int(topo.activeVolumeCount), 0) } @@ -89,27 +140,28 @@ func assert(t *testing.T, message string, actual, expected int) { func TestAddRemoveVolume(t *testing.T) { - topo := NewTopology("weedfs", sequence.NewMemorySequencer(), 32*1024, 5) + topo := NewTopology("weedfs", sequence.NewMemorySequencer(), 32*1024, 5, false) dc := topo.GetOrCreateDataCenter("dc1") rack := dc.GetOrCreateRack("rack1") dn := rack.GetOrCreateDataNode("127.0.0.1", 34534, "127.0.0.1", 25) v := storage.VolumeInfo{ - Id: storage.VolumeId(1), + Id: needle.VolumeId(1), Size: 100, Collection: "xcollection", FileCount: 123, DeleteCount: 23, DeletedByteCount: 45, ReadOnly: false, - Version: storage.CurrentVersion, - ReplicaPlacement: &storage.ReplicaPlacement{}, - Ttl: storage.EMPTY_TTL, + Version: needle.CurrentVersion, + ReplicaPlacement: &super_block.ReplicaPlacement{}, + Ttl: needle.EMPTY_TTL, } dn.UpdateVolumes([]storage.VolumeInfo{v}) topo.RegisterVolumeLayout(v, dn) + topo.RegisterVolumeLayout(v, dn) if _, hasCollection := topo.FindCollection(v.Collection); !hasCollection { t.Errorf("collection %v should exist", v.Collection) diff --git a/weed/topology/topology_vacuum.go b/weed/topology/topology_vacuum.go index d6b09314b..789a01330 100644 --- a/weed/topology/topology_vacuum.go +++ b/weed/topology/topology_vacuum.go @@ -2,31 +2,38 @@ package topology import ( "context" + "sync/atomic" "time" + "google.golang.org/grpc" + + "github.com/chrislusf/seaweedfs/weed/storage/needle" + "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/operation" "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb" - "github.com/chrislusf/seaweedfs/weed/storage" ) -func batchVacuumVolumeCheck(vl *VolumeLayout, vid storage.VolumeId, locationlist *VolumeLocationList, garbageThreshold float64) bool { - ch := make(chan bool, locationlist.Length()) +func batchVacuumVolumeCheck(grpcDialOption grpc.DialOption, vl *VolumeLayout, vid needle.VolumeId, + locationlist *VolumeLocationList, garbageThreshold float64) (*VolumeLocationList, bool) { + ch := make(chan int, locationlist.Length()) + errCount := int32(0) for index, dn := range locationlist.list { - go func(index int, url string, vid storage.VolumeId) { - err := operation.WithVolumeServerClient(url, func(volumeServerClient volume_server_pb.VolumeServerClient) error { - ctx, cancel := context.WithTimeout(context.Background(), time.Duration(5*time.Second)) - defer cancel() - - resp, err := volumeServerClient.VacuumVolumeCheck(ctx, &volume_server_pb.VacuumVolumeCheckRequest{ - VolumdId: uint32(vid), + go func(index int, url string, vid needle.VolumeId) { + err := operation.WithVolumeServerClient(url, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { + resp, err := volumeServerClient.VacuumVolumeCheck(context.Background(), &volume_server_pb.VacuumVolumeCheckRequest{ + VolumeId: uint32(vid), }) if err != nil { - ch <- false + atomic.AddInt32(&errCount, 1) + ch <- -1 return err } - isNeeded := resp.GarbageRatio > garbageThreshold - ch <- isNeeded + if resp.GarbageRatio >= garbageThreshold { + ch <- index + } else { + ch <- -1 + } return nil }) if err != nil { @@ -34,27 +41,33 @@ func batchVacuumVolumeCheck(vl *VolumeLayout, vid storage.VolumeId, locationlist } }(index, dn.Url(), vid) } - isCheckSuccess := true - for _ = range locationlist.list { + vacuumLocationList := NewVolumeLocationList() + for range locationlist.list { select { - case canVacuum := <-ch: - isCheckSuccess = isCheckSuccess && canVacuum + case index := <-ch: + if index != -1 { + vacuumLocationList.list = append(vacuumLocationList.list, locationlist.list[index]) + } case <-time.After(30 * time.Minute): - isCheckSuccess = false - break + return vacuumLocationList, false } } - return isCheckSuccess + return vacuumLocationList, errCount == 0 && len(vacuumLocationList.list) > 0 } -func batchVacuumVolumeCompact(vl *VolumeLayout, vid storage.VolumeId, locationlist *VolumeLocationList, preallocate int64) bool { +func batchVacuumVolumeCompact(grpcDialOption grpc.DialOption, vl *VolumeLayout, vid needle.VolumeId, + locationlist *VolumeLocationList, preallocate int64) bool { + vl.accessLock.Lock() vl.removeFromWritable(vid) + vl.accessLock.Unlock() + ch := make(chan bool, locationlist.Length()) for index, dn := range locationlist.list { - go func(index int, url string, vid storage.VolumeId) { + go func(index int, url string, vid needle.VolumeId) { glog.V(0).Infoln(index, "Start vacuuming", vid, "on", url) - err := operation.WithVolumeServerClient(url, func(volumeServerClient volume_server_pb.VolumeServerClient) error { + err := operation.WithVolumeServerClient(url, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { _, err := volumeServerClient.VacuumVolumeCompact(context.Background(), &volume_server_pb.VacuumVolumeCompactRequest{ - VolumdId: uint32(vid), + VolumeId: uint32(vid), + Preallocate: preallocate, }) return err }) @@ -68,45 +81,50 @@ func batchVacuumVolumeCompact(vl *VolumeLayout, vid storage.VolumeId, locationli }(index, dn.Url(), vid) } isVacuumSuccess := true - for _ = range locationlist.list { + for range locationlist.list { select { case canCommit := <-ch: isVacuumSuccess = isVacuumSuccess && canCommit case <-time.After(30 * time.Minute): - isVacuumSuccess = false - break + return false } } return isVacuumSuccess } -func batchVacuumVolumeCommit(vl *VolumeLayout, vid storage.VolumeId, locationlist *VolumeLocationList) bool { +func batchVacuumVolumeCommit(grpcDialOption grpc.DialOption, vl *VolumeLayout, vid needle.VolumeId, locationlist *VolumeLocationList) bool { isCommitSuccess := true + isReadOnly := false for _, dn := range locationlist.list { - glog.V(0).Infoln("Start Commiting vacuum", vid, "on", dn.Url()) - err := operation.WithVolumeServerClient(dn.Url(), func(volumeServerClient volume_server_pb.VolumeServerClient) error { - _, err := volumeServerClient.VacuumVolumeCommit(context.Background(), &volume_server_pb.VacuumVolumeCommitRequest{ - VolumdId: uint32(vid), + glog.V(0).Infoln("Start Committing vacuum", vid, "on", dn.Url()) + err := operation.WithVolumeServerClient(dn.Url(), grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { + resp, err := volumeServerClient.VacuumVolumeCommit(context.Background(), &volume_server_pb.VacuumVolumeCommitRequest{ + VolumeId: uint32(vid), }) + if resp.IsReadOnly { + isReadOnly = true + } return err }) if err != nil { glog.Errorf("Error when committing vacuum %d on %s: %v", vid, dn.Url(), err) isCommitSuccess = false } else { - glog.V(0).Infof("Complete Commiting vacuum %d on %s", vid, dn.Url()) + glog.V(0).Infof("Complete Committing vacuum %d on %s", vid, dn.Url()) } - if isCommitSuccess { - vl.SetVolumeAvailable(dn, vid) + } + if isCommitSuccess { + for _, dn := range locationlist.list { + vl.SetVolumeAvailable(dn, vid, isReadOnly) } } return isCommitSuccess } -func batchVacuumVolumeCleanup(vl *VolumeLayout, vid storage.VolumeId, locationlist *VolumeLocationList) { +func batchVacuumVolumeCleanup(grpcDialOption grpc.DialOption, vl *VolumeLayout, vid needle.VolumeId, locationlist *VolumeLocationList) { for _, dn := range locationlist.list { glog.V(0).Infoln("Start cleaning up", vid, "on", dn.Url()) - err := operation.WithVolumeServerClient(dn.Url(), func(volumeServerClient volume_server_pb.VolumeServerClient) error { + err := operation.WithVolumeServerClient(dn.Url(), grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { _, err := volumeServerClient.VacuumVolumeCleanup(context.Background(), &volume_server_pb.VacuumVolumeCleanupRequest{ - VolumdId: uint32(vid), + VolumeId: uint32(vid), }) return err }) @@ -118,24 +136,34 @@ func batchVacuumVolumeCleanup(vl *VolumeLayout, vid storage.VolumeId, locationli } } -func (t *Topology) Vacuum(garbageThreshold float64, preallocate int64) int { +func (t *Topology) Vacuum(grpcDialOption grpc.DialOption, garbageThreshold float64, preallocate int64) int { + + // if there is vacuum going on, return immediately + swapped := atomic.CompareAndSwapInt64(&t.vacuumLockCounter, 0, 1) + if !swapped { + return 0 + } + defer atomic.StoreInt64(&t.vacuumLockCounter, 0) + + // now only one vacuum process going on + glog.V(1).Infof("Start vacuum on demand with threshold: %f", garbageThreshold) for _, col := range t.collectionMap.Items() { c := col.(*Collection) for _, vl := range c.storageType2VolumeLayout.Items() { if vl != nil { volumeLayout := vl.(*VolumeLayout) - vacuumOneVolumeLayout(volumeLayout, c, garbageThreshold, preallocate) + vacuumOneVolumeLayout(grpcDialOption, volumeLayout, c, garbageThreshold, preallocate) } } } return 0 } -func vacuumOneVolumeLayout(volumeLayout *VolumeLayout, c *Collection, garbageThreshold float64, preallocate int64) { +func vacuumOneVolumeLayout(grpcDialOption grpc.DialOption, volumeLayout *VolumeLayout, c *Collection, garbageThreshold float64, preallocate int64) { volumeLayout.accessLock.RLock() - tmpMap := make(map[storage.VolumeId]*VolumeLocationList) + tmpMap := make(map[needle.VolumeId]*VolumeLocationList) for vid, locationList := range volumeLayout.vid2location { tmpMap[vid] = locationList } @@ -152,11 +180,12 @@ func vacuumOneVolumeLayout(volumeLayout *VolumeLayout, c *Collection, garbageThr } glog.V(2).Infof("check vacuum on collection:%s volume:%d", c.Name, vid) - if batchVacuumVolumeCheck(volumeLayout, vid, locationList, garbageThreshold) { - if batchVacuumVolumeCompact(volumeLayout, vid, locationList, preallocate) { - batchVacuumVolumeCommit(volumeLayout, vid, locationList) + if vacuumLocationList, needVacuum := batchVacuumVolumeCheck( + grpcDialOption, volumeLayout, vid, locationList, garbageThreshold); needVacuum { + if batchVacuumVolumeCompact(grpcDialOption, volumeLayout, vid, vacuumLocationList, preallocate) { + batchVacuumVolumeCommit(grpcDialOption, volumeLayout, vid, vacuumLocationList) } else { - batchVacuumVolumeCleanup(volumeLayout, vid, locationList) + batchVacuumVolumeCleanup(grpcDialOption, volumeLayout, vid, vacuumLocationList) } } } diff --git a/weed/topology/volume_growth.go b/weed/topology/volume_growth.go index 9bf013ca6..58b5702bf 100644 --- a/weed/topology/volume_growth.go +++ b/weed/topology/volume_growth.go @@ -5,6 +5,12 @@ import ( "math/rand" "sync" + "github.com/chrislusf/seaweedfs/weed/storage/needle" + "github.com/chrislusf/seaweedfs/weed/storage/super_block" + "github.com/chrislusf/seaweedfs/weed/util" + + "google.golang.org/grpc" + "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/storage" ) @@ -18,13 +24,14 @@ This package is created to resolve these replica placement issues: */ type VolumeGrowOption struct { - Collection string - ReplicaPlacement *storage.ReplicaPlacement - Ttl *storage.TTL - Prealloacte int64 - DataCenter string - Rack string - DataNode string + Collection string + ReplicaPlacement *super_block.ReplicaPlacement + Ttl *needle.TTL + Prealloacte int64 + DataCenter string + Rack string + DataNode string + MemoryMapMaxSizeMb uint32 } type VolumeGrowth struct { @@ -42,47 +49,59 @@ func NewDefaultVolumeGrowth() *VolumeGrowth { // one replication type may need rp.GetCopyCount() actual volumes // given copyCount, how many logical volumes to create func (vg *VolumeGrowth) findVolumeCount(copyCount int) (count int) { + v := util.GetViper() + v.SetDefault("master.volume_growth.copy_1", 7) + v.SetDefault("master.volume_growth.copy_2", 6) + v.SetDefault("master.volume_growth.copy_3", 3) + v.SetDefault("master.volume_growth.copy_other", 1) switch copyCount { case 1: - count = 7 + count = v.GetInt("master.volume_growth.copy_1") case 2: - count = 6 + count = v.GetInt("master.volume_growth.copy_2") case 3: - count = 3 + count = v.GetInt("master.volume_growth.copy_3") default: - count = 1 + count = v.GetInt("master.volume_growth.copy_other") } return } -func (vg *VolumeGrowth) AutomaticGrowByType(option *VolumeGrowOption, topo *Topology) (count int, err error) { - count, err = vg.GrowByCountAndType(vg.findVolumeCount(option.ReplicaPlacement.GetCopyCount()), option, topo) +func (vg *VolumeGrowth) AutomaticGrowByType(option *VolumeGrowOption, grpcDialOption grpc.DialOption, topo *Topology, targetCount int) (count int, err error) { + if targetCount == 0 { + targetCount = vg.findVolumeCount(option.ReplicaPlacement.GetCopyCount()) + } + count, err = vg.GrowByCountAndType(grpcDialOption, targetCount, option, topo) if count > 0 && count%option.ReplicaPlacement.GetCopyCount() == 0 { return count, nil } return count, err } -func (vg *VolumeGrowth) GrowByCountAndType(targetCount int, option *VolumeGrowOption, topo *Topology) (counter int, err error) { +func (vg *VolumeGrowth) GrowByCountAndType(grpcDialOption grpc.DialOption, targetCount int, option *VolumeGrowOption, topo *Topology) (counter int, err error) { vg.accessLock.Lock() defer vg.accessLock.Unlock() for i := 0; i < targetCount; i++ { - if c, e := vg.findAndGrow(topo, option); e == nil { + if c, e := vg.findAndGrow(grpcDialOption, topo, option); e == nil { counter += c } else { + glog.V(0).Infof("create %d volume, created %d: %v", targetCount, counter, e) return counter, e } } return } -func (vg *VolumeGrowth) findAndGrow(topo *Topology, option *VolumeGrowOption) (int, error) { +func (vg *VolumeGrowth) findAndGrow(grpcDialOption grpc.DialOption, topo *Topology, option *VolumeGrowOption) (int, error) { servers, e := vg.findEmptySlotsForOneVolume(topo, option) if e != nil { return 0, e } - vid := topo.NextVolumeId() - err := vg.grow(topo, vid, option, servers...) + vid, raftErr := topo.NextVolumeId() + if raftErr != nil { + return 0, raftErr + } + err := vg.grow(grpcDialOption, topo, vid, option, servers...) return len(servers), err } @@ -94,14 +113,14 @@ func (vg *VolumeGrowth) findAndGrow(topo *Topology, option *VolumeGrowOption) (i func (vg *VolumeGrowth) findEmptySlotsForOneVolume(topo *Topology, option *VolumeGrowOption) (servers []*DataNode, err error) { //find main datacenter and other data centers rp := option.ReplicaPlacement - mainDataCenter, otherDataCenters, dc_err := topo.RandomlyPickNodes(rp.DiffDataCenterCount+1, func(node Node) error { + mainDataCenter, otherDataCenters, dc_err := topo.PickNodesByWeight(rp.DiffDataCenterCount+1, func(node Node) error { if option.DataCenter != "" && node.IsDataCenter() && node.Id() != NodeId(option.DataCenter) { return fmt.Errorf("Not matching preferred data center:%s", option.DataCenter) } if len(node.Children()) < rp.DiffRackCount+1 { return fmt.Errorf("Only has %d racks, not enough for %d.", len(node.Children()), rp.DiffRackCount+1) } - if node.FreeSpace() < rp.DiffRackCount+rp.SameRackCount+1 { + if node.FreeSpace() < int64(rp.DiffRackCount+rp.SameRackCount+1) { return fmt.Errorf("Free:%d < Expected:%d", node.FreeSpace(), rp.DiffRackCount+rp.SameRackCount+1) } possibleRacksCount := 0 @@ -126,11 +145,11 @@ func (vg *VolumeGrowth) findEmptySlotsForOneVolume(topo *Topology, option *Volum } //find main rack and other racks - mainRack, otherRacks, rackErr := mainDataCenter.(*DataCenter).RandomlyPickNodes(rp.DiffRackCount+1, func(node Node) error { + mainRack, otherRacks, rackErr := mainDataCenter.(*DataCenter).PickNodesByWeight(rp.DiffRackCount+1, func(node Node) error { if option.Rack != "" && node.IsRack() && node.Id() != NodeId(option.Rack) { return fmt.Errorf("Not matching preferred rack:%s", option.Rack) } - if node.FreeSpace() < rp.SameRackCount+1 { + if node.FreeSpace() < int64(rp.SameRackCount+1) { return fmt.Errorf("Free:%d < Expected:%d", node.FreeSpace(), rp.SameRackCount+1) } if len(node.Children()) < rp.SameRackCount+1 { @@ -153,7 +172,7 @@ func (vg *VolumeGrowth) findEmptySlotsForOneVolume(topo *Topology, option *Volum } //find main rack and other racks - mainServer, otherServers, serverErr := mainRack.(*Rack).RandomlyPickNodes(rp.SameRackCount+1, func(node Node) error { + mainServer, otherServers, serverErr := mainRack.(*Rack).PickNodesByWeight(rp.SameRackCount+1, func(node Node) error { if option.DataNode != "" && node.IsDataNode() && node.Id() != NodeId(option.DataNode) { return fmt.Errorf("Not matching preferred data node:%s", option.DataNode) } @@ -171,7 +190,7 @@ func (vg *VolumeGrowth) findEmptySlotsForOneVolume(topo *Topology, option *Volum servers = append(servers, server.(*DataNode)) } for _, rack := range otherRacks { - r := rand.Intn(rack.FreeSpace()) + r := rand.Int63n(rack.FreeSpace()) if server, e := rack.ReserveOneVolume(r); e == nil { servers = append(servers, server) } else { @@ -179,7 +198,7 @@ func (vg *VolumeGrowth) findEmptySlotsForOneVolume(topo *Topology, option *Volum } } for _, datacenter := range otherDataCenters { - r := rand.Intn(datacenter.FreeSpace()) + r := rand.Int63n(datacenter.FreeSpace()) if server, e := datacenter.ReserveOneVolume(r); e == nil { servers = append(servers, server) } else { @@ -189,16 +208,16 @@ func (vg *VolumeGrowth) findEmptySlotsForOneVolume(topo *Topology, option *Volum return } -func (vg *VolumeGrowth) grow(topo *Topology, vid storage.VolumeId, option *VolumeGrowOption, servers ...*DataNode) error { +func (vg *VolumeGrowth) grow(grpcDialOption grpc.DialOption, topo *Topology, vid needle.VolumeId, option *VolumeGrowOption, servers ...*DataNode) error { for _, server := range servers { - if err := AllocateVolume(server, vid, option); err == nil { + if err := AllocateVolume(server, grpcDialOption, vid, option); err == nil { vi := storage.VolumeInfo{ Id: vid, Size: 0, Collection: option.Collection, ReplicaPlacement: option.ReplicaPlacement, Ttl: option.Ttl, - Version: storage.CurrentVersion, + Version: needle.CurrentVersion, } server.AddOrUpdateVolume(vi) topo.RegisterVolumeLayout(vi, server) diff --git a/weed/topology/volume_growth_test.go b/weed/topology/volume_growth_test.go index f983df1ec..bc9083fd2 100644 --- a/weed/topology/volume_growth_test.go +++ b/weed/topology/volume_growth_test.go @@ -7,6 +7,8 @@ import ( "github.com/chrislusf/seaweedfs/weed/sequence" "github.com/chrislusf/seaweedfs/weed/storage" + "github.com/chrislusf/seaweedfs/weed/storage/needle" + "github.com/chrislusf/seaweedfs/weed/storage/super_block" ) var topologyLayout = ` @@ -79,7 +81,7 @@ func setup(topologyLayout string) *Topology { fmt.Println("data:", data) //need to connect all nodes first before server adding volumes - topo := NewTopology("weedfs", sequence.NewMemorySequencer(), 32*1024, 5) + topo := NewTopology("weedfs", sequence.NewMemorySequencer(), 32*1024, 5, false) mTopology := data.(map[string]interface{}) for dcKey, dcValue := range mTopology { dc := NewDataCenter(dcKey) @@ -96,12 +98,12 @@ func setup(topologyLayout string) *Topology { for _, v := range serverMap["volumes"].([]interface{}) { m := v.(map[string]interface{}) vi := storage.VolumeInfo{ - Id: storage.VolumeId(int64(m["id"].(float64))), + Id: needle.VolumeId(int64(m["id"].(float64))), Size: uint64(m["size"].(float64)), - Version: storage.CurrentVersion} + Version: needle.CurrentVersion} server.AddOrUpdateVolume(vi) } - server.UpAdjustMaxVolumeCountDelta(int(serverMap["limit"].(float64))) + server.UpAdjustMaxVolumeCountDelta(int64(serverMap["limit"].(float64))) } } } @@ -112,7 +114,7 @@ func setup(topologyLayout string) *Topology { func TestFindEmptySlotsForOneVolume(t *testing.T) { topo := setup(topologyLayout) vg := NewDefaultVolumeGrowth() - rp, _ := storage.NewReplicaPlacementFromString("002") + rp, _ := super_block.NewReplicaPlacementFromString("002") volumeGrowOption := &VolumeGrowOption{ Collection: "", ReplicaPlacement: rp, @@ -129,3 +131,212 @@ func TestFindEmptySlotsForOneVolume(t *testing.T) { fmt.Println("assigned node :", server.Id()) } } + +var topologyLayout2 = ` +{ + "dc1":{ + "rack1":{ + "server111":{ + "volumes":[ + {"id":1, "size":12312}, + {"id":2, "size":12312}, + {"id":3, "size":12312} + ], + "limit":300 + }, + "server112":{ + "volumes":[ + {"id":4, "size":12312}, + {"id":5, "size":12312}, + {"id":6, "size":12312} + ], + "limit":300 + }, + "server113":{ + "volumes":[], + "limit":300 + }, + "server114":{ + "volumes":[], + "limit":300 + }, + "server115":{ + "volumes":[], + "limit":300 + }, + "server116":{ + "volumes":[], + "limit":300 + } + }, + "rack2":{ + "server121":{ + "volumes":[ + {"id":4, "size":12312}, + {"id":5, "size":12312}, + {"id":6, "size":12312} + ], + "limit":300 + }, + "server122":{ + "volumes":[], + "limit":300 + }, + "server123":{ + "volumes":[ + {"id":2, "size":12312}, + {"id":3, "size":12312}, + {"id":4, "size":12312} + ], + "limit":300 + }, + "server124":{ + "volumes":[], + "limit":300 + }, + "server125":{ + "volumes":[], + "limit":300 + }, + "server126":{ + "volumes":[], + "limit":300 + } + }, + "rack3":{ + "server131":{ + "volumes":[], + "limit":300 + }, + "server132":{ + "volumes":[], + "limit":300 + }, + "server133":{ + "volumes":[], + "limit":300 + }, + "server134":{ + "volumes":[], + "limit":300 + }, + "server135":{ + "volumes":[], + "limit":300 + }, + "server136":{ + "volumes":[], + "limit":300 + } + } + } +} +` + +func TestReplication011(t *testing.T) { + topo := setup(topologyLayout2) + vg := NewDefaultVolumeGrowth() + rp, _ := super_block.NewReplicaPlacementFromString("011") + volumeGrowOption := &VolumeGrowOption{ + Collection: "MAIL", + ReplicaPlacement: rp, + DataCenter: "dc1", + Rack: "", + DataNode: "", + } + servers, err := vg.findEmptySlotsForOneVolume(topo, volumeGrowOption) + if err != nil { + fmt.Println("finding empty slots error :", err) + t.Fail() + } + for _, server := range servers { + fmt.Println("assigned node :", server.Id()) + } +} + +var topologyLayout3 = ` +{ + "dc1":{ + "rack1":{ + "server111":{ + "volumes":[], + "limit":2000 + } + } + }, + "dc2":{ + "rack2":{ + "server222":{ + "volumes":[], + "limit":2000 + } + } + }, + "dc3":{ + "rack3":{ + "server333":{ + "volumes":[], + "limit":1000 + } + } + }, + "dc4":{ + "rack4":{ + "server444":{ + "volumes":[], + "limit":1000 + } + } + }, + "dc5":{ + "rack5":{ + "server555":{ + "volumes":[], + "limit":500 + } + } + }, + "dc6":{ + "rack6":{ + "server666":{ + "volumes":[], + "limit":500 + } + } + } +} +` + +func TestFindEmptySlotsForOneVolumeScheduleByWeight(t *testing.T) { + topo := setup(topologyLayout3) + vg := NewDefaultVolumeGrowth() + rp, _ := super_block.NewReplicaPlacementFromString("100") + volumeGrowOption := &VolumeGrowOption{ + Collection: "Weight", + ReplicaPlacement: rp, + DataCenter: "", + Rack: "", + DataNode: "", + } + + distribution := map[NodeId]int{} + // assign 1000 volumes + for i := 0; i < 1000; i++ { + servers, err := vg.findEmptySlotsForOneVolume(topo, volumeGrowOption) + if err != nil { + fmt.Println("finding empty slots error :", err) + t.Fail() + } + for _, server := range servers { + // fmt.Println("assigned node :", server.Id()) + if _, ok := distribution[server.id]; !ok { + distribution[server.id] = 0 + } + distribution[server.id] += 1 + } + } + + for k, v := range distribution { + fmt.Printf("%s : %d\n", k, v) + } +} diff --git a/weed/topology/volume_layout.go b/weed/topology/volume_layout.go index 71a071e2f..9e84fd2da 100644 --- a/weed/topology/volume_layout.go +++ b/weed/topology/volume_layout.go @@ -9,17 +9,20 @@ import ( "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/storage" + "github.com/chrislusf/seaweedfs/weed/storage/needle" + "github.com/chrislusf/seaweedfs/weed/storage/super_block" ) // mapping from volume to its locations, inverted from server to volume type VolumeLayout struct { - rp *storage.ReplicaPlacement - ttl *storage.TTL - vid2location map[storage.VolumeId]*VolumeLocationList - writables []storage.VolumeId // transient array of writable volume id - readonlyVolumes map[storage.VolumeId]bool // transient set of readonly volumes - oversizedVolumes map[storage.VolumeId]bool // set of oversized volumes + rp *super_block.ReplicaPlacement + ttl *needle.TTL + vid2location map[needle.VolumeId]*VolumeLocationList + writables []needle.VolumeId // transient array of writable volume id + readonlyVolumes map[needle.VolumeId]bool // transient set of readonly volumes + oversizedVolumes map[needle.VolumeId]bool // set of oversized volumes volumeSizeLimit uint64 + replicationAsMin bool accessLock sync.RWMutex } @@ -29,15 +32,16 @@ type VolumeLayoutStats struct { FileCount uint64 } -func NewVolumeLayout(rp *storage.ReplicaPlacement, ttl *storage.TTL, volumeSizeLimit uint64) *VolumeLayout { +func NewVolumeLayout(rp *super_block.ReplicaPlacement, ttl *needle.TTL, volumeSizeLimit uint64, replicationAsMin bool) *VolumeLayout { return &VolumeLayout{ rp: rp, ttl: ttl, - vid2location: make(map[storage.VolumeId]*VolumeLocationList), - writables: *new([]storage.VolumeId), - readonlyVolumes: make(map[storage.VolumeId]bool), - oversizedVolumes: make(map[storage.VolumeId]bool), + vid2location: make(map[needle.VolumeId]*VolumeLocationList), + writables: *new([]needle.VolumeId), + readonlyVolumes: make(map[needle.VolumeId]bool), + oversizedVolumes: make(map[needle.VolumeId]bool), volumeSizeLimit: volumeSizeLimit, + replicationAsMin: replicationAsMin, } } @@ -49,6 +53,9 @@ func (vl *VolumeLayout) RegisterVolume(v *storage.VolumeInfo, dn *DataNode) { vl.accessLock.Lock() defer vl.accessLock.Unlock() + defer vl.ensureCorrectWritables(v) + defer vl.rememberOversizedVolume(v) + if _, ok := vl.vid2location[v.Id]; !ok { vl.vid2location[v.Id] = NewVolumeLocationList() } @@ -57,7 +64,7 @@ func (vl *VolumeLayout) RegisterVolume(v *storage.VolumeInfo, dn *DataNode) { for _, dn := range vl.vid2location[v.Id].list { if vInfo, err := dn.GetVolumesById(v.Id); err == nil { if vInfo.ReadOnly { - glog.V(3).Infof("vid %d removed from writable", v.Id) + glog.V(1).Infof("vid %d removed from writable", v.Id) vl.removeFromWritable(v.Id) vl.readonlyVolumes[v.Id] = true return @@ -65,23 +72,16 @@ func (vl *VolumeLayout) RegisterVolume(v *storage.VolumeInfo, dn *DataNode) { delete(vl.readonlyVolumes, v.Id) } } else { - glog.V(3).Infof("vid %d removed from writable", v.Id) + glog.V(1).Infof("vid %d removed from writable", v.Id) vl.removeFromWritable(v.Id) delete(vl.readonlyVolumes, v.Id) return } } - if vl.vid2location[v.Id].Length() == vl.rp.GetCopyCount() && vl.isWritable(v) { - if _, ok := vl.oversizedVolumes[v.Id]; !ok { - vl.addToWritable(v.Id) - } - } else { - vl.rememberOversizedVolumne(v) - vl.removeFromWritable(v.Id) - } + } -func (vl *VolumeLayout) rememberOversizedVolumne(v *storage.VolumeInfo) { +func (vl *VolumeLayout) rememberOversizedVolume(v *storage.VolumeInfo) { if vl.isOversized(v) { vl.oversizedVolumes[v.Id] = true } @@ -91,17 +91,31 @@ func (vl *VolumeLayout) UnRegisterVolume(v *storage.VolumeInfo, dn *DataNode) { vl.accessLock.Lock() defer vl.accessLock.Unlock() - vl.removeFromWritable(v.Id) - delete(vl.vid2location, v.Id) + // remove from vid2location map + location, ok := vl.vid2location[v.Id] + if !ok { + return + } + + if location.Remove(dn) { + + vl.ensureCorrectWritables(v) + + if location.Length() == 0 { + delete(vl.vid2location, v.Id) + } + + } } -func (vl *VolumeLayout) addToWritable(vid storage.VolumeId) { - for _, id := range vl.writables { - if vid == id { - return +func (vl *VolumeLayout) ensureCorrectWritables(v *storage.VolumeInfo) { + if vl.enoughCopies(v.Id) && vl.isWritable(v) { + if _, ok := vl.oversizedVolumes[v.Id]; !ok { + vl.setVolumeWritable(v.Id) } + } else { + vl.removeFromWritable(v.Id) } - vl.writables = append(vl.writables, vid) } func (vl *VolumeLayout) isOversized(v *storage.VolumeInfo) bool { @@ -110,7 +124,7 @@ func (vl *VolumeLayout) isOversized(v *storage.VolumeInfo) bool { func (vl *VolumeLayout) isWritable(v *storage.VolumeInfo) bool { return !vl.isOversized(v) && - v.Version == storage.CurrentVersion && + v.Version == needle.CurrentVersion && !v.ReadOnly } @@ -121,7 +135,7 @@ func (vl *VolumeLayout) isEmpty() bool { return len(vl.vid2location) == 0 } -func (vl *VolumeLayout) Lookup(vid storage.VolumeId) []*DataNode { +func (vl *VolumeLayout) Lookup(vid needle.VolumeId) []*DataNode { vl.accessLock.RLock() defer vl.accessLock.RUnlock() @@ -141,7 +155,7 @@ func (vl *VolumeLayout) ListVolumeServers() (nodes []*DataNode) { return } -func (vl *VolumeLayout) PickForWrite(count uint64, option *VolumeGrowOption) (*storage.VolumeId, uint64, *VolumeLocationList, error) { +func (vl *VolumeLayout) PickForWrite(count uint64, option *VolumeGrowOption) (*needle.VolumeId, uint64, *VolumeLocationList, error) { vl.accessLock.RLock() defer vl.accessLock.RUnlock() @@ -158,7 +172,7 @@ func (vl *VolumeLayout) PickForWrite(count uint64, option *VolumeGrowOption) (*s } return nil, 0, nil, errors.New("Strangely vid " + vid.String() + " is on no machine!") } - var vid storage.VolumeId + var vid needle.VolumeId var locationList *VolumeLocationList counter := 0 for _, v := range vl.writables { @@ -205,7 +219,7 @@ func (vl *VolumeLayout) GetActiveVolumeCount(option *VolumeGrowOption) int { return counter } -func (vl *VolumeLayout) removeFromWritable(vid storage.VolumeId) bool { +func (vl *VolumeLayout) removeFromWritable(vid needle.VolumeId) bool { toDeleteIndex := -1 for k, id := range vl.writables { if id == vid { @@ -220,7 +234,7 @@ func (vl *VolumeLayout) removeFromWritable(vid storage.VolumeId) bool { } return false } -func (vl *VolumeLayout) setVolumeWritable(vid storage.VolumeId) bool { +func (vl *VolumeLayout) setVolumeWritable(vid needle.VolumeId) bool { for _, v := range vl.writables { if v == vid { return false @@ -231,7 +245,7 @@ func (vl *VolumeLayout) setVolumeWritable(vid storage.VolumeId) bool { return true } -func (vl *VolumeLayout) SetVolumeUnavailable(dn *DataNode, vid storage.VolumeId) bool { +func (vl *VolumeLayout) SetVolumeUnavailable(dn *DataNode, vid needle.VolumeId) bool { vl.accessLock.Lock() defer vl.accessLock.Unlock() @@ -245,18 +259,34 @@ func (vl *VolumeLayout) SetVolumeUnavailable(dn *DataNode, vid storage.VolumeId) } return false } -func (vl *VolumeLayout) SetVolumeAvailable(dn *DataNode, vid storage.VolumeId) bool { +func (vl *VolumeLayout) SetVolumeAvailable(dn *DataNode, vid needle.VolumeId, isReadOnly bool) bool { vl.accessLock.Lock() defer vl.accessLock.Unlock() + vInfo, err := dn.GetVolumesById(vid) + if err != nil { + return false + } + vl.vid2location[vid].Set(dn) - if vl.vid2location[vid].Length() >= vl.rp.GetCopyCount() { + + if vInfo.ReadOnly || isReadOnly { + return false + } + + if vl.enoughCopies(vid) { return vl.setVolumeWritable(vid) } return false } -func (vl *VolumeLayout) SetVolumeCapacityFull(vid storage.VolumeId) bool { +func (vl *VolumeLayout) enoughCopies(vid needle.VolumeId) bool { + locations := vl.vid2location[vid].Length() + desired := vl.rp.GetCopyCount() + return locations == desired || (vl.replicationAsMin && locations > desired) +} + +func (vl *VolumeLayout) SetVolumeCapacityFull(vid needle.VolumeId) bool { vl.accessLock.Lock() defer vl.accessLock.Unlock() diff --git a/weed/topology/volume_location_list.go b/weed/topology/volume_location_list.go index 8d5881333..8905c54b5 100644 --- a/weed/topology/volume_location_list.go +++ b/weed/topology/volume_location_list.go @@ -3,7 +3,7 @@ package topology import ( "fmt" - "github.com/chrislusf/seaweedfs/weed/storage" + "github.com/chrislusf/seaweedfs/weed/storage/needle" ) type VolumeLocationList struct { @@ -66,7 +66,7 @@ func (dnll *VolumeLocationList) Refresh(freshThreshHold int64) { } } -func (dnll *VolumeLocationList) Stats(vid storage.VolumeId, freshThreshHold int64) (size uint64, fileCount int) { +func (dnll *VolumeLocationList) Stats(vid needle.VolumeId, freshThreshHold int64) (size uint64, fileCount int) { for _, dnl := range dnll.list { if dnl.LastSeen < freshThreshHold { vinfo, err := dnl.GetVolumesById(vid) |
