aboutsummaryrefslogtreecommitdiff
path: root/weed/topology
diff options
context:
space:
mode:
Diffstat (limited to 'weed/topology')
-rw-r--r--weed/topology/allocate_volume.go23
-rw-r--r--weed/topology/cluster_commands.go6
-rw-r--r--weed/topology/collection.go18
-rw-r--r--weed/topology/data_center.go18
-rw-r--r--weed/topology/data_node.go107
-rw-r--r--weed/topology/data_node_ec.go135
-rw-r--r--weed/topology/node.go185
-rw-r--r--weed/topology/rack.go19
-rw-r--r--weed/topology/store_replicate.go210
-rw-r--r--weed/topology/topology.go123
-rw-r--r--weed/topology/topology_ec.go173
-rw-r--r--weed/topology/topology_event_handling.go6
-rw-r--r--weed/topology/topology_map.go21
-rw-r--r--weed/topology/topology_test.go78
-rw-r--r--weed/topology/topology_vacuum.go121
-rw-r--r--weed/topology/volume_growth.go75
-rw-r--r--weed/topology/volume_growth_test.go221
-rw-r--r--weed/topology/volume_layout.go108
-rw-r--r--weed/topology/volume_location_list.go4
19 files changed, 1309 insertions, 342 deletions
diff --git a/weed/topology/allocate_volume.go b/weed/topology/allocate_volume.go
index 55796ab43..e5dc48652 100644
--- a/weed/topology/allocate_volume.go
+++ b/weed/topology/allocate_volume.go
@@ -2,29 +2,28 @@ package topology
import (
"context"
- "time"
"github.com/chrislusf/seaweedfs/weed/operation"
"github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
- "github.com/chrislusf/seaweedfs/weed/storage"
+ "github.com/chrislusf/seaweedfs/weed/storage/needle"
+ "google.golang.org/grpc"
)
type AllocateVolumeResult struct {
Error string
}
-func AllocateVolume(dn *DataNode, vid storage.VolumeId, option *VolumeGrowOption) error {
+func AllocateVolume(dn *DataNode, grpcDialOption grpc.DialOption, vid needle.VolumeId, option *VolumeGrowOption) error {
- return operation.WithVolumeServerClient(dn.Url(), func(client volume_server_pb.VolumeServerClient) error {
- ctx, cancel := context.WithTimeout(context.Background(), time.Duration(5*time.Second))
- defer cancel()
+ return operation.WithVolumeServerClient(dn.Url(), grpcDialOption, func(client volume_server_pb.VolumeServerClient) error {
- _, deleteErr := client.AssignVolume(ctx, &volume_server_pb.AssignVolumeRequest{
- VolumdId: uint32(vid),
- Collection: option.Collection,
- Replication: option.ReplicaPlacement.String(),
- Ttl: option.Ttl.String(),
- Preallocate: option.Prealloacte,
+ _, deleteErr := client.AllocateVolume(context.Background(), &volume_server_pb.AllocateVolumeRequest{
+ VolumeId: uint32(vid),
+ Collection: option.Collection,
+ Replication: option.ReplicaPlacement.String(),
+ Ttl: option.Ttl.String(),
+ Preallocate: option.Prealloacte,
+ MemoryMapMaxSizeMb: option.MemoryMapMaxSizeMb,
})
return deleteErr
})
diff --git a/weed/topology/cluster_commands.go b/weed/topology/cluster_commands.go
index 7a36c25ec..152691ccb 100644
--- a/weed/topology/cluster_commands.go
+++ b/weed/topology/cluster_commands.go
@@ -3,14 +3,14 @@ package topology
import (
"github.com/chrislusf/raft"
"github.com/chrislusf/seaweedfs/weed/glog"
- "github.com/chrislusf/seaweedfs/weed/storage"
+ "github.com/chrislusf/seaweedfs/weed/storage/needle"
)
type MaxVolumeIdCommand struct {
- MaxVolumeId storage.VolumeId `json:"maxVolumeId"`
+ MaxVolumeId needle.VolumeId `json:"maxVolumeId"`
}
-func NewMaxVolumeIdCommand(value storage.VolumeId) *MaxVolumeIdCommand {
+func NewMaxVolumeIdCommand(value needle.VolumeId) *MaxVolumeIdCommand {
return &MaxVolumeIdCommand{
MaxVolumeId: value,
}
diff --git a/weed/topology/collection.go b/weed/topology/collection.go
index a17f0c961..5b410d1eb 100644
--- a/weed/topology/collection.go
+++ b/weed/topology/collection.go
@@ -3,18 +3,24 @@ package topology
import (
"fmt"
- "github.com/chrislusf/seaweedfs/weed/storage"
+ "github.com/chrislusf/seaweedfs/weed/storage/needle"
+ "github.com/chrislusf/seaweedfs/weed/storage/super_block"
"github.com/chrislusf/seaweedfs/weed/util"
)
type Collection struct {
Name string
volumeSizeLimit uint64
+ replicationAsMin bool
storageType2VolumeLayout *util.ConcurrentReadMap
}
-func NewCollection(name string, volumeSizeLimit uint64) *Collection {
- c := &Collection{Name: name, volumeSizeLimit: volumeSizeLimit}
+func NewCollection(name string, volumeSizeLimit uint64, replicationAsMin bool) *Collection {
+ c := &Collection{
+ Name: name,
+ volumeSizeLimit: volumeSizeLimit,
+ replicationAsMin: replicationAsMin,
+ }
c.storageType2VolumeLayout = util.NewConcurrentReadMap()
return c
}
@@ -23,18 +29,18 @@ func (c *Collection) String() string {
return fmt.Sprintf("Name:%s, volumeSizeLimit:%d, storageType2VolumeLayout:%v", c.Name, c.volumeSizeLimit, c.storageType2VolumeLayout)
}
-func (c *Collection) GetOrCreateVolumeLayout(rp *storage.ReplicaPlacement, ttl *storage.TTL) *VolumeLayout {
+func (c *Collection) GetOrCreateVolumeLayout(rp *super_block.ReplicaPlacement, ttl *needle.TTL) *VolumeLayout {
keyString := rp.String()
if ttl != nil {
keyString += ttl.String()
}
vl := c.storageType2VolumeLayout.Get(keyString, func() interface{} {
- return NewVolumeLayout(rp, ttl, c.volumeSizeLimit)
+ return NewVolumeLayout(rp, ttl, c.volumeSizeLimit, c.replicationAsMin)
})
return vl.(*VolumeLayout)
}
-func (c *Collection) Lookup(vid storage.VolumeId) []*DataNode {
+func (c *Collection) Lookup(vid needle.VolumeId) []*DataNode {
for _, vl := range c.storageType2VolumeLayout.Items() {
if vl != nil {
if list := vl.(*VolumeLayout).Lookup(vid); list != nil {
diff --git a/weed/topology/data_center.go b/weed/topology/data_center.go
index bcf2dfd31..dc3accb71 100644
--- a/weed/topology/data_center.go
+++ b/weed/topology/data_center.go
@@ -1,5 +1,7 @@
package topology
+import "github.com/chrislusf/seaweedfs/weed/pb/master_pb"
+
type DataCenter struct {
NodeImpl
}
@@ -38,3 +40,19 @@ func (dc *DataCenter) ToMap() interface{} {
m["Racks"] = racks
return m
}
+
+func (dc *DataCenter) ToDataCenterInfo() *master_pb.DataCenterInfo {
+ m := &master_pb.DataCenterInfo{
+ Id: string(dc.Id()),
+ VolumeCount: uint64(dc.GetVolumeCount()),
+ MaxVolumeCount: uint64(dc.GetMaxVolumeCount()),
+ FreeVolumeCount: uint64(dc.FreeSpace()),
+ ActiveVolumeCount: uint64(dc.GetActiveVolumeCount()),
+ RemoteVolumeCount: uint64(dc.GetRemoteVolumeCount()),
+ }
+ for _, c := range dc.Children() {
+ rack := c.(*Rack)
+ m.RackInfos = append(m.RackInfos, rack.ToRackInfo())
+ }
+ return m
+}
diff --git a/weed/topology/data_node.go b/weed/topology/data_node.go
index 6ea6d3938..efdf5285b 100644
--- a/weed/topology/data_node.go
+++ b/weed/topology/data_node.go
@@ -2,7 +2,13 @@ package topology
import (
"fmt"
+ "github.com/chrislusf/seaweedfs/weed/util"
"strconv"
+ "sync"
+
+ "github.com/chrislusf/seaweedfs/weed/pb/master_pb"
+ "github.com/chrislusf/seaweedfs/weed/storage/erasure_coding"
+ "github.com/chrislusf/seaweedfs/weed/storage/needle"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/storage"
@@ -10,18 +16,21 @@ import (
type DataNode struct {
NodeImpl
- volumes map[storage.VolumeId]storage.VolumeInfo
- Ip string
- Port int
- PublicUrl string
- LastSeen int64 // unix time in seconds
+ volumes map[needle.VolumeId]storage.VolumeInfo
+ Ip string
+ Port int
+ PublicUrl string
+ LastSeen int64 // unix time in seconds
+ ecShards map[needle.VolumeId]*erasure_coding.EcVolumeInfo
+ ecShardsLock sync.RWMutex
}
func NewDataNode(id string) *DataNode {
s := &DataNode{}
s.id = NodeId(id)
s.nodeType = "DataNode"
- s.volumes = make(map[storage.VolumeId]storage.VolumeInfo)
+ s.volumes = make(map[needle.VolumeId]storage.VolumeInfo)
+ s.ecShards = make(map[needle.VolumeId]*erasure_coding.EcVolumeInfo)
s.NodeImpl.value = s
return s
}
@@ -32,25 +41,37 @@ func (dn *DataNode) String() string {
return fmt.Sprintf("Node:%s, volumes:%v, Ip:%s, Port:%d, PublicUrl:%s", dn.NodeImpl.String(), dn.volumes, dn.Ip, dn.Port, dn.PublicUrl)
}
-func (dn *DataNode) AddOrUpdateVolume(v storage.VolumeInfo) (isNew bool) {
+func (dn *DataNode) AddOrUpdateVolume(v storage.VolumeInfo) (isNew, isChangedRO bool) {
dn.Lock()
defer dn.Unlock()
- if _, ok := dn.volumes[v.Id]; !ok {
+ if oldV, ok := dn.volumes[v.Id]; !ok {
dn.volumes[v.Id] = v
dn.UpAdjustVolumeCountDelta(1)
+ if v.IsRemote() {
+ dn.UpAdjustRemoteVolumeCountDelta(1)
+ }
if !v.ReadOnly {
dn.UpAdjustActiveVolumeCountDelta(1)
}
dn.UpAdjustMaxVolumeId(v.Id)
isNew = true
} else {
+ if oldV.IsRemote() != v.IsRemote() {
+ if v.IsRemote() {
+ dn.UpAdjustRemoteVolumeCountDelta(1)
+ }
+ if oldV.IsRemote() {
+ dn.UpAdjustRemoteVolumeCountDelta(-1)
+ }
+ }
+ isChangedRO = dn.volumes[v.Id].ReadOnly != v.ReadOnly
dn.volumes[v.Id] = v
}
return
}
-func (dn *DataNode) UpdateVolumes(actualVolumes []storage.VolumeInfo) (newVolumes, deletedVolumes []storage.VolumeInfo) {
- actualVolumeMap := make(map[storage.VolumeId]storage.VolumeInfo)
+func (dn *DataNode) UpdateVolumes(actualVolumes []storage.VolumeInfo) (newVolumes, deletedVolumes, changeRO []storage.VolumeInfo) {
+ actualVolumeMap := make(map[needle.VolumeId]storage.VolumeInfo)
for _, v := range actualVolumes {
actualVolumeMap[v.Id] = v
}
@@ -61,15 +82,42 @@ func (dn *DataNode) UpdateVolumes(actualVolumes []storage.VolumeInfo) (newVolume
delete(dn.volumes, vid)
deletedVolumes = append(deletedVolumes, v)
dn.UpAdjustVolumeCountDelta(-1)
- dn.UpAdjustActiveVolumeCountDelta(-1)
+ if v.IsRemote() {
+ dn.UpAdjustRemoteVolumeCountDelta(-1)
+ }
+ if !v.ReadOnly {
+ dn.UpAdjustActiveVolumeCountDelta(-1)
+ }
}
}
dn.Unlock()
for _, v := range actualVolumes {
- isNew := dn.AddOrUpdateVolume(v)
+ isNew, isChangedRO := dn.AddOrUpdateVolume(v)
if isNew {
newVolumes = append(newVolumes, v)
}
+ if isChangedRO {
+ changeRO = append(changeRO, v)
+ }
+ }
+ return
+}
+
+func (dn *DataNode) DeltaUpdateVolumes(newlVolumes, deletedVolumes []storage.VolumeInfo) {
+ dn.Lock()
+ for _, v := range deletedVolumes {
+ delete(dn.volumes, v.Id)
+ dn.UpAdjustVolumeCountDelta(-1)
+ if v.IsRemote() {
+ dn.UpAdjustRemoteVolumeCountDelta(-1)
+ }
+ if !v.ReadOnly {
+ dn.UpAdjustActiveVolumeCountDelta(-1)
+ }
+ }
+ dn.Unlock()
+ for _, v := range newlVolumes {
+ dn.AddOrUpdateVolume(v)
}
return
}
@@ -83,7 +131,7 @@ func (dn *DataNode) GetVolumes() (ret []storage.VolumeInfo) {
return ret
}
-func (dn *DataNode) GetVolumesById(id storage.VolumeId) (storage.VolumeInfo, error) {
+func (dn *DataNode) GetVolumesById(id needle.VolumeId) (storage.VolumeInfo, error) {
dn.RLock()
defer dn.RUnlock()
vInfo, ok := dn.volumes[id]
@@ -123,8 +171,41 @@ func (dn *DataNode) ToMap() interface{} {
ret := make(map[string]interface{})
ret["Url"] = dn.Url()
ret["Volumes"] = dn.GetVolumeCount()
+ ret["VolumeIds"] = dn.GetVolumeIds()
+ ret["EcShards"] = dn.GetEcShardCount()
ret["Max"] = dn.GetMaxVolumeCount()
ret["Free"] = dn.FreeSpace()
ret["PublicUrl"] = dn.PublicUrl
return ret
}
+
+func (dn *DataNode) ToDataNodeInfo() *master_pb.DataNodeInfo {
+ m := &master_pb.DataNodeInfo{
+ Id: string(dn.Id()),
+ VolumeCount: uint64(dn.GetVolumeCount()),
+ MaxVolumeCount: uint64(dn.GetMaxVolumeCount()),
+ FreeVolumeCount: uint64(dn.FreeSpace()),
+ ActiveVolumeCount: uint64(dn.GetActiveVolumeCount()),
+ RemoteVolumeCount: uint64(dn.GetRemoteVolumeCount()),
+ }
+ for _, v := range dn.GetVolumes() {
+ m.VolumeInfos = append(m.VolumeInfos, v.ToVolumeInformationMessage())
+ }
+ for _, ecv := range dn.GetEcShards() {
+ m.EcShardInfos = append(m.EcShardInfos, ecv.ToVolumeEcShardInformationMessage())
+ }
+ return m
+}
+
+// GetVolumeIds returns the human readable volume ids limited to count of max 100.
+func (dn *DataNode) GetVolumeIds() string {
+ dn.RLock()
+ defer dn.RUnlock()
+ ids := make([]int, 0, len(dn.volumes))
+
+ for k := range dn.volumes {
+ ids = append(ids, int(k))
+ }
+
+ return util.HumanReadableIntsMax(100, ids...)
+}
diff --git a/weed/topology/data_node_ec.go b/weed/topology/data_node_ec.go
new file mode 100644
index 000000000..75c8784fe
--- /dev/null
+++ b/weed/topology/data_node_ec.go
@@ -0,0 +1,135 @@
+package topology
+
+import (
+ "github.com/chrislusf/seaweedfs/weed/storage/erasure_coding"
+ "github.com/chrislusf/seaweedfs/weed/storage/needle"
+)
+
+func (dn *DataNode) GetEcShards() (ret []*erasure_coding.EcVolumeInfo) {
+ dn.RLock()
+ for _, ecVolumeInfo := range dn.ecShards {
+ ret = append(ret, ecVolumeInfo)
+ }
+ dn.RUnlock()
+ return ret
+}
+
+func (dn *DataNode) UpdateEcShards(actualShards []*erasure_coding.EcVolumeInfo) (newShards, deletedShards []*erasure_coding.EcVolumeInfo) {
+ // prepare the new ec shard map
+ actualEcShardMap := make(map[needle.VolumeId]*erasure_coding.EcVolumeInfo)
+ for _, ecShards := range actualShards {
+ actualEcShardMap[ecShards.VolumeId] = ecShards
+ }
+
+ // found out the newShards and deletedShards
+ var newShardCount, deletedShardCount int
+ dn.ecShardsLock.RLock()
+ for vid, ecShards := range dn.ecShards {
+ if actualEcShards, ok := actualEcShardMap[vid]; !ok {
+ // dn registered ec shards not found in the new set of ec shards
+ deletedShards = append(deletedShards, ecShards)
+ deletedShardCount += ecShards.ShardIdCount()
+ } else {
+ // found, but maybe the actual shard could be missing
+ a := actualEcShards.Minus(ecShards)
+ if a.ShardIdCount() > 0 {
+ newShards = append(newShards, a)
+ newShardCount += a.ShardIdCount()
+ }
+ d := ecShards.Minus(actualEcShards)
+ if d.ShardIdCount() > 0 {
+ deletedShards = append(deletedShards, d)
+ deletedShardCount += d.ShardIdCount()
+ }
+ }
+ }
+ for _, ecShards := range actualShards {
+ if _, found := dn.ecShards[ecShards.VolumeId]; !found {
+ newShards = append(newShards, ecShards)
+ newShardCount += ecShards.ShardIdCount()
+ }
+ }
+ dn.ecShardsLock.RUnlock()
+
+ if len(newShards) > 0 || len(deletedShards) > 0 {
+ // if changed, set to the new ec shard map
+ dn.ecShardsLock.Lock()
+ dn.ecShards = actualEcShardMap
+ dn.UpAdjustEcShardCountDelta(int64(newShardCount - deletedShardCount))
+ dn.ecShardsLock.Unlock()
+ }
+
+ return
+}
+
+func (dn *DataNode) DeltaUpdateEcShards(newShards, deletedShards []*erasure_coding.EcVolumeInfo) {
+
+ for _, newShard := range newShards {
+ dn.AddOrUpdateEcShard(newShard)
+ }
+
+ for _, deletedShard := range deletedShards {
+ dn.DeleteEcShard(deletedShard)
+ }
+
+}
+
+func (dn *DataNode) AddOrUpdateEcShard(s *erasure_coding.EcVolumeInfo) {
+ dn.ecShardsLock.Lock()
+ defer dn.ecShardsLock.Unlock()
+
+ delta := 0
+ if existing, ok := dn.ecShards[s.VolumeId]; !ok {
+ dn.ecShards[s.VolumeId] = s
+ delta = s.ShardBits.ShardIdCount()
+ } else {
+ oldCount := existing.ShardBits.ShardIdCount()
+ existing.ShardBits = existing.ShardBits.Plus(s.ShardBits)
+ delta = existing.ShardBits.ShardIdCount() - oldCount
+ }
+
+ dn.UpAdjustEcShardCountDelta(int64(delta))
+
+}
+
+func (dn *DataNode) DeleteEcShard(s *erasure_coding.EcVolumeInfo) {
+ dn.ecShardsLock.Lock()
+ defer dn.ecShardsLock.Unlock()
+
+ if existing, ok := dn.ecShards[s.VolumeId]; ok {
+ oldCount := existing.ShardBits.ShardIdCount()
+ existing.ShardBits = existing.ShardBits.Minus(s.ShardBits)
+ delta := existing.ShardBits.ShardIdCount() - oldCount
+ dn.UpAdjustEcShardCountDelta(int64(delta))
+ if existing.ShardBits.ShardIdCount() == 0 {
+ delete(dn.ecShards, s.VolumeId)
+ }
+ }
+
+}
+
+func (dn *DataNode) HasVolumesById(id needle.VolumeId) (hasVolumeId bool) {
+
+ // check whether normal volumes has this volume id
+ dn.RLock()
+ _, ok := dn.volumes[id]
+ if ok {
+ hasVolumeId = true
+ }
+ dn.RUnlock()
+
+ if hasVolumeId {
+ return
+ }
+
+ // check whether ec shards has this volume id
+ dn.ecShardsLock.RLock()
+ _, ok = dn.ecShards[id]
+ if ok {
+ hasVolumeId = true
+ }
+ dn.ecShardsLock.RUnlock()
+
+ return
+
+}
diff --git a/weed/topology/node.go b/weed/topology/node.go
index b7d2f79ec..114417edf 100644
--- a/weed/topology/node.go
+++ b/weed/topology/node.go
@@ -5,26 +5,32 @@ import (
"math/rand"
"strings"
"sync"
+ "sync/atomic"
"github.com/chrislusf/seaweedfs/weed/glog"
- "github.com/chrislusf/seaweedfs/weed/storage"
+ "github.com/chrislusf/seaweedfs/weed/storage/erasure_coding"
+ "github.com/chrislusf/seaweedfs/weed/storage/needle"
)
type NodeId string
type Node interface {
Id() NodeId
String() string
- FreeSpace() int
- ReserveOneVolume(r int) (*DataNode, error)
- UpAdjustMaxVolumeCountDelta(maxVolumeCountDelta int)
- UpAdjustVolumeCountDelta(volumeCountDelta int)
- UpAdjustActiveVolumeCountDelta(activeVolumeCountDelta int)
- UpAdjustMaxVolumeId(vid storage.VolumeId)
+ FreeSpace() int64
+ ReserveOneVolume(r int64) (*DataNode, error)
+ UpAdjustMaxVolumeCountDelta(maxVolumeCountDelta int64)
+ UpAdjustVolumeCountDelta(volumeCountDelta int64)
+ UpAdjustRemoteVolumeCountDelta(remoteVolumeCountDelta int64)
+ UpAdjustEcShardCountDelta(ecShardCountDelta int64)
+ UpAdjustActiveVolumeCountDelta(activeVolumeCountDelta int64)
+ UpAdjustMaxVolumeId(vid needle.VolumeId)
- GetVolumeCount() int
- GetActiveVolumeCount() int
- GetMaxVolumeCount() int
- GetMaxVolumeId() storage.VolumeId
+ GetVolumeCount() int64
+ GetEcShardCount() int64
+ GetActiveVolumeCount() int64
+ GetRemoteVolumeCount() int64
+ GetMaxVolumeCount() int64
+ GetMaxVolumeId() needle.VolumeId
SetParent(Node)
LinkChildNode(node Node)
UnlinkChildNode(nodeId NodeId)
@@ -39,14 +45,16 @@ type Node interface {
GetValue() interface{} //get reference to the topology,dc,rack,datanode
}
type NodeImpl struct {
+ volumeCount int64
+ remoteVolumeCount int64
+ activeVolumeCount int64
+ ecShardCount int64
+ maxVolumeCount int64
id NodeId
- volumeCount int
- activeVolumeCount int
- maxVolumeCount int
parent Node
sync.RWMutex // lock children
children map[NodeId]Node
- maxVolumeId storage.VolumeId
+ maxVolumeId needle.VolumeId
//for rack, data center, topology
nodeType string
@@ -54,56 +62,64 @@ type NodeImpl struct {
}
// the first node must satisfy filterFirstNodeFn(), the rest nodes must have one free slot
-func (n *NodeImpl) RandomlyPickNodes(numberOfNodes int, filterFirstNodeFn func(dn Node) error) (firstNode Node, restNodes []Node, err error) {
- candidates := make([]Node, 0, len(n.children))
+func (n *NodeImpl) PickNodesByWeight(numberOfNodes int, filterFirstNodeFn func(dn Node) error) (firstNode Node, restNodes []Node, err error) {
+ var totalWeights int64
var errs []string
n.RLock()
+ candidates := make([]Node, 0, len(n.children))
+ candidatesWeights := make([]int64, 0, len(n.children))
+ //pick nodes which has enough free volumes as candidates, and use free volumes number as node weight.
for _, node := range n.children {
- if err := filterFirstNodeFn(node); err == nil {
- candidates = append(candidates, node)
- } else {
- errs = append(errs, string(node.Id())+":"+err.Error())
+ if node.FreeSpace() <= 0 {
+ continue
}
+ totalWeights += node.FreeSpace()
+ candidates = append(candidates, node)
+ candidatesWeights = append(candidatesWeights, node.FreeSpace())
}
n.RUnlock()
- if len(candidates) == 0 {
- return nil, nil, errors.New("No matching data node found! \n" + strings.Join(errs, "\n"))
+ if len(candidates) < numberOfNodes {
+ glog.V(0).Infoln(n.Id(), "failed to pick", numberOfNodes, "from ", len(candidates), "node candidates")
+ return nil, nil, errors.New("No enough data node found!")
}
- firstNode = candidates[rand.Intn(len(candidates))]
- glog.V(2).Infoln(n.Id(), "picked main node:", firstNode.Id())
- restNodes = make([]Node, numberOfNodes-1)
- candidates = candidates[:0]
- n.RLock()
- for _, node := range n.children {
- if node.Id() == firstNode.Id() {
- continue
- }
- if node.FreeSpace() <= 0 {
- continue
+ //pick nodes randomly by weights, the node picked earlier has higher final weights
+ sortedCandidates := make([]Node, 0, len(candidates))
+ for i := 0; i < len(candidates); i++ {
+ weightsInterval := rand.Int63n(totalWeights)
+ lastWeights := int64(0)
+ for k, weights := range candidatesWeights {
+ if (weightsInterval >= lastWeights) && (weightsInterval < lastWeights+weights) {
+ sortedCandidates = append(sortedCandidates, candidates[k])
+ candidatesWeights[k] = 0
+ totalWeights -= weights
+ break
+ }
+ lastWeights += weights
}
- glog.V(2).Infoln("select rest node candidate:", node.Id())
- candidates = append(candidates, node)
}
- n.RUnlock()
- glog.V(2).Infoln(n.Id(), "picking", numberOfNodes-1, "from rest", len(candidates), "node candidates")
- ret := len(restNodes) == 0
- for k, node := range candidates {
- if k < len(restNodes) {
- restNodes[k] = node
- if k == len(restNodes)-1 {
- ret = true
+
+ restNodes = make([]Node, 0, numberOfNodes-1)
+ ret := false
+ n.RLock()
+ for k, node := range sortedCandidates {
+ if err := filterFirstNodeFn(node); err == nil {
+ firstNode = node
+ if k >= numberOfNodes-1 {
+ restNodes = sortedCandidates[:numberOfNodes-1]
+ } else {
+ restNodes = append(restNodes, sortedCandidates[:k]...)
+ restNodes = append(restNodes, sortedCandidates[k+1:numberOfNodes]...)
}
+ ret = true
+ break
} else {
- r := rand.Intn(k + 1)
- if r < len(restNodes) {
- restNodes[r] = node
- }
+ errs = append(errs, string(node.Id())+":"+err.Error())
}
}
+ n.RUnlock()
if !ret {
- glog.V(2).Infoln(n.Id(), "failed to pick", numberOfNodes-1, "from rest", len(candidates), "node candidates")
- err = errors.New("No enough data node found!")
+ return nil, nil, errors.New("No matching data node found! \n" + strings.Join(errs, "\n"))
}
return
}
@@ -126,8 +142,12 @@ func (n *NodeImpl) String() string {
func (n *NodeImpl) Id() NodeId {
return n.id
}
-func (n *NodeImpl) FreeSpace() int {
- return n.maxVolumeCount - n.volumeCount
+func (n *NodeImpl) FreeSpace() int64 {
+ freeVolumeSlotCount := n.maxVolumeCount + n.remoteVolumeCount - n.volumeCount
+ if n.ecShardCount > 0 {
+ freeVolumeSlotCount = freeVolumeSlotCount - n.ecShardCount/erasure_coding.DataShardsCount - 1
+ }
+ return freeVolumeSlotCount
}
func (n *NodeImpl) SetParent(node Node) {
n.parent = node
@@ -146,7 +166,7 @@ func (n *NodeImpl) Parent() Node {
func (n *NodeImpl) GetValue() interface{} {
return n.value
}
-func (n *NodeImpl) ReserveOneVolume(r int) (assignedNode *DataNode, err error) {
+func (n *NodeImpl) ReserveOneVolume(r int64) (assignedNode *DataNode, err error) {
n.RLock()
defer n.RUnlock()
for _, node := range n.children {
@@ -171,25 +191,52 @@ func (n *NodeImpl) ReserveOneVolume(r int) (assignedNode *DataNode, err error) {
return nil, errors.New("No free volume slot found!")
}
-func (n *NodeImpl) UpAdjustMaxVolumeCountDelta(maxVolumeCountDelta int) { //can be negative
- n.maxVolumeCount += maxVolumeCountDelta
+func (n *NodeImpl) UpAdjustMaxVolumeCountDelta(maxVolumeCountDelta int64) { //can be negative
+ if maxVolumeCountDelta == 0 {
+ return
+ }
+ atomic.AddInt64(&n.maxVolumeCount, maxVolumeCountDelta)
if n.parent != nil {
n.parent.UpAdjustMaxVolumeCountDelta(maxVolumeCountDelta)
}
}
-func (n *NodeImpl) UpAdjustVolumeCountDelta(volumeCountDelta int) { //can be negative
- n.volumeCount += volumeCountDelta
+func (n *NodeImpl) UpAdjustVolumeCountDelta(volumeCountDelta int64) { //can be negative
+ if volumeCountDelta == 0 {
+ return
+ }
+ atomic.AddInt64(&n.volumeCount, volumeCountDelta)
if n.parent != nil {
n.parent.UpAdjustVolumeCountDelta(volumeCountDelta)
}
}
-func (n *NodeImpl) UpAdjustActiveVolumeCountDelta(activeVolumeCountDelta int) { //can be negative
- n.activeVolumeCount += activeVolumeCountDelta
+func (n *NodeImpl) UpAdjustRemoteVolumeCountDelta(remoteVolumeCountDelta int64) { //can be negative
+ if remoteVolumeCountDelta == 0 {
+ return
+ }
+ atomic.AddInt64(&n.remoteVolumeCount, remoteVolumeCountDelta)
+ if n.parent != nil {
+ n.parent.UpAdjustRemoteVolumeCountDelta(remoteVolumeCountDelta)
+ }
+}
+func (n *NodeImpl) UpAdjustEcShardCountDelta(ecShardCountDelta int64) { //can be negative
+ if ecShardCountDelta == 0 {
+ return
+ }
+ atomic.AddInt64(&n.ecShardCount, ecShardCountDelta)
+ if n.parent != nil {
+ n.parent.UpAdjustEcShardCountDelta(ecShardCountDelta)
+ }
+}
+func (n *NodeImpl) UpAdjustActiveVolumeCountDelta(activeVolumeCountDelta int64) { //can be negative
+ if activeVolumeCountDelta == 0 {
+ return
+ }
+ atomic.AddInt64(&n.activeVolumeCount, activeVolumeCountDelta)
if n.parent != nil {
n.parent.UpAdjustActiveVolumeCountDelta(activeVolumeCountDelta)
}
}
-func (n *NodeImpl) UpAdjustMaxVolumeId(vid storage.VolumeId) { //can be negative
+func (n *NodeImpl) UpAdjustMaxVolumeId(vid needle.VolumeId) { //can be negative
if n.maxVolumeId < vid {
n.maxVolumeId = vid
if n.parent != nil {
@@ -197,16 +244,22 @@ func (n *NodeImpl) UpAdjustMaxVolumeId(vid storage.VolumeId) { //can be negative
}
}
}
-func (n *NodeImpl) GetMaxVolumeId() storage.VolumeId {
+func (n *NodeImpl) GetMaxVolumeId() needle.VolumeId {
return n.maxVolumeId
}
-func (n *NodeImpl) GetVolumeCount() int {
+func (n *NodeImpl) GetVolumeCount() int64 {
return n.volumeCount
}
-func (n *NodeImpl) GetActiveVolumeCount() int {
+func (n *NodeImpl) GetEcShardCount() int64 {
+ return n.ecShardCount
+}
+func (n *NodeImpl) GetRemoteVolumeCount() int64 {
+ return n.remoteVolumeCount
+}
+func (n *NodeImpl) GetActiveVolumeCount() int64 {
return n.activeVolumeCount
}
-func (n *NodeImpl) GetMaxVolumeCount() int {
+func (n *NodeImpl) GetMaxVolumeCount() int64 {
return n.maxVolumeCount
}
@@ -218,6 +271,8 @@ func (n *NodeImpl) LinkChildNode(node Node) {
n.UpAdjustMaxVolumeCountDelta(node.GetMaxVolumeCount())
n.UpAdjustMaxVolumeId(node.GetMaxVolumeId())
n.UpAdjustVolumeCountDelta(node.GetVolumeCount())
+ n.UpAdjustRemoteVolumeCountDelta(node.GetRemoteVolumeCount())
+ n.UpAdjustEcShardCountDelta(node.GetEcShardCount())
n.UpAdjustActiveVolumeCountDelta(node.GetActiveVolumeCount())
node.SetParent(n)
glog.V(0).Infoln(n, "adds child", node.Id())
@@ -232,6 +287,8 @@ func (n *NodeImpl) UnlinkChildNode(nodeId NodeId) {
node.SetParent(nil)
delete(n.children, node.Id())
n.UpAdjustVolumeCountDelta(-node.GetVolumeCount())
+ n.UpAdjustRemoteVolumeCountDelta(-node.GetRemoteVolumeCount())
+ n.UpAdjustEcShardCountDelta(-node.GetEcShardCount())
n.UpAdjustActiveVolumeCountDelta(-node.GetActiveVolumeCount())
n.UpAdjustMaxVolumeCountDelta(-node.GetMaxVolumeCount())
glog.V(0).Infoln(n, "removes", node.Id())
diff --git a/weed/topology/rack.go b/weed/topology/rack.go
index a48d64323..1921c0c05 100644
--- a/weed/topology/rack.go
+++ b/weed/topology/rack.go
@@ -1,6 +1,7 @@
package topology
import (
+ "github.com/chrislusf/seaweedfs/weed/pb/master_pb"
"strconv"
"time"
)
@@ -27,7 +28,7 @@ func (r *Rack) FindDataNode(ip string, port int) *DataNode {
}
return nil
}
-func (r *Rack) GetOrCreateDataNode(ip string, port int, publicUrl string, maxVolumeCount int) *DataNode {
+func (r *Rack) GetOrCreateDataNode(ip string, port int, publicUrl string, maxVolumeCount int64) *DataNode {
for _, c := range r.Children() {
dn := c.(*DataNode)
if dn.MatchLocation(ip, port) {
@@ -58,3 +59,19 @@ func (r *Rack) ToMap() interface{} {
m["DataNodes"] = dns
return m
}
+
+func (r *Rack) ToRackInfo() *master_pb.RackInfo {
+ m := &master_pb.RackInfo{
+ Id: string(r.Id()),
+ VolumeCount: uint64(r.GetVolumeCount()),
+ MaxVolumeCount: uint64(r.GetMaxVolumeCount()),
+ FreeVolumeCount: uint64(r.FreeSpace()),
+ ActiveVolumeCount: uint64(r.GetActiveVolumeCount()),
+ RemoteVolumeCount: uint64(r.GetRemoteVolumeCount()),
+ }
+ for _, c := range r.Children() {
+ dn := c.(*DataNode)
+ m.DataNodeInfos = append(m.DataNodeInfos, dn.ToDataNodeInfo())
+ }
+ return m
+}
diff --git a/weed/topology/store_replicate.go b/weed/topology/store_replicate.go
index c73fb706a..481e72fe0 100644
--- a/weed/topology/store_replicate.go
+++ b/weed/topology/store_replicate.go
@@ -1,7 +1,6 @@
package topology
import (
- "bytes"
"encoding/json"
"errors"
"fmt"
@@ -14,101 +13,113 @@ import (
"github.com/chrislusf/seaweedfs/weed/operation"
"github.com/chrislusf/seaweedfs/weed/security"
"github.com/chrislusf/seaweedfs/weed/storage"
+ "github.com/chrislusf/seaweedfs/weed/storage/needle"
"github.com/chrislusf/seaweedfs/weed/util"
)
-func ReplicatedWrite(masterNode string, s *storage.Store,
- volumeId storage.VolumeId, needle *storage.Needle,
- r *http.Request) (size uint32, errorStatus string) {
+func ReplicatedWrite(masterNode string, s *storage.Store, volumeId needle.VolumeId, n *needle.Needle, r *http.Request) (isUnchanged bool, err error) {
//check JWT
jwt := security.GetJwt(r)
- ret, err := s.Write(volumeId, needle)
- needToReplicate := !s.HasVolume(volumeId)
- if err != nil {
- errorStatus = "Failed to write to local disk (" + err.Error() + ")"
- size = ret
- return
+ // check whether this is a replicated write request
+ var remoteLocations []operation.Location
+ if r.FormValue("type") != "replicate" {
+ // this is the initial request
+ remoteLocations, err = getWritableRemoteReplications(s, volumeId, masterNode)
+ if err != nil {
+ glog.V(0).Infoln(err)
+ return
+ }
}
- needToReplicate = needToReplicate || s.GetVolume(volumeId).NeedToReplicate()
- if !needToReplicate {
- needToReplicate = s.GetVolume(volumeId).NeedToReplicate()
+ // read fsync value
+ fsync := false
+ if r.FormValue("fsync") == "true" {
+ fsync = true
}
- if needToReplicate { //send to other replica locations
- if r.FormValue("type") != "replicate" {
-
- if err = distributedOperation(masterNode, s, volumeId, func(location operation.Location) error {
- u := url.URL{
- Scheme: "http",
- Host: location.Url,
- Path: r.URL.Path,
- }
- q := url.Values{
- "type": {"replicate"},
- }
- if needle.LastModified > 0 {
- q.Set("ts", strconv.FormatUint(needle.LastModified, 10))
- }
- if needle.IsChunkedManifest() {
- q.Set("cm", "true")
+
+ if s.GetVolume(volumeId) != nil {
+ isUnchanged, err = s.WriteVolumeNeedle(volumeId, n, fsync)
+ if err != nil {
+ err = fmt.Errorf("failed to write to local disk: %v", err)
+ glog.V(0).Infoln(err)
+ return
+ }
+ }
+
+ if len(remoteLocations) > 0 { //send to other replica locations
+ if err = distributedOperation(remoteLocations, s, func(location operation.Location) error {
+ u := url.URL{
+ Scheme: "http",
+ Host: location.Url,
+ Path: r.URL.Path,
+ }
+ q := url.Values{
+ "type": {"replicate"},
+ "ttl": {n.Ttl.String()},
+ }
+ if n.LastModified > 0 {
+ q.Set("ts", strconv.FormatUint(n.LastModified, 10))
+ }
+ if n.IsChunkedManifest() {
+ q.Set("cm", "true")
+ }
+ u.RawQuery = q.Encode()
+
+ pairMap := make(map[string]string)
+ if n.HasPairs() {
+ tmpMap := make(map[string]string)
+ err := json.Unmarshal(n.Pairs, &tmpMap)
+ if err != nil {
+ glog.V(0).Infoln("Unmarshal pairs error:", err)
}
- u.RawQuery = q.Encode()
-
- pairMap := make(map[string]string)
- if needle.HasPairs() {
- tmpMap := make(map[string]string)
- err := json.Unmarshal(needle.Pairs, &tmpMap)
- if err != nil {
- glog.V(0).Infoln("Unmarshal pairs error:", err)
- }
- for k, v := range tmpMap {
- pairMap[storage.PairNamePrefix+k] = v
- }
+ for k, v := range tmpMap {
+ pairMap[needle.PairNamePrefix+k] = v
}
-
- _, err := operation.Upload(u.String(),
- string(needle.Name), bytes.NewReader(needle.Data), needle.IsGzipped(), string(needle.Mime),
- pairMap, jwt)
- return err
- }); err != nil {
- ret = 0
- errorStatus = fmt.Sprintf("Failed to write to replicas for volume %d: %v", volumeId, err)
}
+
+ // volume server do not know about encryption
+ _, err := operation.UploadData(u.String(), string(n.Name), false, n.Data, n.IsCompressed(), string(n.Mime), pairMap, jwt)
+ return err
+ }); err != nil {
+ err = fmt.Errorf("failed to write to replicas for volume %d: %v", volumeId, err)
+ glog.V(0).Infoln(err)
}
}
- size = ret
return
}
func ReplicatedDelete(masterNode string, store *storage.Store,
- volumeId storage.VolumeId, n *storage.Needle,
- r *http.Request) (uint32, error) {
+ volumeId needle.VolumeId, n *needle.Needle,
+ r *http.Request) (size uint32, err error) {
//check JWT
jwt := security.GetJwt(r)
- ret, err := store.Delete(volumeId, n)
+ var remoteLocations []operation.Location
+ if r.FormValue("type") != "replicate" {
+ remoteLocations, err = getWritableRemoteReplications(store, volumeId, masterNode)
+ if err != nil {
+ glog.V(0).Infoln(err)
+ return
+ }
+ }
+
+ size, err = store.DeleteVolumeNeedle(volumeId, n)
if err != nil {
glog.V(0).Infoln("delete error:", err)
- return ret, err
+ return
}
- needToReplicate := !store.HasVolume(volumeId)
- if !needToReplicate && ret > 0 {
- needToReplicate = store.GetVolume(volumeId).NeedToReplicate()
- }
- if needToReplicate { //send to other replica locations
- if r.FormValue("type") != "replicate" {
- if err = distributedOperation(masterNode, store, volumeId, func(location operation.Location) error {
- return util.Delete("http://"+location.Url+r.URL.Path+"?type=replicate", jwt)
- }); err != nil {
- ret = 0
- }
+ if len(remoteLocations) > 0 { //send to other replica locations
+ if err = distributedOperation(remoteLocations, store, func(location operation.Location) error {
+ return util.Delete("http://"+location.Url+r.URL.Path+"?type=replicate", string(jwt))
+ }); err != nil {
+ size = 0
}
}
- return ret, err
+ return
}
type DistributedOperationResult map[string]error
@@ -131,32 +142,53 @@ type RemoteResult struct {
Error error
}
-func distributedOperation(masterNode string, store *storage.Store, volumeId storage.VolumeId, op func(location operation.Location) error) error {
- if lookupResult, lookupErr := operation.Lookup(masterNode, volumeId.String()); lookupErr == nil {
- length := 0
- selfUrl := (store.Ip + ":" + strconv.Itoa(store.Port))
- results := make(chan RemoteResult)
+func distributedOperation(locations []operation.Location, store *storage.Store, op func(location operation.Location) error) error {
+ length := len(locations)
+ results := make(chan RemoteResult)
+ for _, location := range locations {
+ go func(location operation.Location, results chan RemoteResult) {
+ results <- RemoteResult{location.Url, op(location)}
+ }(location, results)
+ }
+ ret := DistributedOperationResult(make(map[string]error))
+ for i := 0; i < length; i++ {
+ result := <-results
+ ret[result.Host] = result.Error
+ }
+
+ return ret.Error()
+}
+
+func getWritableRemoteReplications(s *storage.Store, volumeId needle.VolumeId, masterNode string) (
+ remoteLocations []operation.Location, err error) {
+
+ v := s.GetVolume(volumeId)
+ if v != nil && v.ReplicaPlacement.GetCopyCount() == 1 {
+ return
+ }
+
+ // not on local store, or has replications
+ lookupResult, lookupErr := operation.Lookup(masterNode, volumeId.String())
+ if lookupErr == nil {
+ selfUrl := s.Ip + ":" + strconv.Itoa(s.Port)
for _, location := range lookupResult.Locations {
if location.Url != selfUrl {
- length++
- go func(location operation.Location, results chan RemoteResult) {
- results <- RemoteResult{location.Url, op(location)}
- }(location, results)
+ remoteLocations = append(remoteLocations, location)
}
}
- ret := DistributedOperationResult(make(map[string]error))
- for i := 0; i < length; i++ {
- result := <-results
- ret[result.Host] = result.Error
- }
- if volume := store.GetVolume(volumeId); volume != nil {
- if length+1 < volume.ReplicaPlacement.GetCopyCount() {
- return fmt.Errorf("replicating opetations [%d] is less than volume's replication copy count [%d]", length+1, volume.ReplicaPlacement.GetCopyCount())
- }
- }
- return ret.Error()
} else {
- glog.V(0).Infoln()
- return fmt.Errorf("Failed to lookup for %d: %v", volumeId, lookupErr)
+ err = fmt.Errorf("failed to lookup for %d: %v", volumeId, lookupErr)
+ return
}
+
+ if v != nil {
+ // has one local and has remote replications
+ copyCount := v.ReplicaPlacement.GetCopyCount()
+ if len(lookupResult.Locations) < copyCount {
+ err = fmt.Errorf("replicating opetations [%d] is less than volume %d replication copy count [%d]",
+ len(lookupResult.Locations), volumeId, copyCount)
+ }
+ }
+
+ return
}
diff --git a/weed/topology/topology.go b/weed/topology/topology.go
index 4242bfa05..993f444a7 100644
--- a/weed/topology/topology.go
+++ b/weed/topology/topology.go
@@ -2,24 +2,33 @@ package topology
import (
"errors"
+ "fmt"
"math/rand"
+ "sync"
"github.com/chrislusf/raft"
+
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb/master_pb"
"github.com/chrislusf/seaweedfs/weed/sequence"
"github.com/chrislusf/seaweedfs/weed/storage"
+ "github.com/chrislusf/seaweedfs/weed/storage/needle"
+ "github.com/chrislusf/seaweedfs/weed/storage/super_block"
"github.com/chrislusf/seaweedfs/weed/util"
)
type Topology struct {
+ vacuumLockCounter int64
NodeImpl
- collectionMap *util.ConcurrentReadMap
+ collectionMap *util.ConcurrentReadMap
+ ecShardMap map[needle.VolumeId]*EcShardLocations
+ ecShardMapLock sync.RWMutex
pulse int64
- volumeSizeLimit uint64
+ volumeSizeLimit uint64
+ replicationAsMin bool
Sequence sequence.Sequencer
@@ -30,15 +39,17 @@ type Topology struct {
RaftServer raft.Server
}
-func NewTopology(id string, seq sequence.Sequencer, volumeSizeLimit uint64, pulse int) *Topology {
+func NewTopology(id string, seq sequence.Sequencer, volumeSizeLimit uint64, pulse int, replicationAsMin bool) *Topology {
t := &Topology{}
t.id = NodeId(id)
t.nodeType = "Topology"
t.NodeImpl.value = t
t.children = make(map[NodeId]Node)
t.collectionMap = util.NewConcurrentReadMap()
+ t.ecShardMap = make(map[needle.VolumeId]*EcShardLocations)
t.pulse = int64(pulse)
t.volumeSizeLimit = volumeSizeLimit
+ t.replicationAsMin = replicationAsMin
t.Sequence = seq
@@ -50,8 +61,13 @@ func NewTopology(id string, seq sequence.Sequencer, volumeSizeLimit uint64, puls
}
func (t *Topology) IsLeader() bool {
- if leader, e := t.Leader(); e == nil {
- return leader == t.RaftServer.Name()
+ if t.RaftServer != nil {
+ if t.RaftServer.State() == raft.Leader {
+ return true
+ }
+ if t.RaftServer.Leader() == "" {
+ return true
+ }
}
return false
}
@@ -66,13 +82,13 @@ func (t *Topology) Leader() (string, error) {
if l == "" {
// We are a single node cluster, we are the leader
- return t.RaftServer.Name(), errors.New("Raft Server not initialized!")
+ return t.RaftServer.Name(), nil
}
return l, nil
}
-func (t *Topology) Lookup(collection string, vid storage.VolumeId) []*DataNode {
+func (t *Topology) Lookup(collection string, vid needle.VolumeId) (dataNodes []*DataNode) {
//maybe an issue if lots of collections?
if collection == "" {
for _, c := range t.collectionMap.Items() {
@@ -85,14 +101,24 @@ func (t *Topology) Lookup(collection string, vid storage.VolumeId) []*DataNode {
return c.(*Collection).Lookup(vid)
}
}
+
+ if locations, found := t.LookupEcShards(vid); found {
+ for _, loc := range locations.Locations {
+ dataNodes = append(dataNodes, loc...)
+ }
+ return dataNodes
+ }
+
return nil
}
-func (t *Topology) NextVolumeId() storage.VolumeId {
+func (t *Topology) NextVolumeId() (needle.VolumeId, error) {
vid := t.GetMaxVolumeId()
next := vid.Next()
- go t.RaftServer.Do(NewMaxVolumeIdCommand(next))
- return next
+ if _, err := t.RaftServer.Do(NewMaxVolumeIdCommand(next)); err != nil {
+ return 0, err
+ }
+ return next, nil
}
func (t *Topology) HasWritableVolume(option *VolumeGrowOption) bool {
@@ -102,19 +128,43 @@ func (t *Topology) HasWritableVolume(option *VolumeGrowOption) bool {
func (t *Topology) PickForWrite(count uint64, option *VolumeGrowOption) (string, uint64, *DataNode, error) {
vid, count, datanodes, err := t.GetVolumeLayout(option.Collection, option.ReplicaPlacement, option.Ttl).PickForWrite(count, option)
- if err != nil || datanodes.Length() == 0 {
- return "", 0, nil, errors.New("No writable volumes available!")
+ if err != nil {
+ return "", 0, nil, fmt.Errorf("failed to find writable volumes for collection:%s replication:%s ttl:%s error: %v", option.Collection, option.ReplicaPlacement.String(), option.Ttl.String(), err)
+ }
+ if datanodes.Length() == 0 {
+ return "", 0, nil, fmt.Errorf("no writable volumes available for collection:%s replication:%s ttl:%s", option.Collection, option.ReplicaPlacement.String(), option.Ttl.String())
}
- fileId, count := t.Sequence.NextFileId(count)
- return storage.NewFileId(*vid, fileId, rand.Uint32()).String(), count, datanodes.Head(), nil
+ fileId := t.Sequence.NextFileId(count)
+ return needle.NewFileId(*vid, fileId, rand.Uint32()).String(), count, datanodes.Head(), nil
}
-func (t *Topology) GetVolumeLayout(collectionName string, rp *storage.ReplicaPlacement, ttl *storage.TTL) *VolumeLayout {
+func (t *Topology) GetVolumeLayout(collectionName string, rp *super_block.ReplicaPlacement, ttl *needle.TTL) *VolumeLayout {
return t.collectionMap.Get(collectionName, func() interface{} {
- return NewCollection(collectionName, t.volumeSizeLimit)
+ return NewCollection(collectionName, t.volumeSizeLimit, t.replicationAsMin)
}).(*Collection).GetOrCreateVolumeLayout(rp, ttl)
}
+func (t *Topology) ListCollections(includeNormalVolumes, includeEcVolumes bool) (ret []string) {
+
+ mapOfCollections := make(map[string]bool)
+ for _, c := range t.collectionMap.Items() {
+ mapOfCollections[c.(*Collection).Name] = true
+ }
+
+ if includeEcVolumes {
+ t.ecShardMapLock.RLock()
+ for _, ecVolumeLocation := range t.ecShardMap {
+ mapOfCollections[ecVolumeLocation.Collection] = true
+ }
+ t.ecShardMapLock.RUnlock()
+ }
+
+ for k := range mapOfCollections {
+ ret = append(ret, k)
+ }
+ return ret
+}
+
func (t *Topology) FindCollection(collectionName string) (*Collection, bool) {
c, hasCollection := t.collectionMap.Find(collectionName)
if !hasCollection {
@@ -152,6 +202,7 @@ func (t *Topology) GetOrCreateDataCenter(dcName string) *DataCenter {
}
func (t *Topology) SyncDataNodeRegistration(volumes []*master_pb.VolumeInformationMessage, dn *DataNode) (newVolumes, deletedVolumes []storage.VolumeInfo) {
+ // convert into in memory struct storage.VolumeInfo
var volumeInfos []storage.VolumeInfo
for _, v := range volumes {
if vi, err := storage.NewVolumeInfo(v); err == nil {
@@ -160,12 +211,48 @@ func (t *Topology) SyncDataNodeRegistration(volumes []*master_pb.VolumeInformati
glog.V(0).Infof("Fail to convert joined volume information: %v", err)
}
}
- newVolumes, deletedVolumes = dn.UpdateVolumes(volumeInfos)
- for _, v := range volumeInfos {
+ // find out the delta volumes
+ var changedVolumes []storage.VolumeInfo
+ newVolumes, deletedVolumes, changedVolumes = dn.UpdateVolumes(volumeInfos)
+ for _, v := range newVolumes {
t.RegisterVolumeLayout(v, dn)
}
for _, v := range deletedVolumes {
t.UnRegisterVolumeLayout(v, dn)
}
+ for _, v := range changedVolumes {
+ vl := t.GetVolumeLayout(v.Collection, v.ReplicaPlacement, v.Ttl)
+ vl.ensureCorrectWritables(&v)
+ }
+ return
+}
+
+func (t *Topology) IncrementalSyncDataNodeRegistration(newVolumes, deletedVolumes []*master_pb.VolumeShortInformationMessage, dn *DataNode) {
+ var newVis, oldVis []storage.VolumeInfo
+ for _, v := range newVolumes {
+ vi, err := storage.NewVolumeInfoFromShort(v)
+ if err != nil {
+ glog.V(0).Infof("NewVolumeInfoFromShort %v: %v", v, err)
+ continue
+ }
+ newVis = append(newVis, vi)
+ }
+ for _, v := range deletedVolumes {
+ vi, err := storage.NewVolumeInfoFromShort(v)
+ if err != nil {
+ glog.V(0).Infof("NewVolumeInfoFromShort %v: %v", v, err)
+ continue
+ }
+ oldVis = append(oldVis, vi)
+ }
+ dn.DeltaUpdateVolumes(newVis, oldVis)
+
+ for _, vi := range newVis {
+ t.RegisterVolumeLayout(vi, dn)
+ }
+ for _, vi := range oldVis {
+ t.UnRegisterVolumeLayout(vi, dn)
+ }
+
return
}
diff --git a/weed/topology/topology_ec.go b/weed/topology/topology_ec.go
new file mode 100644
index 000000000..93b39bb5d
--- /dev/null
+++ b/weed/topology/topology_ec.go
@@ -0,0 +1,173 @@
+package topology
+
+import (
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/pb/master_pb"
+ "github.com/chrislusf/seaweedfs/weed/storage/erasure_coding"
+ "github.com/chrislusf/seaweedfs/weed/storage/needle"
+)
+
+type EcShardLocations struct {
+ Collection string
+ Locations [erasure_coding.TotalShardsCount][]*DataNode
+}
+
+func (t *Topology) SyncDataNodeEcShards(shardInfos []*master_pb.VolumeEcShardInformationMessage, dn *DataNode) (newShards, deletedShards []*erasure_coding.EcVolumeInfo) {
+ // convert into in memory struct storage.VolumeInfo
+ var shards []*erasure_coding.EcVolumeInfo
+ for _, shardInfo := range shardInfos {
+ shards = append(shards,
+ erasure_coding.NewEcVolumeInfo(
+ shardInfo.Collection,
+ needle.VolumeId(shardInfo.Id),
+ erasure_coding.ShardBits(shardInfo.EcIndexBits)))
+ }
+ // find out the delta volumes
+ newShards, deletedShards = dn.UpdateEcShards(shards)
+ for _, v := range newShards {
+ t.RegisterEcShards(v, dn)
+ }
+ for _, v := range deletedShards {
+ t.UnRegisterEcShards(v, dn)
+ }
+ return
+}
+
+func (t *Topology) IncrementalSyncDataNodeEcShards(newEcShards, deletedEcShards []*master_pb.VolumeEcShardInformationMessage, dn *DataNode) {
+ // convert into in memory struct storage.VolumeInfo
+ var newShards, deletedShards []*erasure_coding.EcVolumeInfo
+ for _, shardInfo := range newEcShards {
+ newShards = append(newShards,
+ erasure_coding.NewEcVolumeInfo(
+ shardInfo.Collection,
+ needle.VolumeId(shardInfo.Id),
+ erasure_coding.ShardBits(shardInfo.EcIndexBits)))
+ }
+ for _, shardInfo := range deletedEcShards {
+ deletedShards = append(deletedShards,
+ erasure_coding.NewEcVolumeInfo(
+ shardInfo.Collection,
+ needle.VolumeId(shardInfo.Id),
+ erasure_coding.ShardBits(shardInfo.EcIndexBits)))
+ }
+
+ dn.DeltaUpdateEcShards(newShards, deletedShards)
+
+ for _, v := range newShards {
+ t.RegisterEcShards(v, dn)
+ }
+ for _, v := range deletedShards {
+ t.UnRegisterEcShards(v, dn)
+ }
+ return
+}
+
+func NewEcShardLocations(collection string) *EcShardLocations {
+ return &EcShardLocations{
+ Collection: collection,
+ }
+}
+
+func (loc *EcShardLocations) AddShard(shardId erasure_coding.ShardId, dn *DataNode) (added bool) {
+ dataNodes := loc.Locations[shardId]
+ for _, n := range dataNodes {
+ if n.Id() == dn.Id() {
+ return false
+ }
+ }
+ loc.Locations[shardId] = append(dataNodes, dn)
+ return true
+}
+
+func (loc *EcShardLocations) DeleteShard(shardId erasure_coding.ShardId, dn *DataNode) (deleted bool) {
+ dataNodes := loc.Locations[shardId]
+ foundIndex := -1
+ for index, n := range dataNodes {
+ if n.Id() == dn.Id() {
+ foundIndex = index
+ }
+ }
+ if foundIndex < 0 {
+ return false
+ }
+ loc.Locations[shardId] = append(dataNodes[:foundIndex], dataNodes[foundIndex+1:]...)
+ return true
+}
+
+func (t *Topology) RegisterEcShards(ecShardInfos *erasure_coding.EcVolumeInfo, dn *DataNode) {
+
+ t.ecShardMapLock.Lock()
+ defer t.ecShardMapLock.Unlock()
+
+ locations, found := t.ecShardMap[ecShardInfos.VolumeId]
+ if !found {
+ locations = NewEcShardLocations(ecShardInfos.Collection)
+ t.ecShardMap[ecShardInfos.VolumeId] = locations
+ }
+ for _, shardId := range ecShardInfos.ShardIds() {
+ locations.AddShard(shardId, dn)
+ }
+}
+
+func (t *Topology) UnRegisterEcShards(ecShardInfos *erasure_coding.EcVolumeInfo, dn *DataNode) {
+ glog.Infof("removing ec shard info:%+v", ecShardInfos)
+ t.ecShardMapLock.Lock()
+ defer t.ecShardMapLock.Unlock()
+
+ locations, found := t.ecShardMap[ecShardInfos.VolumeId]
+ if !found {
+ return
+ }
+ for _, shardId := range ecShardInfos.ShardIds() {
+ locations.DeleteShard(shardId, dn)
+ }
+}
+
+func (t *Topology) LookupEcShards(vid needle.VolumeId) (locations *EcShardLocations, found bool) {
+ t.ecShardMapLock.RLock()
+ defer t.ecShardMapLock.RUnlock()
+
+ locations, found = t.ecShardMap[vid]
+
+ return
+}
+
+func (t *Topology) ListEcServersByCollection(collection string) (dataNodes []string) {
+ t.ecShardMapLock.RLock()
+ defer t.ecShardMapLock.RUnlock()
+
+ dateNodeMap := make(map[string]bool)
+ for _, ecVolumeLocation := range t.ecShardMap {
+ if ecVolumeLocation.Collection == collection {
+ for _, locations := range ecVolumeLocation.Locations {
+ for _, loc := range locations {
+ dateNodeMap[string(loc.Id())] = true
+ }
+ }
+ }
+ }
+
+ for k, _ := range dateNodeMap {
+ dataNodes = append(dataNodes, k)
+ }
+
+ return
+}
+
+func (t *Topology) DeleteEcCollection(collection string) {
+ t.ecShardMapLock.Lock()
+ defer t.ecShardMapLock.Unlock()
+
+ var vids []needle.VolumeId
+ for vid, ecVolumeLocation := range t.ecShardMap {
+ if ecVolumeLocation.Collection == collection {
+ vids = append(vids, vid)
+ }
+ }
+
+ for _, vid := range vids {
+ delete(t.ecShardMap, vid)
+ }
+
+ return
+}
diff --git a/weed/topology/topology_event_handling.go b/weed/topology/topology_event_handling.go
index a301103eb..068bd401e 100644
--- a/weed/topology/topology_event_handling.go
+++ b/weed/topology/topology_event_handling.go
@@ -1,6 +1,7 @@
package topology
import (
+ "google.golang.org/grpc"
"math/rand"
"time"
@@ -8,7 +9,7 @@ import (
"github.com/chrislusf/seaweedfs/weed/storage"
)
-func (t *Topology) StartRefreshWritableVolumes(garbageThreshold float64, preallocate int64) {
+func (t *Topology) StartRefreshWritableVolumes(grpcDialOption grpc.DialOption, garbageThreshold float64, preallocate int64) {
go func() {
for {
if t.IsLeader() {
@@ -22,7 +23,7 @@ func (t *Topology) StartRefreshWritableVolumes(garbageThreshold float64, preallo
c := time.Tick(15 * time.Minute)
for _ = range c {
if t.IsLeader() {
- t.Vacuum(garbageThreshold, preallocate)
+ t.Vacuum(grpcDialOption, garbageThreshold, preallocate)
}
}
}(garbageThreshold)
@@ -58,6 +59,7 @@ func (t *Topology) UnRegisterDataNode(dn *DataNode) {
vl.SetVolumeUnavailable(dn, v.Id)
}
dn.UpAdjustVolumeCountDelta(-dn.GetVolumeCount())
+ dn.UpAdjustRemoteVolumeCountDelta(-dn.GetRemoteVolumeCount())
dn.UpAdjustActiveVolumeCountDelta(-dn.GetActiveVolumeCount())
dn.UpAdjustMaxVolumeCountDelta(-dn.GetMaxVolumeCount())
if dn.Parent() != nil {
diff --git a/weed/topology/topology_map.go b/weed/topology/topology_map.go
index 769ba0e2a..73c55d77d 100644
--- a/weed/topology/topology_map.go
+++ b/weed/topology/topology_map.go
@@ -23,7 +23,7 @@ func (t *Topology) ToMap() interface{} {
}
}
}
- m["layouts"] = layouts
+ m["Layouts"] = layouts
return m
}
@@ -68,9 +68,28 @@ func (t *Topology) ToVolumeLocations() (volumeLocations []*master_pb.VolumeLocat
for _, v := range dn.GetVolumes() {
volumeLocation.NewVids = append(volumeLocation.NewVids, uint32(v.Id))
}
+ for _, s := range dn.GetEcShards() {
+ volumeLocation.NewVids = append(volumeLocation.NewVids, uint32(s.VolumeId))
+ }
volumeLocations = append(volumeLocations, volumeLocation)
}
}
}
return
}
+
+func (t *Topology) ToTopologyInfo() *master_pb.TopologyInfo {
+ m := &master_pb.TopologyInfo{
+ Id: string(t.Id()),
+ VolumeCount: uint64(t.GetVolumeCount()),
+ MaxVolumeCount: uint64(t.GetMaxVolumeCount()),
+ FreeVolumeCount: uint64(t.FreeSpace()),
+ ActiveVolumeCount: uint64(t.GetActiveVolumeCount()),
+ RemoteVolumeCount: uint64(t.GetRemoteVolumeCount()),
+ }
+ for _, c := range t.Children() {
+ dc := c.(*DataCenter)
+ m.DataCenterInfos = append(m.DataCenterInfos, dc.ToDataCenterInfo())
+ }
+ return m
+}
diff --git a/weed/topology/topology_test.go b/weed/topology/topology_test.go
index 07dc9c67b..2fe381ca2 100644
--- a/weed/topology/topology_test.go
+++ b/weed/topology/topology_test.go
@@ -4,6 +4,9 @@ import (
"github.com/chrislusf/seaweedfs/weed/pb/master_pb"
"github.com/chrislusf/seaweedfs/weed/sequence"
"github.com/chrislusf/seaweedfs/weed/storage"
+ "github.com/chrislusf/seaweedfs/weed/storage/needle"
+ "github.com/chrislusf/seaweedfs/weed/storage/super_block"
+
"testing"
)
@@ -20,7 +23,7 @@ func TestRemoveDataCenter(t *testing.T) {
}
func TestHandlingVolumeServerHeartbeat(t *testing.T) {
- topo := NewTopology("weedfs", sequence.NewMemorySequencer(), 32*1024, 5)
+ topo := NewTopology("weedfs", sequence.NewMemorySequencer(), 32*1024, 5, false)
dc := topo.GetOrCreateDataCenter("dc1")
rack := dc.GetOrCreateRack("rack1")
@@ -39,7 +42,7 @@ func TestHandlingVolumeServerHeartbeat(t *testing.T) {
DeletedByteCount: 34524,
ReadOnly: false,
ReplicaPlacement: uint32(0),
- Version: uint32(1),
+ Version: uint32(needle.CurrentVersion),
Ttl: 0,
}
volumeMessages = append(volumeMessages, volumeMessage)
@@ -47,8 +50,8 @@ func TestHandlingVolumeServerHeartbeat(t *testing.T) {
topo.SyncDataNodeRegistration(volumeMessages, dn)
- assert(t, "activeVolumeCount1", topo.activeVolumeCount, volumeCount)
- assert(t, "volumeCount", topo.volumeCount, volumeCount)
+ assert(t, "activeVolumeCount1", int(topo.activeVolumeCount), volumeCount)
+ assert(t, "volumeCount", int(topo.volumeCount), volumeCount)
}
{
@@ -64,20 +67,68 @@ func TestHandlingVolumeServerHeartbeat(t *testing.T) {
DeletedByteCount: 345240,
ReadOnly: false,
ReplicaPlacement: uint32(0),
- Version: uint32(1),
+ Version: uint32(needle.CurrentVersion),
Ttl: 0,
}
volumeMessages = append(volumeMessages, volumeMessage)
}
topo.SyncDataNodeRegistration(volumeMessages, dn)
- assert(t, "activeVolumeCount1", topo.activeVolumeCount, volumeCount)
- assert(t, "volumeCount", topo.volumeCount, volumeCount)
+ //rp, _ := storage.NewReplicaPlacementFromString("000")
+ //layout := topo.GetVolumeLayout("", rp, needle.EMPTY_TTL)
+ //assert(t, "writables", len(layout.writables), volumeCount)
+
+ assert(t, "activeVolumeCount1", int(topo.activeVolumeCount), volumeCount)
+ assert(t, "volumeCount", int(topo.volumeCount), volumeCount)
+ }
+
+ {
+ volumeCount := 6
+ newVolumeShortMessage := &master_pb.VolumeShortInformationMessage{
+ Id: uint32(3),
+ Collection: "",
+ ReplicaPlacement: uint32(0),
+ Version: uint32(needle.CurrentVersion),
+ Ttl: 0,
+ }
+ topo.IncrementalSyncDataNodeRegistration(
+ []*master_pb.VolumeShortInformationMessage{newVolumeShortMessage},
+ nil,
+ dn)
+ rp, _ := super_block.NewReplicaPlacementFromString("000")
+ layout := topo.GetVolumeLayout("", rp, needle.EMPTY_TTL)
+ assert(t, "writables after repeated add", len(layout.writables), volumeCount)
+
+ assert(t, "activeVolumeCount1", int(topo.activeVolumeCount), volumeCount)
+ assert(t, "volumeCount", int(topo.volumeCount), volumeCount)
+
+ topo.IncrementalSyncDataNodeRegistration(
+ nil,
+ []*master_pb.VolumeShortInformationMessage{newVolumeShortMessage},
+ dn)
+ assert(t, "writables after deletion", len(layout.writables), volumeCount-1)
+ assert(t, "activeVolumeCount1", int(topo.activeVolumeCount), volumeCount-1)
+ assert(t, "volumeCount", int(topo.volumeCount), volumeCount-1)
+
+ topo.IncrementalSyncDataNodeRegistration(
+ []*master_pb.VolumeShortInformationMessage{newVolumeShortMessage},
+ nil,
+ dn)
+
+ for vid, _ := range layout.vid2location {
+ println("after add volume id", vid)
+ }
+ for _, vid := range layout.writables {
+ println("after add writable volume id", vid)
+ }
+
+ assert(t, "writables after add back", len(layout.writables), volumeCount)
+
}
topo.UnRegisterDataNode(dn)
- assert(t, "activeVolumeCount2", topo.activeVolumeCount, 0)
+ assert(t, "activeVolumeCount2", int(topo.activeVolumeCount), 0)
}
@@ -89,27 +140,28 @@ func assert(t *testing.T, message string, actual, expected int) {
func TestAddRemoveVolume(t *testing.T) {
- topo := NewTopology("weedfs", sequence.NewMemorySequencer(), 32*1024, 5)
+ topo := NewTopology("weedfs", sequence.NewMemorySequencer(), 32*1024, 5, false)
dc := topo.GetOrCreateDataCenter("dc1")
rack := dc.GetOrCreateRack("rack1")
dn := rack.GetOrCreateDataNode("127.0.0.1", 34534, "127.0.0.1", 25)
v := storage.VolumeInfo{
- Id: storage.VolumeId(1),
+ Id: needle.VolumeId(1),
Size: 100,
Collection: "xcollection",
FileCount: 123,
DeleteCount: 23,
DeletedByteCount: 45,
ReadOnly: false,
- Version: storage.CurrentVersion,
- ReplicaPlacement: &storage.ReplicaPlacement{},
- Ttl: storage.EMPTY_TTL,
+ Version: needle.CurrentVersion,
+ ReplicaPlacement: &super_block.ReplicaPlacement{},
+ Ttl: needle.EMPTY_TTL,
}
dn.UpdateVolumes([]storage.VolumeInfo{v})
topo.RegisterVolumeLayout(v, dn)
+ topo.RegisterVolumeLayout(v, dn)
if _, hasCollection := topo.FindCollection(v.Collection); !hasCollection {
t.Errorf("collection %v should exist", v.Collection)
diff --git a/weed/topology/topology_vacuum.go b/weed/topology/topology_vacuum.go
index d6b09314b..789a01330 100644
--- a/weed/topology/topology_vacuum.go
+++ b/weed/topology/topology_vacuum.go
@@ -2,31 +2,38 @@ package topology
import (
"context"
+ "sync/atomic"
"time"
+ "google.golang.org/grpc"
+
+ "github.com/chrislusf/seaweedfs/weed/storage/needle"
+
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/operation"
"github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
- "github.com/chrislusf/seaweedfs/weed/storage"
)
-func batchVacuumVolumeCheck(vl *VolumeLayout, vid storage.VolumeId, locationlist *VolumeLocationList, garbageThreshold float64) bool {
- ch := make(chan bool, locationlist.Length())
+func batchVacuumVolumeCheck(grpcDialOption grpc.DialOption, vl *VolumeLayout, vid needle.VolumeId,
+ locationlist *VolumeLocationList, garbageThreshold float64) (*VolumeLocationList, bool) {
+ ch := make(chan int, locationlist.Length())
+ errCount := int32(0)
for index, dn := range locationlist.list {
- go func(index int, url string, vid storage.VolumeId) {
- err := operation.WithVolumeServerClient(url, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
- ctx, cancel := context.WithTimeout(context.Background(), time.Duration(5*time.Second))
- defer cancel()
-
- resp, err := volumeServerClient.VacuumVolumeCheck(ctx, &volume_server_pb.VacuumVolumeCheckRequest{
- VolumdId: uint32(vid),
+ go func(index int, url string, vid needle.VolumeId) {
+ err := operation.WithVolumeServerClient(url, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
+ resp, err := volumeServerClient.VacuumVolumeCheck(context.Background(), &volume_server_pb.VacuumVolumeCheckRequest{
+ VolumeId: uint32(vid),
})
if err != nil {
- ch <- false
+ atomic.AddInt32(&errCount, 1)
+ ch <- -1
return err
}
- isNeeded := resp.GarbageRatio > garbageThreshold
- ch <- isNeeded
+ if resp.GarbageRatio >= garbageThreshold {
+ ch <- index
+ } else {
+ ch <- -1
+ }
return nil
})
if err != nil {
@@ -34,27 +41,33 @@ func batchVacuumVolumeCheck(vl *VolumeLayout, vid storage.VolumeId, locationlist
}
}(index, dn.Url(), vid)
}
- isCheckSuccess := true
- for _ = range locationlist.list {
+ vacuumLocationList := NewVolumeLocationList()
+ for range locationlist.list {
select {
- case canVacuum := <-ch:
- isCheckSuccess = isCheckSuccess && canVacuum
+ case index := <-ch:
+ if index != -1 {
+ vacuumLocationList.list = append(vacuumLocationList.list, locationlist.list[index])
+ }
case <-time.After(30 * time.Minute):
- isCheckSuccess = false
- break
+ return vacuumLocationList, false
}
}
- return isCheckSuccess
+ return vacuumLocationList, errCount == 0 && len(vacuumLocationList.list) > 0
}
-func batchVacuumVolumeCompact(vl *VolumeLayout, vid storage.VolumeId, locationlist *VolumeLocationList, preallocate int64) bool {
+func batchVacuumVolumeCompact(grpcDialOption grpc.DialOption, vl *VolumeLayout, vid needle.VolumeId,
+ locationlist *VolumeLocationList, preallocate int64) bool {
+ vl.accessLock.Lock()
vl.removeFromWritable(vid)
+ vl.accessLock.Unlock()
+
ch := make(chan bool, locationlist.Length())
for index, dn := range locationlist.list {
- go func(index int, url string, vid storage.VolumeId) {
+ go func(index int, url string, vid needle.VolumeId) {
glog.V(0).Infoln(index, "Start vacuuming", vid, "on", url)
- err := operation.WithVolumeServerClient(url, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
+ err := operation.WithVolumeServerClient(url, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
_, err := volumeServerClient.VacuumVolumeCompact(context.Background(), &volume_server_pb.VacuumVolumeCompactRequest{
- VolumdId: uint32(vid),
+ VolumeId: uint32(vid),
+ Preallocate: preallocate,
})
return err
})
@@ -68,45 +81,50 @@ func batchVacuumVolumeCompact(vl *VolumeLayout, vid storage.VolumeId, locationli
}(index, dn.Url(), vid)
}
isVacuumSuccess := true
- for _ = range locationlist.list {
+ for range locationlist.list {
select {
case canCommit := <-ch:
isVacuumSuccess = isVacuumSuccess && canCommit
case <-time.After(30 * time.Minute):
- isVacuumSuccess = false
- break
+ return false
}
}
return isVacuumSuccess
}
-func batchVacuumVolumeCommit(vl *VolumeLayout, vid storage.VolumeId, locationlist *VolumeLocationList) bool {
+func batchVacuumVolumeCommit(grpcDialOption grpc.DialOption, vl *VolumeLayout, vid needle.VolumeId, locationlist *VolumeLocationList) bool {
isCommitSuccess := true
+ isReadOnly := false
for _, dn := range locationlist.list {
- glog.V(0).Infoln("Start Commiting vacuum", vid, "on", dn.Url())
- err := operation.WithVolumeServerClient(dn.Url(), func(volumeServerClient volume_server_pb.VolumeServerClient) error {
- _, err := volumeServerClient.VacuumVolumeCommit(context.Background(), &volume_server_pb.VacuumVolumeCommitRequest{
- VolumdId: uint32(vid),
+ glog.V(0).Infoln("Start Committing vacuum", vid, "on", dn.Url())
+ err := operation.WithVolumeServerClient(dn.Url(), grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
+ resp, err := volumeServerClient.VacuumVolumeCommit(context.Background(), &volume_server_pb.VacuumVolumeCommitRequest{
+ VolumeId: uint32(vid),
})
+ if resp.IsReadOnly {
+ isReadOnly = true
+ }
return err
})
if err != nil {
glog.Errorf("Error when committing vacuum %d on %s: %v", vid, dn.Url(), err)
isCommitSuccess = false
} else {
- glog.V(0).Infof("Complete Commiting vacuum %d on %s", vid, dn.Url())
+ glog.V(0).Infof("Complete Committing vacuum %d on %s", vid, dn.Url())
}
- if isCommitSuccess {
- vl.SetVolumeAvailable(dn, vid)
+ }
+ if isCommitSuccess {
+ for _, dn := range locationlist.list {
+ vl.SetVolumeAvailable(dn, vid, isReadOnly)
}
}
return isCommitSuccess
}
-func batchVacuumVolumeCleanup(vl *VolumeLayout, vid storage.VolumeId, locationlist *VolumeLocationList) {
+func batchVacuumVolumeCleanup(grpcDialOption grpc.DialOption, vl *VolumeLayout, vid needle.VolumeId, locationlist *VolumeLocationList) {
for _, dn := range locationlist.list {
glog.V(0).Infoln("Start cleaning up", vid, "on", dn.Url())
- err := operation.WithVolumeServerClient(dn.Url(), func(volumeServerClient volume_server_pb.VolumeServerClient) error {
+ err := operation.WithVolumeServerClient(dn.Url(), grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
_, err := volumeServerClient.VacuumVolumeCleanup(context.Background(), &volume_server_pb.VacuumVolumeCleanupRequest{
- VolumdId: uint32(vid),
+ VolumeId: uint32(vid),
})
return err
})
@@ -118,24 +136,34 @@ func batchVacuumVolumeCleanup(vl *VolumeLayout, vid storage.VolumeId, locationli
}
}
-func (t *Topology) Vacuum(garbageThreshold float64, preallocate int64) int {
+func (t *Topology) Vacuum(grpcDialOption grpc.DialOption, garbageThreshold float64, preallocate int64) int {
+
+ // if there is vacuum going on, return immediately
+ swapped := atomic.CompareAndSwapInt64(&t.vacuumLockCounter, 0, 1)
+ if !swapped {
+ return 0
+ }
+ defer atomic.StoreInt64(&t.vacuumLockCounter, 0)
+
+ // now only one vacuum process going on
+
glog.V(1).Infof("Start vacuum on demand with threshold: %f", garbageThreshold)
for _, col := range t.collectionMap.Items() {
c := col.(*Collection)
for _, vl := range c.storageType2VolumeLayout.Items() {
if vl != nil {
volumeLayout := vl.(*VolumeLayout)
- vacuumOneVolumeLayout(volumeLayout, c, garbageThreshold, preallocate)
+ vacuumOneVolumeLayout(grpcDialOption, volumeLayout, c, garbageThreshold, preallocate)
}
}
}
return 0
}
-func vacuumOneVolumeLayout(volumeLayout *VolumeLayout, c *Collection, garbageThreshold float64, preallocate int64) {
+func vacuumOneVolumeLayout(grpcDialOption grpc.DialOption, volumeLayout *VolumeLayout, c *Collection, garbageThreshold float64, preallocate int64) {
volumeLayout.accessLock.RLock()
- tmpMap := make(map[storage.VolumeId]*VolumeLocationList)
+ tmpMap := make(map[needle.VolumeId]*VolumeLocationList)
for vid, locationList := range volumeLayout.vid2location {
tmpMap[vid] = locationList
}
@@ -152,11 +180,12 @@ func vacuumOneVolumeLayout(volumeLayout *VolumeLayout, c *Collection, garbageThr
}
glog.V(2).Infof("check vacuum on collection:%s volume:%d", c.Name, vid)
- if batchVacuumVolumeCheck(volumeLayout, vid, locationList, garbageThreshold) {
- if batchVacuumVolumeCompact(volumeLayout, vid, locationList, preallocate) {
- batchVacuumVolumeCommit(volumeLayout, vid, locationList)
+ if vacuumLocationList, needVacuum := batchVacuumVolumeCheck(
+ grpcDialOption, volumeLayout, vid, locationList, garbageThreshold); needVacuum {
+ if batchVacuumVolumeCompact(grpcDialOption, volumeLayout, vid, vacuumLocationList, preallocate) {
+ batchVacuumVolumeCommit(grpcDialOption, volumeLayout, vid, vacuumLocationList)
} else {
- batchVacuumVolumeCleanup(volumeLayout, vid, locationList)
+ batchVacuumVolumeCleanup(grpcDialOption, volumeLayout, vid, vacuumLocationList)
}
}
}
diff --git a/weed/topology/volume_growth.go b/weed/topology/volume_growth.go
index 9bf013ca6..58b5702bf 100644
--- a/weed/topology/volume_growth.go
+++ b/weed/topology/volume_growth.go
@@ -5,6 +5,12 @@ import (
"math/rand"
"sync"
+ "github.com/chrislusf/seaweedfs/weed/storage/needle"
+ "github.com/chrislusf/seaweedfs/weed/storage/super_block"
+ "github.com/chrislusf/seaweedfs/weed/util"
+
+ "google.golang.org/grpc"
+
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/storage"
)
@@ -18,13 +24,14 @@ This package is created to resolve these replica placement issues:
*/
type VolumeGrowOption struct {
- Collection string
- ReplicaPlacement *storage.ReplicaPlacement
- Ttl *storage.TTL
- Prealloacte int64
- DataCenter string
- Rack string
- DataNode string
+ Collection string
+ ReplicaPlacement *super_block.ReplicaPlacement
+ Ttl *needle.TTL
+ Prealloacte int64
+ DataCenter string
+ Rack string
+ DataNode string
+ MemoryMapMaxSizeMb uint32
}
type VolumeGrowth struct {
@@ -42,47 +49,59 @@ func NewDefaultVolumeGrowth() *VolumeGrowth {
// one replication type may need rp.GetCopyCount() actual volumes
// given copyCount, how many logical volumes to create
func (vg *VolumeGrowth) findVolumeCount(copyCount int) (count int) {
+ v := util.GetViper()
+ v.SetDefault("master.volume_growth.copy_1", 7)
+ v.SetDefault("master.volume_growth.copy_2", 6)
+ v.SetDefault("master.volume_growth.copy_3", 3)
+ v.SetDefault("master.volume_growth.copy_other", 1)
switch copyCount {
case 1:
- count = 7
+ count = v.GetInt("master.volume_growth.copy_1")
case 2:
- count = 6
+ count = v.GetInt("master.volume_growth.copy_2")
case 3:
- count = 3
+ count = v.GetInt("master.volume_growth.copy_3")
default:
- count = 1
+ count = v.GetInt("master.volume_growth.copy_other")
}
return
}
-func (vg *VolumeGrowth) AutomaticGrowByType(option *VolumeGrowOption, topo *Topology) (count int, err error) {
- count, err = vg.GrowByCountAndType(vg.findVolumeCount(option.ReplicaPlacement.GetCopyCount()), option, topo)
+func (vg *VolumeGrowth) AutomaticGrowByType(option *VolumeGrowOption, grpcDialOption grpc.DialOption, topo *Topology, targetCount int) (count int, err error) {
+ if targetCount == 0 {
+ targetCount = vg.findVolumeCount(option.ReplicaPlacement.GetCopyCount())
+ }
+ count, err = vg.GrowByCountAndType(grpcDialOption, targetCount, option, topo)
if count > 0 && count%option.ReplicaPlacement.GetCopyCount() == 0 {
return count, nil
}
return count, err
}
-func (vg *VolumeGrowth) GrowByCountAndType(targetCount int, option *VolumeGrowOption, topo *Topology) (counter int, err error) {
+func (vg *VolumeGrowth) GrowByCountAndType(grpcDialOption grpc.DialOption, targetCount int, option *VolumeGrowOption, topo *Topology) (counter int, err error) {
vg.accessLock.Lock()
defer vg.accessLock.Unlock()
for i := 0; i < targetCount; i++ {
- if c, e := vg.findAndGrow(topo, option); e == nil {
+ if c, e := vg.findAndGrow(grpcDialOption, topo, option); e == nil {
counter += c
} else {
+ glog.V(0).Infof("create %d volume, created %d: %v", targetCount, counter, e)
return counter, e
}
}
return
}
-func (vg *VolumeGrowth) findAndGrow(topo *Topology, option *VolumeGrowOption) (int, error) {
+func (vg *VolumeGrowth) findAndGrow(grpcDialOption grpc.DialOption, topo *Topology, option *VolumeGrowOption) (int, error) {
servers, e := vg.findEmptySlotsForOneVolume(topo, option)
if e != nil {
return 0, e
}
- vid := topo.NextVolumeId()
- err := vg.grow(topo, vid, option, servers...)
+ vid, raftErr := topo.NextVolumeId()
+ if raftErr != nil {
+ return 0, raftErr
+ }
+ err := vg.grow(grpcDialOption, topo, vid, option, servers...)
return len(servers), err
}
@@ -94,14 +113,14 @@ func (vg *VolumeGrowth) findAndGrow(topo *Topology, option *VolumeGrowOption) (i
func (vg *VolumeGrowth) findEmptySlotsForOneVolume(topo *Topology, option *VolumeGrowOption) (servers []*DataNode, err error) {
//find main datacenter and other data centers
rp := option.ReplicaPlacement
- mainDataCenter, otherDataCenters, dc_err := topo.RandomlyPickNodes(rp.DiffDataCenterCount+1, func(node Node) error {
+ mainDataCenter, otherDataCenters, dc_err := topo.PickNodesByWeight(rp.DiffDataCenterCount+1, func(node Node) error {
if option.DataCenter != "" && node.IsDataCenter() && node.Id() != NodeId(option.DataCenter) {
return fmt.Errorf("Not matching preferred data center:%s", option.DataCenter)
}
if len(node.Children()) < rp.DiffRackCount+1 {
return fmt.Errorf("Only has %d racks, not enough for %d.", len(node.Children()), rp.DiffRackCount+1)
}
- if node.FreeSpace() < rp.DiffRackCount+rp.SameRackCount+1 {
+ if node.FreeSpace() < int64(rp.DiffRackCount+rp.SameRackCount+1) {
return fmt.Errorf("Free:%d < Expected:%d", node.FreeSpace(), rp.DiffRackCount+rp.SameRackCount+1)
}
possibleRacksCount := 0
@@ -126,11 +145,11 @@ func (vg *VolumeGrowth) findEmptySlotsForOneVolume(topo *Topology, option *Volum
}
//find main rack and other racks
- mainRack, otherRacks, rackErr := mainDataCenter.(*DataCenter).RandomlyPickNodes(rp.DiffRackCount+1, func(node Node) error {
+ mainRack, otherRacks, rackErr := mainDataCenter.(*DataCenter).PickNodesByWeight(rp.DiffRackCount+1, func(node Node) error {
if option.Rack != "" && node.IsRack() && node.Id() != NodeId(option.Rack) {
return fmt.Errorf("Not matching preferred rack:%s", option.Rack)
}
- if node.FreeSpace() < rp.SameRackCount+1 {
+ if node.FreeSpace() < int64(rp.SameRackCount+1) {
return fmt.Errorf("Free:%d < Expected:%d", node.FreeSpace(), rp.SameRackCount+1)
}
if len(node.Children()) < rp.SameRackCount+1 {
@@ -153,7 +172,7 @@ func (vg *VolumeGrowth) findEmptySlotsForOneVolume(topo *Topology, option *Volum
}
//find main rack and other racks
- mainServer, otherServers, serverErr := mainRack.(*Rack).RandomlyPickNodes(rp.SameRackCount+1, func(node Node) error {
+ mainServer, otherServers, serverErr := mainRack.(*Rack).PickNodesByWeight(rp.SameRackCount+1, func(node Node) error {
if option.DataNode != "" && node.IsDataNode() && node.Id() != NodeId(option.DataNode) {
return fmt.Errorf("Not matching preferred data node:%s", option.DataNode)
}
@@ -171,7 +190,7 @@ func (vg *VolumeGrowth) findEmptySlotsForOneVolume(topo *Topology, option *Volum
servers = append(servers, server.(*DataNode))
}
for _, rack := range otherRacks {
- r := rand.Intn(rack.FreeSpace())
+ r := rand.Int63n(rack.FreeSpace())
if server, e := rack.ReserveOneVolume(r); e == nil {
servers = append(servers, server)
} else {
@@ -179,7 +198,7 @@ func (vg *VolumeGrowth) findEmptySlotsForOneVolume(topo *Topology, option *Volum
}
}
for _, datacenter := range otherDataCenters {
- r := rand.Intn(datacenter.FreeSpace())
+ r := rand.Int63n(datacenter.FreeSpace())
if server, e := datacenter.ReserveOneVolume(r); e == nil {
servers = append(servers, server)
} else {
@@ -189,16 +208,16 @@ func (vg *VolumeGrowth) findEmptySlotsForOneVolume(topo *Topology, option *Volum
return
}
-func (vg *VolumeGrowth) grow(topo *Topology, vid storage.VolumeId, option *VolumeGrowOption, servers ...*DataNode) error {
+func (vg *VolumeGrowth) grow(grpcDialOption grpc.DialOption, topo *Topology, vid needle.VolumeId, option *VolumeGrowOption, servers ...*DataNode) error {
for _, server := range servers {
- if err := AllocateVolume(server, vid, option); err == nil {
+ if err := AllocateVolume(server, grpcDialOption, vid, option); err == nil {
vi := storage.VolumeInfo{
Id: vid,
Size: 0,
Collection: option.Collection,
ReplicaPlacement: option.ReplicaPlacement,
Ttl: option.Ttl,
- Version: storage.CurrentVersion,
+ Version: needle.CurrentVersion,
}
server.AddOrUpdateVolume(vi)
topo.RegisterVolumeLayout(vi, server)
diff --git a/weed/topology/volume_growth_test.go b/weed/topology/volume_growth_test.go
index f983df1ec..bc9083fd2 100644
--- a/weed/topology/volume_growth_test.go
+++ b/weed/topology/volume_growth_test.go
@@ -7,6 +7,8 @@ import (
"github.com/chrislusf/seaweedfs/weed/sequence"
"github.com/chrislusf/seaweedfs/weed/storage"
+ "github.com/chrislusf/seaweedfs/weed/storage/needle"
+ "github.com/chrislusf/seaweedfs/weed/storage/super_block"
)
var topologyLayout = `
@@ -79,7 +81,7 @@ func setup(topologyLayout string) *Topology {
fmt.Println("data:", data)
//need to connect all nodes first before server adding volumes
- topo := NewTopology("weedfs", sequence.NewMemorySequencer(), 32*1024, 5)
+ topo := NewTopology("weedfs", sequence.NewMemorySequencer(), 32*1024, 5, false)
mTopology := data.(map[string]interface{})
for dcKey, dcValue := range mTopology {
dc := NewDataCenter(dcKey)
@@ -96,12 +98,12 @@ func setup(topologyLayout string) *Topology {
for _, v := range serverMap["volumes"].([]interface{}) {
m := v.(map[string]interface{})
vi := storage.VolumeInfo{
- Id: storage.VolumeId(int64(m["id"].(float64))),
+ Id: needle.VolumeId(int64(m["id"].(float64))),
Size: uint64(m["size"].(float64)),
- Version: storage.CurrentVersion}
+ Version: needle.CurrentVersion}
server.AddOrUpdateVolume(vi)
}
- server.UpAdjustMaxVolumeCountDelta(int(serverMap["limit"].(float64)))
+ server.UpAdjustMaxVolumeCountDelta(int64(serverMap["limit"].(float64)))
}
}
}
@@ -112,7 +114,7 @@ func setup(topologyLayout string) *Topology {
func TestFindEmptySlotsForOneVolume(t *testing.T) {
topo := setup(topologyLayout)
vg := NewDefaultVolumeGrowth()
- rp, _ := storage.NewReplicaPlacementFromString("002")
+ rp, _ := super_block.NewReplicaPlacementFromString("002")
volumeGrowOption := &VolumeGrowOption{
Collection: "",
ReplicaPlacement: rp,
@@ -129,3 +131,212 @@ func TestFindEmptySlotsForOneVolume(t *testing.T) {
fmt.Println("assigned node :", server.Id())
}
}
+
+var topologyLayout2 = `
+{
+ "dc1":{
+ "rack1":{
+ "server111":{
+ "volumes":[
+ {"id":1, "size":12312},
+ {"id":2, "size":12312},
+ {"id":3, "size":12312}
+ ],
+ "limit":300
+ },
+ "server112":{
+ "volumes":[
+ {"id":4, "size":12312},
+ {"id":5, "size":12312},
+ {"id":6, "size":12312}
+ ],
+ "limit":300
+ },
+ "server113":{
+ "volumes":[],
+ "limit":300
+ },
+ "server114":{
+ "volumes":[],
+ "limit":300
+ },
+ "server115":{
+ "volumes":[],
+ "limit":300
+ },
+ "server116":{
+ "volumes":[],
+ "limit":300
+ }
+ },
+ "rack2":{
+ "server121":{
+ "volumes":[
+ {"id":4, "size":12312},
+ {"id":5, "size":12312},
+ {"id":6, "size":12312}
+ ],
+ "limit":300
+ },
+ "server122":{
+ "volumes":[],
+ "limit":300
+ },
+ "server123":{
+ "volumes":[
+ {"id":2, "size":12312},
+ {"id":3, "size":12312},
+ {"id":4, "size":12312}
+ ],
+ "limit":300
+ },
+ "server124":{
+ "volumes":[],
+ "limit":300
+ },
+ "server125":{
+ "volumes":[],
+ "limit":300
+ },
+ "server126":{
+ "volumes":[],
+ "limit":300
+ }
+ },
+ "rack3":{
+ "server131":{
+ "volumes":[],
+ "limit":300
+ },
+ "server132":{
+ "volumes":[],
+ "limit":300
+ },
+ "server133":{
+ "volumes":[],
+ "limit":300
+ },
+ "server134":{
+ "volumes":[],
+ "limit":300
+ },
+ "server135":{
+ "volumes":[],
+ "limit":300
+ },
+ "server136":{
+ "volumes":[],
+ "limit":300
+ }
+ }
+ }
+}
+`
+
+func TestReplication011(t *testing.T) {
+ topo := setup(topologyLayout2)
+ vg := NewDefaultVolumeGrowth()
+ rp, _ := super_block.NewReplicaPlacementFromString("011")
+ volumeGrowOption := &VolumeGrowOption{
+ Collection: "MAIL",
+ ReplicaPlacement: rp,
+ DataCenter: "dc1",
+ Rack: "",
+ DataNode: "",
+ }
+ servers, err := vg.findEmptySlotsForOneVolume(topo, volumeGrowOption)
+ if err != nil {
+ fmt.Println("finding empty slots error :", err)
+ t.Fail()
+ }
+ for _, server := range servers {
+ fmt.Println("assigned node :", server.Id())
+ }
+}
+
+var topologyLayout3 = `
+{
+ "dc1":{
+ "rack1":{
+ "server111":{
+ "volumes":[],
+ "limit":2000
+ }
+ }
+ },
+ "dc2":{
+ "rack2":{
+ "server222":{
+ "volumes":[],
+ "limit":2000
+ }
+ }
+ },
+ "dc3":{
+ "rack3":{
+ "server333":{
+ "volumes":[],
+ "limit":1000
+ }
+ }
+ },
+ "dc4":{
+ "rack4":{
+ "server444":{
+ "volumes":[],
+ "limit":1000
+ }
+ }
+ },
+ "dc5":{
+ "rack5":{
+ "server555":{
+ "volumes":[],
+ "limit":500
+ }
+ }
+ },
+ "dc6":{
+ "rack6":{
+ "server666":{
+ "volumes":[],
+ "limit":500
+ }
+ }
+ }
+}
+`
+
+func TestFindEmptySlotsForOneVolumeScheduleByWeight(t *testing.T) {
+ topo := setup(topologyLayout3)
+ vg := NewDefaultVolumeGrowth()
+ rp, _ := super_block.NewReplicaPlacementFromString("100")
+ volumeGrowOption := &VolumeGrowOption{
+ Collection: "Weight",
+ ReplicaPlacement: rp,
+ DataCenter: "",
+ Rack: "",
+ DataNode: "",
+ }
+
+ distribution := map[NodeId]int{}
+ // assign 1000 volumes
+ for i := 0; i < 1000; i++ {
+ servers, err := vg.findEmptySlotsForOneVolume(topo, volumeGrowOption)
+ if err != nil {
+ fmt.Println("finding empty slots error :", err)
+ t.Fail()
+ }
+ for _, server := range servers {
+ // fmt.Println("assigned node :", server.Id())
+ if _, ok := distribution[server.id]; !ok {
+ distribution[server.id] = 0
+ }
+ distribution[server.id] += 1
+ }
+ }
+
+ for k, v := range distribution {
+ fmt.Printf("%s : %d\n", k, v)
+ }
+}
diff --git a/weed/topology/volume_layout.go b/weed/topology/volume_layout.go
index 71a071e2f..9e84fd2da 100644
--- a/weed/topology/volume_layout.go
+++ b/weed/topology/volume_layout.go
@@ -9,17 +9,20 @@ import (
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/storage"
+ "github.com/chrislusf/seaweedfs/weed/storage/needle"
+ "github.com/chrislusf/seaweedfs/weed/storage/super_block"
)
// mapping from volume to its locations, inverted from server to volume
type VolumeLayout struct {
- rp *storage.ReplicaPlacement
- ttl *storage.TTL
- vid2location map[storage.VolumeId]*VolumeLocationList
- writables []storage.VolumeId // transient array of writable volume id
- readonlyVolumes map[storage.VolumeId]bool // transient set of readonly volumes
- oversizedVolumes map[storage.VolumeId]bool // set of oversized volumes
+ rp *super_block.ReplicaPlacement
+ ttl *needle.TTL
+ vid2location map[needle.VolumeId]*VolumeLocationList
+ writables []needle.VolumeId // transient array of writable volume id
+ readonlyVolumes map[needle.VolumeId]bool // transient set of readonly volumes
+ oversizedVolumes map[needle.VolumeId]bool // set of oversized volumes
volumeSizeLimit uint64
+ replicationAsMin bool
accessLock sync.RWMutex
}
@@ -29,15 +32,16 @@ type VolumeLayoutStats struct {
FileCount uint64
}
-func NewVolumeLayout(rp *storage.ReplicaPlacement, ttl *storage.TTL, volumeSizeLimit uint64) *VolumeLayout {
+func NewVolumeLayout(rp *super_block.ReplicaPlacement, ttl *needle.TTL, volumeSizeLimit uint64, replicationAsMin bool) *VolumeLayout {
return &VolumeLayout{
rp: rp,
ttl: ttl,
- vid2location: make(map[storage.VolumeId]*VolumeLocationList),
- writables: *new([]storage.VolumeId),
- readonlyVolumes: make(map[storage.VolumeId]bool),
- oversizedVolumes: make(map[storage.VolumeId]bool),
+ vid2location: make(map[needle.VolumeId]*VolumeLocationList),
+ writables: *new([]needle.VolumeId),
+ readonlyVolumes: make(map[needle.VolumeId]bool),
+ oversizedVolumes: make(map[needle.VolumeId]bool),
volumeSizeLimit: volumeSizeLimit,
+ replicationAsMin: replicationAsMin,
}
}
@@ -49,6 +53,9 @@ func (vl *VolumeLayout) RegisterVolume(v *storage.VolumeInfo, dn *DataNode) {
vl.accessLock.Lock()
defer vl.accessLock.Unlock()
+ defer vl.ensureCorrectWritables(v)
+ defer vl.rememberOversizedVolume(v)
+
if _, ok := vl.vid2location[v.Id]; !ok {
vl.vid2location[v.Id] = NewVolumeLocationList()
}
@@ -57,7 +64,7 @@ func (vl *VolumeLayout) RegisterVolume(v *storage.VolumeInfo, dn *DataNode) {
for _, dn := range vl.vid2location[v.Id].list {
if vInfo, err := dn.GetVolumesById(v.Id); err == nil {
if vInfo.ReadOnly {
- glog.V(3).Infof("vid %d removed from writable", v.Id)
+ glog.V(1).Infof("vid %d removed from writable", v.Id)
vl.removeFromWritable(v.Id)
vl.readonlyVolumes[v.Id] = true
return
@@ -65,23 +72,16 @@ func (vl *VolumeLayout) RegisterVolume(v *storage.VolumeInfo, dn *DataNode) {
delete(vl.readonlyVolumes, v.Id)
}
} else {
- glog.V(3).Infof("vid %d removed from writable", v.Id)
+ glog.V(1).Infof("vid %d removed from writable", v.Id)
vl.removeFromWritable(v.Id)
delete(vl.readonlyVolumes, v.Id)
return
}
}
- if vl.vid2location[v.Id].Length() == vl.rp.GetCopyCount() && vl.isWritable(v) {
- if _, ok := vl.oversizedVolumes[v.Id]; !ok {
- vl.addToWritable(v.Id)
- }
- } else {
- vl.rememberOversizedVolumne(v)
- vl.removeFromWritable(v.Id)
- }
+
}
-func (vl *VolumeLayout) rememberOversizedVolumne(v *storage.VolumeInfo) {
+func (vl *VolumeLayout) rememberOversizedVolume(v *storage.VolumeInfo) {
if vl.isOversized(v) {
vl.oversizedVolumes[v.Id] = true
}
@@ -91,17 +91,31 @@ func (vl *VolumeLayout) UnRegisterVolume(v *storage.VolumeInfo, dn *DataNode) {
vl.accessLock.Lock()
defer vl.accessLock.Unlock()
- vl.removeFromWritable(v.Id)
- delete(vl.vid2location, v.Id)
+ // remove from vid2location map
+ location, ok := vl.vid2location[v.Id]
+ if !ok {
+ return
+ }
+
+ if location.Remove(dn) {
+
+ vl.ensureCorrectWritables(v)
+
+ if location.Length() == 0 {
+ delete(vl.vid2location, v.Id)
+ }
+
+ }
}
-func (vl *VolumeLayout) addToWritable(vid storage.VolumeId) {
- for _, id := range vl.writables {
- if vid == id {
- return
+func (vl *VolumeLayout) ensureCorrectWritables(v *storage.VolumeInfo) {
+ if vl.enoughCopies(v.Id) && vl.isWritable(v) {
+ if _, ok := vl.oversizedVolumes[v.Id]; !ok {
+ vl.setVolumeWritable(v.Id)
}
+ } else {
+ vl.removeFromWritable(v.Id)
}
- vl.writables = append(vl.writables, vid)
}
func (vl *VolumeLayout) isOversized(v *storage.VolumeInfo) bool {
@@ -110,7 +124,7 @@ func (vl *VolumeLayout) isOversized(v *storage.VolumeInfo) bool {
func (vl *VolumeLayout) isWritable(v *storage.VolumeInfo) bool {
return !vl.isOversized(v) &&
- v.Version == storage.CurrentVersion &&
+ v.Version == needle.CurrentVersion &&
!v.ReadOnly
}
@@ -121,7 +135,7 @@ func (vl *VolumeLayout) isEmpty() bool {
return len(vl.vid2location) == 0
}
-func (vl *VolumeLayout) Lookup(vid storage.VolumeId) []*DataNode {
+func (vl *VolumeLayout) Lookup(vid needle.VolumeId) []*DataNode {
vl.accessLock.RLock()
defer vl.accessLock.RUnlock()
@@ -141,7 +155,7 @@ func (vl *VolumeLayout) ListVolumeServers() (nodes []*DataNode) {
return
}
-func (vl *VolumeLayout) PickForWrite(count uint64, option *VolumeGrowOption) (*storage.VolumeId, uint64, *VolumeLocationList, error) {
+func (vl *VolumeLayout) PickForWrite(count uint64, option *VolumeGrowOption) (*needle.VolumeId, uint64, *VolumeLocationList, error) {
vl.accessLock.RLock()
defer vl.accessLock.RUnlock()
@@ -158,7 +172,7 @@ func (vl *VolumeLayout) PickForWrite(count uint64, option *VolumeGrowOption) (*s
}
return nil, 0, nil, errors.New("Strangely vid " + vid.String() + " is on no machine!")
}
- var vid storage.VolumeId
+ var vid needle.VolumeId
var locationList *VolumeLocationList
counter := 0
for _, v := range vl.writables {
@@ -205,7 +219,7 @@ func (vl *VolumeLayout) GetActiveVolumeCount(option *VolumeGrowOption) int {
return counter
}
-func (vl *VolumeLayout) removeFromWritable(vid storage.VolumeId) bool {
+func (vl *VolumeLayout) removeFromWritable(vid needle.VolumeId) bool {
toDeleteIndex := -1
for k, id := range vl.writables {
if id == vid {
@@ -220,7 +234,7 @@ func (vl *VolumeLayout) removeFromWritable(vid storage.VolumeId) bool {
}
return false
}
-func (vl *VolumeLayout) setVolumeWritable(vid storage.VolumeId) bool {
+func (vl *VolumeLayout) setVolumeWritable(vid needle.VolumeId) bool {
for _, v := range vl.writables {
if v == vid {
return false
@@ -231,7 +245,7 @@ func (vl *VolumeLayout) setVolumeWritable(vid storage.VolumeId) bool {
return true
}
-func (vl *VolumeLayout) SetVolumeUnavailable(dn *DataNode, vid storage.VolumeId) bool {
+func (vl *VolumeLayout) SetVolumeUnavailable(dn *DataNode, vid needle.VolumeId) bool {
vl.accessLock.Lock()
defer vl.accessLock.Unlock()
@@ -245,18 +259,34 @@ func (vl *VolumeLayout) SetVolumeUnavailable(dn *DataNode, vid storage.VolumeId)
}
return false
}
-func (vl *VolumeLayout) SetVolumeAvailable(dn *DataNode, vid storage.VolumeId) bool {
+func (vl *VolumeLayout) SetVolumeAvailable(dn *DataNode, vid needle.VolumeId, isReadOnly bool) bool {
vl.accessLock.Lock()
defer vl.accessLock.Unlock()
+ vInfo, err := dn.GetVolumesById(vid)
+ if err != nil {
+ return false
+ }
+
vl.vid2location[vid].Set(dn)
- if vl.vid2location[vid].Length() >= vl.rp.GetCopyCount() {
+
+ if vInfo.ReadOnly || isReadOnly {
+ return false
+ }
+
+ if vl.enoughCopies(vid) {
return vl.setVolumeWritable(vid)
}
return false
}
-func (vl *VolumeLayout) SetVolumeCapacityFull(vid storage.VolumeId) bool {
+func (vl *VolumeLayout) enoughCopies(vid needle.VolumeId) bool {
+ locations := vl.vid2location[vid].Length()
+ desired := vl.rp.GetCopyCount()
+ return locations == desired || (vl.replicationAsMin && locations > desired)
+}
+
+func (vl *VolumeLayout) SetVolumeCapacityFull(vid needle.VolumeId) bool {
vl.accessLock.Lock()
defer vl.accessLock.Unlock()
diff --git a/weed/topology/volume_location_list.go b/weed/topology/volume_location_list.go
index 8d5881333..8905c54b5 100644
--- a/weed/topology/volume_location_list.go
+++ b/weed/topology/volume_location_list.go
@@ -3,7 +3,7 @@ package topology
import (
"fmt"
- "github.com/chrislusf/seaweedfs/weed/storage"
+ "github.com/chrislusf/seaweedfs/weed/storage/needle"
)
type VolumeLocationList struct {
@@ -66,7 +66,7 @@ func (dnll *VolumeLocationList) Refresh(freshThreshHold int64) {
}
}
-func (dnll *VolumeLocationList) Stats(vid storage.VolumeId, freshThreshHold int64) (size uint64, fileCount int) {
+func (dnll *VolumeLocationList) Stats(vid needle.VolumeId, freshThreshHold int64) (size uint64, fileCount int) {
for _, dnl := range dnll.list {
if dnl.LastSeen < freshThreshHold {
vinfo, err := dnl.GetVolumesById(vid)