aboutsummaryrefslogtreecommitdiff
path: root/weed/topology
diff options
context:
space:
mode:
authorbingoohuang <bingoo.huang@gmail.com>2019-07-16 11:13:23 +0800
committerGitHub <noreply@github.com>2019-07-16 11:13:23 +0800
commitd19bbee98d89ec6cd603572bd9c5d55749610e61 (patch)
tree8d760dcee4dfcb4404af90b7d5e64def4549b4cc /weed/topology
parent01060c992591f412b0d5e180bde29991747a9462 (diff)
parent5b5e443d5b9985fd77f3d5470f1d5885a88bf2b9 (diff)
downloadseaweedfs-d19bbee98d89ec6cd603572bd9c5d55749610e61.tar.xz
seaweedfs-d19bbee98d89ec6cd603572bd9c5d55749610e61.zip
keep update from original (#1)
keep update from original
Diffstat (limited to 'weed/topology')
-rw-r--r--weed/topology/allocate_volume.go14
-rw-r--r--weed/topology/cluster_commands.go6
-rw-r--r--weed/topology/collection.go5
-rw-r--r--weed/topology/data_center.go17
-rw-r--r--weed/topology/data_node.go57
-rw-r--r--weed/topology/data_node_ec.go135
-rw-r--r--weed/topology/node.go75
-rw-r--r--weed/topology/rack.go18
-rw-r--r--weed/topology/store_replicate.go38
-rw-r--r--weed/topology/topology.go96
-rw-r--r--weed/topology/topology_ec.go173
-rw-r--r--weed/topology/topology_event_handling.go5
-rw-r--r--weed/topology/topology_map.go18
-rw-r--r--weed/topology/topology_test.go71
-rw-r--r--weed/topology/topology_vacuum.go57
-rw-r--r--weed/topology/volume_growth.go36
-rw-r--r--weed/topology/volume_growth_test.go7
-rw-r--r--weed/topology/volume_layout.go88
-rw-r--r--weed/topology/volume_location_list.go4
19 files changed, 744 insertions, 176 deletions
diff --git a/weed/topology/allocate_volume.go b/weed/topology/allocate_volume.go
index 55796ab43..48336092f 100644
--- a/weed/topology/allocate_volume.go
+++ b/weed/topology/allocate_volume.go
@@ -2,25 +2,23 @@ package topology
import (
"context"
- "time"
"github.com/chrislusf/seaweedfs/weed/operation"
"github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
- "github.com/chrislusf/seaweedfs/weed/storage"
+ "github.com/chrislusf/seaweedfs/weed/storage/needle"
+ "google.golang.org/grpc"
)
type AllocateVolumeResult struct {
Error string
}
-func AllocateVolume(dn *DataNode, vid storage.VolumeId, option *VolumeGrowOption) error {
+func AllocateVolume(dn *DataNode, grpcDialOption grpc.DialOption, vid needle.VolumeId, option *VolumeGrowOption) error {
- return operation.WithVolumeServerClient(dn.Url(), func(client volume_server_pb.VolumeServerClient) error {
- ctx, cancel := context.WithTimeout(context.Background(), time.Duration(5*time.Second))
- defer cancel()
+ return operation.WithVolumeServerClient(dn.Url(), grpcDialOption, func(client volume_server_pb.VolumeServerClient) error {
- _, deleteErr := client.AssignVolume(ctx, &volume_server_pb.AssignVolumeRequest{
- VolumdId: uint32(vid),
+ _, deleteErr := client.AllocateVolume(context.Background(), &volume_server_pb.AllocateVolumeRequest{
+ VolumeId: uint32(vid),
Collection: option.Collection,
Replication: option.ReplicaPlacement.String(),
Ttl: option.Ttl.String(),
diff --git a/weed/topology/cluster_commands.go b/weed/topology/cluster_commands.go
index 7a36c25ec..152691ccb 100644
--- a/weed/topology/cluster_commands.go
+++ b/weed/topology/cluster_commands.go
@@ -3,14 +3,14 @@ package topology
import (
"github.com/chrislusf/raft"
"github.com/chrislusf/seaweedfs/weed/glog"
- "github.com/chrislusf/seaweedfs/weed/storage"
+ "github.com/chrislusf/seaweedfs/weed/storage/needle"
)
type MaxVolumeIdCommand struct {
- MaxVolumeId storage.VolumeId `json:"maxVolumeId"`
+ MaxVolumeId needle.VolumeId `json:"maxVolumeId"`
}
-func NewMaxVolumeIdCommand(value storage.VolumeId) *MaxVolumeIdCommand {
+func NewMaxVolumeIdCommand(value needle.VolumeId) *MaxVolumeIdCommand {
return &MaxVolumeIdCommand{
MaxVolumeId: value,
}
diff --git a/weed/topology/collection.go b/weed/topology/collection.go
index a17f0c961..f6b728ec9 100644
--- a/weed/topology/collection.go
+++ b/weed/topology/collection.go
@@ -4,6 +4,7 @@ import (
"fmt"
"github.com/chrislusf/seaweedfs/weed/storage"
+ "github.com/chrislusf/seaweedfs/weed/storage/needle"
"github.com/chrislusf/seaweedfs/weed/util"
)
@@ -23,7 +24,7 @@ func (c *Collection) String() string {
return fmt.Sprintf("Name:%s, volumeSizeLimit:%d, storageType2VolumeLayout:%v", c.Name, c.volumeSizeLimit, c.storageType2VolumeLayout)
}
-func (c *Collection) GetOrCreateVolumeLayout(rp *storage.ReplicaPlacement, ttl *storage.TTL) *VolumeLayout {
+func (c *Collection) GetOrCreateVolumeLayout(rp *storage.ReplicaPlacement, ttl *needle.TTL) *VolumeLayout {
keyString := rp.String()
if ttl != nil {
keyString += ttl.String()
@@ -34,7 +35,7 @@ func (c *Collection) GetOrCreateVolumeLayout(rp *storage.ReplicaPlacement, ttl *
return vl.(*VolumeLayout)
}
-func (c *Collection) Lookup(vid storage.VolumeId) []*DataNode {
+func (c *Collection) Lookup(vid needle.VolumeId) []*DataNode {
for _, vl := range c.storageType2VolumeLayout.Items() {
if vl != nil {
if list := vl.(*VolumeLayout).Lookup(vid); list != nil {
diff --git a/weed/topology/data_center.go b/weed/topology/data_center.go
index bcf2dfd31..640cb1937 100644
--- a/weed/topology/data_center.go
+++ b/weed/topology/data_center.go
@@ -1,5 +1,7 @@
package topology
+import "github.com/chrislusf/seaweedfs/weed/pb/master_pb"
+
type DataCenter struct {
NodeImpl
}
@@ -38,3 +40,18 @@ func (dc *DataCenter) ToMap() interface{} {
m["Racks"] = racks
return m
}
+
+func (dc *DataCenter) ToDataCenterInfo() *master_pb.DataCenterInfo {
+ m := &master_pb.DataCenterInfo{
+ Id: string(dc.Id()),
+ VolumeCount: uint64(dc.GetVolumeCount()),
+ MaxVolumeCount: uint64(dc.GetMaxVolumeCount()),
+ FreeVolumeCount: uint64(dc.FreeSpace()),
+ ActiveVolumeCount: uint64(dc.GetActiveVolumeCount()),
+ }
+ for _, c := range dc.Children() {
+ rack := c.(*Rack)
+ m.RackInfos = append(m.RackInfos, rack.ToRackInfo())
+ }
+ return m
+}
diff --git a/weed/topology/data_node.go b/weed/topology/data_node.go
index 6ea6d3938..3e72ccdbf 100644
--- a/weed/topology/data_node.go
+++ b/weed/topology/data_node.go
@@ -2,6 +2,12 @@ package topology
import (
"fmt"
+ "sync"
+
+ "github.com/chrislusf/seaweedfs/weed/pb/master_pb"
+ "github.com/chrislusf/seaweedfs/weed/storage/erasure_coding"
+ "github.com/chrislusf/seaweedfs/weed/storage/needle"
+
"strconv"
"github.com/chrislusf/seaweedfs/weed/glog"
@@ -10,18 +16,21 @@ import (
type DataNode struct {
NodeImpl
- volumes map[storage.VolumeId]storage.VolumeInfo
- Ip string
- Port int
- PublicUrl string
- LastSeen int64 // unix time in seconds
+ volumes map[needle.VolumeId]storage.VolumeInfo
+ Ip string
+ Port int
+ PublicUrl string
+ LastSeen int64 // unix time in seconds
+ ecShards map[needle.VolumeId]*erasure_coding.EcVolumeInfo
+ ecShardsLock sync.RWMutex
}
func NewDataNode(id string) *DataNode {
s := &DataNode{}
s.id = NodeId(id)
s.nodeType = "DataNode"
- s.volumes = make(map[storage.VolumeId]storage.VolumeInfo)
+ s.volumes = make(map[needle.VolumeId]storage.VolumeInfo)
+ s.ecShards = make(map[needle.VolumeId]*erasure_coding.EcVolumeInfo)
s.NodeImpl.value = s
return s
}
@@ -50,7 +59,7 @@ func (dn *DataNode) AddOrUpdateVolume(v storage.VolumeInfo) (isNew bool) {
}
func (dn *DataNode) UpdateVolumes(actualVolumes []storage.VolumeInfo) (newVolumes, deletedVolumes []storage.VolumeInfo) {
- actualVolumeMap := make(map[storage.VolumeId]storage.VolumeInfo)
+ actualVolumeMap := make(map[needle.VolumeId]storage.VolumeInfo)
for _, v := range actualVolumes {
actualVolumeMap[v.Id] = v
}
@@ -74,6 +83,20 @@ func (dn *DataNode) UpdateVolumes(actualVolumes []storage.VolumeInfo) (newVolume
return
}
+func (dn *DataNode) DeltaUpdateVolumes(newlVolumes, deletedVolumes []storage.VolumeInfo) {
+ dn.Lock()
+ for _, v := range deletedVolumes {
+ delete(dn.volumes, v.Id)
+ dn.UpAdjustVolumeCountDelta(-1)
+ dn.UpAdjustActiveVolumeCountDelta(-1)
+ }
+ dn.Unlock()
+ for _, v := range newlVolumes {
+ dn.AddOrUpdateVolume(v)
+ }
+ return
+}
+
func (dn *DataNode) GetVolumes() (ret []storage.VolumeInfo) {
dn.RLock()
for _, v := range dn.volumes {
@@ -83,7 +106,7 @@ func (dn *DataNode) GetVolumes() (ret []storage.VolumeInfo) {
return ret
}
-func (dn *DataNode) GetVolumesById(id storage.VolumeId) (storage.VolumeInfo, error) {
+func (dn *DataNode) GetVolumesById(id needle.VolumeId) (storage.VolumeInfo, error) {
dn.RLock()
defer dn.RUnlock()
vInfo, ok := dn.volumes[id]
@@ -123,8 +146,26 @@ func (dn *DataNode) ToMap() interface{} {
ret := make(map[string]interface{})
ret["Url"] = dn.Url()
ret["Volumes"] = dn.GetVolumeCount()
+ ret["EcShards"] = dn.GetEcShardCount()
ret["Max"] = dn.GetMaxVolumeCount()
ret["Free"] = dn.FreeSpace()
ret["PublicUrl"] = dn.PublicUrl
return ret
}
+
+func (dn *DataNode) ToDataNodeInfo() *master_pb.DataNodeInfo {
+ m := &master_pb.DataNodeInfo{
+ Id: string(dn.Id()),
+ VolumeCount: uint64(dn.GetVolumeCount()),
+ MaxVolumeCount: uint64(dn.GetMaxVolumeCount()),
+ FreeVolumeCount: uint64(dn.FreeSpace()),
+ ActiveVolumeCount: uint64(dn.GetActiveVolumeCount()),
+ }
+ for _, v := range dn.GetVolumes() {
+ m.VolumeInfos = append(m.VolumeInfos, v.ToVolumeInformationMessage())
+ }
+ for _, ecv := range dn.GetEcShards() {
+ m.EcShardInfos = append(m.EcShardInfos, ecv.ToVolumeEcShardInformationMessage())
+ }
+ return m
+}
diff --git a/weed/topology/data_node_ec.go b/weed/topology/data_node_ec.go
new file mode 100644
index 000000000..75c8784fe
--- /dev/null
+++ b/weed/topology/data_node_ec.go
@@ -0,0 +1,135 @@
+package topology
+
+import (
+ "github.com/chrislusf/seaweedfs/weed/storage/erasure_coding"
+ "github.com/chrislusf/seaweedfs/weed/storage/needle"
+)
+
+func (dn *DataNode) GetEcShards() (ret []*erasure_coding.EcVolumeInfo) {
+ dn.RLock()
+ for _, ecVolumeInfo := range dn.ecShards {
+ ret = append(ret, ecVolumeInfo)
+ }
+ dn.RUnlock()
+ return ret
+}
+
+func (dn *DataNode) UpdateEcShards(actualShards []*erasure_coding.EcVolumeInfo) (newShards, deletedShards []*erasure_coding.EcVolumeInfo) {
+ // prepare the new ec shard map
+ actualEcShardMap := make(map[needle.VolumeId]*erasure_coding.EcVolumeInfo)
+ for _, ecShards := range actualShards {
+ actualEcShardMap[ecShards.VolumeId] = ecShards
+ }
+
+ // found out the newShards and deletedShards
+ var newShardCount, deletedShardCount int
+ dn.ecShardsLock.RLock()
+ for vid, ecShards := range dn.ecShards {
+ if actualEcShards, ok := actualEcShardMap[vid]; !ok {
+ // dn registered ec shards not found in the new set of ec shards
+ deletedShards = append(deletedShards, ecShards)
+ deletedShardCount += ecShards.ShardIdCount()
+ } else {
+ // found, but maybe the actual shard could be missing
+ a := actualEcShards.Minus(ecShards)
+ if a.ShardIdCount() > 0 {
+ newShards = append(newShards, a)
+ newShardCount += a.ShardIdCount()
+ }
+ d := ecShards.Minus(actualEcShards)
+ if d.ShardIdCount() > 0 {
+ deletedShards = append(deletedShards, d)
+ deletedShardCount += d.ShardIdCount()
+ }
+ }
+ }
+ for _, ecShards := range actualShards {
+ if _, found := dn.ecShards[ecShards.VolumeId]; !found {
+ newShards = append(newShards, ecShards)
+ newShardCount += ecShards.ShardIdCount()
+ }
+ }
+ dn.ecShardsLock.RUnlock()
+
+ if len(newShards) > 0 || len(deletedShards) > 0 {
+ // if changed, set to the new ec shard map
+ dn.ecShardsLock.Lock()
+ dn.ecShards = actualEcShardMap
+ dn.UpAdjustEcShardCountDelta(int64(newShardCount - deletedShardCount))
+ dn.ecShardsLock.Unlock()
+ }
+
+ return
+}
+
+func (dn *DataNode) DeltaUpdateEcShards(newShards, deletedShards []*erasure_coding.EcVolumeInfo) {
+
+ for _, newShard := range newShards {
+ dn.AddOrUpdateEcShard(newShard)
+ }
+
+ for _, deletedShard := range deletedShards {
+ dn.DeleteEcShard(deletedShard)
+ }
+
+}
+
+func (dn *DataNode) AddOrUpdateEcShard(s *erasure_coding.EcVolumeInfo) {
+ dn.ecShardsLock.Lock()
+ defer dn.ecShardsLock.Unlock()
+
+ delta := 0
+ if existing, ok := dn.ecShards[s.VolumeId]; !ok {
+ dn.ecShards[s.VolumeId] = s
+ delta = s.ShardBits.ShardIdCount()
+ } else {
+ oldCount := existing.ShardBits.ShardIdCount()
+ existing.ShardBits = existing.ShardBits.Plus(s.ShardBits)
+ delta = existing.ShardBits.ShardIdCount() - oldCount
+ }
+
+ dn.UpAdjustEcShardCountDelta(int64(delta))
+
+}
+
+func (dn *DataNode) DeleteEcShard(s *erasure_coding.EcVolumeInfo) {
+ dn.ecShardsLock.Lock()
+ defer dn.ecShardsLock.Unlock()
+
+ if existing, ok := dn.ecShards[s.VolumeId]; ok {
+ oldCount := existing.ShardBits.ShardIdCount()
+ existing.ShardBits = existing.ShardBits.Minus(s.ShardBits)
+ delta := existing.ShardBits.ShardIdCount() - oldCount
+ dn.UpAdjustEcShardCountDelta(int64(delta))
+ if existing.ShardBits.ShardIdCount() == 0 {
+ delete(dn.ecShards, s.VolumeId)
+ }
+ }
+
+}
+
+func (dn *DataNode) HasVolumesById(id needle.VolumeId) (hasVolumeId bool) {
+
+ // check whether normal volumes has this volume id
+ dn.RLock()
+ _, ok := dn.volumes[id]
+ if ok {
+ hasVolumeId = true
+ }
+ dn.RUnlock()
+
+ if hasVolumeId {
+ return
+ }
+
+ // check whether ec shards has this volume id
+ dn.ecShardsLock.RLock()
+ _, ok = dn.ecShards[id]
+ if ok {
+ hasVolumeId = true
+ }
+ dn.ecShardsLock.RUnlock()
+
+ return
+
+}
diff --git a/weed/topology/node.go b/weed/topology/node.go
index b7d2f79ec..b2808f589 100644
--- a/weed/topology/node.go
+++ b/weed/topology/node.go
@@ -5,26 +5,30 @@ import (
"math/rand"
"strings"
"sync"
+ "sync/atomic"
"github.com/chrislusf/seaweedfs/weed/glog"
- "github.com/chrislusf/seaweedfs/weed/storage"
+ "github.com/chrislusf/seaweedfs/weed/storage/erasure_coding"
+ "github.com/chrislusf/seaweedfs/weed/storage/needle"
)
type NodeId string
type Node interface {
Id() NodeId
String() string
- FreeSpace() int
- ReserveOneVolume(r int) (*DataNode, error)
- UpAdjustMaxVolumeCountDelta(maxVolumeCountDelta int)
- UpAdjustVolumeCountDelta(volumeCountDelta int)
- UpAdjustActiveVolumeCountDelta(activeVolumeCountDelta int)
- UpAdjustMaxVolumeId(vid storage.VolumeId)
+ FreeSpace() int64
+ ReserveOneVolume(r int64) (*DataNode, error)
+ UpAdjustMaxVolumeCountDelta(maxVolumeCountDelta int64)
+ UpAdjustVolumeCountDelta(volumeCountDelta int64)
+ UpAdjustEcShardCountDelta(ecShardCountDelta int64)
+ UpAdjustActiveVolumeCountDelta(activeVolumeCountDelta int64)
+ UpAdjustMaxVolumeId(vid needle.VolumeId)
- GetVolumeCount() int
- GetActiveVolumeCount() int
- GetMaxVolumeCount() int
- GetMaxVolumeId() storage.VolumeId
+ GetVolumeCount() int64
+ GetEcShardCount() int64
+ GetActiveVolumeCount() int64
+ GetMaxVolumeCount() int64
+ GetMaxVolumeId() needle.VolumeId
SetParent(Node)
LinkChildNode(node Node)
UnlinkChildNode(nodeId NodeId)
@@ -39,14 +43,15 @@ type Node interface {
GetValue() interface{} //get reference to the topology,dc,rack,datanode
}
type NodeImpl struct {
+ volumeCount int64
+ activeVolumeCount int64
+ ecShardCount int64
+ maxVolumeCount int64
id NodeId
- volumeCount int
- activeVolumeCount int
- maxVolumeCount int
parent Node
sync.RWMutex // lock children
children map[NodeId]Node
- maxVolumeId storage.VolumeId
+ maxVolumeId needle.VolumeId
//for rack, data center, topology
nodeType string
@@ -126,7 +131,10 @@ func (n *NodeImpl) String() string {
func (n *NodeImpl) Id() NodeId {
return n.id
}
-func (n *NodeImpl) FreeSpace() int {
+func (n *NodeImpl) FreeSpace() int64 {
+ if n.ecShardCount > 0 {
+ return n.maxVolumeCount - n.volumeCount - n.ecShardCount/erasure_coding.DataShardsCount - 1
+ }
return n.maxVolumeCount - n.volumeCount
}
func (n *NodeImpl) SetParent(node Node) {
@@ -146,7 +154,7 @@ func (n *NodeImpl) Parent() Node {
func (n *NodeImpl) GetValue() interface{} {
return n.value
}
-func (n *NodeImpl) ReserveOneVolume(r int) (assignedNode *DataNode, err error) {
+func (n *NodeImpl) ReserveOneVolume(r int64) (assignedNode *DataNode, err error) {
n.RLock()
defer n.RUnlock()
for _, node := range n.children {
@@ -171,25 +179,31 @@ func (n *NodeImpl) ReserveOneVolume(r int) (assignedNode *DataNode, err error) {
return nil, errors.New("No free volume slot found!")
}
-func (n *NodeImpl) UpAdjustMaxVolumeCountDelta(maxVolumeCountDelta int) { //can be negative
- n.maxVolumeCount += maxVolumeCountDelta
+func (n *NodeImpl) UpAdjustMaxVolumeCountDelta(maxVolumeCountDelta int64) { //can be negative
+ atomic.AddInt64(&n.maxVolumeCount, maxVolumeCountDelta)
if n.parent != nil {
n.parent.UpAdjustMaxVolumeCountDelta(maxVolumeCountDelta)
}
}
-func (n *NodeImpl) UpAdjustVolumeCountDelta(volumeCountDelta int) { //can be negative
- n.volumeCount += volumeCountDelta
+func (n *NodeImpl) UpAdjustVolumeCountDelta(volumeCountDelta int64) { //can be negative
+ atomic.AddInt64(&n.volumeCount, volumeCountDelta)
if n.parent != nil {
n.parent.UpAdjustVolumeCountDelta(volumeCountDelta)
}
}
-func (n *NodeImpl) UpAdjustActiveVolumeCountDelta(activeVolumeCountDelta int) { //can be negative
- n.activeVolumeCount += activeVolumeCountDelta
+func (n *NodeImpl) UpAdjustEcShardCountDelta(ecShardCountDelta int64) { //can be negative
+ atomic.AddInt64(&n.ecShardCount, ecShardCountDelta)
+ if n.parent != nil {
+ n.parent.UpAdjustEcShardCountDelta(ecShardCountDelta)
+ }
+}
+func (n *NodeImpl) UpAdjustActiveVolumeCountDelta(activeVolumeCountDelta int64) { //can be negative
+ atomic.AddInt64(&n.activeVolumeCount, activeVolumeCountDelta)
if n.parent != nil {
n.parent.UpAdjustActiveVolumeCountDelta(activeVolumeCountDelta)
}
}
-func (n *NodeImpl) UpAdjustMaxVolumeId(vid storage.VolumeId) { //can be negative
+func (n *NodeImpl) UpAdjustMaxVolumeId(vid needle.VolumeId) { //can be negative
if n.maxVolumeId < vid {
n.maxVolumeId = vid
if n.parent != nil {
@@ -197,16 +211,19 @@ func (n *NodeImpl) UpAdjustMaxVolumeId(vid storage.VolumeId) { //can be negative
}
}
}
-func (n *NodeImpl) GetMaxVolumeId() storage.VolumeId {
+func (n *NodeImpl) GetMaxVolumeId() needle.VolumeId {
return n.maxVolumeId
}
-func (n *NodeImpl) GetVolumeCount() int {
+func (n *NodeImpl) GetVolumeCount() int64 {
return n.volumeCount
}
-func (n *NodeImpl) GetActiveVolumeCount() int {
+func (n *NodeImpl) GetEcShardCount() int64 {
+ return n.ecShardCount
+}
+func (n *NodeImpl) GetActiveVolumeCount() int64 {
return n.activeVolumeCount
}
-func (n *NodeImpl) GetMaxVolumeCount() int {
+func (n *NodeImpl) GetMaxVolumeCount() int64 {
return n.maxVolumeCount
}
@@ -218,6 +235,7 @@ func (n *NodeImpl) LinkChildNode(node Node) {
n.UpAdjustMaxVolumeCountDelta(node.GetMaxVolumeCount())
n.UpAdjustMaxVolumeId(node.GetMaxVolumeId())
n.UpAdjustVolumeCountDelta(node.GetVolumeCount())
+ n.UpAdjustEcShardCountDelta(node.GetEcShardCount())
n.UpAdjustActiveVolumeCountDelta(node.GetActiveVolumeCount())
node.SetParent(n)
glog.V(0).Infoln(n, "adds child", node.Id())
@@ -232,6 +250,7 @@ func (n *NodeImpl) UnlinkChildNode(nodeId NodeId) {
node.SetParent(nil)
delete(n.children, node.Id())
n.UpAdjustVolumeCountDelta(-node.GetVolumeCount())
+ n.UpAdjustEcShardCountDelta(-node.GetEcShardCount())
n.UpAdjustActiveVolumeCountDelta(-node.GetActiveVolumeCount())
n.UpAdjustMaxVolumeCountDelta(-node.GetMaxVolumeCount())
glog.V(0).Infoln(n, "removes", node.Id())
diff --git a/weed/topology/rack.go b/weed/topology/rack.go
index a48d64323..932c1a804 100644
--- a/weed/topology/rack.go
+++ b/weed/topology/rack.go
@@ -1,6 +1,7 @@
package topology
import (
+ "github.com/chrislusf/seaweedfs/weed/pb/master_pb"
"strconv"
"time"
)
@@ -27,7 +28,7 @@ func (r *Rack) FindDataNode(ip string, port int) *DataNode {
}
return nil
}
-func (r *Rack) GetOrCreateDataNode(ip string, port int, publicUrl string, maxVolumeCount int) *DataNode {
+func (r *Rack) GetOrCreateDataNode(ip string, port int, publicUrl string, maxVolumeCount int64) *DataNode {
for _, c := range r.Children() {
dn := c.(*DataNode)
if dn.MatchLocation(ip, port) {
@@ -58,3 +59,18 @@ func (r *Rack) ToMap() interface{} {
m["DataNodes"] = dns
return m
}
+
+func (r *Rack) ToRackInfo() *master_pb.RackInfo {
+ m := &master_pb.RackInfo{
+ Id: string(r.Id()),
+ VolumeCount: uint64(r.GetVolumeCount()),
+ MaxVolumeCount: uint64(r.GetMaxVolumeCount()),
+ FreeVolumeCount: uint64(r.FreeSpace()),
+ ActiveVolumeCount: uint64(r.GetActiveVolumeCount()),
+ }
+ for _, c := range r.Children() {
+ dn := c.(*DataNode)
+ m.DataNodeInfos = append(m.DataNodeInfos, dn.ToDataNodeInfo())
+ }
+ return m
+}
diff --git a/weed/topology/store_replicate.go b/weed/topology/store_replicate.go
index c73fb706a..d4076d548 100644
--- a/weed/topology/store_replicate.go
+++ b/weed/topology/store_replicate.go
@@ -14,24 +14,24 @@ import (
"github.com/chrislusf/seaweedfs/weed/operation"
"github.com/chrislusf/seaweedfs/weed/security"
"github.com/chrislusf/seaweedfs/weed/storage"
+ "github.com/chrislusf/seaweedfs/weed/storage/needle"
"github.com/chrislusf/seaweedfs/weed/util"
)
func ReplicatedWrite(masterNode string, s *storage.Store,
- volumeId storage.VolumeId, needle *storage.Needle,
- r *http.Request) (size uint32, errorStatus string) {
+ volumeId needle.VolumeId, n *needle.Needle,
+ r *http.Request) (size uint32, isUnchanged bool, err error) {
//check JWT
jwt := security.GetJwt(r)
- ret, err := s.Write(volumeId, needle)
- needToReplicate := !s.HasVolume(volumeId)
+ size, isUnchanged, err = s.Write(volumeId, n)
if err != nil {
- errorStatus = "Failed to write to local disk (" + err.Error() + ")"
- size = ret
+ err = fmt.Errorf("failed to write to local disk: %v", err)
return
}
+ needToReplicate := !s.HasVolume(volumeId)
needToReplicate = needToReplicate || s.GetVolume(volumeId).NeedToReplicate()
if !needToReplicate {
needToReplicate = s.GetVolume(volumeId).NeedToReplicate()
@@ -47,43 +47,43 @@ func ReplicatedWrite(masterNode string, s *storage.Store,
}
q := url.Values{
"type": {"replicate"},
+ "ttl": {n.Ttl.String()},
}
- if needle.LastModified > 0 {
- q.Set("ts", strconv.FormatUint(needle.LastModified, 10))
+ if n.LastModified > 0 {
+ q.Set("ts", strconv.FormatUint(n.LastModified, 10))
}
- if needle.IsChunkedManifest() {
+ if n.IsChunkedManifest() {
q.Set("cm", "true")
}
u.RawQuery = q.Encode()
pairMap := make(map[string]string)
- if needle.HasPairs() {
+ if n.HasPairs() {
tmpMap := make(map[string]string)
- err := json.Unmarshal(needle.Pairs, &tmpMap)
+ err := json.Unmarshal(n.Pairs, &tmpMap)
if err != nil {
glog.V(0).Infoln("Unmarshal pairs error:", err)
}
for k, v := range tmpMap {
- pairMap[storage.PairNamePrefix+k] = v
+ pairMap[needle.PairNamePrefix+k] = v
}
}
_, err := operation.Upload(u.String(),
- string(needle.Name), bytes.NewReader(needle.Data), needle.IsGzipped(), string(needle.Mime),
+ string(n.Name), bytes.NewReader(n.Data), n.IsGzipped(), string(n.Mime),
pairMap, jwt)
return err
}); err != nil {
- ret = 0
- errorStatus = fmt.Sprintf("Failed to write to replicas for volume %d: %v", volumeId, err)
+ size = 0
+ err = fmt.Errorf("failed to write to replicas for volume %d: %v", volumeId, err)
}
}
}
- size = ret
return
}
func ReplicatedDelete(masterNode string, store *storage.Store,
- volumeId storage.VolumeId, n *storage.Needle,
+ volumeId needle.VolumeId, n *needle.Needle,
r *http.Request) (uint32, error) {
//check JWT
@@ -102,7 +102,7 @@ func ReplicatedDelete(masterNode string, store *storage.Store,
if needToReplicate { //send to other replica locations
if r.FormValue("type") != "replicate" {
if err = distributedOperation(masterNode, store, volumeId, func(location operation.Location) error {
- return util.Delete("http://"+location.Url+r.URL.Path+"?type=replicate", jwt)
+ return util.Delete("http://"+location.Url+r.URL.Path+"?type=replicate", string(jwt))
}); err != nil {
ret = 0
}
@@ -131,7 +131,7 @@ type RemoteResult struct {
Error error
}
-func distributedOperation(masterNode string, store *storage.Store, volumeId storage.VolumeId, op func(location operation.Location) error) error {
+func distributedOperation(masterNode string, store *storage.Store, volumeId needle.VolumeId, op func(location operation.Location) error) error {
if lookupResult, lookupErr := operation.Lookup(masterNode, volumeId.String()); lookupErr == nil {
length := 0
selfUrl := (store.Ip + ":" + strconv.Itoa(store.Port))
diff --git a/weed/topology/topology.go b/weed/topology/topology.go
index 4242bfa05..aa01190c9 100644
--- a/weed/topology/topology.go
+++ b/weed/topology/topology.go
@@ -2,20 +2,25 @@ package topology
import (
"errors"
+ "fmt"
"math/rand"
+ "sync"
"github.com/chrislusf/raft"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb/master_pb"
"github.com/chrislusf/seaweedfs/weed/sequence"
"github.com/chrislusf/seaweedfs/weed/storage"
+ "github.com/chrislusf/seaweedfs/weed/storage/needle"
"github.com/chrislusf/seaweedfs/weed/util"
)
type Topology struct {
NodeImpl
- collectionMap *util.ConcurrentReadMap
+ collectionMap *util.ConcurrentReadMap
+ ecShardMap map[needle.VolumeId]*EcShardLocations
+ ecShardMapLock sync.RWMutex
pulse int64
@@ -37,6 +42,7 @@ func NewTopology(id string, seq sequence.Sequencer, volumeSizeLimit uint64, puls
t.NodeImpl.value = t
t.children = make(map[NodeId]Node)
t.collectionMap = util.NewConcurrentReadMap()
+ t.ecShardMap = make(map[needle.VolumeId]*EcShardLocations)
t.pulse = int64(pulse)
t.volumeSizeLimit = volumeSizeLimit
@@ -50,8 +56,8 @@ func NewTopology(id string, seq sequence.Sequencer, volumeSizeLimit uint64, puls
}
func (t *Topology) IsLeader() bool {
- if leader, e := t.Leader(); e == nil {
- return leader == t.RaftServer.Name()
+ if t.RaftServer != nil {
+ return t.RaftServer.State() == raft.Leader
}
return false
}
@@ -72,7 +78,7 @@ func (t *Topology) Leader() (string, error) {
return l, nil
}
-func (t *Topology) Lookup(collection string, vid storage.VolumeId) []*DataNode {
+func (t *Topology) Lookup(collection string, vid needle.VolumeId) (dataNodes []*DataNode) {
//maybe an issue if lots of collections?
if collection == "" {
for _, c := range t.collectionMap.Items() {
@@ -85,14 +91,24 @@ func (t *Topology) Lookup(collection string, vid storage.VolumeId) []*DataNode {
return c.(*Collection).Lookup(vid)
}
}
+
+ if locations, found := t.LookupEcShards(vid); found {
+ for _, loc := range locations.Locations {
+ dataNodes = append(dataNodes, loc...)
+ }
+ return dataNodes
+ }
+
return nil
}
-func (t *Topology) NextVolumeId() storage.VolumeId {
+func (t *Topology) NextVolumeId() (needle.VolumeId, error) {
vid := t.GetMaxVolumeId()
next := vid.Next()
- go t.RaftServer.Do(NewMaxVolumeIdCommand(next))
- return next
+ if _, err := t.RaftServer.Do(NewMaxVolumeIdCommand(next)); err != nil {
+ return 0, err
+ }
+ return next, nil
}
func (t *Topology) HasWritableVolume(option *VolumeGrowOption) bool {
@@ -102,19 +118,43 @@ func (t *Topology) HasWritableVolume(option *VolumeGrowOption) bool {
func (t *Topology) PickForWrite(count uint64, option *VolumeGrowOption) (string, uint64, *DataNode, error) {
vid, count, datanodes, err := t.GetVolumeLayout(option.Collection, option.ReplicaPlacement, option.Ttl).PickForWrite(count, option)
- if err != nil || datanodes.Length() == 0 {
- return "", 0, nil, errors.New("No writable volumes available!")
+ if err != nil {
+ return "", 0, nil, fmt.Errorf("failed to find writable volumes for collectio:%s replication:%s ttl:%s error: %v", option.Collection, option.ReplicaPlacement.String(), option.Ttl.String(), err)
+ }
+ if datanodes.Length() == 0 {
+ return "", 0, nil, fmt.Errorf("no writable volumes available for for collectio:%s replication:%s ttl:%s", option.Collection, option.ReplicaPlacement.String(), option.Ttl.String())
}
fileId, count := t.Sequence.NextFileId(count)
- return storage.NewFileId(*vid, fileId, rand.Uint32()).String(), count, datanodes.Head(), nil
+ return needle.NewFileId(*vid, fileId, rand.Uint32()).String(), count, datanodes.Head(), nil
}
-func (t *Topology) GetVolumeLayout(collectionName string, rp *storage.ReplicaPlacement, ttl *storage.TTL) *VolumeLayout {
+func (t *Topology) GetVolumeLayout(collectionName string, rp *storage.ReplicaPlacement, ttl *needle.TTL) *VolumeLayout {
return t.collectionMap.Get(collectionName, func() interface{} {
return NewCollection(collectionName, t.volumeSizeLimit)
}).(*Collection).GetOrCreateVolumeLayout(rp, ttl)
}
+func (t *Topology) ListCollections(includeNormalVolumes, includeEcVolumes bool) (ret []string) {
+
+ mapOfCollections := make(map[string]bool)
+ for _, c := range t.collectionMap.Items() {
+ mapOfCollections[c.(*Collection).Name] = true
+ }
+
+ if includeEcVolumes {
+ t.ecShardMapLock.RLock()
+ for _, ecVolumeLocation := range t.ecShardMap {
+ mapOfCollections[ecVolumeLocation.Collection] = true
+ }
+ t.ecShardMapLock.RUnlock()
+ }
+
+ for k, _ := range mapOfCollections {
+ ret = append(ret, k)
+ }
+ return ret
+}
+
func (t *Topology) FindCollection(collectionName string) (*Collection, bool) {
c, hasCollection := t.collectionMap.Find(collectionName)
if !hasCollection {
@@ -152,6 +192,7 @@ func (t *Topology) GetOrCreateDataCenter(dcName string) *DataCenter {
}
func (t *Topology) SyncDataNodeRegistration(volumes []*master_pb.VolumeInformationMessage, dn *DataNode) (newVolumes, deletedVolumes []storage.VolumeInfo) {
+ // convert into in memory struct storage.VolumeInfo
var volumeInfos []storage.VolumeInfo
for _, v := range volumes {
if vi, err := storage.NewVolumeInfo(v); err == nil {
@@ -160,8 +201,9 @@ func (t *Topology) SyncDataNodeRegistration(volumes []*master_pb.VolumeInformati
glog.V(0).Infof("Fail to convert joined volume information: %v", err)
}
}
+ // find out the delta volumes
newVolumes, deletedVolumes = dn.UpdateVolumes(volumeInfos)
- for _, v := range volumeInfos {
+ for _, v := range newVolumes {
t.RegisterVolumeLayout(v, dn)
}
for _, v := range deletedVolumes {
@@ -169,3 +211,33 @@ func (t *Topology) SyncDataNodeRegistration(volumes []*master_pb.VolumeInformati
}
return
}
+
+func (t *Topology) IncrementalSyncDataNodeRegistration(newVolumes, deletedVolumes []*master_pb.VolumeShortInformationMessage, dn *DataNode) {
+ var newVis, oldVis []storage.VolumeInfo
+ for _, v := range newVolumes {
+ vi, err := storage.NewVolumeInfoFromShort(v)
+ if err != nil {
+ glog.V(0).Infof("NewVolumeInfoFromShort %v: %v", v, err)
+ continue
+ }
+ newVis = append(newVis, vi)
+ }
+ for _, v := range deletedVolumes {
+ vi, err := storage.NewVolumeInfoFromShort(v)
+ if err != nil {
+ glog.V(0).Infof("NewVolumeInfoFromShort %v: %v", v, err)
+ continue
+ }
+ oldVis = append(oldVis, vi)
+ }
+ dn.DeltaUpdateVolumes(newVis, oldVis)
+
+ for _, vi := range newVis {
+ t.RegisterVolumeLayout(vi, dn)
+ }
+ for _, vi := range oldVis {
+ t.UnRegisterVolumeLayout(vi, dn)
+ }
+
+ return
+}
diff --git a/weed/topology/topology_ec.go b/weed/topology/topology_ec.go
new file mode 100644
index 000000000..93b39bb5d
--- /dev/null
+++ b/weed/topology/topology_ec.go
@@ -0,0 +1,173 @@
+package topology
+
+import (
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/pb/master_pb"
+ "github.com/chrislusf/seaweedfs/weed/storage/erasure_coding"
+ "github.com/chrislusf/seaweedfs/weed/storage/needle"
+)
+
+type EcShardLocations struct {
+ Collection string
+ Locations [erasure_coding.TotalShardsCount][]*DataNode
+}
+
+func (t *Topology) SyncDataNodeEcShards(shardInfos []*master_pb.VolumeEcShardInformationMessage, dn *DataNode) (newShards, deletedShards []*erasure_coding.EcVolumeInfo) {
+ // convert into in memory struct storage.VolumeInfo
+ var shards []*erasure_coding.EcVolumeInfo
+ for _, shardInfo := range shardInfos {
+ shards = append(shards,
+ erasure_coding.NewEcVolumeInfo(
+ shardInfo.Collection,
+ needle.VolumeId(shardInfo.Id),
+ erasure_coding.ShardBits(shardInfo.EcIndexBits)))
+ }
+ // find out the delta volumes
+ newShards, deletedShards = dn.UpdateEcShards(shards)
+ for _, v := range newShards {
+ t.RegisterEcShards(v, dn)
+ }
+ for _, v := range deletedShards {
+ t.UnRegisterEcShards(v, dn)
+ }
+ return
+}
+
+func (t *Topology) IncrementalSyncDataNodeEcShards(newEcShards, deletedEcShards []*master_pb.VolumeEcShardInformationMessage, dn *DataNode) {
+ // convert into in memory struct storage.VolumeInfo
+ var newShards, deletedShards []*erasure_coding.EcVolumeInfo
+ for _, shardInfo := range newEcShards {
+ newShards = append(newShards,
+ erasure_coding.NewEcVolumeInfo(
+ shardInfo.Collection,
+ needle.VolumeId(shardInfo.Id),
+ erasure_coding.ShardBits(shardInfo.EcIndexBits)))
+ }
+ for _, shardInfo := range deletedEcShards {
+ deletedShards = append(deletedShards,
+ erasure_coding.NewEcVolumeInfo(
+ shardInfo.Collection,
+ needle.VolumeId(shardInfo.Id),
+ erasure_coding.ShardBits(shardInfo.EcIndexBits)))
+ }
+
+ dn.DeltaUpdateEcShards(newShards, deletedShards)
+
+ for _, v := range newShards {
+ t.RegisterEcShards(v, dn)
+ }
+ for _, v := range deletedShards {
+ t.UnRegisterEcShards(v, dn)
+ }
+ return
+}
+
+func NewEcShardLocations(collection string) *EcShardLocations {
+ return &EcShardLocations{
+ Collection: collection,
+ }
+}
+
+func (loc *EcShardLocations) AddShard(shardId erasure_coding.ShardId, dn *DataNode) (added bool) {
+ dataNodes := loc.Locations[shardId]
+ for _, n := range dataNodes {
+ if n.Id() == dn.Id() {
+ return false
+ }
+ }
+ loc.Locations[shardId] = append(dataNodes, dn)
+ return true
+}
+
+func (loc *EcShardLocations) DeleteShard(shardId erasure_coding.ShardId, dn *DataNode) (deleted bool) {
+ dataNodes := loc.Locations[shardId]
+ foundIndex := -1
+ for index, n := range dataNodes {
+ if n.Id() == dn.Id() {
+ foundIndex = index
+ }
+ }
+ if foundIndex < 0 {
+ return false
+ }
+ loc.Locations[shardId] = append(dataNodes[:foundIndex], dataNodes[foundIndex+1:]...)
+ return true
+}
+
+func (t *Topology) RegisterEcShards(ecShardInfos *erasure_coding.EcVolumeInfo, dn *DataNode) {
+
+ t.ecShardMapLock.Lock()
+ defer t.ecShardMapLock.Unlock()
+
+ locations, found := t.ecShardMap[ecShardInfos.VolumeId]
+ if !found {
+ locations = NewEcShardLocations(ecShardInfos.Collection)
+ t.ecShardMap[ecShardInfos.VolumeId] = locations
+ }
+ for _, shardId := range ecShardInfos.ShardIds() {
+ locations.AddShard(shardId, dn)
+ }
+}
+
+func (t *Topology) UnRegisterEcShards(ecShardInfos *erasure_coding.EcVolumeInfo, dn *DataNode) {
+ glog.Infof("removing ec shard info:%+v", ecShardInfos)
+ t.ecShardMapLock.Lock()
+ defer t.ecShardMapLock.Unlock()
+
+ locations, found := t.ecShardMap[ecShardInfos.VolumeId]
+ if !found {
+ return
+ }
+ for _, shardId := range ecShardInfos.ShardIds() {
+ locations.DeleteShard(shardId, dn)
+ }
+}
+
+func (t *Topology) LookupEcShards(vid needle.VolumeId) (locations *EcShardLocations, found bool) {
+ t.ecShardMapLock.RLock()
+ defer t.ecShardMapLock.RUnlock()
+
+ locations, found = t.ecShardMap[vid]
+
+ return
+}
+
+func (t *Topology) ListEcServersByCollection(collection string) (dataNodes []string) {
+ t.ecShardMapLock.RLock()
+ defer t.ecShardMapLock.RUnlock()
+
+ dateNodeMap := make(map[string]bool)
+ for _, ecVolumeLocation := range t.ecShardMap {
+ if ecVolumeLocation.Collection == collection {
+ for _, locations := range ecVolumeLocation.Locations {
+ for _, loc := range locations {
+ dateNodeMap[string(loc.Id())] = true
+ }
+ }
+ }
+ }
+
+ for k, _ := range dateNodeMap {
+ dataNodes = append(dataNodes, k)
+ }
+
+ return
+}
+
+func (t *Topology) DeleteEcCollection(collection string) {
+ t.ecShardMapLock.Lock()
+ defer t.ecShardMapLock.Unlock()
+
+ var vids []needle.VolumeId
+ for vid, ecVolumeLocation := range t.ecShardMap {
+ if ecVolumeLocation.Collection == collection {
+ vids = append(vids, vid)
+ }
+ }
+
+ for _, vid := range vids {
+ delete(t.ecShardMap, vid)
+ }
+
+ return
+}
diff --git a/weed/topology/topology_event_handling.go b/weed/topology/topology_event_handling.go
index a301103eb..041351492 100644
--- a/weed/topology/topology_event_handling.go
+++ b/weed/topology/topology_event_handling.go
@@ -1,6 +1,7 @@
package topology
import (
+ "google.golang.org/grpc"
"math/rand"
"time"
@@ -8,7 +9,7 @@ import (
"github.com/chrislusf/seaweedfs/weed/storage"
)
-func (t *Topology) StartRefreshWritableVolumes(garbageThreshold float64, preallocate int64) {
+func (t *Topology) StartRefreshWritableVolumes(grpcDialOption grpc.DialOption, garbageThreshold float64, preallocate int64) {
go func() {
for {
if t.IsLeader() {
@@ -22,7 +23,7 @@ func (t *Topology) StartRefreshWritableVolumes(garbageThreshold float64, preallo
c := time.Tick(15 * time.Minute)
for _ = range c {
if t.IsLeader() {
- t.Vacuum(garbageThreshold, preallocate)
+ t.Vacuum(grpcDialOption, garbageThreshold, preallocate)
}
}
}(garbageThreshold)
diff --git a/weed/topology/topology_map.go b/weed/topology/topology_map.go
index 769ba0e2a..37a88c9ed 100644
--- a/weed/topology/topology_map.go
+++ b/weed/topology/topology_map.go
@@ -68,9 +68,27 @@ func (t *Topology) ToVolumeLocations() (volumeLocations []*master_pb.VolumeLocat
for _, v := range dn.GetVolumes() {
volumeLocation.NewVids = append(volumeLocation.NewVids, uint32(v.Id))
}
+ for _, s := range dn.GetEcShards() {
+ volumeLocation.NewVids = append(volumeLocation.NewVids, uint32(s.VolumeId))
+ }
volumeLocations = append(volumeLocations, volumeLocation)
}
}
}
return
}
+
+func (t *Topology) ToTopologyInfo() *master_pb.TopologyInfo {
+ m := &master_pb.TopologyInfo{
+ Id: string(t.Id()),
+ VolumeCount: uint64(t.GetVolumeCount()),
+ MaxVolumeCount: uint64(t.GetMaxVolumeCount()),
+ FreeVolumeCount: uint64(t.FreeSpace()),
+ ActiveVolumeCount: uint64(t.GetActiveVolumeCount()),
+ }
+ for _, c := range t.Children() {
+ dc := c.(*DataCenter)
+ m.DataCenterInfos = append(m.DataCenterInfos, dc.ToDataCenterInfo())
+ }
+ return m
+}
diff --git a/weed/topology/topology_test.go b/weed/topology/topology_test.go
index 07dc9c67b..8f79ad684 100644
--- a/weed/topology/topology_test.go
+++ b/weed/topology/topology_test.go
@@ -4,6 +4,8 @@ import (
"github.com/chrislusf/seaweedfs/weed/pb/master_pb"
"github.com/chrislusf/seaweedfs/weed/sequence"
"github.com/chrislusf/seaweedfs/weed/storage"
+ "github.com/chrislusf/seaweedfs/weed/storage/needle"
+
"testing"
)
@@ -39,7 +41,7 @@ func TestHandlingVolumeServerHeartbeat(t *testing.T) {
DeletedByteCount: 34524,
ReadOnly: false,
ReplicaPlacement: uint32(0),
- Version: uint32(1),
+ Version: uint32(needle.CurrentVersion),
Ttl: 0,
}
volumeMessages = append(volumeMessages, volumeMessage)
@@ -47,8 +49,8 @@ func TestHandlingVolumeServerHeartbeat(t *testing.T) {
topo.SyncDataNodeRegistration(volumeMessages, dn)
- assert(t, "activeVolumeCount1", topo.activeVolumeCount, volumeCount)
- assert(t, "volumeCount", topo.volumeCount, volumeCount)
+ assert(t, "activeVolumeCount1", int(topo.activeVolumeCount), volumeCount)
+ assert(t, "volumeCount", int(topo.volumeCount), volumeCount)
}
{
@@ -64,20 +66,68 @@ func TestHandlingVolumeServerHeartbeat(t *testing.T) {
DeletedByteCount: 345240,
ReadOnly: false,
ReplicaPlacement: uint32(0),
- Version: uint32(1),
+ Version: uint32(needle.CurrentVersion),
Ttl: 0,
}
volumeMessages = append(volumeMessages, volumeMessage)
}
topo.SyncDataNodeRegistration(volumeMessages, dn)
- assert(t, "activeVolumeCount1", topo.activeVolumeCount, volumeCount)
- assert(t, "volumeCount", topo.volumeCount, volumeCount)
+ //rp, _ := storage.NewReplicaPlacementFromString("000")
+ //layout := topo.GetVolumeLayout("", rp, needle.EMPTY_TTL)
+ //assert(t, "writables", len(layout.writables), volumeCount)
+
+ assert(t, "activeVolumeCount1", int(topo.activeVolumeCount), volumeCount)
+ assert(t, "volumeCount", int(topo.volumeCount), volumeCount)
+ }
+
+ {
+ volumeCount := 6
+ newVolumeShortMessage := &master_pb.VolumeShortInformationMessage{
+ Id: uint32(3),
+ Collection: "",
+ ReplicaPlacement: uint32(0),
+ Version: uint32(needle.CurrentVersion),
+ Ttl: 0,
+ }
+ topo.IncrementalSyncDataNodeRegistration(
+ []*master_pb.VolumeShortInformationMessage{newVolumeShortMessage},
+ nil,
+ dn)
+ rp, _ := storage.NewReplicaPlacementFromString("000")
+ layout := topo.GetVolumeLayout("", rp, needle.EMPTY_TTL)
+ assert(t, "writables after repeated add", len(layout.writables), volumeCount)
+
+ assert(t, "activeVolumeCount1", int(topo.activeVolumeCount), volumeCount)
+ assert(t, "volumeCount", int(topo.volumeCount), volumeCount)
+
+ topo.IncrementalSyncDataNodeRegistration(
+ nil,
+ []*master_pb.VolumeShortInformationMessage{newVolumeShortMessage},
+ dn)
+ assert(t, "writables after deletion", len(layout.writables), volumeCount-1)
+ assert(t, "activeVolumeCount1", int(topo.activeVolumeCount), volumeCount-1)
+ assert(t, "volumeCount", int(topo.volumeCount), volumeCount-1)
+
+ topo.IncrementalSyncDataNodeRegistration(
+ []*master_pb.VolumeShortInformationMessage{newVolumeShortMessage},
+ nil,
+ dn)
+
+ for vid, _ := range layout.vid2location {
+ println("after add volume id", vid)
+ }
+ for _, vid := range layout.writables {
+ println("after add writable volume id", vid)
+ }
+
+ assert(t, "writables after add back", len(layout.writables), volumeCount)
+
}
topo.UnRegisterDataNode(dn)
- assert(t, "activeVolumeCount2", topo.activeVolumeCount, 0)
+ assert(t, "activeVolumeCount2", int(topo.activeVolumeCount), 0)
}
@@ -96,20 +146,21 @@ func TestAddRemoveVolume(t *testing.T) {
dn := rack.GetOrCreateDataNode("127.0.0.1", 34534, "127.0.0.1", 25)
v := storage.VolumeInfo{
- Id: storage.VolumeId(1),
+ Id: needle.VolumeId(1),
Size: 100,
Collection: "xcollection",
FileCount: 123,
DeleteCount: 23,
DeletedByteCount: 45,
ReadOnly: false,
- Version: storage.CurrentVersion,
+ Version: needle.CurrentVersion,
ReplicaPlacement: &storage.ReplicaPlacement{},
- Ttl: storage.EMPTY_TTL,
+ Ttl: needle.EMPTY_TTL,
}
dn.UpdateVolumes([]storage.VolumeInfo{v})
topo.RegisterVolumeLayout(v, dn)
+ topo.RegisterVolumeLayout(v, dn)
if _, hasCollection := topo.FindCollection(v.Collection); !hasCollection {
t.Errorf("collection %v should exist", v.Collection)
diff --git a/weed/topology/topology_vacuum.go b/weed/topology/topology_vacuum.go
index d6b09314b..351ff842f 100644
--- a/weed/topology/topology_vacuum.go
+++ b/weed/topology/topology_vacuum.go
@@ -4,22 +4,21 @@ import (
"context"
"time"
+ "github.com/chrislusf/seaweedfs/weed/storage/needle"
+ "google.golang.org/grpc"
+
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/operation"
"github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
- "github.com/chrislusf/seaweedfs/weed/storage"
)
-func batchVacuumVolumeCheck(vl *VolumeLayout, vid storage.VolumeId, locationlist *VolumeLocationList, garbageThreshold float64) bool {
+func batchVacuumVolumeCheck(grpcDialOption grpc.DialOption, vl *VolumeLayout, vid needle.VolumeId, locationlist *VolumeLocationList, garbageThreshold float64) bool {
ch := make(chan bool, locationlist.Length())
for index, dn := range locationlist.list {
- go func(index int, url string, vid storage.VolumeId) {
- err := operation.WithVolumeServerClient(url, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
- ctx, cancel := context.WithTimeout(context.Background(), time.Duration(5*time.Second))
- defer cancel()
-
- resp, err := volumeServerClient.VacuumVolumeCheck(ctx, &volume_server_pb.VacuumVolumeCheckRequest{
- VolumdId: uint32(vid),
+ go func(index int, url string, vid needle.VolumeId) {
+ err := operation.WithVolumeServerClient(url, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
+ resp, err := volumeServerClient.VacuumVolumeCheck(context.Background(), &volume_server_pb.VacuumVolumeCheckRequest{
+ VolumeId: uint32(vid),
})
if err != nil {
ch <- false
@@ -46,15 +45,15 @@ func batchVacuumVolumeCheck(vl *VolumeLayout, vid storage.VolumeId, locationlist
}
return isCheckSuccess
}
-func batchVacuumVolumeCompact(vl *VolumeLayout, vid storage.VolumeId, locationlist *VolumeLocationList, preallocate int64) bool {
+func batchVacuumVolumeCompact(grpcDialOption grpc.DialOption, vl *VolumeLayout, vid needle.VolumeId, locationlist *VolumeLocationList, preallocate int64) bool {
vl.removeFromWritable(vid)
ch := make(chan bool, locationlist.Length())
for index, dn := range locationlist.list {
- go func(index int, url string, vid storage.VolumeId) {
+ go func(index int, url string, vid needle.VolumeId) {
glog.V(0).Infoln(index, "Start vacuuming", vid, "on", url)
- err := operation.WithVolumeServerClient(url, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
+ err := operation.WithVolumeServerClient(url, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
_, err := volumeServerClient.VacuumVolumeCompact(context.Background(), &volume_server_pb.VacuumVolumeCompactRequest{
- VolumdId: uint32(vid),
+ VolumeId: uint32(vid),
})
return err
})
@@ -79,13 +78,13 @@ func batchVacuumVolumeCompact(vl *VolumeLayout, vid storage.VolumeId, locationli
}
return isVacuumSuccess
}
-func batchVacuumVolumeCommit(vl *VolumeLayout, vid storage.VolumeId, locationlist *VolumeLocationList) bool {
+func batchVacuumVolumeCommit(grpcDialOption grpc.DialOption, vl *VolumeLayout, vid needle.VolumeId, locationlist *VolumeLocationList) bool {
isCommitSuccess := true
for _, dn := range locationlist.list {
- glog.V(0).Infoln("Start Commiting vacuum", vid, "on", dn.Url())
- err := operation.WithVolumeServerClient(dn.Url(), func(volumeServerClient volume_server_pb.VolumeServerClient) error {
+ glog.V(0).Infoln("Start Committing vacuum", vid, "on", dn.Url())
+ err := operation.WithVolumeServerClient(dn.Url(), grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
_, err := volumeServerClient.VacuumVolumeCommit(context.Background(), &volume_server_pb.VacuumVolumeCommitRequest{
- VolumdId: uint32(vid),
+ VolumeId: uint32(vid),
})
return err
})
@@ -93,7 +92,7 @@ func batchVacuumVolumeCommit(vl *VolumeLayout, vid storage.VolumeId, locationlis
glog.Errorf("Error when committing vacuum %d on %s: %v", vid, dn.Url(), err)
isCommitSuccess = false
} else {
- glog.V(0).Infof("Complete Commiting vacuum %d on %s", vid, dn.Url())
+ glog.V(0).Infof("Complete Committing vacuum %d on %s", vid, dn.Url())
}
if isCommitSuccess {
vl.SetVolumeAvailable(dn, vid)
@@ -101,12 +100,12 @@ func batchVacuumVolumeCommit(vl *VolumeLayout, vid storage.VolumeId, locationlis
}
return isCommitSuccess
}
-func batchVacuumVolumeCleanup(vl *VolumeLayout, vid storage.VolumeId, locationlist *VolumeLocationList) {
+func batchVacuumVolumeCleanup(grpcDialOption grpc.DialOption, vl *VolumeLayout, vid needle.VolumeId, locationlist *VolumeLocationList) {
for _, dn := range locationlist.list {
glog.V(0).Infoln("Start cleaning up", vid, "on", dn.Url())
- err := operation.WithVolumeServerClient(dn.Url(), func(volumeServerClient volume_server_pb.VolumeServerClient) error {
+ err := operation.WithVolumeServerClient(dn.Url(), grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
_, err := volumeServerClient.VacuumVolumeCleanup(context.Background(), &volume_server_pb.VacuumVolumeCleanupRequest{
- VolumdId: uint32(vid),
+ VolumeId: uint32(vid),
})
return err
})
@@ -118,24 +117,24 @@ func batchVacuumVolumeCleanup(vl *VolumeLayout, vid storage.VolumeId, locationli
}
}
-func (t *Topology) Vacuum(garbageThreshold float64, preallocate int64) int {
+func (t *Topology) Vacuum(grpcDialOption grpc.DialOption, garbageThreshold float64, preallocate int64) int {
glog.V(1).Infof("Start vacuum on demand with threshold: %f", garbageThreshold)
for _, col := range t.collectionMap.Items() {
c := col.(*Collection)
for _, vl := range c.storageType2VolumeLayout.Items() {
if vl != nil {
volumeLayout := vl.(*VolumeLayout)
- vacuumOneVolumeLayout(volumeLayout, c, garbageThreshold, preallocate)
+ vacuumOneVolumeLayout(grpcDialOption, volumeLayout, c, garbageThreshold, preallocate)
}
}
}
return 0
}
-func vacuumOneVolumeLayout(volumeLayout *VolumeLayout, c *Collection, garbageThreshold float64, preallocate int64) {
+func vacuumOneVolumeLayout(grpcDialOption grpc.DialOption, volumeLayout *VolumeLayout, c *Collection, garbageThreshold float64, preallocate int64) {
volumeLayout.accessLock.RLock()
- tmpMap := make(map[storage.VolumeId]*VolumeLocationList)
+ tmpMap := make(map[needle.VolumeId]*VolumeLocationList)
for vid, locationList := range volumeLayout.vid2location {
tmpMap[vid] = locationList
}
@@ -152,11 +151,11 @@ func vacuumOneVolumeLayout(volumeLayout *VolumeLayout, c *Collection, garbageThr
}
glog.V(2).Infof("check vacuum on collection:%s volume:%d", c.Name, vid)
- if batchVacuumVolumeCheck(volumeLayout, vid, locationList, garbageThreshold) {
- if batchVacuumVolumeCompact(volumeLayout, vid, locationList, preallocate) {
- batchVacuumVolumeCommit(volumeLayout, vid, locationList)
+ if batchVacuumVolumeCheck(grpcDialOption, volumeLayout, vid, locationList, garbageThreshold) {
+ if batchVacuumVolumeCompact(grpcDialOption, volumeLayout, vid, locationList, preallocate) {
+ batchVacuumVolumeCommit(grpcDialOption, volumeLayout, vid, locationList)
} else {
- batchVacuumVolumeCleanup(volumeLayout, vid, locationList)
+ batchVacuumVolumeCleanup(grpcDialOption, volumeLayout, vid, locationList)
}
}
}
diff --git a/weed/topology/volume_growth.go b/weed/topology/volume_growth.go
index 9bf013ca6..ff02044a1 100644
--- a/weed/topology/volume_growth.go
+++ b/weed/topology/volume_growth.go
@@ -5,6 +5,9 @@ import (
"math/rand"
"sync"
+ "github.com/chrislusf/seaweedfs/weed/storage/needle"
+ "google.golang.org/grpc"
+
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/storage"
)
@@ -20,7 +23,7 @@ This package is created to resolve these replica placement issues:
type VolumeGrowOption struct {
Collection string
ReplicaPlacement *storage.ReplicaPlacement
- Ttl *storage.TTL
+ Ttl *needle.TTL
Prealloacte int64
DataCenter string
Rack string
@@ -55,19 +58,19 @@ func (vg *VolumeGrowth) findVolumeCount(copyCount int) (count int) {
return
}
-func (vg *VolumeGrowth) AutomaticGrowByType(option *VolumeGrowOption, topo *Topology) (count int, err error) {
- count, err = vg.GrowByCountAndType(vg.findVolumeCount(option.ReplicaPlacement.GetCopyCount()), option, topo)
+func (vg *VolumeGrowth) AutomaticGrowByType(option *VolumeGrowOption, grpcDialOption grpc.DialOption, topo *Topology) (count int, err error) {
+ count, err = vg.GrowByCountAndType(grpcDialOption, vg.findVolumeCount(option.ReplicaPlacement.GetCopyCount()), option, topo)
if count > 0 && count%option.ReplicaPlacement.GetCopyCount() == 0 {
return count, nil
}
return count, err
}
-func (vg *VolumeGrowth) GrowByCountAndType(targetCount int, option *VolumeGrowOption, topo *Topology) (counter int, err error) {
+func (vg *VolumeGrowth) GrowByCountAndType(grpcDialOption grpc.DialOption, targetCount int, option *VolumeGrowOption, topo *Topology) (counter int, err error) {
vg.accessLock.Lock()
defer vg.accessLock.Unlock()
for i := 0; i < targetCount; i++ {
- if c, e := vg.findAndGrow(topo, option); e == nil {
+ if c, e := vg.findAndGrow(grpcDialOption, topo, option); e == nil {
counter += c
} else {
return counter, e
@@ -76,13 +79,16 @@ func (vg *VolumeGrowth) GrowByCountAndType(targetCount int, option *VolumeGrowOp
return
}
-func (vg *VolumeGrowth) findAndGrow(topo *Topology, option *VolumeGrowOption) (int, error) {
+func (vg *VolumeGrowth) findAndGrow(grpcDialOption grpc.DialOption, topo *Topology, option *VolumeGrowOption) (int, error) {
servers, e := vg.findEmptySlotsForOneVolume(topo, option)
if e != nil {
return 0, e
}
- vid := topo.NextVolumeId()
- err := vg.grow(topo, vid, option, servers...)
+ vid, raftErr := topo.NextVolumeId()
+ if raftErr != nil {
+ return 0, raftErr
+ }
+ err := vg.grow(grpcDialOption, topo, vid, option, servers...)
return len(servers), err
}
@@ -101,7 +107,7 @@ func (vg *VolumeGrowth) findEmptySlotsForOneVolume(topo *Topology, option *Volum
if len(node.Children()) < rp.DiffRackCount+1 {
return fmt.Errorf("Only has %d racks, not enough for %d.", len(node.Children()), rp.DiffRackCount+1)
}
- if node.FreeSpace() < rp.DiffRackCount+rp.SameRackCount+1 {
+ if node.FreeSpace() < int64(rp.DiffRackCount+rp.SameRackCount+1) {
return fmt.Errorf("Free:%d < Expected:%d", node.FreeSpace(), rp.DiffRackCount+rp.SameRackCount+1)
}
possibleRacksCount := 0
@@ -130,7 +136,7 @@ func (vg *VolumeGrowth) findEmptySlotsForOneVolume(topo *Topology, option *Volum
if option.Rack != "" && node.IsRack() && node.Id() != NodeId(option.Rack) {
return fmt.Errorf("Not matching preferred rack:%s", option.Rack)
}
- if node.FreeSpace() < rp.SameRackCount+1 {
+ if node.FreeSpace() < int64(rp.SameRackCount+1) {
return fmt.Errorf("Free:%d < Expected:%d", node.FreeSpace(), rp.SameRackCount+1)
}
if len(node.Children()) < rp.SameRackCount+1 {
@@ -171,7 +177,7 @@ func (vg *VolumeGrowth) findEmptySlotsForOneVolume(topo *Topology, option *Volum
servers = append(servers, server.(*DataNode))
}
for _, rack := range otherRacks {
- r := rand.Intn(rack.FreeSpace())
+ r := rand.Int63n(rack.FreeSpace())
if server, e := rack.ReserveOneVolume(r); e == nil {
servers = append(servers, server)
} else {
@@ -179,7 +185,7 @@ func (vg *VolumeGrowth) findEmptySlotsForOneVolume(topo *Topology, option *Volum
}
}
for _, datacenter := range otherDataCenters {
- r := rand.Intn(datacenter.FreeSpace())
+ r := rand.Int63n(datacenter.FreeSpace())
if server, e := datacenter.ReserveOneVolume(r); e == nil {
servers = append(servers, server)
} else {
@@ -189,16 +195,16 @@ func (vg *VolumeGrowth) findEmptySlotsForOneVolume(topo *Topology, option *Volum
return
}
-func (vg *VolumeGrowth) grow(topo *Topology, vid storage.VolumeId, option *VolumeGrowOption, servers ...*DataNode) error {
+func (vg *VolumeGrowth) grow(grpcDialOption grpc.DialOption, topo *Topology, vid needle.VolumeId, option *VolumeGrowOption, servers ...*DataNode) error {
for _, server := range servers {
- if err := AllocateVolume(server, vid, option); err == nil {
+ if err := AllocateVolume(server, grpcDialOption, vid, option); err == nil {
vi := storage.VolumeInfo{
Id: vid,
Size: 0,
Collection: option.Collection,
ReplicaPlacement: option.ReplicaPlacement,
Ttl: option.Ttl,
- Version: storage.CurrentVersion,
+ Version: needle.CurrentVersion,
}
server.AddOrUpdateVolume(vi)
topo.RegisterVolumeLayout(vi, server)
diff --git a/weed/topology/volume_growth_test.go b/weed/topology/volume_growth_test.go
index f983df1ec..3573365fd 100644
--- a/weed/topology/volume_growth_test.go
+++ b/weed/topology/volume_growth_test.go
@@ -7,6 +7,7 @@ import (
"github.com/chrislusf/seaweedfs/weed/sequence"
"github.com/chrislusf/seaweedfs/weed/storage"
+ "github.com/chrislusf/seaweedfs/weed/storage/needle"
)
var topologyLayout = `
@@ -96,12 +97,12 @@ func setup(topologyLayout string) *Topology {
for _, v := range serverMap["volumes"].([]interface{}) {
m := v.(map[string]interface{})
vi := storage.VolumeInfo{
- Id: storage.VolumeId(int64(m["id"].(float64))),
+ Id: needle.VolumeId(int64(m["id"].(float64))),
Size: uint64(m["size"].(float64)),
- Version: storage.CurrentVersion}
+ Version: needle.CurrentVersion}
server.AddOrUpdateVolume(vi)
}
- server.UpAdjustMaxVolumeCountDelta(int(serverMap["limit"].(float64)))
+ server.UpAdjustMaxVolumeCountDelta(int64(serverMap["limit"].(float64)))
}
}
}
diff --git a/weed/topology/volume_layout.go b/weed/topology/volume_layout.go
index 71a071e2f..799cbca62 100644
--- a/weed/topology/volume_layout.go
+++ b/weed/topology/volume_layout.go
@@ -9,16 +9,17 @@ import (
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/storage"
+ "github.com/chrislusf/seaweedfs/weed/storage/needle"
)
// mapping from volume to its locations, inverted from server to volume
type VolumeLayout struct {
rp *storage.ReplicaPlacement
- ttl *storage.TTL
- vid2location map[storage.VolumeId]*VolumeLocationList
- writables []storage.VolumeId // transient array of writable volume id
- readonlyVolumes map[storage.VolumeId]bool // transient set of readonly volumes
- oversizedVolumes map[storage.VolumeId]bool // set of oversized volumes
+ ttl *needle.TTL
+ vid2location map[needle.VolumeId]*VolumeLocationList
+ writables []needle.VolumeId // transient array of writable volume id
+ readonlyVolumes map[needle.VolumeId]bool // transient set of readonly volumes
+ oversizedVolumes map[needle.VolumeId]bool // set of oversized volumes
volumeSizeLimit uint64
accessLock sync.RWMutex
}
@@ -29,14 +30,14 @@ type VolumeLayoutStats struct {
FileCount uint64
}
-func NewVolumeLayout(rp *storage.ReplicaPlacement, ttl *storage.TTL, volumeSizeLimit uint64) *VolumeLayout {
+func NewVolumeLayout(rp *storage.ReplicaPlacement, ttl *needle.TTL, volumeSizeLimit uint64) *VolumeLayout {
return &VolumeLayout{
rp: rp,
ttl: ttl,
- vid2location: make(map[storage.VolumeId]*VolumeLocationList),
- writables: *new([]storage.VolumeId),
- readonlyVolumes: make(map[storage.VolumeId]bool),
- oversizedVolumes: make(map[storage.VolumeId]bool),
+ vid2location: make(map[needle.VolumeId]*VolumeLocationList),
+ writables: *new([]needle.VolumeId),
+ readonlyVolumes: make(map[needle.VolumeId]bool),
+ oversizedVolumes: make(map[needle.VolumeId]bool),
volumeSizeLimit: volumeSizeLimit,
}
}
@@ -57,7 +58,7 @@ func (vl *VolumeLayout) RegisterVolume(v *storage.VolumeInfo, dn *DataNode) {
for _, dn := range vl.vid2location[v.Id].list {
if vInfo, err := dn.GetVolumesById(v.Id); err == nil {
if vInfo.ReadOnly {
- glog.V(3).Infof("vid %d removed from writable", v.Id)
+ glog.V(1).Infof("vid %d removed from writable", v.Id)
vl.removeFromWritable(v.Id)
vl.readonlyVolumes[v.Id] = true
return
@@ -65,23 +66,19 @@ func (vl *VolumeLayout) RegisterVolume(v *storage.VolumeInfo, dn *DataNode) {
delete(vl.readonlyVolumes, v.Id)
}
} else {
- glog.V(3).Infof("vid %d removed from writable", v.Id)
+ glog.V(1).Infof("vid %d removed from writable", v.Id)
vl.removeFromWritable(v.Id)
delete(vl.readonlyVolumes, v.Id)
return
}
}
- if vl.vid2location[v.Id].Length() == vl.rp.GetCopyCount() && vl.isWritable(v) {
- if _, ok := vl.oversizedVolumes[v.Id]; !ok {
- vl.addToWritable(v.Id)
- }
- } else {
- vl.rememberOversizedVolumne(v)
- vl.removeFromWritable(v.Id)
- }
+
+ vl.rememberOversizedVolume(v)
+ vl.ensureCorrectWritables(v)
+
}
-func (vl *VolumeLayout) rememberOversizedVolumne(v *storage.VolumeInfo) {
+func (vl *VolumeLayout) rememberOversizedVolume(v *storage.VolumeInfo) {
if vl.isOversized(v) {
vl.oversizedVolumes[v.Id] = true
}
@@ -91,11 +88,34 @@ func (vl *VolumeLayout) UnRegisterVolume(v *storage.VolumeInfo, dn *DataNode) {
vl.accessLock.Lock()
defer vl.accessLock.Unlock()
- vl.removeFromWritable(v.Id)
- delete(vl.vid2location, v.Id)
+ // remove from vid2location map
+ location, ok := vl.vid2location[v.Id]
+ if !ok {
+ return
+ }
+
+ if location.Remove(dn) {
+
+ vl.ensureCorrectWritables(v)
+
+ if location.Length() == 0 {
+ delete(vl.vid2location, v.Id)
+ }
+
+ }
+}
+
+func (vl *VolumeLayout) ensureCorrectWritables(v *storage.VolumeInfo) {
+ if vl.vid2location[v.Id].Length() == vl.rp.GetCopyCount() && vl.isWritable(v) {
+ if _, ok := vl.oversizedVolumes[v.Id]; !ok {
+ vl.addToWritable(v.Id)
+ }
+ } else {
+ vl.removeFromWritable(v.Id)
+ }
}
-func (vl *VolumeLayout) addToWritable(vid storage.VolumeId) {
+func (vl *VolumeLayout) addToWritable(vid needle.VolumeId) {
for _, id := range vl.writables {
if vid == id {
return
@@ -110,7 +130,7 @@ func (vl *VolumeLayout) isOversized(v *storage.VolumeInfo) bool {
func (vl *VolumeLayout) isWritable(v *storage.VolumeInfo) bool {
return !vl.isOversized(v) &&
- v.Version == storage.CurrentVersion &&
+ v.Version == needle.CurrentVersion &&
!v.ReadOnly
}
@@ -121,7 +141,7 @@ func (vl *VolumeLayout) isEmpty() bool {
return len(vl.vid2location) == 0
}
-func (vl *VolumeLayout) Lookup(vid storage.VolumeId) []*DataNode {
+func (vl *VolumeLayout) Lookup(vid needle.VolumeId) []*DataNode {
vl.accessLock.RLock()
defer vl.accessLock.RUnlock()
@@ -141,7 +161,7 @@ func (vl *VolumeLayout) ListVolumeServers() (nodes []*DataNode) {
return
}
-func (vl *VolumeLayout) PickForWrite(count uint64, option *VolumeGrowOption) (*storage.VolumeId, uint64, *VolumeLocationList, error) {
+func (vl *VolumeLayout) PickForWrite(count uint64, option *VolumeGrowOption) (*needle.VolumeId, uint64, *VolumeLocationList, error) {
vl.accessLock.RLock()
defer vl.accessLock.RUnlock()
@@ -158,7 +178,7 @@ func (vl *VolumeLayout) PickForWrite(count uint64, option *VolumeGrowOption) (*s
}
return nil, 0, nil, errors.New("Strangely vid " + vid.String() + " is on no machine!")
}
- var vid storage.VolumeId
+ var vid needle.VolumeId
var locationList *VolumeLocationList
counter := 0
for _, v := range vl.writables {
@@ -205,7 +225,7 @@ func (vl *VolumeLayout) GetActiveVolumeCount(option *VolumeGrowOption) int {
return counter
}
-func (vl *VolumeLayout) removeFromWritable(vid storage.VolumeId) bool {
+func (vl *VolumeLayout) removeFromWritable(vid needle.VolumeId) bool {
toDeleteIndex := -1
for k, id := range vl.writables {
if id == vid {
@@ -220,7 +240,7 @@ func (vl *VolumeLayout) removeFromWritable(vid storage.VolumeId) bool {
}
return false
}
-func (vl *VolumeLayout) setVolumeWritable(vid storage.VolumeId) bool {
+func (vl *VolumeLayout) setVolumeWritable(vid needle.VolumeId) bool {
for _, v := range vl.writables {
if v == vid {
return false
@@ -231,7 +251,7 @@ func (vl *VolumeLayout) setVolumeWritable(vid storage.VolumeId) bool {
return true
}
-func (vl *VolumeLayout) SetVolumeUnavailable(dn *DataNode, vid storage.VolumeId) bool {
+func (vl *VolumeLayout) SetVolumeUnavailable(dn *DataNode, vid needle.VolumeId) bool {
vl.accessLock.Lock()
defer vl.accessLock.Unlock()
@@ -245,18 +265,18 @@ func (vl *VolumeLayout) SetVolumeUnavailable(dn *DataNode, vid storage.VolumeId)
}
return false
}
-func (vl *VolumeLayout) SetVolumeAvailable(dn *DataNode, vid storage.VolumeId) bool {
+func (vl *VolumeLayout) SetVolumeAvailable(dn *DataNode, vid needle.VolumeId) bool {
vl.accessLock.Lock()
defer vl.accessLock.Unlock()
vl.vid2location[vid].Set(dn)
- if vl.vid2location[vid].Length() >= vl.rp.GetCopyCount() {
+ if vl.vid2location[vid].Length() == vl.rp.GetCopyCount() {
return vl.setVolumeWritable(vid)
}
return false
}
-func (vl *VolumeLayout) SetVolumeCapacityFull(vid storage.VolumeId) bool {
+func (vl *VolumeLayout) SetVolumeCapacityFull(vid needle.VolumeId) bool {
vl.accessLock.Lock()
defer vl.accessLock.Unlock()
diff --git a/weed/topology/volume_location_list.go b/weed/topology/volume_location_list.go
index 8d5881333..8905c54b5 100644
--- a/weed/topology/volume_location_list.go
+++ b/weed/topology/volume_location_list.go
@@ -3,7 +3,7 @@ package topology
import (
"fmt"
- "github.com/chrislusf/seaweedfs/weed/storage"
+ "github.com/chrislusf/seaweedfs/weed/storage/needle"
)
type VolumeLocationList struct {
@@ -66,7 +66,7 @@ func (dnll *VolumeLocationList) Refresh(freshThreshHold int64) {
}
}
-func (dnll *VolumeLocationList) Stats(vid storage.VolumeId, freshThreshHold int64) (size uint64, fileCount int) {
+func (dnll *VolumeLocationList) Stats(vid needle.VolumeId, freshThreshHold int64) (size uint64, fileCount int) {
for _, dnl := range dnll.list {
if dnl.LastSeen < freshThreshHold {
vinfo, err := dnl.GetVolumesById(vid)