aboutsummaryrefslogtreecommitdiff
path: root/weed/topology
diff options
context:
space:
mode:
Diffstat (limited to 'weed/topology')
-rw-r--r--weed/topology/allocate_volume.go5
-rw-r--r--weed/topology/cluster_commands.go6
-rw-r--r--weed/topology/collection.go5
-rw-r--r--weed/topology/data_node.go11
-rw-r--r--weed/topology/node.go14
-rw-r--r--weed/topology/store_replicate.go25
-rw-r--r--weed/topology/topology.go9
-rw-r--r--weed/topology/topology_test.go8
-rw-r--r--weed/topology/topology_vacuum.go19
-rw-r--r--weed/topology/volume_growth.go10
-rw-r--r--weed/topology/volume_growth_test.go5
-rw-r--r--weed/topology/volume_layout.go41
-rw-r--r--weed/topology/volume_location_list.go4
13 files changed, 88 insertions, 74 deletions
diff --git a/weed/topology/allocate_volume.go b/weed/topology/allocate_volume.go
index f08736f64..48336092f 100644
--- a/weed/topology/allocate_volume.go
+++ b/weed/topology/allocate_volume.go
@@ -2,9 +2,10 @@ package topology
import (
"context"
+
"github.com/chrislusf/seaweedfs/weed/operation"
"github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
- "github.com/chrislusf/seaweedfs/weed/storage"
+ "github.com/chrislusf/seaweedfs/weed/storage/needle"
"google.golang.org/grpc"
)
@@ -12,7 +13,7 @@ type AllocateVolumeResult struct {
Error string
}
-func AllocateVolume(dn *DataNode, grpcDialOption grpc.DialOption, vid storage.VolumeId, option *VolumeGrowOption) error {
+func AllocateVolume(dn *DataNode, grpcDialOption grpc.DialOption, vid needle.VolumeId, option *VolumeGrowOption) error {
return operation.WithVolumeServerClient(dn.Url(), grpcDialOption, func(client volume_server_pb.VolumeServerClient) error {
diff --git a/weed/topology/cluster_commands.go b/weed/topology/cluster_commands.go
index 7a36c25ec..152691ccb 100644
--- a/weed/topology/cluster_commands.go
+++ b/weed/topology/cluster_commands.go
@@ -3,14 +3,14 @@ package topology
import (
"github.com/chrislusf/raft"
"github.com/chrislusf/seaweedfs/weed/glog"
- "github.com/chrislusf/seaweedfs/weed/storage"
+ "github.com/chrislusf/seaweedfs/weed/storage/needle"
)
type MaxVolumeIdCommand struct {
- MaxVolumeId storage.VolumeId `json:"maxVolumeId"`
+ MaxVolumeId needle.VolumeId `json:"maxVolumeId"`
}
-func NewMaxVolumeIdCommand(value storage.VolumeId) *MaxVolumeIdCommand {
+func NewMaxVolumeIdCommand(value needle.VolumeId) *MaxVolumeIdCommand {
return &MaxVolumeIdCommand{
MaxVolumeId: value,
}
diff --git a/weed/topology/collection.go b/weed/topology/collection.go
index a17f0c961..f6b728ec9 100644
--- a/weed/topology/collection.go
+++ b/weed/topology/collection.go
@@ -4,6 +4,7 @@ import (
"fmt"
"github.com/chrislusf/seaweedfs/weed/storage"
+ "github.com/chrislusf/seaweedfs/weed/storage/needle"
"github.com/chrislusf/seaweedfs/weed/util"
)
@@ -23,7 +24,7 @@ func (c *Collection) String() string {
return fmt.Sprintf("Name:%s, volumeSizeLimit:%d, storageType2VolumeLayout:%v", c.Name, c.volumeSizeLimit, c.storageType2VolumeLayout)
}
-func (c *Collection) GetOrCreateVolumeLayout(rp *storage.ReplicaPlacement, ttl *storage.TTL) *VolumeLayout {
+func (c *Collection) GetOrCreateVolumeLayout(rp *storage.ReplicaPlacement, ttl *needle.TTL) *VolumeLayout {
keyString := rp.String()
if ttl != nil {
keyString += ttl.String()
@@ -34,7 +35,7 @@ func (c *Collection) GetOrCreateVolumeLayout(rp *storage.ReplicaPlacement, ttl *
return vl.(*VolumeLayout)
}
-func (c *Collection) Lookup(vid storage.VolumeId) []*DataNode {
+func (c *Collection) Lookup(vid needle.VolumeId) []*DataNode {
for _, vl := range c.storageType2VolumeLayout.Items() {
if vl != nil {
if list := vl.(*VolumeLayout).Lookup(vid); list != nil {
diff --git a/weed/topology/data_node.go b/weed/topology/data_node.go
index 84304512f..a89aa65d8 100644
--- a/weed/topology/data_node.go
+++ b/weed/topology/data_node.go
@@ -2,7 +2,10 @@ package topology
import (
"fmt"
+
"github.com/chrislusf/seaweedfs/weed/pb/master_pb"
+ "github.com/chrislusf/seaweedfs/weed/storage/needle"
+
"strconv"
"github.com/chrislusf/seaweedfs/weed/glog"
@@ -11,7 +14,7 @@ import (
type DataNode struct {
NodeImpl
- volumes map[storage.VolumeId]storage.VolumeInfo
+ volumes map[needle.VolumeId]storage.VolumeInfo
Ip string
Port int
PublicUrl string
@@ -22,7 +25,7 @@ func NewDataNode(id string) *DataNode {
s := &DataNode{}
s.id = NodeId(id)
s.nodeType = "DataNode"
- s.volumes = make(map[storage.VolumeId]storage.VolumeInfo)
+ s.volumes = make(map[needle.VolumeId]storage.VolumeInfo)
s.NodeImpl.value = s
return s
}
@@ -51,7 +54,7 @@ func (dn *DataNode) AddOrUpdateVolume(v storage.VolumeInfo) (isNew bool) {
}
func (dn *DataNode) UpdateVolumes(actualVolumes []storage.VolumeInfo) (newVolumes, deletedVolumes []storage.VolumeInfo) {
- actualVolumeMap := make(map[storage.VolumeId]storage.VolumeInfo)
+ actualVolumeMap := make(map[needle.VolumeId]storage.VolumeInfo)
for _, v := range actualVolumes {
actualVolumeMap[v.Id] = v
}
@@ -84,7 +87,7 @@ func (dn *DataNode) GetVolumes() (ret []storage.VolumeInfo) {
return ret
}
-func (dn *DataNode) GetVolumesById(id storage.VolumeId) (storage.VolumeInfo, error) {
+func (dn *DataNode) GetVolumesById(id needle.VolumeId) (storage.VolumeInfo, error) {
dn.RLock()
defer dn.RUnlock()
vInfo, ok := dn.volumes[id]
diff --git a/weed/topology/node.go b/weed/topology/node.go
index db70c9734..a115c8480 100644
--- a/weed/topology/node.go
+++ b/weed/topology/node.go
@@ -8,7 +8,7 @@ import (
"sync/atomic"
"github.com/chrislusf/seaweedfs/weed/glog"
- "github.com/chrislusf/seaweedfs/weed/storage"
+ "github.com/chrislusf/seaweedfs/weed/storage/needle"
)
type NodeId string
@@ -20,12 +20,12 @@ type Node interface {
UpAdjustMaxVolumeCountDelta(maxVolumeCountDelta int64)
UpAdjustVolumeCountDelta(volumeCountDelta int64)
UpAdjustActiveVolumeCountDelta(activeVolumeCountDelta int64)
- UpAdjustMaxVolumeId(vid storage.VolumeId)
+ UpAdjustMaxVolumeId(vid needle.VolumeId)
GetVolumeCount() int64
GetActiveVolumeCount() int64
GetMaxVolumeCount() int64
- GetMaxVolumeId() storage.VolumeId
+ GetMaxVolumeId() needle.VolumeId
SetParent(Node)
LinkChildNode(node Node)
UnlinkChildNode(nodeId NodeId)
@@ -46,8 +46,8 @@ type NodeImpl struct {
maxVolumeCount int64
parent Node
sync.RWMutex // lock children
- children map[NodeId]Node
- maxVolumeId storage.VolumeId
+ children map[NodeId]Node
+ maxVolumeId needle.VolumeId
//for rack, data center, topology
nodeType string
@@ -190,7 +190,7 @@ func (n *NodeImpl) UpAdjustActiveVolumeCountDelta(activeVolumeCountDelta int64)
n.parent.UpAdjustActiveVolumeCountDelta(activeVolumeCountDelta)
}
}
-func (n *NodeImpl) UpAdjustMaxVolumeId(vid storage.VolumeId) { //can be negative
+func (n *NodeImpl) UpAdjustMaxVolumeId(vid needle.VolumeId) { //can be negative
if n.maxVolumeId < vid {
n.maxVolumeId = vid
if n.parent != nil {
@@ -198,7 +198,7 @@ func (n *NodeImpl) UpAdjustMaxVolumeId(vid storage.VolumeId) { //can be negative
}
}
}
-func (n *NodeImpl) GetMaxVolumeId() storage.VolumeId {
+func (n *NodeImpl) GetMaxVolumeId() needle.VolumeId {
return n.maxVolumeId
}
func (n *NodeImpl) GetVolumeCount() int64 {
diff --git a/weed/topology/store_replicate.go b/weed/topology/store_replicate.go
index 4273e6d68..fd19cbfba 100644
--- a/weed/topology/store_replicate.go
+++ b/weed/topology/store_replicate.go
@@ -14,17 +14,18 @@ import (
"github.com/chrislusf/seaweedfs/weed/operation"
"github.com/chrislusf/seaweedfs/weed/security"
"github.com/chrislusf/seaweedfs/weed/storage"
+ "github.com/chrislusf/seaweedfs/weed/storage/needle"
"github.com/chrislusf/seaweedfs/weed/util"
)
func ReplicatedWrite(masterNode string, s *storage.Store,
- volumeId storage.VolumeId, needle *storage.Needle,
+ volumeId needle.VolumeId, n *needle.Needle,
r *http.Request) (size uint32, errorStatus string) {
//check JWT
jwt := security.GetJwt(r)
- ret, err := s.Write(volumeId, needle)
+ ret, err := s.Write(volumeId, n)
needToReplicate := !s.HasVolume(volumeId)
if err != nil {
errorStatus = "Failed to write to local disk (" + err.Error() + ")"
@@ -47,30 +48,30 @@ func ReplicatedWrite(masterNode string, s *storage.Store,
}
q := url.Values{
"type": {"replicate"},
- "ttl": {needle.Ttl.String()},
+ "ttl": {n.Ttl.String()},
}
- if needle.LastModified > 0 {
- q.Set("ts", strconv.FormatUint(needle.LastModified, 10))
+ if n.LastModified > 0 {
+ q.Set("ts", strconv.FormatUint(n.LastModified, 10))
}
- if needle.IsChunkedManifest() {
+ if n.IsChunkedManifest() {
q.Set("cm", "true")
}
u.RawQuery = q.Encode()
pairMap := make(map[string]string)
- if needle.HasPairs() {
+ if n.HasPairs() {
tmpMap := make(map[string]string)
- err := json.Unmarshal(needle.Pairs, &tmpMap)
+ err := json.Unmarshal(n.Pairs, &tmpMap)
if err != nil {
glog.V(0).Infoln("Unmarshal pairs error:", err)
}
for k, v := range tmpMap {
- pairMap[storage.PairNamePrefix+k] = v
+ pairMap[needle.PairNamePrefix+k] = v
}
}
_, err := operation.Upload(u.String(),
- string(needle.Name), bytes.NewReader(needle.Data), needle.IsGzipped(), string(needle.Mime),
+ string(n.Name), bytes.NewReader(n.Data), n.IsGzipped(), string(n.Mime),
pairMap, jwt)
return err
}); err != nil {
@@ -84,7 +85,7 @@ func ReplicatedWrite(masterNode string, s *storage.Store,
}
func ReplicatedDelete(masterNode string, store *storage.Store,
- volumeId storage.VolumeId, n *storage.Needle,
+ volumeId needle.VolumeId, n *needle.Needle,
r *http.Request) (uint32, error) {
//check JWT
@@ -132,7 +133,7 @@ type RemoteResult struct {
Error error
}
-func distributedOperation(masterNode string, store *storage.Store, volumeId storage.VolumeId, op func(location operation.Location) error) error {
+func distributedOperation(masterNode string, store *storage.Store, volumeId needle.VolumeId, op func(location operation.Location) error) error {
if lookupResult, lookupErr := operation.Lookup(masterNode, volumeId.String()); lookupErr == nil {
length := 0
selfUrl := (store.Ip + ":" + strconv.Itoa(store.Port))
diff --git a/weed/topology/topology.go b/weed/topology/topology.go
index 208c9b5b7..5426ec7e3 100644
--- a/weed/topology/topology.go
+++ b/weed/topology/topology.go
@@ -9,6 +9,7 @@ import (
"github.com/chrislusf/seaweedfs/weed/pb/master_pb"
"github.com/chrislusf/seaweedfs/weed/sequence"
"github.com/chrislusf/seaweedfs/weed/storage"
+ "github.com/chrislusf/seaweedfs/weed/storage/needle"
"github.com/chrislusf/seaweedfs/weed/util"
)
@@ -72,7 +73,7 @@ func (t *Topology) Leader() (string, error) {
return l, nil
}
-func (t *Topology) Lookup(collection string, vid storage.VolumeId) []*DataNode {
+func (t *Topology) Lookup(collection string, vid needle.VolumeId) []*DataNode {
//maybe an issue if lots of collections?
if collection == "" {
for _, c := range t.collectionMap.Items() {
@@ -88,7 +89,7 @@ func (t *Topology) Lookup(collection string, vid storage.VolumeId) []*DataNode {
return nil
}
-func (t *Topology) NextVolumeId() (storage.VolumeId, error) {
+func (t *Topology) NextVolumeId() (needle.VolumeId, error) {
vid := t.GetMaxVolumeId()
next := vid.Next()
if _, err := t.RaftServer.Do(NewMaxVolumeIdCommand(next)); err != nil {
@@ -108,10 +109,10 @@ func (t *Topology) PickForWrite(count uint64, option *VolumeGrowOption) (string,
return "", 0, nil, errors.New("No writable volumes available!")
}
fileId, count := t.Sequence.NextFileId(count)
- return storage.NewFileId(*vid, fileId, rand.Uint32()).String(), count, datanodes.Head(), nil
+ return needle.NewFileId(*vid, fileId, rand.Uint32()).String(), count, datanodes.Head(), nil
}
-func (t *Topology) GetVolumeLayout(collectionName string, rp *storage.ReplicaPlacement, ttl *storage.TTL) *VolumeLayout {
+func (t *Topology) GetVolumeLayout(collectionName string, rp *storage.ReplicaPlacement, ttl *needle.TTL) *VolumeLayout {
return t.collectionMap.Get(collectionName, func() interface{} {
return NewCollection(collectionName, t.volumeSizeLimit)
}).(*Collection).GetOrCreateVolumeLayout(rp, ttl)
diff --git a/weed/topology/topology_test.go b/weed/topology/topology_test.go
index a8bdec902..ec745ee93 100644
--- a/weed/topology/topology_test.go
+++ b/weed/topology/topology_test.go
@@ -4,6 +4,8 @@ import (
"github.com/chrislusf/seaweedfs/weed/pb/master_pb"
"github.com/chrislusf/seaweedfs/weed/sequence"
"github.com/chrislusf/seaweedfs/weed/storage"
+ "github.com/chrislusf/seaweedfs/weed/storage/needle"
+
"testing"
)
@@ -96,16 +98,16 @@ func TestAddRemoveVolume(t *testing.T) {
dn := rack.GetOrCreateDataNode("127.0.0.1", 34534, "127.0.0.1", 25)
v := storage.VolumeInfo{
- Id: storage.VolumeId(1),
+ Id: needle.VolumeId(1),
Size: 100,
Collection: "xcollection",
FileCount: 123,
DeleteCount: 23,
DeletedByteCount: 45,
ReadOnly: false,
- Version: storage.CurrentVersion,
+ Version: needle.CurrentVersion,
ReplicaPlacement: &storage.ReplicaPlacement{},
- Ttl: storage.EMPTY_TTL,
+ Ttl: needle.EMPTY_TTL,
}
dn.UpdateVolumes([]storage.VolumeInfo{v})
diff --git a/weed/topology/topology_vacuum.go b/weed/topology/topology_vacuum.go
index ea65b2ff9..351ff842f 100644
--- a/weed/topology/topology_vacuum.go
+++ b/weed/topology/topology_vacuum.go
@@ -2,19 +2,20 @@ package topology
import (
"context"
- "google.golang.org/grpc"
"time"
+ "github.com/chrislusf/seaweedfs/weed/storage/needle"
+ "google.golang.org/grpc"
+
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/operation"
"github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
- "github.com/chrislusf/seaweedfs/weed/storage"
)
-func batchVacuumVolumeCheck(grpcDialOption grpc.DialOption, vl *VolumeLayout, vid storage.VolumeId, locationlist *VolumeLocationList, garbageThreshold float64) bool {
+func batchVacuumVolumeCheck(grpcDialOption grpc.DialOption, vl *VolumeLayout, vid needle.VolumeId, locationlist *VolumeLocationList, garbageThreshold float64) bool {
ch := make(chan bool, locationlist.Length())
for index, dn := range locationlist.list {
- go func(index int, url string, vid storage.VolumeId) {
+ go func(index int, url string, vid needle.VolumeId) {
err := operation.WithVolumeServerClient(url, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
resp, err := volumeServerClient.VacuumVolumeCheck(context.Background(), &volume_server_pb.VacuumVolumeCheckRequest{
VolumeId: uint32(vid),
@@ -44,11 +45,11 @@ func batchVacuumVolumeCheck(grpcDialOption grpc.DialOption, vl *VolumeLayout, vi
}
return isCheckSuccess
}
-func batchVacuumVolumeCompact(grpcDialOption grpc.DialOption, vl *VolumeLayout, vid storage.VolumeId, locationlist *VolumeLocationList, preallocate int64) bool {
+func batchVacuumVolumeCompact(grpcDialOption grpc.DialOption, vl *VolumeLayout, vid needle.VolumeId, locationlist *VolumeLocationList, preallocate int64) bool {
vl.removeFromWritable(vid)
ch := make(chan bool, locationlist.Length())
for index, dn := range locationlist.list {
- go func(index int, url string, vid storage.VolumeId) {
+ go func(index int, url string, vid needle.VolumeId) {
glog.V(0).Infoln(index, "Start vacuuming", vid, "on", url)
err := operation.WithVolumeServerClient(url, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
_, err := volumeServerClient.VacuumVolumeCompact(context.Background(), &volume_server_pb.VacuumVolumeCompactRequest{
@@ -77,7 +78,7 @@ func batchVacuumVolumeCompact(grpcDialOption grpc.DialOption, vl *VolumeLayout,
}
return isVacuumSuccess
}
-func batchVacuumVolumeCommit(grpcDialOption grpc.DialOption, vl *VolumeLayout, vid storage.VolumeId, locationlist *VolumeLocationList) bool {
+func batchVacuumVolumeCommit(grpcDialOption grpc.DialOption, vl *VolumeLayout, vid needle.VolumeId, locationlist *VolumeLocationList) bool {
isCommitSuccess := true
for _, dn := range locationlist.list {
glog.V(0).Infoln("Start Committing vacuum", vid, "on", dn.Url())
@@ -99,7 +100,7 @@ func batchVacuumVolumeCommit(grpcDialOption grpc.DialOption, vl *VolumeLayout, v
}
return isCommitSuccess
}
-func batchVacuumVolumeCleanup(grpcDialOption grpc.DialOption, vl *VolumeLayout, vid storage.VolumeId, locationlist *VolumeLocationList) {
+func batchVacuumVolumeCleanup(grpcDialOption grpc.DialOption, vl *VolumeLayout, vid needle.VolumeId, locationlist *VolumeLocationList) {
for _, dn := range locationlist.list {
glog.V(0).Infoln("Start cleaning up", vid, "on", dn.Url())
err := operation.WithVolumeServerClient(dn.Url(), grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
@@ -133,7 +134,7 @@ func (t *Topology) Vacuum(grpcDialOption grpc.DialOption, garbageThreshold float
func vacuumOneVolumeLayout(grpcDialOption grpc.DialOption, volumeLayout *VolumeLayout, c *Collection, garbageThreshold float64, preallocate int64) {
volumeLayout.accessLock.RLock()
- tmpMap := make(map[storage.VolumeId]*VolumeLocationList)
+ tmpMap := make(map[needle.VolumeId]*VolumeLocationList)
for vid, locationList := range volumeLayout.vid2location {
tmpMap[vid] = locationList
}
diff --git a/weed/topology/volume_growth.go b/weed/topology/volume_growth.go
index 514033ca1..ff02044a1 100644
--- a/weed/topology/volume_growth.go
+++ b/weed/topology/volume_growth.go
@@ -2,10 +2,12 @@ package topology
import (
"fmt"
- "google.golang.org/grpc"
"math/rand"
"sync"
+ "github.com/chrislusf/seaweedfs/weed/storage/needle"
+ "google.golang.org/grpc"
+
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/storage"
)
@@ -21,7 +23,7 @@ This package is created to resolve these replica placement issues:
type VolumeGrowOption struct {
Collection string
ReplicaPlacement *storage.ReplicaPlacement
- Ttl *storage.TTL
+ Ttl *needle.TTL
Prealloacte int64
DataCenter string
Rack string
@@ -193,7 +195,7 @@ func (vg *VolumeGrowth) findEmptySlotsForOneVolume(topo *Topology, option *Volum
return
}
-func (vg *VolumeGrowth) grow(grpcDialOption grpc.DialOption, topo *Topology, vid storage.VolumeId, option *VolumeGrowOption, servers ...*DataNode) error {
+func (vg *VolumeGrowth) grow(grpcDialOption grpc.DialOption, topo *Topology, vid needle.VolumeId, option *VolumeGrowOption, servers ...*DataNode) error {
for _, server := range servers {
if err := AllocateVolume(server, grpcDialOption, vid, option); err == nil {
vi := storage.VolumeInfo{
@@ -202,7 +204,7 @@ func (vg *VolumeGrowth) grow(grpcDialOption grpc.DialOption, topo *Topology, vid
Collection: option.Collection,
ReplicaPlacement: option.ReplicaPlacement,
Ttl: option.Ttl,
- Version: storage.CurrentVersion,
+ Version: needle.CurrentVersion,
}
server.AddOrUpdateVolume(vi)
topo.RegisterVolumeLayout(vi, server)
diff --git a/weed/topology/volume_growth_test.go b/weed/topology/volume_growth_test.go
index 1963cb928..3573365fd 100644
--- a/weed/topology/volume_growth_test.go
+++ b/weed/topology/volume_growth_test.go
@@ -7,6 +7,7 @@ import (
"github.com/chrislusf/seaweedfs/weed/sequence"
"github.com/chrislusf/seaweedfs/weed/storage"
+ "github.com/chrislusf/seaweedfs/weed/storage/needle"
)
var topologyLayout = `
@@ -96,9 +97,9 @@ func setup(topologyLayout string) *Topology {
for _, v := range serverMap["volumes"].([]interface{}) {
m := v.(map[string]interface{})
vi := storage.VolumeInfo{
- Id: storage.VolumeId(int64(m["id"].(float64))),
+ Id: needle.VolumeId(int64(m["id"].(float64))),
Size: uint64(m["size"].(float64)),
- Version: storage.CurrentVersion}
+ Version: needle.CurrentVersion}
server.AddOrUpdateVolume(vi)
}
server.UpAdjustMaxVolumeCountDelta(int64(serverMap["limit"].(float64)))
diff --git a/weed/topology/volume_layout.go b/weed/topology/volume_layout.go
index 71a071e2f..b3aa7d251 100644
--- a/weed/topology/volume_layout.go
+++ b/weed/topology/volume_layout.go
@@ -9,16 +9,17 @@ import (
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/storage"
+ "github.com/chrislusf/seaweedfs/weed/storage/needle"
)
// mapping from volume to its locations, inverted from server to volume
type VolumeLayout struct {
rp *storage.ReplicaPlacement
- ttl *storage.TTL
- vid2location map[storage.VolumeId]*VolumeLocationList
- writables []storage.VolumeId // transient array of writable volume id
- readonlyVolumes map[storage.VolumeId]bool // transient set of readonly volumes
- oversizedVolumes map[storage.VolumeId]bool // set of oversized volumes
+ ttl *needle.TTL
+ vid2location map[needle.VolumeId]*VolumeLocationList
+ writables []needle.VolumeId // transient array of writable volume id
+ readonlyVolumes map[needle.VolumeId]bool // transient set of readonly volumes
+ oversizedVolumes map[needle.VolumeId]bool // set of oversized volumes
volumeSizeLimit uint64
accessLock sync.RWMutex
}
@@ -29,14 +30,14 @@ type VolumeLayoutStats struct {
FileCount uint64
}
-func NewVolumeLayout(rp *storage.ReplicaPlacement, ttl *storage.TTL, volumeSizeLimit uint64) *VolumeLayout {
+func NewVolumeLayout(rp *storage.ReplicaPlacement, ttl *needle.TTL, volumeSizeLimit uint64) *VolumeLayout {
return &VolumeLayout{
rp: rp,
ttl: ttl,
- vid2location: make(map[storage.VolumeId]*VolumeLocationList),
- writables: *new([]storage.VolumeId),
- readonlyVolumes: make(map[storage.VolumeId]bool),
- oversizedVolumes: make(map[storage.VolumeId]bool),
+ vid2location: make(map[needle.VolumeId]*VolumeLocationList),
+ writables: *new([]needle.VolumeId),
+ readonlyVolumes: make(map[needle.VolumeId]bool),
+ oversizedVolumes: make(map[needle.VolumeId]bool),
volumeSizeLimit: volumeSizeLimit,
}
}
@@ -95,7 +96,7 @@ func (vl *VolumeLayout) UnRegisterVolume(v *storage.VolumeInfo, dn *DataNode) {
delete(vl.vid2location, v.Id)
}
-func (vl *VolumeLayout) addToWritable(vid storage.VolumeId) {
+func (vl *VolumeLayout) addToWritable(vid needle.VolumeId) {
for _, id := range vl.writables {
if vid == id {
return
@@ -110,7 +111,7 @@ func (vl *VolumeLayout) isOversized(v *storage.VolumeInfo) bool {
func (vl *VolumeLayout) isWritable(v *storage.VolumeInfo) bool {
return !vl.isOversized(v) &&
- v.Version == storage.CurrentVersion &&
+ v.Version == needle.CurrentVersion &&
!v.ReadOnly
}
@@ -121,7 +122,7 @@ func (vl *VolumeLayout) isEmpty() bool {
return len(vl.vid2location) == 0
}
-func (vl *VolumeLayout) Lookup(vid storage.VolumeId) []*DataNode {
+func (vl *VolumeLayout) Lookup(vid needle.VolumeId) []*DataNode {
vl.accessLock.RLock()
defer vl.accessLock.RUnlock()
@@ -141,7 +142,7 @@ func (vl *VolumeLayout) ListVolumeServers() (nodes []*DataNode) {
return
}
-func (vl *VolumeLayout) PickForWrite(count uint64, option *VolumeGrowOption) (*storage.VolumeId, uint64, *VolumeLocationList, error) {
+func (vl *VolumeLayout) PickForWrite(count uint64, option *VolumeGrowOption) (*needle.VolumeId, uint64, *VolumeLocationList, error) {
vl.accessLock.RLock()
defer vl.accessLock.RUnlock()
@@ -158,7 +159,7 @@ func (vl *VolumeLayout) PickForWrite(count uint64, option *VolumeGrowOption) (*s
}
return nil, 0, nil, errors.New("Strangely vid " + vid.String() + " is on no machine!")
}
- var vid storage.VolumeId
+ var vid needle.VolumeId
var locationList *VolumeLocationList
counter := 0
for _, v := range vl.writables {
@@ -205,7 +206,7 @@ func (vl *VolumeLayout) GetActiveVolumeCount(option *VolumeGrowOption) int {
return counter
}
-func (vl *VolumeLayout) removeFromWritable(vid storage.VolumeId) bool {
+func (vl *VolumeLayout) removeFromWritable(vid needle.VolumeId) bool {
toDeleteIndex := -1
for k, id := range vl.writables {
if id == vid {
@@ -220,7 +221,7 @@ func (vl *VolumeLayout) removeFromWritable(vid storage.VolumeId) bool {
}
return false
}
-func (vl *VolumeLayout) setVolumeWritable(vid storage.VolumeId) bool {
+func (vl *VolumeLayout) setVolumeWritable(vid needle.VolumeId) bool {
for _, v := range vl.writables {
if v == vid {
return false
@@ -231,7 +232,7 @@ func (vl *VolumeLayout) setVolumeWritable(vid storage.VolumeId) bool {
return true
}
-func (vl *VolumeLayout) SetVolumeUnavailable(dn *DataNode, vid storage.VolumeId) bool {
+func (vl *VolumeLayout) SetVolumeUnavailable(dn *DataNode, vid needle.VolumeId) bool {
vl.accessLock.Lock()
defer vl.accessLock.Unlock()
@@ -245,7 +246,7 @@ func (vl *VolumeLayout) SetVolumeUnavailable(dn *DataNode, vid storage.VolumeId)
}
return false
}
-func (vl *VolumeLayout) SetVolumeAvailable(dn *DataNode, vid storage.VolumeId) bool {
+func (vl *VolumeLayout) SetVolumeAvailable(dn *DataNode, vid needle.VolumeId) bool {
vl.accessLock.Lock()
defer vl.accessLock.Unlock()
@@ -256,7 +257,7 @@ func (vl *VolumeLayout) SetVolumeAvailable(dn *DataNode, vid storage.VolumeId) b
return false
}
-func (vl *VolumeLayout) SetVolumeCapacityFull(vid storage.VolumeId) bool {
+func (vl *VolumeLayout) SetVolumeCapacityFull(vid needle.VolumeId) bool {
vl.accessLock.Lock()
defer vl.accessLock.Unlock()
diff --git a/weed/topology/volume_location_list.go b/weed/topology/volume_location_list.go
index 8d5881333..8905c54b5 100644
--- a/weed/topology/volume_location_list.go
+++ b/weed/topology/volume_location_list.go
@@ -3,7 +3,7 @@ package topology
import (
"fmt"
- "github.com/chrislusf/seaweedfs/weed/storage"
+ "github.com/chrislusf/seaweedfs/weed/storage/needle"
)
type VolumeLocationList struct {
@@ -66,7 +66,7 @@ func (dnll *VolumeLocationList) Refresh(freshThreshHold int64) {
}
}
-func (dnll *VolumeLocationList) Stats(vid storage.VolumeId, freshThreshHold int64) (size uint64, fileCount int) {
+func (dnll *VolumeLocationList) Stats(vid needle.VolumeId, freshThreshHold int64) (size uint64, fileCount int) {
for _, dnl := range dnll.list {
if dnl.LastSeen < freshThreshHold {
vinfo, err := dnl.GetVolumesById(vid)