diff options
| author | bingoohuang <bingoo.huang@gmail.com> | 2019-07-16 11:13:23 +0800 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2019-07-16 11:13:23 +0800 |
| commit | d19bbee98d89ec6cd603572bd9c5d55749610e61 (patch) | |
| tree | 8d760dcee4dfcb4404af90b7d5e64def4549b4cc /weed/topology | |
| parent | 01060c992591f412b0d5e180bde29991747a9462 (diff) | |
| parent | 5b5e443d5b9985fd77f3d5470f1d5885a88bf2b9 (diff) | |
| download | seaweedfs-d19bbee98d89ec6cd603572bd9c5d55749610e61.tar.xz seaweedfs-d19bbee98d89ec6cd603572bd9c5d55749610e61.zip | |
keep update from original (#1)
keep update from original
Diffstat (limited to 'weed/topology')
| -rw-r--r-- | weed/topology/allocate_volume.go | 14 | ||||
| -rw-r--r-- | weed/topology/cluster_commands.go | 6 | ||||
| -rw-r--r-- | weed/topology/collection.go | 5 | ||||
| -rw-r--r-- | weed/topology/data_center.go | 17 | ||||
| -rw-r--r-- | weed/topology/data_node.go | 57 | ||||
| -rw-r--r-- | weed/topology/data_node_ec.go | 135 | ||||
| -rw-r--r-- | weed/topology/node.go | 75 | ||||
| -rw-r--r-- | weed/topology/rack.go | 18 | ||||
| -rw-r--r-- | weed/topology/store_replicate.go | 38 | ||||
| -rw-r--r-- | weed/topology/topology.go | 96 | ||||
| -rw-r--r-- | weed/topology/topology_ec.go | 173 | ||||
| -rw-r--r-- | weed/topology/topology_event_handling.go | 5 | ||||
| -rw-r--r-- | weed/topology/topology_map.go | 18 | ||||
| -rw-r--r-- | weed/topology/topology_test.go | 71 | ||||
| -rw-r--r-- | weed/topology/topology_vacuum.go | 57 | ||||
| -rw-r--r-- | weed/topology/volume_growth.go | 36 | ||||
| -rw-r--r-- | weed/topology/volume_growth_test.go | 7 | ||||
| -rw-r--r-- | weed/topology/volume_layout.go | 88 | ||||
| -rw-r--r-- | weed/topology/volume_location_list.go | 4 |
19 files changed, 744 insertions, 176 deletions
diff --git a/weed/topology/allocate_volume.go b/weed/topology/allocate_volume.go index 55796ab43..48336092f 100644 --- a/weed/topology/allocate_volume.go +++ b/weed/topology/allocate_volume.go @@ -2,25 +2,23 @@ package topology import ( "context" - "time" "github.com/chrislusf/seaweedfs/weed/operation" "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb" - "github.com/chrislusf/seaweedfs/weed/storage" + "github.com/chrislusf/seaweedfs/weed/storage/needle" + "google.golang.org/grpc" ) type AllocateVolumeResult struct { Error string } -func AllocateVolume(dn *DataNode, vid storage.VolumeId, option *VolumeGrowOption) error { +func AllocateVolume(dn *DataNode, grpcDialOption grpc.DialOption, vid needle.VolumeId, option *VolumeGrowOption) error { - return operation.WithVolumeServerClient(dn.Url(), func(client volume_server_pb.VolumeServerClient) error { - ctx, cancel := context.WithTimeout(context.Background(), time.Duration(5*time.Second)) - defer cancel() + return operation.WithVolumeServerClient(dn.Url(), grpcDialOption, func(client volume_server_pb.VolumeServerClient) error { - _, deleteErr := client.AssignVolume(ctx, &volume_server_pb.AssignVolumeRequest{ - VolumdId: uint32(vid), + _, deleteErr := client.AllocateVolume(context.Background(), &volume_server_pb.AllocateVolumeRequest{ + VolumeId: uint32(vid), Collection: option.Collection, Replication: option.ReplicaPlacement.String(), Ttl: option.Ttl.String(), diff --git a/weed/topology/cluster_commands.go b/weed/topology/cluster_commands.go index 7a36c25ec..152691ccb 100644 --- a/weed/topology/cluster_commands.go +++ b/weed/topology/cluster_commands.go @@ -3,14 +3,14 @@ package topology import ( "github.com/chrislusf/raft" "github.com/chrislusf/seaweedfs/weed/glog" - "github.com/chrislusf/seaweedfs/weed/storage" + "github.com/chrislusf/seaweedfs/weed/storage/needle" ) type MaxVolumeIdCommand struct { - MaxVolumeId storage.VolumeId `json:"maxVolumeId"` + MaxVolumeId needle.VolumeId `json:"maxVolumeId"` } -func NewMaxVolumeIdCommand(value storage.VolumeId) *MaxVolumeIdCommand { +func NewMaxVolumeIdCommand(value needle.VolumeId) *MaxVolumeIdCommand { return &MaxVolumeIdCommand{ MaxVolumeId: value, } diff --git a/weed/topology/collection.go b/weed/topology/collection.go index a17f0c961..f6b728ec9 100644 --- a/weed/topology/collection.go +++ b/weed/topology/collection.go @@ -4,6 +4,7 @@ import ( "fmt" "github.com/chrislusf/seaweedfs/weed/storage" + "github.com/chrislusf/seaweedfs/weed/storage/needle" "github.com/chrislusf/seaweedfs/weed/util" ) @@ -23,7 +24,7 @@ func (c *Collection) String() string { return fmt.Sprintf("Name:%s, volumeSizeLimit:%d, storageType2VolumeLayout:%v", c.Name, c.volumeSizeLimit, c.storageType2VolumeLayout) } -func (c *Collection) GetOrCreateVolumeLayout(rp *storage.ReplicaPlacement, ttl *storage.TTL) *VolumeLayout { +func (c *Collection) GetOrCreateVolumeLayout(rp *storage.ReplicaPlacement, ttl *needle.TTL) *VolumeLayout { keyString := rp.String() if ttl != nil { keyString += ttl.String() @@ -34,7 +35,7 @@ func (c *Collection) GetOrCreateVolumeLayout(rp *storage.ReplicaPlacement, ttl * return vl.(*VolumeLayout) } -func (c *Collection) Lookup(vid storage.VolumeId) []*DataNode { +func (c *Collection) Lookup(vid needle.VolumeId) []*DataNode { for _, vl := range c.storageType2VolumeLayout.Items() { if vl != nil { if list := vl.(*VolumeLayout).Lookup(vid); list != nil { diff --git a/weed/topology/data_center.go b/weed/topology/data_center.go index bcf2dfd31..640cb1937 100644 --- a/weed/topology/data_center.go +++ b/weed/topology/data_center.go @@ -1,5 +1,7 @@ package topology +import "github.com/chrislusf/seaweedfs/weed/pb/master_pb" + type DataCenter struct { NodeImpl } @@ -38,3 +40,18 @@ func (dc *DataCenter) ToMap() interface{} { m["Racks"] = racks return m } + +func (dc *DataCenter) ToDataCenterInfo() *master_pb.DataCenterInfo { + m := &master_pb.DataCenterInfo{ + Id: string(dc.Id()), + VolumeCount: uint64(dc.GetVolumeCount()), + MaxVolumeCount: uint64(dc.GetMaxVolumeCount()), + FreeVolumeCount: uint64(dc.FreeSpace()), + ActiveVolumeCount: uint64(dc.GetActiveVolumeCount()), + } + for _, c := range dc.Children() { + rack := c.(*Rack) + m.RackInfos = append(m.RackInfos, rack.ToRackInfo()) + } + return m +} diff --git a/weed/topology/data_node.go b/weed/topology/data_node.go index 6ea6d3938..3e72ccdbf 100644 --- a/weed/topology/data_node.go +++ b/weed/topology/data_node.go @@ -2,6 +2,12 @@ package topology import ( "fmt" + "sync" + + "github.com/chrislusf/seaweedfs/weed/pb/master_pb" + "github.com/chrislusf/seaweedfs/weed/storage/erasure_coding" + "github.com/chrislusf/seaweedfs/weed/storage/needle" + "strconv" "github.com/chrislusf/seaweedfs/weed/glog" @@ -10,18 +16,21 @@ import ( type DataNode struct { NodeImpl - volumes map[storage.VolumeId]storage.VolumeInfo - Ip string - Port int - PublicUrl string - LastSeen int64 // unix time in seconds + volumes map[needle.VolumeId]storage.VolumeInfo + Ip string + Port int + PublicUrl string + LastSeen int64 // unix time in seconds + ecShards map[needle.VolumeId]*erasure_coding.EcVolumeInfo + ecShardsLock sync.RWMutex } func NewDataNode(id string) *DataNode { s := &DataNode{} s.id = NodeId(id) s.nodeType = "DataNode" - s.volumes = make(map[storage.VolumeId]storage.VolumeInfo) + s.volumes = make(map[needle.VolumeId]storage.VolumeInfo) + s.ecShards = make(map[needle.VolumeId]*erasure_coding.EcVolumeInfo) s.NodeImpl.value = s return s } @@ -50,7 +59,7 @@ func (dn *DataNode) AddOrUpdateVolume(v storage.VolumeInfo) (isNew bool) { } func (dn *DataNode) UpdateVolumes(actualVolumes []storage.VolumeInfo) (newVolumes, deletedVolumes []storage.VolumeInfo) { - actualVolumeMap := make(map[storage.VolumeId]storage.VolumeInfo) + actualVolumeMap := make(map[needle.VolumeId]storage.VolumeInfo) for _, v := range actualVolumes { actualVolumeMap[v.Id] = v } @@ -74,6 +83,20 @@ func (dn *DataNode) UpdateVolumes(actualVolumes []storage.VolumeInfo) (newVolume return } +func (dn *DataNode) DeltaUpdateVolumes(newlVolumes, deletedVolumes []storage.VolumeInfo) { + dn.Lock() + for _, v := range deletedVolumes { + delete(dn.volumes, v.Id) + dn.UpAdjustVolumeCountDelta(-1) + dn.UpAdjustActiveVolumeCountDelta(-1) + } + dn.Unlock() + for _, v := range newlVolumes { + dn.AddOrUpdateVolume(v) + } + return +} + func (dn *DataNode) GetVolumes() (ret []storage.VolumeInfo) { dn.RLock() for _, v := range dn.volumes { @@ -83,7 +106,7 @@ func (dn *DataNode) GetVolumes() (ret []storage.VolumeInfo) { return ret } -func (dn *DataNode) GetVolumesById(id storage.VolumeId) (storage.VolumeInfo, error) { +func (dn *DataNode) GetVolumesById(id needle.VolumeId) (storage.VolumeInfo, error) { dn.RLock() defer dn.RUnlock() vInfo, ok := dn.volumes[id] @@ -123,8 +146,26 @@ func (dn *DataNode) ToMap() interface{} { ret := make(map[string]interface{}) ret["Url"] = dn.Url() ret["Volumes"] = dn.GetVolumeCount() + ret["EcShards"] = dn.GetEcShardCount() ret["Max"] = dn.GetMaxVolumeCount() ret["Free"] = dn.FreeSpace() ret["PublicUrl"] = dn.PublicUrl return ret } + +func (dn *DataNode) ToDataNodeInfo() *master_pb.DataNodeInfo { + m := &master_pb.DataNodeInfo{ + Id: string(dn.Id()), + VolumeCount: uint64(dn.GetVolumeCount()), + MaxVolumeCount: uint64(dn.GetMaxVolumeCount()), + FreeVolumeCount: uint64(dn.FreeSpace()), + ActiveVolumeCount: uint64(dn.GetActiveVolumeCount()), + } + for _, v := range dn.GetVolumes() { + m.VolumeInfos = append(m.VolumeInfos, v.ToVolumeInformationMessage()) + } + for _, ecv := range dn.GetEcShards() { + m.EcShardInfos = append(m.EcShardInfos, ecv.ToVolumeEcShardInformationMessage()) + } + return m +} diff --git a/weed/topology/data_node_ec.go b/weed/topology/data_node_ec.go new file mode 100644 index 000000000..75c8784fe --- /dev/null +++ b/weed/topology/data_node_ec.go @@ -0,0 +1,135 @@ +package topology + +import ( + "github.com/chrislusf/seaweedfs/weed/storage/erasure_coding" + "github.com/chrislusf/seaweedfs/weed/storage/needle" +) + +func (dn *DataNode) GetEcShards() (ret []*erasure_coding.EcVolumeInfo) { + dn.RLock() + for _, ecVolumeInfo := range dn.ecShards { + ret = append(ret, ecVolumeInfo) + } + dn.RUnlock() + return ret +} + +func (dn *DataNode) UpdateEcShards(actualShards []*erasure_coding.EcVolumeInfo) (newShards, deletedShards []*erasure_coding.EcVolumeInfo) { + // prepare the new ec shard map + actualEcShardMap := make(map[needle.VolumeId]*erasure_coding.EcVolumeInfo) + for _, ecShards := range actualShards { + actualEcShardMap[ecShards.VolumeId] = ecShards + } + + // found out the newShards and deletedShards + var newShardCount, deletedShardCount int + dn.ecShardsLock.RLock() + for vid, ecShards := range dn.ecShards { + if actualEcShards, ok := actualEcShardMap[vid]; !ok { + // dn registered ec shards not found in the new set of ec shards + deletedShards = append(deletedShards, ecShards) + deletedShardCount += ecShards.ShardIdCount() + } else { + // found, but maybe the actual shard could be missing + a := actualEcShards.Minus(ecShards) + if a.ShardIdCount() > 0 { + newShards = append(newShards, a) + newShardCount += a.ShardIdCount() + } + d := ecShards.Minus(actualEcShards) + if d.ShardIdCount() > 0 { + deletedShards = append(deletedShards, d) + deletedShardCount += d.ShardIdCount() + } + } + } + for _, ecShards := range actualShards { + if _, found := dn.ecShards[ecShards.VolumeId]; !found { + newShards = append(newShards, ecShards) + newShardCount += ecShards.ShardIdCount() + } + } + dn.ecShardsLock.RUnlock() + + if len(newShards) > 0 || len(deletedShards) > 0 { + // if changed, set to the new ec shard map + dn.ecShardsLock.Lock() + dn.ecShards = actualEcShardMap + dn.UpAdjustEcShardCountDelta(int64(newShardCount - deletedShardCount)) + dn.ecShardsLock.Unlock() + } + + return +} + +func (dn *DataNode) DeltaUpdateEcShards(newShards, deletedShards []*erasure_coding.EcVolumeInfo) { + + for _, newShard := range newShards { + dn.AddOrUpdateEcShard(newShard) + } + + for _, deletedShard := range deletedShards { + dn.DeleteEcShard(deletedShard) + } + +} + +func (dn *DataNode) AddOrUpdateEcShard(s *erasure_coding.EcVolumeInfo) { + dn.ecShardsLock.Lock() + defer dn.ecShardsLock.Unlock() + + delta := 0 + if existing, ok := dn.ecShards[s.VolumeId]; !ok { + dn.ecShards[s.VolumeId] = s + delta = s.ShardBits.ShardIdCount() + } else { + oldCount := existing.ShardBits.ShardIdCount() + existing.ShardBits = existing.ShardBits.Plus(s.ShardBits) + delta = existing.ShardBits.ShardIdCount() - oldCount + } + + dn.UpAdjustEcShardCountDelta(int64(delta)) + +} + +func (dn *DataNode) DeleteEcShard(s *erasure_coding.EcVolumeInfo) { + dn.ecShardsLock.Lock() + defer dn.ecShardsLock.Unlock() + + if existing, ok := dn.ecShards[s.VolumeId]; ok { + oldCount := existing.ShardBits.ShardIdCount() + existing.ShardBits = existing.ShardBits.Minus(s.ShardBits) + delta := existing.ShardBits.ShardIdCount() - oldCount + dn.UpAdjustEcShardCountDelta(int64(delta)) + if existing.ShardBits.ShardIdCount() == 0 { + delete(dn.ecShards, s.VolumeId) + } + } + +} + +func (dn *DataNode) HasVolumesById(id needle.VolumeId) (hasVolumeId bool) { + + // check whether normal volumes has this volume id + dn.RLock() + _, ok := dn.volumes[id] + if ok { + hasVolumeId = true + } + dn.RUnlock() + + if hasVolumeId { + return + } + + // check whether ec shards has this volume id + dn.ecShardsLock.RLock() + _, ok = dn.ecShards[id] + if ok { + hasVolumeId = true + } + dn.ecShardsLock.RUnlock() + + return + +} diff --git a/weed/topology/node.go b/weed/topology/node.go index b7d2f79ec..b2808f589 100644 --- a/weed/topology/node.go +++ b/weed/topology/node.go @@ -5,26 +5,30 @@ import ( "math/rand" "strings" "sync" + "sync/atomic" "github.com/chrislusf/seaweedfs/weed/glog" - "github.com/chrislusf/seaweedfs/weed/storage" + "github.com/chrislusf/seaweedfs/weed/storage/erasure_coding" + "github.com/chrislusf/seaweedfs/weed/storage/needle" ) type NodeId string type Node interface { Id() NodeId String() string - FreeSpace() int - ReserveOneVolume(r int) (*DataNode, error) - UpAdjustMaxVolumeCountDelta(maxVolumeCountDelta int) - UpAdjustVolumeCountDelta(volumeCountDelta int) - UpAdjustActiveVolumeCountDelta(activeVolumeCountDelta int) - UpAdjustMaxVolumeId(vid storage.VolumeId) + FreeSpace() int64 + ReserveOneVolume(r int64) (*DataNode, error) + UpAdjustMaxVolumeCountDelta(maxVolumeCountDelta int64) + UpAdjustVolumeCountDelta(volumeCountDelta int64) + UpAdjustEcShardCountDelta(ecShardCountDelta int64) + UpAdjustActiveVolumeCountDelta(activeVolumeCountDelta int64) + UpAdjustMaxVolumeId(vid needle.VolumeId) - GetVolumeCount() int - GetActiveVolumeCount() int - GetMaxVolumeCount() int - GetMaxVolumeId() storage.VolumeId + GetVolumeCount() int64 + GetEcShardCount() int64 + GetActiveVolumeCount() int64 + GetMaxVolumeCount() int64 + GetMaxVolumeId() needle.VolumeId SetParent(Node) LinkChildNode(node Node) UnlinkChildNode(nodeId NodeId) @@ -39,14 +43,15 @@ type Node interface { GetValue() interface{} //get reference to the topology,dc,rack,datanode } type NodeImpl struct { + volumeCount int64 + activeVolumeCount int64 + ecShardCount int64 + maxVolumeCount int64 id NodeId - volumeCount int - activeVolumeCount int - maxVolumeCount int parent Node sync.RWMutex // lock children children map[NodeId]Node - maxVolumeId storage.VolumeId + maxVolumeId needle.VolumeId //for rack, data center, topology nodeType string @@ -126,7 +131,10 @@ func (n *NodeImpl) String() string { func (n *NodeImpl) Id() NodeId { return n.id } -func (n *NodeImpl) FreeSpace() int { +func (n *NodeImpl) FreeSpace() int64 { + if n.ecShardCount > 0 { + return n.maxVolumeCount - n.volumeCount - n.ecShardCount/erasure_coding.DataShardsCount - 1 + } return n.maxVolumeCount - n.volumeCount } func (n *NodeImpl) SetParent(node Node) { @@ -146,7 +154,7 @@ func (n *NodeImpl) Parent() Node { func (n *NodeImpl) GetValue() interface{} { return n.value } -func (n *NodeImpl) ReserveOneVolume(r int) (assignedNode *DataNode, err error) { +func (n *NodeImpl) ReserveOneVolume(r int64) (assignedNode *DataNode, err error) { n.RLock() defer n.RUnlock() for _, node := range n.children { @@ -171,25 +179,31 @@ func (n *NodeImpl) ReserveOneVolume(r int) (assignedNode *DataNode, err error) { return nil, errors.New("No free volume slot found!") } -func (n *NodeImpl) UpAdjustMaxVolumeCountDelta(maxVolumeCountDelta int) { //can be negative - n.maxVolumeCount += maxVolumeCountDelta +func (n *NodeImpl) UpAdjustMaxVolumeCountDelta(maxVolumeCountDelta int64) { //can be negative + atomic.AddInt64(&n.maxVolumeCount, maxVolumeCountDelta) if n.parent != nil { n.parent.UpAdjustMaxVolumeCountDelta(maxVolumeCountDelta) } } -func (n *NodeImpl) UpAdjustVolumeCountDelta(volumeCountDelta int) { //can be negative - n.volumeCount += volumeCountDelta +func (n *NodeImpl) UpAdjustVolumeCountDelta(volumeCountDelta int64) { //can be negative + atomic.AddInt64(&n.volumeCount, volumeCountDelta) if n.parent != nil { n.parent.UpAdjustVolumeCountDelta(volumeCountDelta) } } -func (n *NodeImpl) UpAdjustActiveVolumeCountDelta(activeVolumeCountDelta int) { //can be negative - n.activeVolumeCount += activeVolumeCountDelta +func (n *NodeImpl) UpAdjustEcShardCountDelta(ecShardCountDelta int64) { //can be negative + atomic.AddInt64(&n.ecShardCount, ecShardCountDelta) + if n.parent != nil { + n.parent.UpAdjustEcShardCountDelta(ecShardCountDelta) + } +} +func (n *NodeImpl) UpAdjustActiveVolumeCountDelta(activeVolumeCountDelta int64) { //can be negative + atomic.AddInt64(&n.activeVolumeCount, activeVolumeCountDelta) if n.parent != nil { n.parent.UpAdjustActiveVolumeCountDelta(activeVolumeCountDelta) } } -func (n *NodeImpl) UpAdjustMaxVolumeId(vid storage.VolumeId) { //can be negative +func (n *NodeImpl) UpAdjustMaxVolumeId(vid needle.VolumeId) { //can be negative if n.maxVolumeId < vid { n.maxVolumeId = vid if n.parent != nil { @@ -197,16 +211,19 @@ func (n *NodeImpl) UpAdjustMaxVolumeId(vid storage.VolumeId) { //can be negative } } } -func (n *NodeImpl) GetMaxVolumeId() storage.VolumeId { +func (n *NodeImpl) GetMaxVolumeId() needle.VolumeId { return n.maxVolumeId } -func (n *NodeImpl) GetVolumeCount() int { +func (n *NodeImpl) GetVolumeCount() int64 { return n.volumeCount } -func (n *NodeImpl) GetActiveVolumeCount() int { +func (n *NodeImpl) GetEcShardCount() int64 { + return n.ecShardCount +} +func (n *NodeImpl) GetActiveVolumeCount() int64 { return n.activeVolumeCount } -func (n *NodeImpl) GetMaxVolumeCount() int { +func (n *NodeImpl) GetMaxVolumeCount() int64 { return n.maxVolumeCount } @@ -218,6 +235,7 @@ func (n *NodeImpl) LinkChildNode(node Node) { n.UpAdjustMaxVolumeCountDelta(node.GetMaxVolumeCount()) n.UpAdjustMaxVolumeId(node.GetMaxVolumeId()) n.UpAdjustVolumeCountDelta(node.GetVolumeCount()) + n.UpAdjustEcShardCountDelta(node.GetEcShardCount()) n.UpAdjustActiveVolumeCountDelta(node.GetActiveVolumeCount()) node.SetParent(n) glog.V(0).Infoln(n, "adds child", node.Id()) @@ -232,6 +250,7 @@ func (n *NodeImpl) UnlinkChildNode(nodeId NodeId) { node.SetParent(nil) delete(n.children, node.Id()) n.UpAdjustVolumeCountDelta(-node.GetVolumeCount()) + n.UpAdjustEcShardCountDelta(-node.GetEcShardCount()) n.UpAdjustActiveVolumeCountDelta(-node.GetActiveVolumeCount()) n.UpAdjustMaxVolumeCountDelta(-node.GetMaxVolumeCount()) glog.V(0).Infoln(n, "removes", node.Id()) diff --git a/weed/topology/rack.go b/weed/topology/rack.go index a48d64323..932c1a804 100644 --- a/weed/topology/rack.go +++ b/weed/topology/rack.go @@ -1,6 +1,7 @@ package topology import ( + "github.com/chrislusf/seaweedfs/weed/pb/master_pb" "strconv" "time" ) @@ -27,7 +28,7 @@ func (r *Rack) FindDataNode(ip string, port int) *DataNode { } return nil } -func (r *Rack) GetOrCreateDataNode(ip string, port int, publicUrl string, maxVolumeCount int) *DataNode { +func (r *Rack) GetOrCreateDataNode(ip string, port int, publicUrl string, maxVolumeCount int64) *DataNode { for _, c := range r.Children() { dn := c.(*DataNode) if dn.MatchLocation(ip, port) { @@ -58,3 +59,18 @@ func (r *Rack) ToMap() interface{} { m["DataNodes"] = dns return m } + +func (r *Rack) ToRackInfo() *master_pb.RackInfo { + m := &master_pb.RackInfo{ + Id: string(r.Id()), + VolumeCount: uint64(r.GetVolumeCount()), + MaxVolumeCount: uint64(r.GetMaxVolumeCount()), + FreeVolumeCount: uint64(r.FreeSpace()), + ActiveVolumeCount: uint64(r.GetActiveVolumeCount()), + } + for _, c := range r.Children() { + dn := c.(*DataNode) + m.DataNodeInfos = append(m.DataNodeInfos, dn.ToDataNodeInfo()) + } + return m +} diff --git a/weed/topology/store_replicate.go b/weed/topology/store_replicate.go index c73fb706a..d4076d548 100644 --- a/weed/topology/store_replicate.go +++ b/weed/topology/store_replicate.go @@ -14,24 +14,24 @@ import ( "github.com/chrislusf/seaweedfs/weed/operation" "github.com/chrislusf/seaweedfs/weed/security" "github.com/chrislusf/seaweedfs/weed/storage" + "github.com/chrislusf/seaweedfs/weed/storage/needle" "github.com/chrislusf/seaweedfs/weed/util" ) func ReplicatedWrite(masterNode string, s *storage.Store, - volumeId storage.VolumeId, needle *storage.Needle, - r *http.Request) (size uint32, errorStatus string) { + volumeId needle.VolumeId, n *needle.Needle, + r *http.Request) (size uint32, isUnchanged bool, err error) { //check JWT jwt := security.GetJwt(r) - ret, err := s.Write(volumeId, needle) - needToReplicate := !s.HasVolume(volumeId) + size, isUnchanged, err = s.Write(volumeId, n) if err != nil { - errorStatus = "Failed to write to local disk (" + err.Error() + ")" - size = ret + err = fmt.Errorf("failed to write to local disk: %v", err) return } + needToReplicate := !s.HasVolume(volumeId) needToReplicate = needToReplicate || s.GetVolume(volumeId).NeedToReplicate() if !needToReplicate { needToReplicate = s.GetVolume(volumeId).NeedToReplicate() @@ -47,43 +47,43 @@ func ReplicatedWrite(masterNode string, s *storage.Store, } q := url.Values{ "type": {"replicate"}, + "ttl": {n.Ttl.String()}, } - if needle.LastModified > 0 { - q.Set("ts", strconv.FormatUint(needle.LastModified, 10)) + if n.LastModified > 0 { + q.Set("ts", strconv.FormatUint(n.LastModified, 10)) } - if needle.IsChunkedManifest() { + if n.IsChunkedManifest() { q.Set("cm", "true") } u.RawQuery = q.Encode() pairMap := make(map[string]string) - if needle.HasPairs() { + if n.HasPairs() { tmpMap := make(map[string]string) - err := json.Unmarshal(needle.Pairs, &tmpMap) + err := json.Unmarshal(n.Pairs, &tmpMap) if err != nil { glog.V(0).Infoln("Unmarshal pairs error:", err) } for k, v := range tmpMap { - pairMap[storage.PairNamePrefix+k] = v + pairMap[needle.PairNamePrefix+k] = v } } _, err := operation.Upload(u.String(), - string(needle.Name), bytes.NewReader(needle.Data), needle.IsGzipped(), string(needle.Mime), + string(n.Name), bytes.NewReader(n.Data), n.IsGzipped(), string(n.Mime), pairMap, jwt) return err }); err != nil { - ret = 0 - errorStatus = fmt.Sprintf("Failed to write to replicas for volume %d: %v", volumeId, err) + size = 0 + err = fmt.Errorf("failed to write to replicas for volume %d: %v", volumeId, err) } } } - size = ret return } func ReplicatedDelete(masterNode string, store *storage.Store, - volumeId storage.VolumeId, n *storage.Needle, + volumeId needle.VolumeId, n *needle.Needle, r *http.Request) (uint32, error) { //check JWT @@ -102,7 +102,7 @@ func ReplicatedDelete(masterNode string, store *storage.Store, if needToReplicate { //send to other replica locations if r.FormValue("type") != "replicate" { if err = distributedOperation(masterNode, store, volumeId, func(location operation.Location) error { - return util.Delete("http://"+location.Url+r.URL.Path+"?type=replicate", jwt) + return util.Delete("http://"+location.Url+r.URL.Path+"?type=replicate", string(jwt)) }); err != nil { ret = 0 } @@ -131,7 +131,7 @@ type RemoteResult struct { Error error } -func distributedOperation(masterNode string, store *storage.Store, volumeId storage.VolumeId, op func(location operation.Location) error) error { +func distributedOperation(masterNode string, store *storage.Store, volumeId needle.VolumeId, op func(location operation.Location) error) error { if lookupResult, lookupErr := operation.Lookup(masterNode, volumeId.String()); lookupErr == nil { length := 0 selfUrl := (store.Ip + ":" + strconv.Itoa(store.Port)) diff --git a/weed/topology/topology.go b/weed/topology/topology.go index 4242bfa05..aa01190c9 100644 --- a/weed/topology/topology.go +++ b/weed/topology/topology.go @@ -2,20 +2,25 @@ package topology import ( "errors" + "fmt" "math/rand" + "sync" "github.com/chrislusf/raft" "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/pb/master_pb" "github.com/chrislusf/seaweedfs/weed/sequence" "github.com/chrislusf/seaweedfs/weed/storage" + "github.com/chrislusf/seaweedfs/weed/storage/needle" "github.com/chrislusf/seaweedfs/weed/util" ) type Topology struct { NodeImpl - collectionMap *util.ConcurrentReadMap + collectionMap *util.ConcurrentReadMap + ecShardMap map[needle.VolumeId]*EcShardLocations + ecShardMapLock sync.RWMutex pulse int64 @@ -37,6 +42,7 @@ func NewTopology(id string, seq sequence.Sequencer, volumeSizeLimit uint64, puls t.NodeImpl.value = t t.children = make(map[NodeId]Node) t.collectionMap = util.NewConcurrentReadMap() + t.ecShardMap = make(map[needle.VolumeId]*EcShardLocations) t.pulse = int64(pulse) t.volumeSizeLimit = volumeSizeLimit @@ -50,8 +56,8 @@ func NewTopology(id string, seq sequence.Sequencer, volumeSizeLimit uint64, puls } func (t *Topology) IsLeader() bool { - if leader, e := t.Leader(); e == nil { - return leader == t.RaftServer.Name() + if t.RaftServer != nil { + return t.RaftServer.State() == raft.Leader } return false } @@ -72,7 +78,7 @@ func (t *Topology) Leader() (string, error) { return l, nil } -func (t *Topology) Lookup(collection string, vid storage.VolumeId) []*DataNode { +func (t *Topology) Lookup(collection string, vid needle.VolumeId) (dataNodes []*DataNode) { //maybe an issue if lots of collections? if collection == "" { for _, c := range t.collectionMap.Items() { @@ -85,14 +91,24 @@ func (t *Topology) Lookup(collection string, vid storage.VolumeId) []*DataNode { return c.(*Collection).Lookup(vid) } } + + if locations, found := t.LookupEcShards(vid); found { + for _, loc := range locations.Locations { + dataNodes = append(dataNodes, loc...) + } + return dataNodes + } + return nil } -func (t *Topology) NextVolumeId() storage.VolumeId { +func (t *Topology) NextVolumeId() (needle.VolumeId, error) { vid := t.GetMaxVolumeId() next := vid.Next() - go t.RaftServer.Do(NewMaxVolumeIdCommand(next)) - return next + if _, err := t.RaftServer.Do(NewMaxVolumeIdCommand(next)); err != nil { + return 0, err + } + return next, nil } func (t *Topology) HasWritableVolume(option *VolumeGrowOption) bool { @@ -102,19 +118,43 @@ func (t *Topology) HasWritableVolume(option *VolumeGrowOption) bool { func (t *Topology) PickForWrite(count uint64, option *VolumeGrowOption) (string, uint64, *DataNode, error) { vid, count, datanodes, err := t.GetVolumeLayout(option.Collection, option.ReplicaPlacement, option.Ttl).PickForWrite(count, option) - if err != nil || datanodes.Length() == 0 { - return "", 0, nil, errors.New("No writable volumes available!") + if err != nil { + return "", 0, nil, fmt.Errorf("failed to find writable volumes for collectio:%s replication:%s ttl:%s error: %v", option.Collection, option.ReplicaPlacement.String(), option.Ttl.String(), err) + } + if datanodes.Length() == 0 { + return "", 0, nil, fmt.Errorf("no writable volumes available for for collectio:%s replication:%s ttl:%s", option.Collection, option.ReplicaPlacement.String(), option.Ttl.String()) } fileId, count := t.Sequence.NextFileId(count) - return storage.NewFileId(*vid, fileId, rand.Uint32()).String(), count, datanodes.Head(), nil + return needle.NewFileId(*vid, fileId, rand.Uint32()).String(), count, datanodes.Head(), nil } -func (t *Topology) GetVolumeLayout(collectionName string, rp *storage.ReplicaPlacement, ttl *storage.TTL) *VolumeLayout { +func (t *Topology) GetVolumeLayout(collectionName string, rp *storage.ReplicaPlacement, ttl *needle.TTL) *VolumeLayout { return t.collectionMap.Get(collectionName, func() interface{} { return NewCollection(collectionName, t.volumeSizeLimit) }).(*Collection).GetOrCreateVolumeLayout(rp, ttl) } +func (t *Topology) ListCollections(includeNormalVolumes, includeEcVolumes bool) (ret []string) { + + mapOfCollections := make(map[string]bool) + for _, c := range t.collectionMap.Items() { + mapOfCollections[c.(*Collection).Name] = true + } + + if includeEcVolumes { + t.ecShardMapLock.RLock() + for _, ecVolumeLocation := range t.ecShardMap { + mapOfCollections[ecVolumeLocation.Collection] = true + } + t.ecShardMapLock.RUnlock() + } + + for k, _ := range mapOfCollections { + ret = append(ret, k) + } + return ret +} + func (t *Topology) FindCollection(collectionName string) (*Collection, bool) { c, hasCollection := t.collectionMap.Find(collectionName) if !hasCollection { @@ -152,6 +192,7 @@ func (t *Topology) GetOrCreateDataCenter(dcName string) *DataCenter { } func (t *Topology) SyncDataNodeRegistration(volumes []*master_pb.VolumeInformationMessage, dn *DataNode) (newVolumes, deletedVolumes []storage.VolumeInfo) { + // convert into in memory struct storage.VolumeInfo var volumeInfos []storage.VolumeInfo for _, v := range volumes { if vi, err := storage.NewVolumeInfo(v); err == nil { @@ -160,8 +201,9 @@ func (t *Topology) SyncDataNodeRegistration(volumes []*master_pb.VolumeInformati glog.V(0).Infof("Fail to convert joined volume information: %v", err) } } + // find out the delta volumes newVolumes, deletedVolumes = dn.UpdateVolumes(volumeInfos) - for _, v := range volumeInfos { + for _, v := range newVolumes { t.RegisterVolumeLayout(v, dn) } for _, v := range deletedVolumes { @@ -169,3 +211,33 @@ func (t *Topology) SyncDataNodeRegistration(volumes []*master_pb.VolumeInformati } return } + +func (t *Topology) IncrementalSyncDataNodeRegistration(newVolumes, deletedVolumes []*master_pb.VolumeShortInformationMessage, dn *DataNode) { + var newVis, oldVis []storage.VolumeInfo + for _, v := range newVolumes { + vi, err := storage.NewVolumeInfoFromShort(v) + if err != nil { + glog.V(0).Infof("NewVolumeInfoFromShort %v: %v", v, err) + continue + } + newVis = append(newVis, vi) + } + for _, v := range deletedVolumes { + vi, err := storage.NewVolumeInfoFromShort(v) + if err != nil { + glog.V(0).Infof("NewVolumeInfoFromShort %v: %v", v, err) + continue + } + oldVis = append(oldVis, vi) + } + dn.DeltaUpdateVolumes(newVis, oldVis) + + for _, vi := range newVis { + t.RegisterVolumeLayout(vi, dn) + } + for _, vi := range oldVis { + t.UnRegisterVolumeLayout(vi, dn) + } + + return +} diff --git a/weed/topology/topology_ec.go b/weed/topology/topology_ec.go new file mode 100644 index 000000000..93b39bb5d --- /dev/null +++ b/weed/topology/topology_ec.go @@ -0,0 +1,173 @@ +package topology + +import ( + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/pb/master_pb" + "github.com/chrislusf/seaweedfs/weed/storage/erasure_coding" + "github.com/chrislusf/seaweedfs/weed/storage/needle" +) + +type EcShardLocations struct { + Collection string + Locations [erasure_coding.TotalShardsCount][]*DataNode +} + +func (t *Topology) SyncDataNodeEcShards(shardInfos []*master_pb.VolumeEcShardInformationMessage, dn *DataNode) (newShards, deletedShards []*erasure_coding.EcVolumeInfo) { + // convert into in memory struct storage.VolumeInfo + var shards []*erasure_coding.EcVolumeInfo + for _, shardInfo := range shardInfos { + shards = append(shards, + erasure_coding.NewEcVolumeInfo( + shardInfo.Collection, + needle.VolumeId(shardInfo.Id), + erasure_coding.ShardBits(shardInfo.EcIndexBits))) + } + // find out the delta volumes + newShards, deletedShards = dn.UpdateEcShards(shards) + for _, v := range newShards { + t.RegisterEcShards(v, dn) + } + for _, v := range deletedShards { + t.UnRegisterEcShards(v, dn) + } + return +} + +func (t *Topology) IncrementalSyncDataNodeEcShards(newEcShards, deletedEcShards []*master_pb.VolumeEcShardInformationMessage, dn *DataNode) { + // convert into in memory struct storage.VolumeInfo + var newShards, deletedShards []*erasure_coding.EcVolumeInfo + for _, shardInfo := range newEcShards { + newShards = append(newShards, + erasure_coding.NewEcVolumeInfo( + shardInfo.Collection, + needle.VolumeId(shardInfo.Id), + erasure_coding.ShardBits(shardInfo.EcIndexBits))) + } + for _, shardInfo := range deletedEcShards { + deletedShards = append(deletedShards, + erasure_coding.NewEcVolumeInfo( + shardInfo.Collection, + needle.VolumeId(shardInfo.Id), + erasure_coding.ShardBits(shardInfo.EcIndexBits))) + } + + dn.DeltaUpdateEcShards(newShards, deletedShards) + + for _, v := range newShards { + t.RegisterEcShards(v, dn) + } + for _, v := range deletedShards { + t.UnRegisterEcShards(v, dn) + } + return +} + +func NewEcShardLocations(collection string) *EcShardLocations { + return &EcShardLocations{ + Collection: collection, + } +} + +func (loc *EcShardLocations) AddShard(shardId erasure_coding.ShardId, dn *DataNode) (added bool) { + dataNodes := loc.Locations[shardId] + for _, n := range dataNodes { + if n.Id() == dn.Id() { + return false + } + } + loc.Locations[shardId] = append(dataNodes, dn) + return true +} + +func (loc *EcShardLocations) DeleteShard(shardId erasure_coding.ShardId, dn *DataNode) (deleted bool) { + dataNodes := loc.Locations[shardId] + foundIndex := -1 + for index, n := range dataNodes { + if n.Id() == dn.Id() { + foundIndex = index + } + } + if foundIndex < 0 { + return false + } + loc.Locations[shardId] = append(dataNodes[:foundIndex], dataNodes[foundIndex+1:]...) + return true +} + +func (t *Topology) RegisterEcShards(ecShardInfos *erasure_coding.EcVolumeInfo, dn *DataNode) { + + t.ecShardMapLock.Lock() + defer t.ecShardMapLock.Unlock() + + locations, found := t.ecShardMap[ecShardInfos.VolumeId] + if !found { + locations = NewEcShardLocations(ecShardInfos.Collection) + t.ecShardMap[ecShardInfos.VolumeId] = locations + } + for _, shardId := range ecShardInfos.ShardIds() { + locations.AddShard(shardId, dn) + } +} + +func (t *Topology) UnRegisterEcShards(ecShardInfos *erasure_coding.EcVolumeInfo, dn *DataNode) { + glog.Infof("removing ec shard info:%+v", ecShardInfos) + t.ecShardMapLock.Lock() + defer t.ecShardMapLock.Unlock() + + locations, found := t.ecShardMap[ecShardInfos.VolumeId] + if !found { + return + } + for _, shardId := range ecShardInfos.ShardIds() { + locations.DeleteShard(shardId, dn) + } +} + +func (t *Topology) LookupEcShards(vid needle.VolumeId) (locations *EcShardLocations, found bool) { + t.ecShardMapLock.RLock() + defer t.ecShardMapLock.RUnlock() + + locations, found = t.ecShardMap[vid] + + return +} + +func (t *Topology) ListEcServersByCollection(collection string) (dataNodes []string) { + t.ecShardMapLock.RLock() + defer t.ecShardMapLock.RUnlock() + + dateNodeMap := make(map[string]bool) + for _, ecVolumeLocation := range t.ecShardMap { + if ecVolumeLocation.Collection == collection { + for _, locations := range ecVolumeLocation.Locations { + for _, loc := range locations { + dateNodeMap[string(loc.Id())] = true + } + } + } + } + + for k, _ := range dateNodeMap { + dataNodes = append(dataNodes, k) + } + + return +} + +func (t *Topology) DeleteEcCollection(collection string) { + t.ecShardMapLock.Lock() + defer t.ecShardMapLock.Unlock() + + var vids []needle.VolumeId + for vid, ecVolumeLocation := range t.ecShardMap { + if ecVolumeLocation.Collection == collection { + vids = append(vids, vid) + } + } + + for _, vid := range vids { + delete(t.ecShardMap, vid) + } + + return +} diff --git a/weed/topology/topology_event_handling.go b/weed/topology/topology_event_handling.go index a301103eb..041351492 100644 --- a/weed/topology/topology_event_handling.go +++ b/weed/topology/topology_event_handling.go @@ -1,6 +1,7 @@ package topology import ( + "google.golang.org/grpc" "math/rand" "time" @@ -8,7 +9,7 @@ import ( "github.com/chrislusf/seaweedfs/weed/storage" ) -func (t *Topology) StartRefreshWritableVolumes(garbageThreshold float64, preallocate int64) { +func (t *Topology) StartRefreshWritableVolumes(grpcDialOption grpc.DialOption, garbageThreshold float64, preallocate int64) { go func() { for { if t.IsLeader() { @@ -22,7 +23,7 @@ func (t *Topology) StartRefreshWritableVolumes(garbageThreshold float64, preallo c := time.Tick(15 * time.Minute) for _ = range c { if t.IsLeader() { - t.Vacuum(garbageThreshold, preallocate) + t.Vacuum(grpcDialOption, garbageThreshold, preallocate) } } }(garbageThreshold) diff --git a/weed/topology/topology_map.go b/weed/topology/topology_map.go index 769ba0e2a..37a88c9ed 100644 --- a/weed/topology/topology_map.go +++ b/weed/topology/topology_map.go @@ -68,9 +68,27 @@ func (t *Topology) ToVolumeLocations() (volumeLocations []*master_pb.VolumeLocat for _, v := range dn.GetVolumes() { volumeLocation.NewVids = append(volumeLocation.NewVids, uint32(v.Id)) } + for _, s := range dn.GetEcShards() { + volumeLocation.NewVids = append(volumeLocation.NewVids, uint32(s.VolumeId)) + } volumeLocations = append(volumeLocations, volumeLocation) } } } return } + +func (t *Topology) ToTopologyInfo() *master_pb.TopologyInfo { + m := &master_pb.TopologyInfo{ + Id: string(t.Id()), + VolumeCount: uint64(t.GetVolumeCount()), + MaxVolumeCount: uint64(t.GetMaxVolumeCount()), + FreeVolumeCount: uint64(t.FreeSpace()), + ActiveVolumeCount: uint64(t.GetActiveVolumeCount()), + } + for _, c := range t.Children() { + dc := c.(*DataCenter) + m.DataCenterInfos = append(m.DataCenterInfos, dc.ToDataCenterInfo()) + } + return m +} diff --git a/weed/topology/topology_test.go b/weed/topology/topology_test.go index 07dc9c67b..8f79ad684 100644 --- a/weed/topology/topology_test.go +++ b/weed/topology/topology_test.go @@ -4,6 +4,8 @@ import ( "github.com/chrislusf/seaweedfs/weed/pb/master_pb" "github.com/chrislusf/seaweedfs/weed/sequence" "github.com/chrislusf/seaweedfs/weed/storage" + "github.com/chrislusf/seaweedfs/weed/storage/needle" + "testing" ) @@ -39,7 +41,7 @@ func TestHandlingVolumeServerHeartbeat(t *testing.T) { DeletedByteCount: 34524, ReadOnly: false, ReplicaPlacement: uint32(0), - Version: uint32(1), + Version: uint32(needle.CurrentVersion), Ttl: 0, } volumeMessages = append(volumeMessages, volumeMessage) @@ -47,8 +49,8 @@ func TestHandlingVolumeServerHeartbeat(t *testing.T) { topo.SyncDataNodeRegistration(volumeMessages, dn) - assert(t, "activeVolumeCount1", topo.activeVolumeCount, volumeCount) - assert(t, "volumeCount", topo.volumeCount, volumeCount) + assert(t, "activeVolumeCount1", int(topo.activeVolumeCount), volumeCount) + assert(t, "volumeCount", int(topo.volumeCount), volumeCount) } { @@ -64,20 +66,68 @@ func TestHandlingVolumeServerHeartbeat(t *testing.T) { DeletedByteCount: 345240, ReadOnly: false, ReplicaPlacement: uint32(0), - Version: uint32(1), + Version: uint32(needle.CurrentVersion), Ttl: 0, } volumeMessages = append(volumeMessages, volumeMessage) } topo.SyncDataNodeRegistration(volumeMessages, dn) - assert(t, "activeVolumeCount1", topo.activeVolumeCount, volumeCount) - assert(t, "volumeCount", topo.volumeCount, volumeCount) + //rp, _ := storage.NewReplicaPlacementFromString("000") + //layout := topo.GetVolumeLayout("", rp, needle.EMPTY_TTL) + //assert(t, "writables", len(layout.writables), volumeCount) + + assert(t, "activeVolumeCount1", int(topo.activeVolumeCount), volumeCount) + assert(t, "volumeCount", int(topo.volumeCount), volumeCount) + } + + { + volumeCount := 6 + newVolumeShortMessage := &master_pb.VolumeShortInformationMessage{ + Id: uint32(3), + Collection: "", + ReplicaPlacement: uint32(0), + Version: uint32(needle.CurrentVersion), + Ttl: 0, + } + topo.IncrementalSyncDataNodeRegistration( + []*master_pb.VolumeShortInformationMessage{newVolumeShortMessage}, + nil, + dn) + rp, _ := storage.NewReplicaPlacementFromString("000") + layout := topo.GetVolumeLayout("", rp, needle.EMPTY_TTL) + assert(t, "writables after repeated add", len(layout.writables), volumeCount) + + assert(t, "activeVolumeCount1", int(topo.activeVolumeCount), volumeCount) + assert(t, "volumeCount", int(topo.volumeCount), volumeCount) + + topo.IncrementalSyncDataNodeRegistration( + nil, + []*master_pb.VolumeShortInformationMessage{newVolumeShortMessage}, + dn) + assert(t, "writables after deletion", len(layout.writables), volumeCount-1) + assert(t, "activeVolumeCount1", int(topo.activeVolumeCount), volumeCount-1) + assert(t, "volumeCount", int(topo.volumeCount), volumeCount-1) + + topo.IncrementalSyncDataNodeRegistration( + []*master_pb.VolumeShortInformationMessage{newVolumeShortMessage}, + nil, + dn) + + for vid, _ := range layout.vid2location { + println("after add volume id", vid) + } + for _, vid := range layout.writables { + println("after add writable volume id", vid) + } + + assert(t, "writables after add back", len(layout.writables), volumeCount) + } topo.UnRegisterDataNode(dn) - assert(t, "activeVolumeCount2", topo.activeVolumeCount, 0) + assert(t, "activeVolumeCount2", int(topo.activeVolumeCount), 0) } @@ -96,20 +146,21 @@ func TestAddRemoveVolume(t *testing.T) { dn := rack.GetOrCreateDataNode("127.0.0.1", 34534, "127.0.0.1", 25) v := storage.VolumeInfo{ - Id: storage.VolumeId(1), + Id: needle.VolumeId(1), Size: 100, Collection: "xcollection", FileCount: 123, DeleteCount: 23, DeletedByteCount: 45, ReadOnly: false, - Version: storage.CurrentVersion, + Version: needle.CurrentVersion, ReplicaPlacement: &storage.ReplicaPlacement{}, - Ttl: storage.EMPTY_TTL, + Ttl: needle.EMPTY_TTL, } dn.UpdateVolumes([]storage.VolumeInfo{v}) topo.RegisterVolumeLayout(v, dn) + topo.RegisterVolumeLayout(v, dn) if _, hasCollection := topo.FindCollection(v.Collection); !hasCollection { t.Errorf("collection %v should exist", v.Collection) diff --git a/weed/topology/topology_vacuum.go b/weed/topology/topology_vacuum.go index d6b09314b..351ff842f 100644 --- a/weed/topology/topology_vacuum.go +++ b/weed/topology/topology_vacuum.go @@ -4,22 +4,21 @@ import ( "context" "time" + "github.com/chrislusf/seaweedfs/weed/storage/needle" + "google.golang.org/grpc" + "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/operation" "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb" - "github.com/chrislusf/seaweedfs/weed/storage" ) -func batchVacuumVolumeCheck(vl *VolumeLayout, vid storage.VolumeId, locationlist *VolumeLocationList, garbageThreshold float64) bool { +func batchVacuumVolumeCheck(grpcDialOption grpc.DialOption, vl *VolumeLayout, vid needle.VolumeId, locationlist *VolumeLocationList, garbageThreshold float64) bool { ch := make(chan bool, locationlist.Length()) for index, dn := range locationlist.list { - go func(index int, url string, vid storage.VolumeId) { - err := operation.WithVolumeServerClient(url, func(volumeServerClient volume_server_pb.VolumeServerClient) error { - ctx, cancel := context.WithTimeout(context.Background(), time.Duration(5*time.Second)) - defer cancel() - - resp, err := volumeServerClient.VacuumVolumeCheck(ctx, &volume_server_pb.VacuumVolumeCheckRequest{ - VolumdId: uint32(vid), + go func(index int, url string, vid needle.VolumeId) { + err := operation.WithVolumeServerClient(url, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { + resp, err := volumeServerClient.VacuumVolumeCheck(context.Background(), &volume_server_pb.VacuumVolumeCheckRequest{ + VolumeId: uint32(vid), }) if err != nil { ch <- false @@ -46,15 +45,15 @@ func batchVacuumVolumeCheck(vl *VolumeLayout, vid storage.VolumeId, locationlist } return isCheckSuccess } -func batchVacuumVolumeCompact(vl *VolumeLayout, vid storage.VolumeId, locationlist *VolumeLocationList, preallocate int64) bool { +func batchVacuumVolumeCompact(grpcDialOption grpc.DialOption, vl *VolumeLayout, vid needle.VolumeId, locationlist *VolumeLocationList, preallocate int64) bool { vl.removeFromWritable(vid) ch := make(chan bool, locationlist.Length()) for index, dn := range locationlist.list { - go func(index int, url string, vid storage.VolumeId) { + go func(index int, url string, vid needle.VolumeId) { glog.V(0).Infoln(index, "Start vacuuming", vid, "on", url) - err := operation.WithVolumeServerClient(url, func(volumeServerClient volume_server_pb.VolumeServerClient) error { + err := operation.WithVolumeServerClient(url, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { _, err := volumeServerClient.VacuumVolumeCompact(context.Background(), &volume_server_pb.VacuumVolumeCompactRequest{ - VolumdId: uint32(vid), + VolumeId: uint32(vid), }) return err }) @@ -79,13 +78,13 @@ func batchVacuumVolumeCompact(vl *VolumeLayout, vid storage.VolumeId, locationli } return isVacuumSuccess } -func batchVacuumVolumeCommit(vl *VolumeLayout, vid storage.VolumeId, locationlist *VolumeLocationList) bool { +func batchVacuumVolumeCommit(grpcDialOption grpc.DialOption, vl *VolumeLayout, vid needle.VolumeId, locationlist *VolumeLocationList) bool { isCommitSuccess := true for _, dn := range locationlist.list { - glog.V(0).Infoln("Start Commiting vacuum", vid, "on", dn.Url()) - err := operation.WithVolumeServerClient(dn.Url(), func(volumeServerClient volume_server_pb.VolumeServerClient) error { + glog.V(0).Infoln("Start Committing vacuum", vid, "on", dn.Url()) + err := operation.WithVolumeServerClient(dn.Url(), grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { _, err := volumeServerClient.VacuumVolumeCommit(context.Background(), &volume_server_pb.VacuumVolumeCommitRequest{ - VolumdId: uint32(vid), + VolumeId: uint32(vid), }) return err }) @@ -93,7 +92,7 @@ func batchVacuumVolumeCommit(vl *VolumeLayout, vid storage.VolumeId, locationlis glog.Errorf("Error when committing vacuum %d on %s: %v", vid, dn.Url(), err) isCommitSuccess = false } else { - glog.V(0).Infof("Complete Commiting vacuum %d on %s", vid, dn.Url()) + glog.V(0).Infof("Complete Committing vacuum %d on %s", vid, dn.Url()) } if isCommitSuccess { vl.SetVolumeAvailable(dn, vid) @@ -101,12 +100,12 @@ func batchVacuumVolumeCommit(vl *VolumeLayout, vid storage.VolumeId, locationlis } return isCommitSuccess } -func batchVacuumVolumeCleanup(vl *VolumeLayout, vid storage.VolumeId, locationlist *VolumeLocationList) { +func batchVacuumVolumeCleanup(grpcDialOption grpc.DialOption, vl *VolumeLayout, vid needle.VolumeId, locationlist *VolumeLocationList) { for _, dn := range locationlist.list { glog.V(0).Infoln("Start cleaning up", vid, "on", dn.Url()) - err := operation.WithVolumeServerClient(dn.Url(), func(volumeServerClient volume_server_pb.VolumeServerClient) error { + err := operation.WithVolumeServerClient(dn.Url(), grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { _, err := volumeServerClient.VacuumVolumeCleanup(context.Background(), &volume_server_pb.VacuumVolumeCleanupRequest{ - VolumdId: uint32(vid), + VolumeId: uint32(vid), }) return err }) @@ -118,24 +117,24 @@ func batchVacuumVolumeCleanup(vl *VolumeLayout, vid storage.VolumeId, locationli } } -func (t *Topology) Vacuum(garbageThreshold float64, preallocate int64) int { +func (t *Topology) Vacuum(grpcDialOption grpc.DialOption, garbageThreshold float64, preallocate int64) int { glog.V(1).Infof("Start vacuum on demand with threshold: %f", garbageThreshold) for _, col := range t.collectionMap.Items() { c := col.(*Collection) for _, vl := range c.storageType2VolumeLayout.Items() { if vl != nil { volumeLayout := vl.(*VolumeLayout) - vacuumOneVolumeLayout(volumeLayout, c, garbageThreshold, preallocate) + vacuumOneVolumeLayout(grpcDialOption, volumeLayout, c, garbageThreshold, preallocate) } } } return 0 } -func vacuumOneVolumeLayout(volumeLayout *VolumeLayout, c *Collection, garbageThreshold float64, preallocate int64) { +func vacuumOneVolumeLayout(grpcDialOption grpc.DialOption, volumeLayout *VolumeLayout, c *Collection, garbageThreshold float64, preallocate int64) { volumeLayout.accessLock.RLock() - tmpMap := make(map[storage.VolumeId]*VolumeLocationList) + tmpMap := make(map[needle.VolumeId]*VolumeLocationList) for vid, locationList := range volumeLayout.vid2location { tmpMap[vid] = locationList } @@ -152,11 +151,11 @@ func vacuumOneVolumeLayout(volumeLayout *VolumeLayout, c *Collection, garbageThr } glog.V(2).Infof("check vacuum on collection:%s volume:%d", c.Name, vid) - if batchVacuumVolumeCheck(volumeLayout, vid, locationList, garbageThreshold) { - if batchVacuumVolumeCompact(volumeLayout, vid, locationList, preallocate) { - batchVacuumVolumeCommit(volumeLayout, vid, locationList) + if batchVacuumVolumeCheck(grpcDialOption, volumeLayout, vid, locationList, garbageThreshold) { + if batchVacuumVolumeCompact(grpcDialOption, volumeLayout, vid, locationList, preallocate) { + batchVacuumVolumeCommit(grpcDialOption, volumeLayout, vid, locationList) } else { - batchVacuumVolumeCleanup(volumeLayout, vid, locationList) + batchVacuumVolumeCleanup(grpcDialOption, volumeLayout, vid, locationList) } } } diff --git a/weed/topology/volume_growth.go b/weed/topology/volume_growth.go index 9bf013ca6..ff02044a1 100644 --- a/weed/topology/volume_growth.go +++ b/weed/topology/volume_growth.go @@ -5,6 +5,9 @@ import ( "math/rand" "sync" + "github.com/chrislusf/seaweedfs/weed/storage/needle" + "google.golang.org/grpc" + "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/storage" ) @@ -20,7 +23,7 @@ This package is created to resolve these replica placement issues: type VolumeGrowOption struct { Collection string ReplicaPlacement *storage.ReplicaPlacement - Ttl *storage.TTL + Ttl *needle.TTL Prealloacte int64 DataCenter string Rack string @@ -55,19 +58,19 @@ func (vg *VolumeGrowth) findVolumeCount(copyCount int) (count int) { return } -func (vg *VolumeGrowth) AutomaticGrowByType(option *VolumeGrowOption, topo *Topology) (count int, err error) { - count, err = vg.GrowByCountAndType(vg.findVolumeCount(option.ReplicaPlacement.GetCopyCount()), option, topo) +func (vg *VolumeGrowth) AutomaticGrowByType(option *VolumeGrowOption, grpcDialOption grpc.DialOption, topo *Topology) (count int, err error) { + count, err = vg.GrowByCountAndType(grpcDialOption, vg.findVolumeCount(option.ReplicaPlacement.GetCopyCount()), option, topo) if count > 0 && count%option.ReplicaPlacement.GetCopyCount() == 0 { return count, nil } return count, err } -func (vg *VolumeGrowth) GrowByCountAndType(targetCount int, option *VolumeGrowOption, topo *Topology) (counter int, err error) { +func (vg *VolumeGrowth) GrowByCountAndType(grpcDialOption grpc.DialOption, targetCount int, option *VolumeGrowOption, topo *Topology) (counter int, err error) { vg.accessLock.Lock() defer vg.accessLock.Unlock() for i := 0; i < targetCount; i++ { - if c, e := vg.findAndGrow(topo, option); e == nil { + if c, e := vg.findAndGrow(grpcDialOption, topo, option); e == nil { counter += c } else { return counter, e @@ -76,13 +79,16 @@ func (vg *VolumeGrowth) GrowByCountAndType(targetCount int, option *VolumeGrowOp return } -func (vg *VolumeGrowth) findAndGrow(topo *Topology, option *VolumeGrowOption) (int, error) { +func (vg *VolumeGrowth) findAndGrow(grpcDialOption grpc.DialOption, topo *Topology, option *VolumeGrowOption) (int, error) { servers, e := vg.findEmptySlotsForOneVolume(topo, option) if e != nil { return 0, e } - vid := topo.NextVolumeId() - err := vg.grow(topo, vid, option, servers...) + vid, raftErr := topo.NextVolumeId() + if raftErr != nil { + return 0, raftErr + } + err := vg.grow(grpcDialOption, topo, vid, option, servers...) return len(servers), err } @@ -101,7 +107,7 @@ func (vg *VolumeGrowth) findEmptySlotsForOneVolume(topo *Topology, option *Volum if len(node.Children()) < rp.DiffRackCount+1 { return fmt.Errorf("Only has %d racks, not enough for %d.", len(node.Children()), rp.DiffRackCount+1) } - if node.FreeSpace() < rp.DiffRackCount+rp.SameRackCount+1 { + if node.FreeSpace() < int64(rp.DiffRackCount+rp.SameRackCount+1) { return fmt.Errorf("Free:%d < Expected:%d", node.FreeSpace(), rp.DiffRackCount+rp.SameRackCount+1) } possibleRacksCount := 0 @@ -130,7 +136,7 @@ func (vg *VolumeGrowth) findEmptySlotsForOneVolume(topo *Topology, option *Volum if option.Rack != "" && node.IsRack() && node.Id() != NodeId(option.Rack) { return fmt.Errorf("Not matching preferred rack:%s", option.Rack) } - if node.FreeSpace() < rp.SameRackCount+1 { + if node.FreeSpace() < int64(rp.SameRackCount+1) { return fmt.Errorf("Free:%d < Expected:%d", node.FreeSpace(), rp.SameRackCount+1) } if len(node.Children()) < rp.SameRackCount+1 { @@ -171,7 +177,7 @@ func (vg *VolumeGrowth) findEmptySlotsForOneVolume(topo *Topology, option *Volum servers = append(servers, server.(*DataNode)) } for _, rack := range otherRacks { - r := rand.Intn(rack.FreeSpace()) + r := rand.Int63n(rack.FreeSpace()) if server, e := rack.ReserveOneVolume(r); e == nil { servers = append(servers, server) } else { @@ -179,7 +185,7 @@ func (vg *VolumeGrowth) findEmptySlotsForOneVolume(topo *Topology, option *Volum } } for _, datacenter := range otherDataCenters { - r := rand.Intn(datacenter.FreeSpace()) + r := rand.Int63n(datacenter.FreeSpace()) if server, e := datacenter.ReserveOneVolume(r); e == nil { servers = append(servers, server) } else { @@ -189,16 +195,16 @@ func (vg *VolumeGrowth) findEmptySlotsForOneVolume(topo *Topology, option *Volum return } -func (vg *VolumeGrowth) grow(topo *Topology, vid storage.VolumeId, option *VolumeGrowOption, servers ...*DataNode) error { +func (vg *VolumeGrowth) grow(grpcDialOption grpc.DialOption, topo *Topology, vid needle.VolumeId, option *VolumeGrowOption, servers ...*DataNode) error { for _, server := range servers { - if err := AllocateVolume(server, vid, option); err == nil { + if err := AllocateVolume(server, grpcDialOption, vid, option); err == nil { vi := storage.VolumeInfo{ Id: vid, Size: 0, Collection: option.Collection, ReplicaPlacement: option.ReplicaPlacement, Ttl: option.Ttl, - Version: storage.CurrentVersion, + Version: needle.CurrentVersion, } server.AddOrUpdateVolume(vi) topo.RegisterVolumeLayout(vi, server) diff --git a/weed/topology/volume_growth_test.go b/weed/topology/volume_growth_test.go index f983df1ec..3573365fd 100644 --- a/weed/topology/volume_growth_test.go +++ b/weed/topology/volume_growth_test.go @@ -7,6 +7,7 @@ import ( "github.com/chrislusf/seaweedfs/weed/sequence" "github.com/chrislusf/seaweedfs/weed/storage" + "github.com/chrislusf/seaweedfs/weed/storage/needle" ) var topologyLayout = ` @@ -96,12 +97,12 @@ func setup(topologyLayout string) *Topology { for _, v := range serverMap["volumes"].([]interface{}) { m := v.(map[string]interface{}) vi := storage.VolumeInfo{ - Id: storage.VolumeId(int64(m["id"].(float64))), + Id: needle.VolumeId(int64(m["id"].(float64))), Size: uint64(m["size"].(float64)), - Version: storage.CurrentVersion} + Version: needle.CurrentVersion} server.AddOrUpdateVolume(vi) } - server.UpAdjustMaxVolumeCountDelta(int(serverMap["limit"].(float64))) + server.UpAdjustMaxVolumeCountDelta(int64(serverMap["limit"].(float64))) } } } diff --git a/weed/topology/volume_layout.go b/weed/topology/volume_layout.go index 71a071e2f..799cbca62 100644 --- a/weed/topology/volume_layout.go +++ b/weed/topology/volume_layout.go @@ -9,16 +9,17 @@ import ( "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/storage" + "github.com/chrislusf/seaweedfs/weed/storage/needle" ) // mapping from volume to its locations, inverted from server to volume type VolumeLayout struct { rp *storage.ReplicaPlacement - ttl *storage.TTL - vid2location map[storage.VolumeId]*VolumeLocationList - writables []storage.VolumeId // transient array of writable volume id - readonlyVolumes map[storage.VolumeId]bool // transient set of readonly volumes - oversizedVolumes map[storage.VolumeId]bool // set of oversized volumes + ttl *needle.TTL + vid2location map[needle.VolumeId]*VolumeLocationList + writables []needle.VolumeId // transient array of writable volume id + readonlyVolumes map[needle.VolumeId]bool // transient set of readonly volumes + oversizedVolumes map[needle.VolumeId]bool // set of oversized volumes volumeSizeLimit uint64 accessLock sync.RWMutex } @@ -29,14 +30,14 @@ type VolumeLayoutStats struct { FileCount uint64 } -func NewVolumeLayout(rp *storage.ReplicaPlacement, ttl *storage.TTL, volumeSizeLimit uint64) *VolumeLayout { +func NewVolumeLayout(rp *storage.ReplicaPlacement, ttl *needle.TTL, volumeSizeLimit uint64) *VolumeLayout { return &VolumeLayout{ rp: rp, ttl: ttl, - vid2location: make(map[storage.VolumeId]*VolumeLocationList), - writables: *new([]storage.VolumeId), - readonlyVolumes: make(map[storage.VolumeId]bool), - oversizedVolumes: make(map[storage.VolumeId]bool), + vid2location: make(map[needle.VolumeId]*VolumeLocationList), + writables: *new([]needle.VolumeId), + readonlyVolumes: make(map[needle.VolumeId]bool), + oversizedVolumes: make(map[needle.VolumeId]bool), volumeSizeLimit: volumeSizeLimit, } } @@ -57,7 +58,7 @@ func (vl *VolumeLayout) RegisterVolume(v *storage.VolumeInfo, dn *DataNode) { for _, dn := range vl.vid2location[v.Id].list { if vInfo, err := dn.GetVolumesById(v.Id); err == nil { if vInfo.ReadOnly { - glog.V(3).Infof("vid %d removed from writable", v.Id) + glog.V(1).Infof("vid %d removed from writable", v.Id) vl.removeFromWritable(v.Id) vl.readonlyVolumes[v.Id] = true return @@ -65,23 +66,19 @@ func (vl *VolumeLayout) RegisterVolume(v *storage.VolumeInfo, dn *DataNode) { delete(vl.readonlyVolumes, v.Id) } } else { - glog.V(3).Infof("vid %d removed from writable", v.Id) + glog.V(1).Infof("vid %d removed from writable", v.Id) vl.removeFromWritable(v.Id) delete(vl.readonlyVolumes, v.Id) return } } - if vl.vid2location[v.Id].Length() == vl.rp.GetCopyCount() && vl.isWritable(v) { - if _, ok := vl.oversizedVolumes[v.Id]; !ok { - vl.addToWritable(v.Id) - } - } else { - vl.rememberOversizedVolumne(v) - vl.removeFromWritable(v.Id) - } + + vl.rememberOversizedVolume(v) + vl.ensureCorrectWritables(v) + } -func (vl *VolumeLayout) rememberOversizedVolumne(v *storage.VolumeInfo) { +func (vl *VolumeLayout) rememberOversizedVolume(v *storage.VolumeInfo) { if vl.isOversized(v) { vl.oversizedVolumes[v.Id] = true } @@ -91,11 +88,34 @@ func (vl *VolumeLayout) UnRegisterVolume(v *storage.VolumeInfo, dn *DataNode) { vl.accessLock.Lock() defer vl.accessLock.Unlock() - vl.removeFromWritable(v.Id) - delete(vl.vid2location, v.Id) + // remove from vid2location map + location, ok := vl.vid2location[v.Id] + if !ok { + return + } + + if location.Remove(dn) { + + vl.ensureCorrectWritables(v) + + if location.Length() == 0 { + delete(vl.vid2location, v.Id) + } + + } +} + +func (vl *VolumeLayout) ensureCorrectWritables(v *storage.VolumeInfo) { + if vl.vid2location[v.Id].Length() == vl.rp.GetCopyCount() && vl.isWritable(v) { + if _, ok := vl.oversizedVolumes[v.Id]; !ok { + vl.addToWritable(v.Id) + } + } else { + vl.removeFromWritable(v.Id) + } } -func (vl *VolumeLayout) addToWritable(vid storage.VolumeId) { +func (vl *VolumeLayout) addToWritable(vid needle.VolumeId) { for _, id := range vl.writables { if vid == id { return @@ -110,7 +130,7 @@ func (vl *VolumeLayout) isOversized(v *storage.VolumeInfo) bool { func (vl *VolumeLayout) isWritable(v *storage.VolumeInfo) bool { return !vl.isOversized(v) && - v.Version == storage.CurrentVersion && + v.Version == needle.CurrentVersion && !v.ReadOnly } @@ -121,7 +141,7 @@ func (vl *VolumeLayout) isEmpty() bool { return len(vl.vid2location) == 0 } -func (vl *VolumeLayout) Lookup(vid storage.VolumeId) []*DataNode { +func (vl *VolumeLayout) Lookup(vid needle.VolumeId) []*DataNode { vl.accessLock.RLock() defer vl.accessLock.RUnlock() @@ -141,7 +161,7 @@ func (vl *VolumeLayout) ListVolumeServers() (nodes []*DataNode) { return } -func (vl *VolumeLayout) PickForWrite(count uint64, option *VolumeGrowOption) (*storage.VolumeId, uint64, *VolumeLocationList, error) { +func (vl *VolumeLayout) PickForWrite(count uint64, option *VolumeGrowOption) (*needle.VolumeId, uint64, *VolumeLocationList, error) { vl.accessLock.RLock() defer vl.accessLock.RUnlock() @@ -158,7 +178,7 @@ func (vl *VolumeLayout) PickForWrite(count uint64, option *VolumeGrowOption) (*s } return nil, 0, nil, errors.New("Strangely vid " + vid.String() + " is on no machine!") } - var vid storage.VolumeId + var vid needle.VolumeId var locationList *VolumeLocationList counter := 0 for _, v := range vl.writables { @@ -205,7 +225,7 @@ func (vl *VolumeLayout) GetActiveVolumeCount(option *VolumeGrowOption) int { return counter } -func (vl *VolumeLayout) removeFromWritable(vid storage.VolumeId) bool { +func (vl *VolumeLayout) removeFromWritable(vid needle.VolumeId) bool { toDeleteIndex := -1 for k, id := range vl.writables { if id == vid { @@ -220,7 +240,7 @@ func (vl *VolumeLayout) removeFromWritable(vid storage.VolumeId) bool { } return false } -func (vl *VolumeLayout) setVolumeWritable(vid storage.VolumeId) bool { +func (vl *VolumeLayout) setVolumeWritable(vid needle.VolumeId) bool { for _, v := range vl.writables { if v == vid { return false @@ -231,7 +251,7 @@ func (vl *VolumeLayout) setVolumeWritable(vid storage.VolumeId) bool { return true } -func (vl *VolumeLayout) SetVolumeUnavailable(dn *DataNode, vid storage.VolumeId) bool { +func (vl *VolumeLayout) SetVolumeUnavailable(dn *DataNode, vid needle.VolumeId) bool { vl.accessLock.Lock() defer vl.accessLock.Unlock() @@ -245,18 +265,18 @@ func (vl *VolumeLayout) SetVolumeUnavailable(dn *DataNode, vid storage.VolumeId) } return false } -func (vl *VolumeLayout) SetVolumeAvailable(dn *DataNode, vid storage.VolumeId) bool { +func (vl *VolumeLayout) SetVolumeAvailable(dn *DataNode, vid needle.VolumeId) bool { vl.accessLock.Lock() defer vl.accessLock.Unlock() vl.vid2location[vid].Set(dn) - if vl.vid2location[vid].Length() >= vl.rp.GetCopyCount() { + if vl.vid2location[vid].Length() == vl.rp.GetCopyCount() { return vl.setVolumeWritable(vid) } return false } -func (vl *VolumeLayout) SetVolumeCapacityFull(vid storage.VolumeId) bool { +func (vl *VolumeLayout) SetVolumeCapacityFull(vid needle.VolumeId) bool { vl.accessLock.Lock() defer vl.accessLock.Unlock() diff --git a/weed/topology/volume_location_list.go b/weed/topology/volume_location_list.go index 8d5881333..8905c54b5 100644 --- a/weed/topology/volume_location_list.go +++ b/weed/topology/volume_location_list.go @@ -3,7 +3,7 @@ package topology import ( "fmt" - "github.com/chrislusf/seaweedfs/weed/storage" + "github.com/chrislusf/seaweedfs/weed/storage/needle" ) type VolumeLocationList struct { @@ -66,7 +66,7 @@ func (dnll *VolumeLocationList) Refresh(freshThreshHold int64) { } } -func (dnll *VolumeLocationList) Stats(vid storage.VolumeId, freshThreshHold int64) (size uint64, fileCount int) { +func (dnll *VolumeLocationList) Stats(vid needle.VolumeId, freshThreshHold int64) (size uint64, fileCount int) { for _, dnl := range dnll.list { if dnl.LastSeen < freshThreshHold { vinfo, err := dnl.GetVolumesById(vid) |
