diff options
Diffstat (limited to 'weed/topology')
| -rw-r--r-- | weed/topology/allocate_volume.go | 5 | ||||
| -rw-r--r-- | weed/topology/cluster_commands.go | 6 | ||||
| -rw-r--r-- | weed/topology/collection.go | 5 | ||||
| -rw-r--r-- | weed/topology/data_node.go | 11 | ||||
| -rw-r--r-- | weed/topology/node.go | 14 | ||||
| -rw-r--r-- | weed/topology/store_replicate.go | 25 | ||||
| -rw-r--r-- | weed/topology/topology.go | 9 | ||||
| -rw-r--r-- | weed/topology/topology_test.go | 8 | ||||
| -rw-r--r-- | weed/topology/topology_vacuum.go | 19 | ||||
| -rw-r--r-- | weed/topology/volume_growth.go | 10 | ||||
| -rw-r--r-- | weed/topology/volume_growth_test.go | 5 | ||||
| -rw-r--r-- | weed/topology/volume_layout.go | 41 | ||||
| -rw-r--r-- | weed/topology/volume_location_list.go | 4 |
13 files changed, 88 insertions, 74 deletions
diff --git a/weed/topology/allocate_volume.go b/weed/topology/allocate_volume.go index f08736f64..48336092f 100644 --- a/weed/topology/allocate_volume.go +++ b/weed/topology/allocate_volume.go @@ -2,9 +2,10 @@ package topology import ( "context" + "github.com/chrislusf/seaweedfs/weed/operation" "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb" - "github.com/chrislusf/seaweedfs/weed/storage" + "github.com/chrislusf/seaweedfs/weed/storage/needle" "google.golang.org/grpc" ) @@ -12,7 +13,7 @@ type AllocateVolumeResult struct { Error string } -func AllocateVolume(dn *DataNode, grpcDialOption grpc.DialOption, vid storage.VolumeId, option *VolumeGrowOption) error { +func AllocateVolume(dn *DataNode, grpcDialOption grpc.DialOption, vid needle.VolumeId, option *VolumeGrowOption) error { return operation.WithVolumeServerClient(dn.Url(), grpcDialOption, func(client volume_server_pb.VolumeServerClient) error { diff --git a/weed/topology/cluster_commands.go b/weed/topology/cluster_commands.go index 7a36c25ec..152691ccb 100644 --- a/weed/topology/cluster_commands.go +++ b/weed/topology/cluster_commands.go @@ -3,14 +3,14 @@ package topology import ( "github.com/chrislusf/raft" "github.com/chrislusf/seaweedfs/weed/glog" - "github.com/chrislusf/seaweedfs/weed/storage" + "github.com/chrislusf/seaweedfs/weed/storage/needle" ) type MaxVolumeIdCommand struct { - MaxVolumeId storage.VolumeId `json:"maxVolumeId"` + MaxVolumeId needle.VolumeId `json:"maxVolumeId"` } -func NewMaxVolumeIdCommand(value storage.VolumeId) *MaxVolumeIdCommand { +func NewMaxVolumeIdCommand(value needle.VolumeId) *MaxVolumeIdCommand { return &MaxVolumeIdCommand{ MaxVolumeId: value, } diff --git a/weed/topology/collection.go b/weed/topology/collection.go index a17f0c961..f6b728ec9 100644 --- a/weed/topology/collection.go +++ b/weed/topology/collection.go @@ -4,6 +4,7 @@ import ( "fmt" "github.com/chrislusf/seaweedfs/weed/storage" + "github.com/chrislusf/seaweedfs/weed/storage/needle" "github.com/chrislusf/seaweedfs/weed/util" ) @@ -23,7 +24,7 @@ func (c *Collection) String() string { return fmt.Sprintf("Name:%s, volumeSizeLimit:%d, storageType2VolumeLayout:%v", c.Name, c.volumeSizeLimit, c.storageType2VolumeLayout) } -func (c *Collection) GetOrCreateVolumeLayout(rp *storage.ReplicaPlacement, ttl *storage.TTL) *VolumeLayout { +func (c *Collection) GetOrCreateVolumeLayout(rp *storage.ReplicaPlacement, ttl *needle.TTL) *VolumeLayout { keyString := rp.String() if ttl != nil { keyString += ttl.String() @@ -34,7 +35,7 @@ func (c *Collection) GetOrCreateVolumeLayout(rp *storage.ReplicaPlacement, ttl * return vl.(*VolumeLayout) } -func (c *Collection) Lookup(vid storage.VolumeId) []*DataNode { +func (c *Collection) Lookup(vid needle.VolumeId) []*DataNode { for _, vl := range c.storageType2VolumeLayout.Items() { if vl != nil { if list := vl.(*VolumeLayout).Lookup(vid); list != nil { diff --git a/weed/topology/data_node.go b/weed/topology/data_node.go index 84304512f..a89aa65d8 100644 --- a/weed/topology/data_node.go +++ b/weed/topology/data_node.go @@ -2,7 +2,10 @@ package topology import ( "fmt" + "github.com/chrislusf/seaweedfs/weed/pb/master_pb" + "github.com/chrislusf/seaweedfs/weed/storage/needle" + "strconv" "github.com/chrislusf/seaweedfs/weed/glog" @@ -11,7 +14,7 @@ import ( type DataNode struct { NodeImpl - volumes map[storage.VolumeId]storage.VolumeInfo + volumes map[needle.VolumeId]storage.VolumeInfo Ip string Port int PublicUrl string @@ -22,7 +25,7 @@ func NewDataNode(id string) *DataNode { s := &DataNode{} s.id = NodeId(id) s.nodeType = "DataNode" - s.volumes = make(map[storage.VolumeId]storage.VolumeInfo) + s.volumes = make(map[needle.VolumeId]storage.VolumeInfo) s.NodeImpl.value = s return s } @@ -51,7 +54,7 @@ func (dn *DataNode) AddOrUpdateVolume(v storage.VolumeInfo) (isNew bool) { } func (dn *DataNode) UpdateVolumes(actualVolumes []storage.VolumeInfo) (newVolumes, deletedVolumes []storage.VolumeInfo) { - actualVolumeMap := make(map[storage.VolumeId]storage.VolumeInfo) + actualVolumeMap := make(map[needle.VolumeId]storage.VolumeInfo) for _, v := range actualVolumes { actualVolumeMap[v.Id] = v } @@ -84,7 +87,7 @@ func (dn *DataNode) GetVolumes() (ret []storage.VolumeInfo) { return ret } -func (dn *DataNode) GetVolumesById(id storage.VolumeId) (storage.VolumeInfo, error) { +func (dn *DataNode) GetVolumesById(id needle.VolumeId) (storage.VolumeInfo, error) { dn.RLock() defer dn.RUnlock() vInfo, ok := dn.volumes[id] diff --git a/weed/topology/node.go b/weed/topology/node.go index db70c9734..a115c8480 100644 --- a/weed/topology/node.go +++ b/weed/topology/node.go @@ -8,7 +8,7 @@ import ( "sync/atomic" "github.com/chrislusf/seaweedfs/weed/glog" - "github.com/chrislusf/seaweedfs/weed/storage" + "github.com/chrislusf/seaweedfs/weed/storage/needle" ) type NodeId string @@ -20,12 +20,12 @@ type Node interface { UpAdjustMaxVolumeCountDelta(maxVolumeCountDelta int64) UpAdjustVolumeCountDelta(volumeCountDelta int64) UpAdjustActiveVolumeCountDelta(activeVolumeCountDelta int64) - UpAdjustMaxVolumeId(vid storage.VolumeId) + UpAdjustMaxVolumeId(vid needle.VolumeId) GetVolumeCount() int64 GetActiveVolumeCount() int64 GetMaxVolumeCount() int64 - GetMaxVolumeId() storage.VolumeId + GetMaxVolumeId() needle.VolumeId SetParent(Node) LinkChildNode(node Node) UnlinkChildNode(nodeId NodeId) @@ -46,8 +46,8 @@ type NodeImpl struct { maxVolumeCount int64 parent Node sync.RWMutex // lock children - children map[NodeId]Node - maxVolumeId storage.VolumeId + children map[NodeId]Node + maxVolumeId needle.VolumeId //for rack, data center, topology nodeType string @@ -190,7 +190,7 @@ func (n *NodeImpl) UpAdjustActiveVolumeCountDelta(activeVolumeCountDelta int64) n.parent.UpAdjustActiveVolumeCountDelta(activeVolumeCountDelta) } } -func (n *NodeImpl) UpAdjustMaxVolumeId(vid storage.VolumeId) { //can be negative +func (n *NodeImpl) UpAdjustMaxVolumeId(vid needle.VolumeId) { //can be negative if n.maxVolumeId < vid { n.maxVolumeId = vid if n.parent != nil { @@ -198,7 +198,7 @@ func (n *NodeImpl) UpAdjustMaxVolumeId(vid storage.VolumeId) { //can be negative } } } -func (n *NodeImpl) GetMaxVolumeId() storage.VolumeId { +func (n *NodeImpl) GetMaxVolumeId() needle.VolumeId { return n.maxVolumeId } func (n *NodeImpl) GetVolumeCount() int64 { diff --git a/weed/topology/store_replicate.go b/weed/topology/store_replicate.go index 4273e6d68..fd19cbfba 100644 --- a/weed/topology/store_replicate.go +++ b/weed/topology/store_replicate.go @@ -14,17 +14,18 @@ import ( "github.com/chrislusf/seaweedfs/weed/operation" "github.com/chrislusf/seaweedfs/weed/security" "github.com/chrislusf/seaweedfs/weed/storage" + "github.com/chrislusf/seaweedfs/weed/storage/needle" "github.com/chrislusf/seaweedfs/weed/util" ) func ReplicatedWrite(masterNode string, s *storage.Store, - volumeId storage.VolumeId, needle *storage.Needle, + volumeId needle.VolumeId, n *needle.Needle, r *http.Request) (size uint32, errorStatus string) { //check JWT jwt := security.GetJwt(r) - ret, err := s.Write(volumeId, needle) + ret, err := s.Write(volumeId, n) needToReplicate := !s.HasVolume(volumeId) if err != nil { errorStatus = "Failed to write to local disk (" + err.Error() + ")" @@ -47,30 +48,30 @@ func ReplicatedWrite(masterNode string, s *storage.Store, } q := url.Values{ "type": {"replicate"}, - "ttl": {needle.Ttl.String()}, + "ttl": {n.Ttl.String()}, } - if needle.LastModified > 0 { - q.Set("ts", strconv.FormatUint(needle.LastModified, 10)) + if n.LastModified > 0 { + q.Set("ts", strconv.FormatUint(n.LastModified, 10)) } - if needle.IsChunkedManifest() { + if n.IsChunkedManifest() { q.Set("cm", "true") } u.RawQuery = q.Encode() pairMap := make(map[string]string) - if needle.HasPairs() { + if n.HasPairs() { tmpMap := make(map[string]string) - err := json.Unmarshal(needle.Pairs, &tmpMap) + err := json.Unmarshal(n.Pairs, &tmpMap) if err != nil { glog.V(0).Infoln("Unmarshal pairs error:", err) } for k, v := range tmpMap { - pairMap[storage.PairNamePrefix+k] = v + pairMap[needle.PairNamePrefix+k] = v } } _, err := operation.Upload(u.String(), - string(needle.Name), bytes.NewReader(needle.Data), needle.IsGzipped(), string(needle.Mime), + string(n.Name), bytes.NewReader(n.Data), n.IsGzipped(), string(n.Mime), pairMap, jwt) return err }); err != nil { @@ -84,7 +85,7 @@ func ReplicatedWrite(masterNode string, s *storage.Store, } func ReplicatedDelete(masterNode string, store *storage.Store, - volumeId storage.VolumeId, n *storage.Needle, + volumeId needle.VolumeId, n *needle.Needle, r *http.Request) (uint32, error) { //check JWT @@ -132,7 +133,7 @@ type RemoteResult struct { Error error } -func distributedOperation(masterNode string, store *storage.Store, volumeId storage.VolumeId, op func(location operation.Location) error) error { +func distributedOperation(masterNode string, store *storage.Store, volumeId needle.VolumeId, op func(location operation.Location) error) error { if lookupResult, lookupErr := operation.Lookup(masterNode, volumeId.String()); lookupErr == nil { length := 0 selfUrl := (store.Ip + ":" + strconv.Itoa(store.Port)) diff --git a/weed/topology/topology.go b/weed/topology/topology.go index 208c9b5b7..5426ec7e3 100644 --- a/weed/topology/topology.go +++ b/weed/topology/topology.go @@ -9,6 +9,7 @@ import ( "github.com/chrislusf/seaweedfs/weed/pb/master_pb" "github.com/chrislusf/seaweedfs/weed/sequence" "github.com/chrislusf/seaweedfs/weed/storage" + "github.com/chrislusf/seaweedfs/weed/storage/needle" "github.com/chrislusf/seaweedfs/weed/util" ) @@ -72,7 +73,7 @@ func (t *Topology) Leader() (string, error) { return l, nil } -func (t *Topology) Lookup(collection string, vid storage.VolumeId) []*DataNode { +func (t *Topology) Lookup(collection string, vid needle.VolumeId) []*DataNode { //maybe an issue if lots of collections? if collection == "" { for _, c := range t.collectionMap.Items() { @@ -88,7 +89,7 @@ func (t *Topology) Lookup(collection string, vid storage.VolumeId) []*DataNode { return nil } -func (t *Topology) NextVolumeId() (storage.VolumeId, error) { +func (t *Topology) NextVolumeId() (needle.VolumeId, error) { vid := t.GetMaxVolumeId() next := vid.Next() if _, err := t.RaftServer.Do(NewMaxVolumeIdCommand(next)); err != nil { @@ -108,10 +109,10 @@ func (t *Topology) PickForWrite(count uint64, option *VolumeGrowOption) (string, return "", 0, nil, errors.New("No writable volumes available!") } fileId, count := t.Sequence.NextFileId(count) - return storage.NewFileId(*vid, fileId, rand.Uint32()).String(), count, datanodes.Head(), nil + return needle.NewFileId(*vid, fileId, rand.Uint32()).String(), count, datanodes.Head(), nil } -func (t *Topology) GetVolumeLayout(collectionName string, rp *storage.ReplicaPlacement, ttl *storage.TTL) *VolumeLayout { +func (t *Topology) GetVolumeLayout(collectionName string, rp *storage.ReplicaPlacement, ttl *needle.TTL) *VolumeLayout { return t.collectionMap.Get(collectionName, func() interface{} { return NewCollection(collectionName, t.volumeSizeLimit) }).(*Collection).GetOrCreateVolumeLayout(rp, ttl) diff --git a/weed/topology/topology_test.go b/weed/topology/topology_test.go index a8bdec902..ec745ee93 100644 --- a/weed/topology/topology_test.go +++ b/weed/topology/topology_test.go @@ -4,6 +4,8 @@ import ( "github.com/chrislusf/seaweedfs/weed/pb/master_pb" "github.com/chrislusf/seaweedfs/weed/sequence" "github.com/chrislusf/seaweedfs/weed/storage" + "github.com/chrislusf/seaweedfs/weed/storage/needle" + "testing" ) @@ -96,16 +98,16 @@ func TestAddRemoveVolume(t *testing.T) { dn := rack.GetOrCreateDataNode("127.0.0.1", 34534, "127.0.0.1", 25) v := storage.VolumeInfo{ - Id: storage.VolumeId(1), + Id: needle.VolumeId(1), Size: 100, Collection: "xcollection", FileCount: 123, DeleteCount: 23, DeletedByteCount: 45, ReadOnly: false, - Version: storage.CurrentVersion, + Version: needle.CurrentVersion, ReplicaPlacement: &storage.ReplicaPlacement{}, - Ttl: storage.EMPTY_TTL, + Ttl: needle.EMPTY_TTL, } dn.UpdateVolumes([]storage.VolumeInfo{v}) diff --git a/weed/topology/topology_vacuum.go b/weed/topology/topology_vacuum.go index ea65b2ff9..351ff842f 100644 --- a/weed/topology/topology_vacuum.go +++ b/weed/topology/topology_vacuum.go @@ -2,19 +2,20 @@ package topology import ( "context" - "google.golang.org/grpc" "time" + "github.com/chrislusf/seaweedfs/weed/storage/needle" + "google.golang.org/grpc" + "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/operation" "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb" - "github.com/chrislusf/seaweedfs/weed/storage" ) -func batchVacuumVolumeCheck(grpcDialOption grpc.DialOption, vl *VolumeLayout, vid storage.VolumeId, locationlist *VolumeLocationList, garbageThreshold float64) bool { +func batchVacuumVolumeCheck(grpcDialOption grpc.DialOption, vl *VolumeLayout, vid needle.VolumeId, locationlist *VolumeLocationList, garbageThreshold float64) bool { ch := make(chan bool, locationlist.Length()) for index, dn := range locationlist.list { - go func(index int, url string, vid storage.VolumeId) { + go func(index int, url string, vid needle.VolumeId) { err := operation.WithVolumeServerClient(url, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { resp, err := volumeServerClient.VacuumVolumeCheck(context.Background(), &volume_server_pb.VacuumVolumeCheckRequest{ VolumeId: uint32(vid), @@ -44,11 +45,11 @@ func batchVacuumVolumeCheck(grpcDialOption grpc.DialOption, vl *VolumeLayout, vi } return isCheckSuccess } -func batchVacuumVolumeCompact(grpcDialOption grpc.DialOption, vl *VolumeLayout, vid storage.VolumeId, locationlist *VolumeLocationList, preallocate int64) bool { +func batchVacuumVolumeCompact(grpcDialOption grpc.DialOption, vl *VolumeLayout, vid needle.VolumeId, locationlist *VolumeLocationList, preallocate int64) bool { vl.removeFromWritable(vid) ch := make(chan bool, locationlist.Length()) for index, dn := range locationlist.list { - go func(index int, url string, vid storage.VolumeId) { + go func(index int, url string, vid needle.VolumeId) { glog.V(0).Infoln(index, "Start vacuuming", vid, "on", url) err := operation.WithVolumeServerClient(url, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { _, err := volumeServerClient.VacuumVolumeCompact(context.Background(), &volume_server_pb.VacuumVolumeCompactRequest{ @@ -77,7 +78,7 @@ func batchVacuumVolumeCompact(grpcDialOption grpc.DialOption, vl *VolumeLayout, } return isVacuumSuccess } -func batchVacuumVolumeCommit(grpcDialOption grpc.DialOption, vl *VolumeLayout, vid storage.VolumeId, locationlist *VolumeLocationList) bool { +func batchVacuumVolumeCommit(grpcDialOption grpc.DialOption, vl *VolumeLayout, vid needle.VolumeId, locationlist *VolumeLocationList) bool { isCommitSuccess := true for _, dn := range locationlist.list { glog.V(0).Infoln("Start Committing vacuum", vid, "on", dn.Url()) @@ -99,7 +100,7 @@ func batchVacuumVolumeCommit(grpcDialOption grpc.DialOption, vl *VolumeLayout, v } return isCommitSuccess } -func batchVacuumVolumeCleanup(grpcDialOption grpc.DialOption, vl *VolumeLayout, vid storage.VolumeId, locationlist *VolumeLocationList) { +func batchVacuumVolumeCleanup(grpcDialOption grpc.DialOption, vl *VolumeLayout, vid needle.VolumeId, locationlist *VolumeLocationList) { for _, dn := range locationlist.list { glog.V(0).Infoln("Start cleaning up", vid, "on", dn.Url()) err := operation.WithVolumeServerClient(dn.Url(), grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { @@ -133,7 +134,7 @@ func (t *Topology) Vacuum(grpcDialOption grpc.DialOption, garbageThreshold float func vacuumOneVolumeLayout(grpcDialOption grpc.DialOption, volumeLayout *VolumeLayout, c *Collection, garbageThreshold float64, preallocate int64) { volumeLayout.accessLock.RLock() - tmpMap := make(map[storage.VolumeId]*VolumeLocationList) + tmpMap := make(map[needle.VolumeId]*VolumeLocationList) for vid, locationList := range volumeLayout.vid2location { tmpMap[vid] = locationList } diff --git a/weed/topology/volume_growth.go b/weed/topology/volume_growth.go index 514033ca1..ff02044a1 100644 --- a/weed/topology/volume_growth.go +++ b/weed/topology/volume_growth.go @@ -2,10 +2,12 @@ package topology import ( "fmt" - "google.golang.org/grpc" "math/rand" "sync" + "github.com/chrislusf/seaweedfs/weed/storage/needle" + "google.golang.org/grpc" + "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/storage" ) @@ -21,7 +23,7 @@ This package is created to resolve these replica placement issues: type VolumeGrowOption struct { Collection string ReplicaPlacement *storage.ReplicaPlacement - Ttl *storage.TTL + Ttl *needle.TTL Prealloacte int64 DataCenter string Rack string @@ -193,7 +195,7 @@ func (vg *VolumeGrowth) findEmptySlotsForOneVolume(topo *Topology, option *Volum return } -func (vg *VolumeGrowth) grow(grpcDialOption grpc.DialOption, topo *Topology, vid storage.VolumeId, option *VolumeGrowOption, servers ...*DataNode) error { +func (vg *VolumeGrowth) grow(grpcDialOption grpc.DialOption, topo *Topology, vid needle.VolumeId, option *VolumeGrowOption, servers ...*DataNode) error { for _, server := range servers { if err := AllocateVolume(server, grpcDialOption, vid, option); err == nil { vi := storage.VolumeInfo{ @@ -202,7 +204,7 @@ func (vg *VolumeGrowth) grow(grpcDialOption grpc.DialOption, topo *Topology, vid Collection: option.Collection, ReplicaPlacement: option.ReplicaPlacement, Ttl: option.Ttl, - Version: storage.CurrentVersion, + Version: needle.CurrentVersion, } server.AddOrUpdateVolume(vi) topo.RegisterVolumeLayout(vi, server) diff --git a/weed/topology/volume_growth_test.go b/weed/topology/volume_growth_test.go index 1963cb928..3573365fd 100644 --- a/weed/topology/volume_growth_test.go +++ b/weed/topology/volume_growth_test.go @@ -7,6 +7,7 @@ import ( "github.com/chrislusf/seaweedfs/weed/sequence" "github.com/chrislusf/seaweedfs/weed/storage" + "github.com/chrislusf/seaweedfs/weed/storage/needle" ) var topologyLayout = ` @@ -96,9 +97,9 @@ func setup(topologyLayout string) *Topology { for _, v := range serverMap["volumes"].([]interface{}) { m := v.(map[string]interface{}) vi := storage.VolumeInfo{ - Id: storage.VolumeId(int64(m["id"].(float64))), + Id: needle.VolumeId(int64(m["id"].(float64))), Size: uint64(m["size"].(float64)), - Version: storage.CurrentVersion} + Version: needle.CurrentVersion} server.AddOrUpdateVolume(vi) } server.UpAdjustMaxVolumeCountDelta(int64(serverMap["limit"].(float64))) diff --git a/weed/topology/volume_layout.go b/weed/topology/volume_layout.go index 71a071e2f..b3aa7d251 100644 --- a/weed/topology/volume_layout.go +++ b/weed/topology/volume_layout.go @@ -9,16 +9,17 @@ import ( "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/storage" + "github.com/chrislusf/seaweedfs/weed/storage/needle" ) // mapping from volume to its locations, inverted from server to volume type VolumeLayout struct { rp *storage.ReplicaPlacement - ttl *storage.TTL - vid2location map[storage.VolumeId]*VolumeLocationList - writables []storage.VolumeId // transient array of writable volume id - readonlyVolumes map[storage.VolumeId]bool // transient set of readonly volumes - oversizedVolumes map[storage.VolumeId]bool // set of oversized volumes + ttl *needle.TTL + vid2location map[needle.VolumeId]*VolumeLocationList + writables []needle.VolumeId // transient array of writable volume id + readonlyVolumes map[needle.VolumeId]bool // transient set of readonly volumes + oversizedVolumes map[needle.VolumeId]bool // set of oversized volumes volumeSizeLimit uint64 accessLock sync.RWMutex } @@ -29,14 +30,14 @@ type VolumeLayoutStats struct { FileCount uint64 } -func NewVolumeLayout(rp *storage.ReplicaPlacement, ttl *storage.TTL, volumeSizeLimit uint64) *VolumeLayout { +func NewVolumeLayout(rp *storage.ReplicaPlacement, ttl *needle.TTL, volumeSizeLimit uint64) *VolumeLayout { return &VolumeLayout{ rp: rp, ttl: ttl, - vid2location: make(map[storage.VolumeId]*VolumeLocationList), - writables: *new([]storage.VolumeId), - readonlyVolumes: make(map[storage.VolumeId]bool), - oversizedVolumes: make(map[storage.VolumeId]bool), + vid2location: make(map[needle.VolumeId]*VolumeLocationList), + writables: *new([]needle.VolumeId), + readonlyVolumes: make(map[needle.VolumeId]bool), + oversizedVolumes: make(map[needle.VolumeId]bool), volumeSizeLimit: volumeSizeLimit, } } @@ -95,7 +96,7 @@ func (vl *VolumeLayout) UnRegisterVolume(v *storage.VolumeInfo, dn *DataNode) { delete(vl.vid2location, v.Id) } -func (vl *VolumeLayout) addToWritable(vid storage.VolumeId) { +func (vl *VolumeLayout) addToWritable(vid needle.VolumeId) { for _, id := range vl.writables { if vid == id { return @@ -110,7 +111,7 @@ func (vl *VolumeLayout) isOversized(v *storage.VolumeInfo) bool { func (vl *VolumeLayout) isWritable(v *storage.VolumeInfo) bool { return !vl.isOversized(v) && - v.Version == storage.CurrentVersion && + v.Version == needle.CurrentVersion && !v.ReadOnly } @@ -121,7 +122,7 @@ func (vl *VolumeLayout) isEmpty() bool { return len(vl.vid2location) == 0 } -func (vl *VolumeLayout) Lookup(vid storage.VolumeId) []*DataNode { +func (vl *VolumeLayout) Lookup(vid needle.VolumeId) []*DataNode { vl.accessLock.RLock() defer vl.accessLock.RUnlock() @@ -141,7 +142,7 @@ func (vl *VolumeLayout) ListVolumeServers() (nodes []*DataNode) { return } -func (vl *VolumeLayout) PickForWrite(count uint64, option *VolumeGrowOption) (*storage.VolumeId, uint64, *VolumeLocationList, error) { +func (vl *VolumeLayout) PickForWrite(count uint64, option *VolumeGrowOption) (*needle.VolumeId, uint64, *VolumeLocationList, error) { vl.accessLock.RLock() defer vl.accessLock.RUnlock() @@ -158,7 +159,7 @@ func (vl *VolumeLayout) PickForWrite(count uint64, option *VolumeGrowOption) (*s } return nil, 0, nil, errors.New("Strangely vid " + vid.String() + " is on no machine!") } - var vid storage.VolumeId + var vid needle.VolumeId var locationList *VolumeLocationList counter := 0 for _, v := range vl.writables { @@ -205,7 +206,7 @@ func (vl *VolumeLayout) GetActiveVolumeCount(option *VolumeGrowOption) int { return counter } -func (vl *VolumeLayout) removeFromWritable(vid storage.VolumeId) bool { +func (vl *VolumeLayout) removeFromWritable(vid needle.VolumeId) bool { toDeleteIndex := -1 for k, id := range vl.writables { if id == vid { @@ -220,7 +221,7 @@ func (vl *VolumeLayout) removeFromWritable(vid storage.VolumeId) bool { } return false } -func (vl *VolumeLayout) setVolumeWritable(vid storage.VolumeId) bool { +func (vl *VolumeLayout) setVolumeWritable(vid needle.VolumeId) bool { for _, v := range vl.writables { if v == vid { return false @@ -231,7 +232,7 @@ func (vl *VolumeLayout) setVolumeWritable(vid storage.VolumeId) bool { return true } -func (vl *VolumeLayout) SetVolumeUnavailable(dn *DataNode, vid storage.VolumeId) bool { +func (vl *VolumeLayout) SetVolumeUnavailable(dn *DataNode, vid needle.VolumeId) bool { vl.accessLock.Lock() defer vl.accessLock.Unlock() @@ -245,7 +246,7 @@ func (vl *VolumeLayout) SetVolumeUnavailable(dn *DataNode, vid storage.VolumeId) } return false } -func (vl *VolumeLayout) SetVolumeAvailable(dn *DataNode, vid storage.VolumeId) bool { +func (vl *VolumeLayout) SetVolumeAvailable(dn *DataNode, vid needle.VolumeId) bool { vl.accessLock.Lock() defer vl.accessLock.Unlock() @@ -256,7 +257,7 @@ func (vl *VolumeLayout) SetVolumeAvailable(dn *DataNode, vid storage.VolumeId) b return false } -func (vl *VolumeLayout) SetVolumeCapacityFull(vid storage.VolumeId) bool { +func (vl *VolumeLayout) SetVolumeCapacityFull(vid needle.VolumeId) bool { vl.accessLock.Lock() defer vl.accessLock.Unlock() diff --git a/weed/topology/volume_location_list.go b/weed/topology/volume_location_list.go index 8d5881333..8905c54b5 100644 --- a/weed/topology/volume_location_list.go +++ b/weed/topology/volume_location_list.go @@ -3,7 +3,7 @@ package topology import ( "fmt" - "github.com/chrislusf/seaweedfs/weed/storage" + "github.com/chrislusf/seaweedfs/weed/storage/needle" ) type VolumeLocationList struct { @@ -66,7 +66,7 @@ func (dnll *VolumeLocationList) Refresh(freshThreshHold int64) { } } -func (dnll *VolumeLocationList) Stats(vid storage.VolumeId, freshThreshHold int64) (size uint64, fileCount int) { +func (dnll *VolumeLocationList) Stats(vid needle.VolumeId, freshThreshHold int64) (size uint64, fileCount int) { for _, dnl := range dnll.list { if dnl.LastSeen < freshThreshHold { vinfo, err := dnl.GetVolumesById(vid) |
