aboutsummaryrefslogtreecommitdiff
path: root/weed/topology
diff options
context:
space:
mode:
authorbingoohuang <bingoo.huang@gmail.com>2019-12-30 13:05:50 +0800
committerGitHub <noreply@github.com>2019-12-30 13:05:50 +0800
commit70da715d8d917527291b35fb069fac077d17b868 (patch)
treeb89bad02094cc7131bc2c9f64df13e15f9de9914 /weed/topology
parent93a7df500ffeed766e395907e860b1733040ff23 (diff)
parent09043c8e5a3b43add589344d28d4f57e90c83f70 (diff)
downloadseaweedfs-70da715d8d917527291b35fb069fac077d17b868.tar.xz
seaweedfs-70da715d8d917527291b35fb069fac077d17b868.zip
Merge pull request #4 from chrislusf/master
Syncing to the original repository
Diffstat (limited to 'weed/topology')
-rw-r--r--weed/topology/allocate_volume.go11
-rw-r--r--weed/topology/collection.go4
-rw-r--r--weed/topology/data_center.go1
-rw-r--r--weed/topology/data_node.go31
-rw-r--r--weed/topology/node.go19
-rw-r--r--weed/topology/rack.go1
-rw-r--r--weed/topology/store_replicate.go182
-rw-r--r--weed/topology/topology.go11
-rw-r--r--weed/topology/topology_event_handling.go1
-rw-r--r--weed/topology/topology_map.go3
-rw-r--r--weed/topology/topology_test.go5
-rw-r--r--weed/topology/topology_vacuum.go62
-rw-r--r--weed/topology/volume_growth.go24
-rw-r--r--weed/topology/volume_growth_test.go3
-rw-r--r--weed/topology/volume_layout.go5
15 files changed, 228 insertions, 135 deletions
diff --git a/weed/topology/allocate_volume.go b/weed/topology/allocate_volume.go
index 48336092f..e5dc48652 100644
--- a/weed/topology/allocate_volume.go
+++ b/weed/topology/allocate_volume.go
@@ -18,11 +18,12 @@ func AllocateVolume(dn *DataNode, grpcDialOption grpc.DialOption, vid needle.Vol
return operation.WithVolumeServerClient(dn.Url(), grpcDialOption, func(client volume_server_pb.VolumeServerClient) error {
_, deleteErr := client.AllocateVolume(context.Background(), &volume_server_pb.AllocateVolumeRequest{
- VolumeId: uint32(vid),
- Collection: option.Collection,
- Replication: option.ReplicaPlacement.String(),
- Ttl: option.Ttl.String(),
- Preallocate: option.Prealloacte,
+ VolumeId: uint32(vid),
+ Collection: option.Collection,
+ Replication: option.ReplicaPlacement.String(),
+ Ttl: option.Ttl.String(),
+ Preallocate: option.Prealloacte,
+ MemoryMapMaxSizeMb: option.MemoryMapMaxSizeMb,
})
return deleteErr
})
diff --git a/weed/topology/collection.go b/weed/topology/collection.go
index f6b728ec9..7a611d904 100644
--- a/weed/topology/collection.go
+++ b/weed/topology/collection.go
@@ -3,8 +3,8 @@ package topology
import (
"fmt"
- "github.com/chrislusf/seaweedfs/weed/storage"
"github.com/chrislusf/seaweedfs/weed/storage/needle"
+ "github.com/chrislusf/seaweedfs/weed/storage/super_block"
"github.com/chrislusf/seaweedfs/weed/util"
)
@@ -24,7 +24,7 @@ func (c *Collection) String() string {
return fmt.Sprintf("Name:%s, volumeSizeLimit:%d, storageType2VolumeLayout:%v", c.Name, c.volumeSizeLimit, c.storageType2VolumeLayout)
}
-func (c *Collection) GetOrCreateVolumeLayout(rp *storage.ReplicaPlacement, ttl *needle.TTL) *VolumeLayout {
+func (c *Collection) GetOrCreateVolumeLayout(rp *super_block.ReplicaPlacement, ttl *needle.TTL) *VolumeLayout {
keyString := rp.String()
if ttl != nil {
keyString += ttl.String()
diff --git a/weed/topology/data_center.go b/weed/topology/data_center.go
index 640cb1937..dc3accb71 100644
--- a/weed/topology/data_center.go
+++ b/weed/topology/data_center.go
@@ -48,6 +48,7 @@ func (dc *DataCenter) ToDataCenterInfo() *master_pb.DataCenterInfo {
MaxVolumeCount: uint64(dc.GetMaxVolumeCount()),
FreeVolumeCount: uint64(dc.FreeSpace()),
ActiveVolumeCount: uint64(dc.GetActiveVolumeCount()),
+ RemoteVolumeCount: uint64(dc.GetRemoteVolumeCount()),
}
for _, c := range dc.Children() {
rack := c.(*Rack)
diff --git a/weed/topology/data_node.go b/weed/topology/data_node.go
index 3e72ccdbf..617341e54 100644
--- a/weed/topology/data_node.go
+++ b/weed/topology/data_node.go
@@ -2,14 +2,13 @@ package topology
import (
"fmt"
+ "strconv"
"sync"
"github.com/chrislusf/seaweedfs/weed/pb/master_pb"
"github.com/chrislusf/seaweedfs/weed/storage/erasure_coding"
"github.com/chrislusf/seaweedfs/weed/storage/needle"
- "strconv"
-
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/storage"
)
@@ -44,15 +43,26 @@ func (dn *DataNode) String() string {
func (dn *DataNode) AddOrUpdateVolume(v storage.VolumeInfo) (isNew bool) {
dn.Lock()
defer dn.Unlock()
- if _, ok := dn.volumes[v.Id]; !ok {
+ if oldV, ok := dn.volumes[v.Id]; !ok {
dn.volumes[v.Id] = v
dn.UpAdjustVolumeCountDelta(1)
+ if v.IsRemote() {
+ dn.UpAdjustRemoteVolumeCountDelta(1)
+ }
if !v.ReadOnly {
dn.UpAdjustActiveVolumeCountDelta(1)
}
dn.UpAdjustMaxVolumeId(v.Id)
isNew = true
} else {
+ if oldV.IsRemote() != v.IsRemote() {
+ if v.IsRemote() {
+ dn.UpAdjustRemoteVolumeCountDelta(1)
+ }
+ if oldV.IsRemote() {
+ dn.UpAdjustRemoteVolumeCountDelta(-1)
+ }
+ }
dn.volumes[v.Id] = v
}
return
@@ -70,7 +80,12 @@ func (dn *DataNode) UpdateVolumes(actualVolumes []storage.VolumeInfo) (newVolume
delete(dn.volumes, vid)
deletedVolumes = append(deletedVolumes, v)
dn.UpAdjustVolumeCountDelta(-1)
- dn.UpAdjustActiveVolumeCountDelta(-1)
+ if v.IsRemote() {
+ dn.UpAdjustRemoteVolumeCountDelta(-1)
+ }
+ if !v.ReadOnly {
+ dn.UpAdjustActiveVolumeCountDelta(-1)
+ }
}
}
dn.Unlock()
@@ -88,7 +103,12 @@ func (dn *DataNode) DeltaUpdateVolumes(newlVolumes, deletedVolumes []storage.Vol
for _, v := range deletedVolumes {
delete(dn.volumes, v.Id)
dn.UpAdjustVolumeCountDelta(-1)
- dn.UpAdjustActiveVolumeCountDelta(-1)
+ if v.IsRemote() {
+ dn.UpAdjustRemoteVolumeCountDelta(-1)
+ }
+ if !v.ReadOnly {
+ dn.UpAdjustActiveVolumeCountDelta(-1)
+ }
}
dn.Unlock()
for _, v := range newlVolumes {
@@ -160,6 +180,7 @@ func (dn *DataNode) ToDataNodeInfo() *master_pb.DataNodeInfo {
MaxVolumeCount: uint64(dn.GetMaxVolumeCount()),
FreeVolumeCount: uint64(dn.FreeSpace()),
ActiveVolumeCount: uint64(dn.GetActiveVolumeCount()),
+ RemoteVolumeCount: uint64(dn.GetRemoteVolumeCount()),
}
for _, v := range dn.GetVolumes() {
m.VolumeInfos = append(m.VolumeInfos, v.ToVolumeInformationMessage())
diff --git a/weed/topology/node.go b/weed/topology/node.go
index b2808f589..572a89d4d 100644
--- a/weed/topology/node.go
+++ b/weed/topology/node.go
@@ -20,6 +20,7 @@ type Node interface {
ReserveOneVolume(r int64) (*DataNode, error)
UpAdjustMaxVolumeCountDelta(maxVolumeCountDelta int64)
UpAdjustVolumeCountDelta(volumeCountDelta int64)
+ UpAdjustRemoteVolumeCountDelta(remoteVolumeCountDelta int64)
UpAdjustEcShardCountDelta(ecShardCountDelta int64)
UpAdjustActiveVolumeCountDelta(activeVolumeCountDelta int64)
UpAdjustMaxVolumeId(vid needle.VolumeId)
@@ -27,6 +28,7 @@ type Node interface {
GetVolumeCount() int64
GetEcShardCount() int64
GetActiveVolumeCount() int64
+ GetRemoteVolumeCount() int64
GetMaxVolumeCount() int64
GetMaxVolumeId() needle.VolumeId
SetParent(Node)
@@ -44,6 +46,7 @@ type Node interface {
}
type NodeImpl struct {
volumeCount int64
+ remoteVolumeCount int64
activeVolumeCount int64
ecShardCount int64
maxVolumeCount int64
@@ -132,10 +135,11 @@ func (n *NodeImpl) Id() NodeId {
return n.id
}
func (n *NodeImpl) FreeSpace() int64 {
+ freeVolumeSlotCount := n.maxVolumeCount + n.remoteVolumeCount - n.volumeCount
if n.ecShardCount > 0 {
- return n.maxVolumeCount - n.volumeCount - n.ecShardCount/erasure_coding.DataShardsCount - 1
+ freeVolumeSlotCount = freeVolumeSlotCount - n.ecShardCount/erasure_coding.DataShardsCount - 1
}
- return n.maxVolumeCount - n.volumeCount
+ return freeVolumeSlotCount
}
func (n *NodeImpl) SetParent(node Node) {
n.parent = node
@@ -191,6 +195,12 @@ func (n *NodeImpl) UpAdjustVolumeCountDelta(volumeCountDelta int64) { //can be n
n.parent.UpAdjustVolumeCountDelta(volumeCountDelta)
}
}
+func (n *NodeImpl) UpAdjustRemoteVolumeCountDelta(remoteVolumeCountDelta int64) { //can be negative
+ atomic.AddInt64(&n.remoteVolumeCount, remoteVolumeCountDelta)
+ if n.parent != nil {
+ n.parent.UpAdjustRemoteVolumeCountDelta(remoteVolumeCountDelta)
+ }
+}
func (n *NodeImpl) UpAdjustEcShardCountDelta(ecShardCountDelta int64) { //can be negative
atomic.AddInt64(&n.ecShardCount, ecShardCountDelta)
if n.parent != nil {
@@ -220,6 +230,9 @@ func (n *NodeImpl) GetVolumeCount() int64 {
func (n *NodeImpl) GetEcShardCount() int64 {
return n.ecShardCount
}
+func (n *NodeImpl) GetRemoteVolumeCount() int64 {
+ return n.remoteVolumeCount
+}
func (n *NodeImpl) GetActiveVolumeCount() int64 {
return n.activeVolumeCount
}
@@ -235,6 +248,7 @@ func (n *NodeImpl) LinkChildNode(node Node) {
n.UpAdjustMaxVolumeCountDelta(node.GetMaxVolumeCount())
n.UpAdjustMaxVolumeId(node.GetMaxVolumeId())
n.UpAdjustVolumeCountDelta(node.GetVolumeCount())
+ n.UpAdjustRemoteVolumeCountDelta(node.GetRemoteVolumeCount())
n.UpAdjustEcShardCountDelta(node.GetEcShardCount())
n.UpAdjustActiveVolumeCountDelta(node.GetActiveVolumeCount())
node.SetParent(n)
@@ -250,6 +264,7 @@ func (n *NodeImpl) UnlinkChildNode(nodeId NodeId) {
node.SetParent(nil)
delete(n.children, node.Id())
n.UpAdjustVolumeCountDelta(-node.GetVolumeCount())
+ n.UpAdjustRemoteVolumeCountDelta(-node.GetRemoteVolumeCount())
n.UpAdjustEcShardCountDelta(-node.GetEcShardCount())
n.UpAdjustActiveVolumeCountDelta(-node.GetActiveVolumeCount())
n.UpAdjustMaxVolumeCountDelta(-node.GetMaxVolumeCount())
diff --git a/weed/topology/rack.go b/weed/topology/rack.go
index 932c1a804..1921c0c05 100644
--- a/weed/topology/rack.go
+++ b/weed/topology/rack.go
@@ -67,6 +67,7 @@ func (r *Rack) ToRackInfo() *master_pb.RackInfo {
MaxVolumeCount: uint64(r.GetMaxVolumeCount()),
FreeVolumeCount: uint64(r.FreeSpace()),
ActiveVolumeCount: uint64(r.GetActiveVolumeCount()),
+ RemoteVolumeCount: uint64(r.GetRemoteVolumeCount()),
}
for _, c := range r.Children() {
dn := c.(*DataNode)
diff --git a/weed/topology/store_replicate.go b/weed/topology/store_replicate.go
index d21c4d210..b195b48ed 100644
--- a/weed/topology/store_replicate.go
+++ b/weed/topology/store_replicate.go
@@ -25,58 +25,61 @@ func ReplicatedWrite(masterNode string, s *storage.Store,
//check JWT
jwt := security.GetJwt(r)
+ var remoteLocations []operation.Location
+ if r.FormValue("type") != "replicate" {
+ remoteLocations, err = getWritableRemoteReplications(s, volumeId, masterNode)
+ if err != nil {
+ glog.V(0).Infoln(err)
+ return
+ }
+ }
+
size, isUnchanged, err = s.WriteVolumeNeedle(volumeId, n)
if err != nil {
err = fmt.Errorf("failed to write to local disk: %v", err)
+ glog.V(0).Infoln(err)
return
}
- needToReplicate := !s.HasVolume(volumeId)
- needToReplicate = needToReplicate || s.GetVolume(volumeId).NeedToReplicate()
- if !needToReplicate {
- needToReplicate = s.GetVolume(volumeId).NeedToReplicate()
- }
- if needToReplicate { //send to other replica locations
- if r.FormValue("type") != "replicate" {
-
- if err = distributedOperation(masterNode, s, volumeId, func(location operation.Location) error {
- u := url.URL{
- Scheme: "http",
- Host: location.Url,
- Path: r.URL.Path,
- }
- q := url.Values{
- "type": {"replicate"},
- "ttl": {n.Ttl.String()},
- }
- if n.LastModified > 0 {
- q.Set("ts", strconv.FormatUint(n.LastModified, 10))
- }
- if n.IsChunkedManifest() {
- q.Set("cm", "true")
+ if len(remoteLocations) > 0 { //send to other replica locations
+ if err = distributedOperation(remoteLocations, s, func(location operation.Location) error {
+ u := url.URL{
+ Scheme: "http",
+ Host: location.Url,
+ Path: r.URL.Path,
+ }
+ q := url.Values{
+ "type": {"replicate"},
+ "ttl": {n.Ttl.String()},
+ }
+ if n.LastModified > 0 {
+ q.Set("ts", strconv.FormatUint(n.LastModified, 10))
+ }
+ if n.IsChunkedManifest() {
+ q.Set("cm", "true")
+ }
+ u.RawQuery = q.Encode()
+
+ pairMap := make(map[string]string)
+ if n.HasPairs() {
+ tmpMap := make(map[string]string)
+ err := json.Unmarshal(n.Pairs, &tmpMap)
+ if err != nil {
+ glog.V(0).Infoln("Unmarshal pairs error:", err)
}
- u.RawQuery = q.Encode()
-
- pairMap := make(map[string]string)
- if n.HasPairs() {
- tmpMap := make(map[string]string)
- err := json.Unmarshal(n.Pairs, &tmpMap)
- if err != nil {
- glog.V(0).Infoln("Unmarshal pairs error:", err)
- }
- for k, v := range tmpMap {
- pairMap[needle.PairNamePrefix+k] = v
- }
+ for k, v := range tmpMap {
+ pairMap[needle.PairNamePrefix+k] = v
}
-
- _, err := operation.Upload(u.String(),
- string(n.Name), bytes.NewReader(n.Data), n.IsGzipped(), string(n.Mime),
- pairMap, jwt)
- return err
- }); err != nil {
- size = 0
- err = fmt.Errorf("failed to write to replicas for volume %d: %v", volumeId, err)
}
+
+ _, err := operation.Upload(u.String(),
+ string(n.Name), bytes.NewReader(n.Data), n.IsGzipped(), string(n.Mime),
+ pairMap, jwt)
+ return err
+ }); err != nil {
+ size = 0
+ err = fmt.Errorf("failed to write to replicas for volume %d: %v", volumeId, err)
+ glog.V(0).Infoln(err)
}
}
return
@@ -84,31 +87,34 @@ func ReplicatedWrite(masterNode string, s *storage.Store,
func ReplicatedDelete(masterNode string, store *storage.Store,
volumeId needle.VolumeId, n *needle.Needle,
- r *http.Request) (uint32, error) {
+ r *http.Request) (size uint32, err error) {
//check JWT
jwt := security.GetJwt(r)
- ret, err := store.DeleteVolumeNeedle(volumeId, n)
+ var remoteLocations []operation.Location
+ if r.FormValue("type") != "replicate" {
+ remoteLocations, err = getWritableRemoteReplications(store, volumeId, masterNode)
+ if err != nil {
+ glog.V(0).Infoln(err)
+ return
+ }
+ }
+
+ size, err = store.DeleteVolumeNeedle(volumeId, n)
if err != nil {
glog.V(0).Infoln("delete error:", err)
- return ret, err
+ return
}
- needToReplicate := !store.HasVolume(volumeId)
- if !needToReplicate && ret > 0 {
- needToReplicate = store.GetVolume(volumeId).NeedToReplicate()
- }
- if needToReplicate { //send to other replica locations
- if r.FormValue("type") != "replicate" {
- if err = distributedOperation(masterNode, store, volumeId, func(location operation.Location) error {
- return util.Delete("http://"+location.Url+r.URL.Path+"?type=replicate", string(jwt))
- }); err != nil {
- ret = 0
- }
+ if len(remoteLocations) > 0 { //send to other replica locations
+ if err = distributedOperation(remoteLocations, store, func(location operation.Location) error {
+ return util.Delete("http://"+location.Url+r.URL.Path+"?type=replicate", string(jwt))
+ }); err != nil {
+ size = 0
}
}
- return ret, err
+ return
}
type DistributedOperationResult map[string]error
@@ -131,32 +137,44 @@ type RemoteResult struct {
Error error
}
-func distributedOperation(masterNode string, store *storage.Store, volumeId needle.VolumeId, op func(location operation.Location) error) error {
- if lookupResult, lookupErr := operation.Lookup(masterNode, volumeId.String()); lookupErr == nil {
- length := 0
- selfUrl := (store.Ip + ":" + strconv.Itoa(store.Port))
- results := make(chan RemoteResult)
- for _, location := range lookupResult.Locations {
- if location.Url != selfUrl {
- length++
- go func(location operation.Location, results chan RemoteResult) {
- results <- RemoteResult{location.Url, op(location)}
- }(location, results)
+func distributedOperation(locations []operation.Location, store *storage.Store, op func(location operation.Location) error) error {
+ length := len(locations)
+ results := make(chan RemoteResult)
+ for _, location := range locations {
+ go func(location operation.Location, results chan RemoteResult) {
+ results <- RemoteResult{location.Url, op(location)}
+ }(location, results)
+ }
+ ret := DistributedOperationResult(make(map[string]error))
+ for i := 0; i < length; i++ {
+ result := <-results
+ ret[result.Host] = result.Error
+ }
+
+ return ret.Error()
+}
+
+func getWritableRemoteReplications(s *storage.Store, volumeId needle.VolumeId, masterNode string) (
+ remoteLocations []operation.Location, err error) {
+ copyCount := s.GetVolume(volumeId).ReplicaPlacement.GetCopyCount()
+ if copyCount > 1 {
+ if lookupResult, lookupErr := operation.Lookup(masterNode, volumeId.String()); lookupErr == nil {
+ if len(lookupResult.Locations) < copyCount {
+ err = fmt.Errorf("replicating opetations [%d] is less than volume's replication copy count [%d]",
+ len(lookupResult.Locations), copyCount)
+ return
}
- }
- ret := DistributedOperationResult(make(map[string]error))
- for i := 0; i < length; i++ {
- result := <-results
- ret[result.Host] = result.Error
- }
- if volume := store.GetVolume(volumeId); volume != nil {
- if length+1 < volume.ReplicaPlacement.GetCopyCount() {
- return fmt.Errorf("replicating opetations [%d] is less than volume's replication copy count [%d]", length+1, volume.ReplicaPlacement.GetCopyCount())
+ selfUrl := s.Ip + ":" + strconv.Itoa(s.Port)
+ for _, location := range lookupResult.Locations {
+ if location.Url != selfUrl {
+ remoteLocations = append(remoteLocations, location)
+ }
}
+ } else {
+ err = fmt.Errorf("failed to lookup for %d: %v", volumeId, lookupErr)
+ return
}
- return ret.Error()
- } else {
- glog.V(0).Infoln()
- return fmt.Errorf("Failed to lookup for %d: %v", volumeId, lookupErr)
}
+
+ return
}
diff --git a/weed/topology/topology.go b/weed/topology/topology.go
index aa01190c9..e6cb44727 100644
--- a/weed/topology/topology.go
+++ b/weed/topology/topology.go
@@ -7,15 +7,18 @@ import (
"sync"
"github.com/chrislusf/raft"
+
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb/master_pb"
"github.com/chrislusf/seaweedfs/weed/sequence"
"github.com/chrislusf/seaweedfs/weed/storage"
"github.com/chrislusf/seaweedfs/weed/storage/needle"
+ "github.com/chrislusf/seaweedfs/weed/storage/super_block"
"github.com/chrislusf/seaweedfs/weed/util"
)
type Topology struct {
+ vacuumLockCounter int64
NodeImpl
collectionMap *util.ConcurrentReadMap
@@ -119,16 +122,16 @@ func (t *Topology) HasWritableVolume(option *VolumeGrowOption) bool {
func (t *Topology) PickForWrite(count uint64, option *VolumeGrowOption) (string, uint64, *DataNode, error) {
vid, count, datanodes, err := t.GetVolumeLayout(option.Collection, option.ReplicaPlacement, option.Ttl).PickForWrite(count, option)
if err != nil {
- return "", 0, nil, fmt.Errorf("failed to find writable volumes for collectio:%s replication:%s ttl:%s error: %v", option.Collection, option.ReplicaPlacement.String(), option.Ttl.String(), err)
+ return "", 0, nil, fmt.Errorf("failed to find writable volumes for collection:%s replication:%s ttl:%s error: %v", option.Collection, option.ReplicaPlacement.String(), option.Ttl.String(), err)
}
if datanodes.Length() == 0 {
- return "", 0, nil, fmt.Errorf("no writable volumes available for for collectio:%s replication:%s ttl:%s", option.Collection, option.ReplicaPlacement.String(), option.Ttl.String())
+ return "", 0, nil, fmt.Errorf("no writable volumes available for collection:%s replication:%s ttl:%s", option.Collection, option.ReplicaPlacement.String(), option.Ttl.String())
}
- fileId, count := t.Sequence.NextFileId(count)
+ fileId := t.Sequence.NextFileId(count)
return needle.NewFileId(*vid, fileId, rand.Uint32()).String(), count, datanodes.Head(), nil
}
-func (t *Topology) GetVolumeLayout(collectionName string, rp *storage.ReplicaPlacement, ttl *needle.TTL) *VolumeLayout {
+func (t *Topology) GetVolumeLayout(collectionName string, rp *super_block.ReplicaPlacement, ttl *needle.TTL) *VolumeLayout {
return t.collectionMap.Get(collectionName, func() interface{} {
return NewCollection(collectionName, t.volumeSizeLimit)
}).(*Collection).GetOrCreateVolumeLayout(rp, ttl)
diff --git a/weed/topology/topology_event_handling.go b/weed/topology/topology_event_handling.go
index 041351492..068bd401e 100644
--- a/weed/topology/topology_event_handling.go
+++ b/weed/topology/topology_event_handling.go
@@ -59,6 +59,7 @@ func (t *Topology) UnRegisterDataNode(dn *DataNode) {
vl.SetVolumeUnavailable(dn, v.Id)
}
dn.UpAdjustVolumeCountDelta(-dn.GetVolumeCount())
+ dn.UpAdjustRemoteVolumeCountDelta(-dn.GetRemoteVolumeCount())
dn.UpAdjustActiveVolumeCountDelta(-dn.GetActiveVolumeCount())
dn.UpAdjustMaxVolumeCountDelta(-dn.GetMaxVolumeCount())
if dn.Parent() != nil {
diff --git a/weed/topology/topology_map.go b/weed/topology/topology_map.go
index 37a88c9ed..73c55d77d 100644
--- a/weed/topology/topology_map.go
+++ b/weed/topology/topology_map.go
@@ -23,7 +23,7 @@ func (t *Topology) ToMap() interface{} {
}
}
}
- m["layouts"] = layouts
+ m["Layouts"] = layouts
return m
}
@@ -85,6 +85,7 @@ func (t *Topology) ToTopologyInfo() *master_pb.TopologyInfo {
MaxVolumeCount: uint64(t.GetMaxVolumeCount()),
FreeVolumeCount: uint64(t.FreeSpace()),
ActiveVolumeCount: uint64(t.GetActiveVolumeCount()),
+ RemoteVolumeCount: uint64(t.GetRemoteVolumeCount()),
}
for _, c := range t.Children() {
dc := c.(*DataCenter)
diff --git a/weed/topology/topology_test.go b/weed/topology/topology_test.go
index 8f79ad684..e7676ccf7 100644
--- a/weed/topology/topology_test.go
+++ b/weed/topology/topology_test.go
@@ -5,6 +5,7 @@ import (
"github.com/chrislusf/seaweedfs/weed/sequence"
"github.com/chrislusf/seaweedfs/weed/storage"
"github.com/chrislusf/seaweedfs/weed/storage/needle"
+ "github.com/chrislusf/seaweedfs/weed/storage/super_block"
"testing"
)
@@ -94,7 +95,7 @@ func TestHandlingVolumeServerHeartbeat(t *testing.T) {
[]*master_pb.VolumeShortInformationMessage{newVolumeShortMessage},
nil,
dn)
- rp, _ := storage.NewReplicaPlacementFromString("000")
+ rp, _ := super_block.NewReplicaPlacementFromString("000")
layout := topo.GetVolumeLayout("", rp, needle.EMPTY_TTL)
assert(t, "writables after repeated add", len(layout.writables), volumeCount)
@@ -154,7 +155,7 @@ func TestAddRemoveVolume(t *testing.T) {
DeletedByteCount: 45,
ReadOnly: false,
Version: needle.CurrentVersion,
- ReplicaPlacement: &storage.ReplicaPlacement{},
+ ReplicaPlacement: &super_block.ReplicaPlacement{},
Ttl: needle.EMPTY_TTL,
}
diff --git a/weed/topology/topology_vacuum.go b/weed/topology/topology_vacuum.go
index 351ff842f..ca626e973 100644
--- a/weed/topology/topology_vacuum.go
+++ b/weed/topology/topology_vacuum.go
@@ -2,6 +2,7 @@ package topology
import (
"context"
+ "sync/atomic"
"time"
"github.com/chrislusf/seaweedfs/weed/storage/needle"
@@ -12,8 +13,10 @@ import (
"github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
)
-func batchVacuumVolumeCheck(grpcDialOption grpc.DialOption, vl *VolumeLayout, vid needle.VolumeId, locationlist *VolumeLocationList, garbageThreshold float64) bool {
- ch := make(chan bool, locationlist.Length())
+func batchVacuumVolumeCheck(grpcDialOption grpc.DialOption, vl *VolumeLayout, vid needle.VolumeId,
+ locationlist *VolumeLocationList, garbageThreshold float64) (*VolumeLocationList, bool) {
+ ch := make(chan int, locationlist.Length())
+ errCount := int32(0)
for index, dn := range locationlist.list {
go func(index int, url string, vid needle.VolumeId) {
err := operation.WithVolumeServerClient(url, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
@@ -21,11 +24,15 @@ func batchVacuumVolumeCheck(grpcDialOption grpc.DialOption, vl *VolumeLayout, vi
VolumeId: uint32(vid),
})
if err != nil {
- ch <- false
+ atomic.AddInt32(&errCount, 1)
+ ch <- -1
return err
}
- isNeeded := resp.GarbageRatio > garbageThreshold
- ch <- isNeeded
+ if resp.GarbageRatio >= garbageThreshold {
+ ch <- index
+ } else {
+ ch <- -1
+ }
return nil
})
if err != nil {
@@ -33,20 +40,25 @@ func batchVacuumVolumeCheck(grpcDialOption grpc.DialOption, vl *VolumeLayout, vi
}
}(index, dn.Url(), vid)
}
- isCheckSuccess := true
- for _ = range locationlist.list {
+ vacuumLocationList := NewVolumeLocationList()
+ for range locationlist.list {
select {
- case canVacuum := <-ch:
- isCheckSuccess = isCheckSuccess && canVacuum
+ case index := <-ch:
+ if index != -1 {
+ vacuumLocationList.list = append(vacuumLocationList.list, locationlist.list[index])
+ }
case <-time.After(30 * time.Minute):
- isCheckSuccess = false
- break
+ return vacuumLocationList, false
}
}
- return isCheckSuccess
+ return vacuumLocationList, errCount == 0 && len(vacuumLocationList.list) > 0
}
-func batchVacuumVolumeCompact(grpcDialOption grpc.DialOption, vl *VolumeLayout, vid needle.VolumeId, locationlist *VolumeLocationList, preallocate int64) bool {
+func batchVacuumVolumeCompact(grpcDialOption grpc.DialOption, vl *VolumeLayout, vid needle.VolumeId,
+ locationlist *VolumeLocationList, preallocate int64) bool {
+ vl.accessLock.Lock()
vl.removeFromWritable(vid)
+ vl.accessLock.Unlock()
+
ch := make(chan bool, locationlist.Length())
for index, dn := range locationlist.list {
go func(index int, url string, vid needle.VolumeId) {
@@ -67,13 +79,12 @@ func batchVacuumVolumeCompact(grpcDialOption grpc.DialOption, vl *VolumeLayout,
}(index, dn.Url(), vid)
}
isVacuumSuccess := true
- for _ = range locationlist.list {
+ for range locationlist.list {
select {
case canCommit := <-ch:
isVacuumSuccess = isVacuumSuccess && canCommit
case <-time.After(30 * time.Minute):
- isVacuumSuccess = false
- break
+ return false
}
}
return isVacuumSuccess
@@ -118,6 +129,16 @@ func batchVacuumVolumeCleanup(grpcDialOption grpc.DialOption, vl *VolumeLayout,
}
func (t *Topology) Vacuum(grpcDialOption grpc.DialOption, garbageThreshold float64, preallocate int64) int {
+
+ // if there is vacuum going on, return immediately
+ swapped := atomic.CompareAndSwapInt64(&t.vacuumLockCounter, 0, 1)
+ if !swapped {
+ return 0
+ }
+ defer atomic.StoreInt64(&t.vacuumLockCounter, 0)
+
+ // now only one vacuum process going on
+
glog.V(1).Infof("Start vacuum on demand with threshold: %f", garbageThreshold)
for _, col := range t.collectionMap.Items() {
c := col.(*Collection)
@@ -151,11 +172,12 @@ func vacuumOneVolumeLayout(grpcDialOption grpc.DialOption, volumeLayout *VolumeL
}
glog.V(2).Infof("check vacuum on collection:%s volume:%d", c.Name, vid)
- if batchVacuumVolumeCheck(grpcDialOption, volumeLayout, vid, locationList, garbageThreshold) {
- if batchVacuumVolumeCompact(grpcDialOption, volumeLayout, vid, locationList, preallocate) {
- batchVacuumVolumeCommit(grpcDialOption, volumeLayout, vid, locationList)
+ if vacuumLocationList, needVacuum := batchVacuumVolumeCheck(
+ grpcDialOption, volumeLayout, vid, locationList, garbageThreshold); needVacuum {
+ if batchVacuumVolumeCompact(grpcDialOption, volumeLayout, vid, vacuumLocationList, preallocate) {
+ batchVacuumVolumeCommit(grpcDialOption, volumeLayout, vid, vacuumLocationList)
} else {
- batchVacuumVolumeCleanup(grpcDialOption, volumeLayout, vid, locationList)
+ batchVacuumVolumeCleanup(grpcDialOption, volumeLayout, vid, vacuumLocationList)
}
}
}
diff --git a/weed/topology/volume_growth.go b/weed/topology/volume_growth.go
index ff02044a1..80fbc86cd 100644
--- a/weed/topology/volume_growth.go
+++ b/weed/topology/volume_growth.go
@@ -6,6 +6,8 @@ import (
"sync"
"github.com/chrislusf/seaweedfs/weed/storage/needle"
+ "github.com/chrislusf/seaweedfs/weed/storage/super_block"
+
"google.golang.org/grpc"
"github.com/chrislusf/seaweedfs/weed/glog"
@@ -21,13 +23,14 @@ This package is created to resolve these replica placement issues:
*/
type VolumeGrowOption struct {
- Collection string
- ReplicaPlacement *storage.ReplicaPlacement
- Ttl *needle.TTL
- Prealloacte int64
- DataCenter string
- Rack string
- DataNode string
+ Collection string
+ ReplicaPlacement *super_block.ReplicaPlacement
+ Ttl *needle.TTL
+ Prealloacte int64
+ DataCenter string
+ Rack string
+ DataNode string
+ MemoryMapMaxSizeMb uint32
}
type VolumeGrowth struct {
@@ -58,8 +61,11 @@ func (vg *VolumeGrowth) findVolumeCount(copyCount int) (count int) {
return
}
-func (vg *VolumeGrowth) AutomaticGrowByType(option *VolumeGrowOption, grpcDialOption grpc.DialOption, topo *Topology) (count int, err error) {
- count, err = vg.GrowByCountAndType(grpcDialOption, vg.findVolumeCount(option.ReplicaPlacement.GetCopyCount()), option, topo)
+func (vg *VolumeGrowth) AutomaticGrowByType(option *VolumeGrowOption, grpcDialOption grpc.DialOption, topo *Topology, targetCount int) (count int, err error) {
+ if targetCount == 0 {
+ targetCount = vg.findVolumeCount(option.ReplicaPlacement.GetCopyCount())
+ }
+ count, err = vg.GrowByCountAndType(grpcDialOption, targetCount, option, topo)
if count > 0 && count%option.ReplicaPlacement.GetCopyCount() == 0 {
return count, nil
}
diff --git a/weed/topology/volume_growth_test.go b/weed/topology/volume_growth_test.go
index 3573365fd..e3c5cc580 100644
--- a/weed/topology/volume_growth_test.go
+++ b/weed/topology/volume_growth_test.go
@@ -8,6 +8,7 @@ import (
"github.com/chrislusf/seaweedfs/weed/sequence"
"github.com/chrislusf/seaweedfs/weed/storage"
"github.com/chrislusf/seaweedfs/weed/storage/needle"
+ "github.com/chrislusf/seaweedfs/weed/storage/super_block"
)
var topologyLayout = `
@@ -113,7 +114,7 @@ func setup(topologyLayout string) *Topology {
func TestFindEmptySlotsForOneVolume(t *testing.T) {
topo := setup(topologyLayout)
vg := NewDefaultVolumeGrowth()
- rp, _ := storage.NewReplicaPlacementFromString("002")
+ rp, _ := super_block.NewReplicaPlacementFromString("002")
volumeGrowOption := &VolumeGrowOption{
Collection: "",
ReplicaPlacement: rp,
diff --git a/weed/topology/volume_layout.go b/weed/topology/volume_layout.go
index 799cbca62..7633b28be 100644
--- a/weed/topology/volume_layout.go
+++ b/weed/topology/volume_layout.go
@@ -10,11 +10,12 @@ import (
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/storage"
"github.com/chrislusf/seaweedfs/weed/storage/needle"
+ "github.com/chrislusf/seaweedfs/weed/storage/super_block"
)
// mapping from volume to its locations, inverted from server to volume
type VolumeLayout struct {
- rp *storage.ReplicaPlacement
+ rp *super_block.ReplicaPlacement
ttl *needle.TTL
vid2location map[needle.VolumeId]*VolumeLocationList
writables []needle.VolumeId // transient array of writable volume id
@@ -30,7 +31,7 @@ type VolumeLayoutStats struct {
FileCount uint64
}
-func NewVolumeLayout(rp *storage.ReplicaPlacement, ttl *needle.TTL, volumeSizeLimit uint64) *VolumeLayout {
+func NewVolumeLayout(rp *super_block.ReplicaPlacement, ttl *needle.TTL, volumeSizeLimit uint64) *VolumeLayout {
return &VolumeLayout{
rp: rp,
ttl: ttl,