aboutsummaryrefslogtreecommitdiff
path: root/weed/topology
diff options
context:
space:
mode:
Diffstat (limited to 'weed/topology')
-rw-r--r--weed/topology/allocate_volume.go6
-rw-r--r--weed/topology/cluster_commands.go20
-rw-r--r--weed/topology/data_node.go19
-rw-r--r--weed/topology/data_node_ec.go4
-rw-r--r--weed/topology/node.go9
-rw-r--r--weed/topology/rack.go7
-rw-r--r--weed/topology/store_replicate.go24
-rw-r--r--weed/topology/topology.go56
-rw-r--r--weed/topology/topology_ec.go7
-rw-r--r--weed/topology/topology_event_handling.go6
-rw-r--r--weed/topology/topology_test.go4
-rw-r--r--weed/topology/topology_vacuum.go51
-rw-r--r--weed/topology/volume_growth.go41
-rw-r--r--weed/topology/volume_layout.go66
-rw-r--r--weed/topology/volume_location_list.go5
15 files changed, 244 insertions, 81 deletions
diff --git a/weed/topology/allocate_volume.go b/weed/topology/allocate_volume.go
index 7c7fae683..f21d0fc0a 100644
--- a/weed/topology/allocate_volume.go
+++ b/weed/topology/allocate_volume.go
@@ -15,9 +15,9 @@ type AllocateVolumeResult struct {
func AllocateVolume(dn *DataNode, grpcDialOption grpc.DialOption, vid needle.VolumeId, option *VolumeGrowOption) error {
- return operation.WithVolumeServerClient(dn.Url(), grpcDialOption, func(client volume_server_pb.VolumeServerClient) error {
+ return operation.WithVolumeServerClient(false, dn.ServerAddress(), grpcDialOption, func(client volume_server_pb.VolumeServerClient) error {
- _, deleteErr := client.AllocateVolume(context.Background(), &volume_server_pb.AllocateVolumeRequest{
+ _, allocateErr := client.AllocateVolume(context.Background(), &volume_server_pb.AllocateVolumeRequest{
VolumeId: uint32(vid),
Collection: option.Collection,
Replication: option.ReplicaPlacement.String(),
@@ -26,7 +26,7 @@ func AllocateVolume(dn *DataNode, grpcDialOption grpc.DialOption, vid needle.Vol
MemoryMapMaxSizeMb: option.MemoryMapMaxSizeMb,
DiskType: string(option.DiskType),
})
- return deleteErr
+ return allocateErr
})
}
diff --git a/weed/topology/cluster_commands.go b/weed/topology/cluster_commands.go
index 152691ccb..1bcc6b449 100644
--- a/weed/topology/cluster_commands.go
+++ b/weed/topology/cluster_commands.go
@@ -1,9 +1,12 @@
package topology
import (
+ "encoding/json"
+ "fmt"
"github.com/chrislusf/raft"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/storage/needle"
+ hashicorpRaft "github.com/hashicorp/raft"
)
type MaxVolumeIdCommand struct {
@@ -20,6 +23,7 @@ func (c *MaxVolumeIdCommand) CommandName() string {
return "MaxVolumeId"
}
+// deprecatedCommandApply represents the old interface to apply a command to the server.
func (c *MaxVolumeIdCommand) Apply(server raft.Server) (interface{}, error) {
topo := server.Context().(*Topology)
before := topo.GetMaxVolumeId()
@@ -29,3 +33,19 @@ func (c *MaxVolumeIdCommand) Apply(server raft.Server) (interface{}, error) {
return nil, nil
}
+
+func (s *MaxVolumeIdCommand) Persist(sink hashicorpRaft.SnapshotSink) error {
+ b, err := json.Marshal(s)
+ if err != nil {
+ return fmt.Errorf("marshal: %v", err)
+ }
+ _, err = sink.Write(b)
+ if err != nil {
+ sink.Cancel()
+ return fmt.Errorf("sink.Write(): %v", err)
+ }
+ return sink.Close()
+}
+
+func (s *MaxVolumeIdCommand) Release() {
+}
diff --git a/weed/topology/data_node.go b/weed/topology/data_node.go
index 2813f7b45..6bdbd965f 100644
--- a/weed/topology/data_node.go
+++ b/weed/topology/data_node.go
@@ -2,22 +2,23 @@ package topology
import (
"fmt"
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/pb"
"github.com/chrislusf/seaweedfs/weed/pb/master_pb"
+ "github.com/chrislusf/seaweedfs/weed/storage"
"github.com/chrislusf/seaweedfs/weed/storage/needle"
"github.com/chrislusf/seaweedfs/weed/storage/types"
"github.com/chrislusf/seaweedfs/weed/util"
- "strconv"
-
- "github.com/chrislusf/seaweedfs/weed/glog"
- "github.com/chrislusf/seaweedfs/weed/storage"
)
type DataNode struct {
NodeImpl
Ip string
Port int
+ GrpcPort int
PublicUrl string
LastSeen int64 // unix time in seconds
+ Counter int // in race condition, the previous dataNode was not dead
}
func NewDataNode(id string) *DataNode {
@@ -109,6 +110,9 @@ func (dn *DataNode) DeltaUpdateVolumes(newVolumes, deletedVolumes []storage.Volu
for _, v := range deletedVolumes {
disk := dn.getOrCreateDisk(v.DiskType)
+ if _, found := disk.volumes[v.Id]; !found {
+ continue
+ }
delete(disk.volumes, v.Id)
deltaDiskUsages := newDiskUsages()
@@ -206,7 +210,11 @@ func (dn *DataNode) MatchLocation(ip string, port int) bool {
}
func (dn *DataNode) Url() string {
- return dn.Ip + ":" + strconv.Itoa(dn.Port)
+ return util.JoinHostPort(dn.Ip, dn.Port)
+}
+
+func (dn *DataNode) ServerAddress() pb.ServerAddress {
+ return pb.NewServerAddress(dn.Ip, dn.Port, dn.GrpcPort)
}
func (dn *DataNode) ToMap() interface{} {
@@ -240,6 +248,7 @@ func (dn *DataNode) ToDataNodeInfo() *master_pb.DataNodeInfo {
m := &master_pb.DataNodeInfo{
Id: string(dn.Id()),
DiskInfos: make(map[string]*master_pb.DiskInfo),
+ GrpcPort: uint32(dn.GrpcPort),
}
for _, c := range dn.Children() {
disk := c.(*Disk)
diff --git a/weed/topology/data_node_ec.go b/weed/topology/data_node_ec.go
index 330b16b24..bf72fa9af 100644
--- a/weed/topology/data_node_ec.go
+++ b/weed/topology/data_node_ec.go
@@ -58,7 +58,7 @@ func (dn *DataNode) UpdateEcShards(actualShards []*erasure_coding.EcVolumeInfo)
}
for _, ecShards := range actualShards {
- if dn.hasEcShards(ecShards.VolumeId) {
+ if dn.HasEcShards(ecShards.VolumeId) {
continue
}
@@ -79,7 +79,7 @@ func (dn *DataNode) UpdateEcShards(actualShards []*erasure_coding.EcVolumeInfo)
return
}
-func (dn *DataNode) hasEcShards(volumeId needle.VolumeId) (found bool) {
+func (dn *DataNode) HasEcShards(volumeId needle.VolumeId) (found bool) {
dn.RLock()
defer dn.RUnlock()
for _, c := range dn.children {
diff --git a/weed/topology/node.go b/weed/topology/node.go
index 4772cb411..c5956177a 100644
--- a/weed/topology/node.go
+++ b/weed/topology/node.go
@@ -3,6 +3,7 @@ package topology
import (
"errors"
"github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/stats"
"github.com/chrislusf/seaweedfs/weed/storage/erasure_coding"
"github.com/chrislusf/seaweedfs/weed/storage/needle"
"github.com/chrislusf/seaweedfs/weed/storage/types"
@@ -246,6 +247,14 @@ func (n *NodeImpl) CollectDeadNodeAndFullVolumes(freshThreshHold int64, volumeSi
} else if float64(v.Size) > float64(volumeSizeLimit)*growThreshold {
n.GetTopology().chanCrowdedVolumes <- v
}
+ copyCount := v.ReplicaPlacement.GetCopyCount()
+ if copyCount > 1 {
+ if copyCount > len(n.GetTopology().Lookup(v.Collection, v.Id)) {
+ stats.MasterReplicaPlacementMismatch.WithLabelValues(v.Collection, v.Id.String()).Set(1)
+ } else {
+ stats.MasterReplicaPlacementMismatch.WithLabelValues(v.Collection, v.Id.String()).Set(0)
+ }
+ }
}
}
} else {
diff --git a/weed/topology/rack.go b/weed/topology/rack.go
index 8eb2a717c..cd09746b2 100644
--- a/weed/topology/rack.go
+++ b/weed/topology/rack.go
@@ -3,7 +3,7 @@ package topology
import (
"github.com/chrislusf/seaweedfs/weed/pb/master_pb"
"github.com/chrislusf/seaweedfs/weed/storage/types"
- "strconv"
+ "github.com/chrislusf/seaweedfs/weed/util"
"time"
)
@@ -30,7 +30,7 @@ func (r *Rack) FindDataNode(ip string, port int) *DataNode {
}
return nil
}
-func (r *Rack) GetOrCreateDataNode(ip string, port int, publicUrl string, maxVolumeCounts map[string]uint32) *DataNode {
+func (r *Rack) GetOrCreateDataNode(ip string, port int, grpcPort int, publicUrl string, maxVolumeCounts map[string]uint32) *DataNode {
for _, c := range r.Children() {
dn := c.(*DataNode)
if dn.MatchLocation(ip, port) {
@@ -38,9 +38,10 @@ func (r *Rack) GetOrCreateDataNode(ip string, port int, publicUrl string, maxVol
return dn
}
}
- dn := NewDataNode(ip + ":" + strconv.Itoa(port))
+ dn := NewDataNode(util.JoinHostPort(ip, port))
dn.Ip = ip
dn.Port = port
+ dn.GrpcPort = grpcPort
dn.PublicUrl = publicUrl
dn.LastSeen = time.Now().Unix()
r.LinkChildNode(dn)
diff --git a/weed/topology/store_replicate.go b/weed/topology/store_replicate.go
index b114b468d..7bb10f1da 100644
--- a/weed/topology/store_replicate.go
+++ b/weed/topology/store_replicate.go
@@ -13,6 +13,7 @@ import (
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/operation"
"github.com/chrislusf/seaweedfs/weed/security"
+ "github.com/chrislusf/seaweedfs/weed/stats"
"github.com/chrislusf/seaweedfs/weed/storage"
"github.com/chrislusf/seaweedfs/weed/storage/needle"
"github.com/chrislusf/seaweedfs/weed/storage/types"
@@ -28,7 +29,7 @@ func ReplicatedWrite(masterFn operation.GetMasterFn, grpcDialOption grpc.DialOpt
var remoteLocations []operation.Location
if r.FormValue("type") != "replicate" {
// this is the initial request
- remoteLocations, err = getWritableRemoteReplications(s, grpcDialOption, volumeId, masterFn)
+ remoteLocations, err = GetWritableRemoteReplications(s, grpcDialOption, volumeId, masterFn)
if err != nil {
glog.V(0).Infoln(err)
return
@@ -44,6 +45,7 @@ func ReplicatedWrite(masterFn operation.GetMasterFn, grpcDialOption grpc.DialOpt
if s.GetVolume(volumeId) != nil {
isUnchanged, err = s.WriteVolumeNeedle(volumeId, n, true, fsync)
if err != nil {
+ stats.VolumeServerRequestCounter.WithLabelValues(stats.ErrorWriteToLocalDisk).Inc()
err = fmt.Errorf("failed to write to local disk: %v", err)
glog.V(0).Infoln(err)
return
@@ -74,6 +76,7 @@ func ReplicatedWrite(masterFn operation.GetMasterFn, grpcDialOption grpc.DialOpt
tmpMap := make(map[string]string)
err := json.Unmarshal(n.Pairs, &tmpMap)
if err != nil {
+ stats.VolumeServerRequestCounter.WithLabelValues(stats.ErrorUnmarshalPairs).Inc()
glog.V(0).Infoln("Unmarshal pairs error:", err)
}
for k, v := range tmpMap {
@@ -83,11 +86,22 @@ func ReplicatedWrite(masterFn operation.GetMasterFn, grpcDialOption grpc.DialOpt
// volume server do not know about encryption
// TODO optimize here to compress data only once
- _, err := operation.UploadData(u.String(), string(n.Name), false, n.Data, n.IsCompressed(), string(n.Mime), pairMap, jwt)
+ uploadOption := &operation.UploadOption{
+ UploadUrl: u.String(),
+ Filename: string(n.Name),
+ Cipher: false,
+ IsInputCompressed: n.IsCompressed(),
+ MimeType: string(n.Mime),
+ PairMap: pairMap,
+ Jwt: jwt,
+ }
+ _, err := operation.UploadData(n.Data, uploadOption)
return err
}); err != nil {
+ stats.VolumeServerRequestCounter.WithLabelValues(stats.ErrorWriteToReplicas).Inc()
err = fmt.Errorf("failed to write to replicas for volume %d: %v", volumeId, err)
glog.V(0).Infoln(err)
+ return false, err
}
}
return
@@ -100,7 +114,7 @@ func ReplicatedDelete(masterFn operation.GetMasterFn, grpcDialOption grpc.DialOp
var remoteLocations []operation.Location
if r.FormValue("type") != "replicate" {
- remoteLocations, err = getWritableRemoteReplications(store, grpcDialOption, volumeId, masterFn)
+ remoteLocations, err = GetWritableRemoteReplications(store, grpcDialOption, volumeId, masterFn)
if err != nil {
glog.V(0).Infoln(err)
return
@@ -160,7 +174,7 @@ func DistributedOperation(locations []operation.Location, op func(location opera
return ret.Error()
}
-func getWritableRemoteReplications(s *storage.Store, grpcDialOption grpc.DialOption, volumeId needle.VolumeId, masterFn operation.GetMasterFn) (remoteLocations []operation.Location, err error) {
+func GetWritableRemoteReplications(s *storage.Store, grpcDialOption grpc.DialOption, volumeId needle.VolumeId, masterFn operation.GetMasterFn) (remoteLocations []operation.Location, err error) {
v := s.GetVolume(volumeId)
if v != nil && v.ReplicaPlacement.GetCopyCount() == 1 {
@@ -170,7 +184,7 @@ func getWritableRemoteReplications(s *storage.Store, grpcDialOption grpc.DialOpt
// not on local store, or has replications
lookupResult, lookupErr := operation.LookupVolumeId(masterFn, grpcDialOption, volumeId.String())
if lookupErr == nil {
- selfUrl := s.Ip + ":" + strconv.Itoa(s.Port)
+ selfUrl := util.JoinHostPort(s.Ip, s.Port)
for _, location := range lookupResult.Locations {
if location.Url != selfUrl {
remoteLocations = append(remoteLocations, location)
diff --git a/weed/topology/topology.go b/weed/topology/topology.go
index 4cbe22a42..631c1fa29 100644
--- a/weed/topology/topology.go
+++ b/weed/topology/topology.go
@@ -1,14 +1,18 @@
package topology
import (
+ "encoding/json"
"errors"
"fmt"
- "github.com/chrislusf/seaweedfs/weed/storage/types"
"math/rand"
"sync"
"time"
+ "github.com/chrislusf/seaweedfs/weed/pb"
+ "github.com/chrislusf/seaweedfs/weed/storage/types"
+
"github.com/chrislusf/raft"
+ hashicorpRaft "github.com/hashicorp/raft"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb/master_pb"
@@ -39,7 +43,10 @@ type Topology struct {
Configuration *Configuration
- RaftServer raft.Server
+ RaftServer raft.Server
+ HashicorpRaft *hashicorpRaft.Raft
+ UuidAccessLock sync.RWMutex
+ UuidMap map[string][]string
}
func NewTopology(id string, seq sequence.Sequencer, volumeSizeLimit uint64, pulse int, replicationAsMin bool) *Topology {
@@ -71,19 +78,25 @@ func (t *Topology) IsLeader() bool {
return true
}
if leader, err := t.Leader(); err == nil {
- if t.RaftServer.Name() == leader {
+ if pb.ServerAddress(t.RaftServer.Name()) == leader {
return true
}
}
+ } else if t.HashicorpRaft != nil {
+ if t.HashicorpRaft.State() == hashicorpRaft.Leader {
+ return true
+ }
}
return false
}
-func (t *Topology) Leader() (string, error) {
- l := ""
+func (t *Topology) Leader() (pb.ServerAddress, error) {
+ var l pb.ServerAddress
for count := 0; count < 3; count++ {
if t.RaftServer != nil {
- l = t.RaftServer.Leader()
+ l = pb.ServerAddress(t.RaftServer.Leader())
+ } else if t.HashicorpRaft != nil {
+ l = pb.ServerAddress(t.HashicorpRaft.Leader())
} else {
return "", errors.New("Raft Server not ready yet!")
}
@@ -123,8 +136,18 @@ func (t *Topology) Lookup(collection string, vid needle.VolumeId) (dataNodes []*
func (t *Topology) NextVolumeId() (needle.VolumeId, error) {
vid := t.GetMaxVolumeId()
next := vid.Next()
- if _, err := t.RaftServer.Do(NewMaxVolumeIdCommand(next)); err != nil {
- return 0, err
+ if t.RaftServer != nil {
+ if _, err := t.RaftServer.Do(NewMaxVolumeIdCommand(next)); err != nil {
+ return 0, err
+ }
+ } else if t.HashicorpRaft != nil {
+ b, err := json.Marshal(NewMaxVolumeIdCommand(next))
+ if err != nil {
+ return 0, fmt.Errorf("failed marshal NewMaxVolumeIdCommand: %+v", err)
+ }
+ if future := t.HashicorpRaft.Apply(b, time.Second); future.Error() != nil {
+ return 0, future.Error()
+ }
}
return next, nil
}
@@ -136,7 +159,7 @@ func (t *Topology) HasWritableVolume(option *VolumeGrowOption) bool {
return active > 0
}
-func (t *Topology) PickForWrite(count uint64, option *VolumeGrowOption) (string, uint64, *DataNode, error) {
+func (t *Topology) PickForWrite(count uint64, option *VolumeGrowOption) (string, uint64, *VolumeLocationList, error) {
vid, count, datanodes, err := t.GetVolumeLayout(option.Collection, option.ReplicaPlacement, option.Ttl, option.DiskType).PickForWrite(count, option)
if err != nil {
return "", 0, nil, fmt.Errorf("failed to find writable volumes for collection:%s replication:%s ttl:%s error: %v", option.Collection, option.ReplicaPlacement.String(), option.Ttl.String(), err)
@@ -145,7 +168,7 @@ func (t *Topology) PickForWrite(count uint64, option *VolumeGrowOption) (string,
return "", 0, nil, fmt.Errorf("no writable volumes available for collection:%s replication:%s ttl:%s", option.Collection, option.ReplicaPlacement.String(), option.Ttl.String())
}
fileId := t.Sequence.NextFileId(count)
- return needle.NewFileId(*vid, fileId, rand.Uint32()).String(), count, datanodes.Head(), nil
+ return needle.NewFileId(*vid, fileId, rand.Uint32()).String(), count, datanodes, nil
}
func (t *Topology) GetVolumeLayout(collectionName string, rp *super_block.ReplicaPlacement, ttl *needle.TTL, diskType types.DiskType) *VolumeLayout {
@@ -205,7 +228,7 @@ func (t *Topology) RegisterVolumeLayout(v storage.VolumeInfo, dn *DataNode) {
vl.EnsureCorrectWritables(&v)
}
func (t *Topology) UnRegisterVolumeLayout(v storage.VolumeInfo, dn *DataNode) {
- glog.Infof("removing volume info: %+v", v)
+ glog.Infof("removing volume info: %+v from %v", v, dn.id)
diskType := types.ToDiskType(v.DiskType)
volumeLayout := t.GetVolumeLayout(v.Collection, v.ReplicaPlacement, v.Ttl, diskType)
volumeLayout.UnRegisterVolume(&v, dn)
@@ -282,3 +305,14 @@ func (t *Topology) IncrementalSyncDataNodeRegistration(newVolumes, deletedVolume
return
}
+
+func (t *Topology) DataNodeRegistration(dcName, rackName string, dn *DataNode) {
+ if dn.Parent() != nil {
+ return
+ }
+ // registration to topo
+ dc := t.GetOrCreateDataCenter(dcName)
+ rack := dc.GetOrCreateRack(rackName)
+ rack.LinkChildNode(dn)
+ glog.Infof("[%s] reLink To topo ", dn.Id())
+}
diff --git a/weed/topology/topology_ec.go b/weed/topology/topology_ec.go
index 022eeb578..fdc4f274e 100644
--- a/weed/topology/topology_ec.go
+++ b/weed/topology/topology_ec.go
@@ -2,6 +2,7 @@ package topology
import (
"github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/pb"
"github.com/chrislusf/seaweedfs/weed/pb/master_pb"
"github.com/chrislusf/seaweedfs/weed/storage/erasure_coding"
"github.com/chrislusf/seaweedfs/weed/storage/needle"
@@ -135,16 +136,16 @@ func (t *Topology) LookupEcShards(vid needle.VolumeId) (locations *EcShardLocati
return
}
-func (t *Topology) ListEcServersByCollection(collection string) (dataNodes []string) {
+func (t *Topology) ListEcServersByCollection(collection string) (dataNodes []pb.ServerAddress) {
t.ecShardMapLock.RLock()
defer t.ecShardMapLock.RUnlock()
- dateNodeMap := make(map[string]bool)
+ dateNodeMap := make(map[pb.ServerAddress]bool)
for _, ecVolumeLocation := range t.ecShardMap {
if ecVolumeLocation.Collection == collection {
for _, locations := range ecVolumeLocation.Locations {
for _, loc := range locations {
- dateNodeMap[string(loc.Id())] = true
+ dateNodeMap[loc.ServerAddress()] = true
}
}
}
diff --git a/weed/topology/topology_event_handling.go b/weed/topology/topology_event_handling.go
index 0f1db74df..fe3717233 100644
--- a/weed/topology/topology_event_handling.go
+++ b/weed/topology/topology_event_handling.go
@@ -1,6 +1,7 @@
package topology
import (
+ "github.com/chrislusf/seaweedfs/weed/storage/erasure_coding"
"github.com/chrislusf/seaweedfs/weed/storage/types"
"google.golang.org/grpc"
"math/rand"
@@ -24,7 +25,7 @@ func (t *Topology) StartRefreshWritableVolumes(grpcDialOption grpc.DialOption, g
c := time.Tick(15 * time.Minute)
for _ = range c {
if t.IsLeader() {
- t.Vacuum(grpcDialOption, garbageThreshold, preallocate)
+ t.Vacuum(grpcDialOption, garbageThreshold, 0, "", preallocate)
}
}
}(garbageThreshold)
@@ -84,7 +85,8 @@ func (t *Topology) UnRegisterDataNode(dn *DataNode) {
negativeUsages := dn.GetDiskUsages().negative()
dn.UpAdjustDiskUsageDelta(negativeUsages)
-
+ dn.DeltaUpdateVolumes([]storage.VolumeInfo{}, dn.GetVolumes())
+ dn.DeltaUpdateEcShards([]*erasure_coding.EcVolumeInfo{}, dn.GetEcShards())
if dn.Parent() != nil {
dn.Parent().UnlinkChildNode(dn.Id())
}
diff --git a/weed/topology/topology_test.go b/weed/topology/topology_test.go
index ecfe9d8d1..2ece48a95 100644
--- a/weed/topology/topology_test.go
+++ b/weed/topology/topology_test.go
@@ -31,7 +31,7 @@ func TestHandlingVolumeServerHeartbeat(t *testing.T) {
maxVolumeCounts := make(map[string]uint32)
maxVolumeCounts[""] = 25
maxVolumeCounts["ssd"] = 12
- dn := rack.GetOrCreateDataNode("127.0.0.1", 34534, "127.0.0.1", maxVolumeCounts)
+ dn := rack.GetOrCreateDataNode("127.0.0.1", 34534, 0, "127.0.0.1", maxVolumeCounts)
{
volumeCount := 7
@@ -177,7 +177,7 @@ func TestAddRemoveVolume(t *testing.T) {
maxVolumeCounts := make(map[string]uint32)
maxVolumeCounts[""] = 25
maxVolumeCounts["ssd"] = 12
- dn := rack.GetOrCreateDataNode("127.0.0.1", 34534, "127.0.0.1", maxVolumeCounts)
+ dn := rack.GetOrCreateDataNode("127.0.0.1", 34534, 0, "127.0.0.1", maxVolumeCounts)
v := storage.VolumeInfo{
Id: needle.VolumeId(1),
diff --git a/weed/topology/topology_vacuum.go b/weed/topology/topology_vacuum.go
index 9feb55b73..e53aa2853 100644
--- a/weed/topology/topology_vacuum.go
+++ b/weed/topology/topology_vacuum.go
@@ -2,6 +2,8 @@ package topology
import (
"context"
+ "github.com/chrislusf/seaweedfs/weed/pb"
+ "io"
"sync/atomic"
"time"
@@ -19,8 +21,8 @@ func (t *Topology) batchVacuumVolumeCheck(grpcDialOption grpc.DialOption, vid ne
ch := make(chan int, locationlist.Length())
errCount := int32(0)
for index, dn := range locationlist.list {
- go func(index int, url string, vid needle.VolumeId) {
- err := operation.WithVolumeServerClient(url, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
+ go func(index int, url pb.ServerAddress, vid needle.VolumeId) {
+ err := operation.WithVolumeServerClient(false, url, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
resp, err := volumeServerClient.VacuumVolumeCheck(context.Background(), &volume_server_pb.VacuumVolumeCheckRequest{
VolumeId: uint32(vid),
})
@@ -39,7 +41,7 @@ func (t *Topology) batchVacuumVolumeCheck(grpcDialOption grpc.DialOption, vid ne
if err != nil {
glog.V(0).Infof("Checking vacuuming %d on %s: %v", vid, url, err)
}
- }(index, dn.Url(), vid)
+ }(index, dn.ServerAddress(), vid)
}
vacuumLocationList := NewVolumeLocationList()
@@ -58,6 +60,7 @@ func (t *Topology) batchVacuumVolumeCheck(grpcDialOption grpc.DialOption, vid ne
}
return vacuumLocationList, errCount == 0 && len(vacuumLocationList.list) > 0
}
+
func (t *Topology) batchVacuumVolumeCompact(grpcDialOption grpc.DialOption, vl *VolumeLayout, vid needle.VolumeId,
locationlist *VolumeLocationList, preallocate int64) bool {
vl.accessLock.Lock()
@@ -66,14 +69,29 @@ func (t *Topology) batchVacuumVolumeCompact(grpcDialOption grpc.DialOption, vl *
ch := make(chan bool, locationlist.Length())
for index, dn := range locationlist.list {
- go func(index int, url string, vid needle.VolumeId) {
+ go func(index int, url pb.ServerAddress, vid needle.VolumeId) {
glog.V(0).Infoln(index, "Start vacuuming", vid, "on", url)
- err := operation.WithVolumeServerClient(url, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
- _, err := volumeServerClient.VacuumVolumeCompact(context.Background(), &volume_server_pb.VacuumVolumeCompactRequest{
+ err := operation.WithVolumeServerClient(true, url, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
+ stream, err := volumeServerClient.VacuumVolumeCompact(context.Background(), &volume_server_pb.VacuumVolumeCompactRequest{
VolumeId: uint32(vid),
Preallocate: preallocate,
})
- return err
+ if err != nil {
+ return err
+ }
+
+ for {
+ resp, recvErr := stream.Recv()
+ if recvErr != nil {
+ if recvErr == io.EOF {
+ break
+ } else {
+ return recvErr
+ }
+ }
+ glog.V(0).Infof("%d vacuum %d on %s processed %d bytes", index, vid, url, resp.ProcessedBytes)
+ }
+ return nil
})
if err != nil {
glog.Errorf("Error when vacuuming %d on %s: %v", vid, url, err)
@@ -82,7 +100,7 @@ func (t *Topology) batchVacuumVolumeCompact(grpcDialOption grpc.DialOption, vl *
glog.V(0).Infof("Complete vacuuming %d on %s", vid, url)
ch <- true
}
- }(index, dn.Url(), vid)
+ }(index, dn.ServerAddress(), vid)
}
isVacuumSuccess := true
@@ -99,12 +117,13 @@ func (t *Topology) batchVacuumVolumeCompact(grpcDialOption grpc.DialOption, vl *
}
return isVacuumSuccess
}
+
func (t *Topology) batchVacuumVolumeCommit(grpcDialOption grpc.DialOption, vl *VolumeLayout, vid needle.VolumeId, locationlist *VolumeLocationList) bool {
isCommitSuccess := true
isReadOnly := false
for _, dn := range locationlist.list {
glog.V(0).Infoln("Start Committing vacuum", vid, "on", dn.Url())
- err := operation.WithVolumeServerClient(dn.Url(), grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
+ err := operation.WithVolumeServerClient(false, dn.ServerAddress(), grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
resp, err := volumeServerClient.VacuumVolumeCommit(context.Background(), &volume_server_pb.VacuumVolumeCommitRequest{
VolumeId: uint32(vid),
})
@@ -127,10 +146,11 @@ func (t *Topology) batchVacuumVolumeCommit(grpcDialOption grpc.DialOption, vl *V
}
return isCommitSuccess
}
+
func (t *Topology) batchVacuumVolumeCleanup(grpcDialOption grpc.DialOption, vl *VolumeLayout, vid needle.VolumeId, locationlist *VolumeLocationList) {
for _, dn := range locationlist.list {
glog.V(0).Infoln("Start cleaning up", vid, "on", dn.Url())
- err := operation.WithVolumeServerClient(dn.Url(), grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
+ err := operation.WithVolumeServerClient(false, dn.ServerAddress(), grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
_, err := volumeServerClient.VacuumVolumeCleanup(context.Background(), &volume_server_pb.VacuumVolumeCleanupRequest{
VolumeId: uint32(vid),
})
@@ -144,7 +164,7 @@ func (t *Topology) batchVacuumVolumeCleanup(grpcDialOption grpc.DialOption, vl *
}
}
-func (t *Topology) Vacuum(grpcDialOption grpc.DialOption, garbageThreshold float64, preallocate int64) {
+func (t *Topology) Vacuum(grpcDialOption grpc.DialOption, garbageThreshold float64, volumeId uint32, collection string, preallocate int64) {
// if there is vacuum going on, return immediately
swapped := atomic.CompareAndSwapInt64(&t.vacuumLockCounter, 0, 1)
@@ -155,12 +175,19 @@ func (t *Topology) Vacuum(grpcDialOption grpc.DialOption, garbageThreshold float
// now only one vacuum process going on
- glog.V(1).Infof("Start vacuum on demand with threshold: %f", garbageThreshold)
+ glog.V(1).Infof("Start vacuum on demand with threshold: %f collection: %s volumeId: %d",
+ garbageThreshold, collection, volumeId)
for _, col := range t.collectionMap.Items() {
c := col.(*Collection)
+ if collection != "" && collection != c.Name {
+ continue
+ }
for _, vl := range c.storageType2VolumeLayout.Items() {
if vl != nil {
volumeLayout := vl.(*VolumeLayout)
+ if volumeId > 0 && volumeLayout.Lookup(needle.VolumeId(volumeId)) == nil {
+ continue
+ }
t.vacuumOneVolumeLayout(grpcDialOption, volumeLayout, c, garbageThreshold, preallocate)
}
}
diff --git a/weed/topology/volume_growth.go b/weed/topology/volume_growth.go
index e2949848d..238ca99f4 100644
--- a/weed/topology/volume_growth.go
+++ b/weed/topology/volume_growth.go
@@ -3,6 +3,7 @@ package topology
import (
"encoding/json"
"fmt"
+ "github.com/chrislusf/seaweedfs/weed/pb/master_pb"
"math/rand"
"sync"
@@ -77,42 +78,50 @@ func (vg *VolumeGrowth) findVolumeCount(copyCount int) (count int) {
return
}
-func (vg *VolumeGrowth) AutomaticGrowByType(option *VolumeGrowOption, grpcDialOption grpc.DialOption, topo *Topology, targetCount int) (count int, err error) {
+func (vg *VolumeGrowth) AutomaticGrowByType(option *VolumeGrowOption, grpcDialOption grpc.DialOption, topo *Topology, targetCount int) (result []*master_pb.VolumeLocation, err error) {
if targetCount == 0 {
targetCount = vg.findVolumeCount(option.ReplicaPlacement.GetCopyCount())
}
- count, err = vg.GrowByCountAndType(grpcDialOption, targetCount, option, topo)
- if count > 0 && count%option.ReplicaPlacement.GetCopyCount() == 0 {
- return count, nil
+ result, err = vg.GrowByCountAndType(grpcDialOption, targetCount, option, topo)
+ if len(result) > 0 && len(result)%option.ReplicaPlacement.GetCopyCount() == 0 {
+ return result, nil
}
- return count, err
+ return result, err
}
-func (vg *VolumeGrowth) GrowByCountAndType(grpcDialOption grpc.DialOption, targetCount int, option *VolumeGrowOption, topo *Topology) (counter int, err error) {
+func (vg *VolumeGrowth) GrowByCountAndType(grpcDialOption grpc.DialOption, targetCount int, option *VolumeGrowOption, topo *Topology) (result []*master_pb.VolumeLocation, err error) {
vg.accessLock.Lock()
defer vg.accessLock.Unlock()
for i := 0; i < targetCount; i++ {
- if c, e := vg.findAndGrow(grpcDialOption, topo, option); e == nil {
- counter += c
+ if res, e := vg.findAndGrow(grpcDialOption, topo, option); e == nil {
+ result = append(result, res...)
} else {
- glog.V(0).Infof("create %d volume, created %d: %v", targetCount, counter, e)
- return counter, e
+ glog.V(0).Infof("create %d volume, created %d: %v", targetCount, len(result), e)
+ return result, e
}
}
return
}
-func (vg *VolumeGrowth) findAndGrow(grpcDialOption grpc.DialOption, topo *Topology, option *VolumeGrowOption) (int, error) {
+func (vg *VolumeGrowth) findAndGrow(grpcDialOption grpc.DialOption, topo *Topology, option *VolumeGrowOption) (result []*master_pb.VolumeLocation, err error) {
servers, e := vg.findEmptySlotsForOneVolume(topo, option)
if e != nil {
- return 0, e
+ return nil, e
}
vid, raftErr := topo.NextVolumeId()
if raftErr != nil {
- return 0, raftErr
+ return nil, raftErr
}
- err := vg.grow(grpcDialOption, topo, vid, option, servers...)
- return len(servers), err
+ if err = vg.grow(grpcDialOption, topo, vid, option, servers...); err == nil {
+ for _, server := range servers {
+ result = append(result, &master_pb.VolumeLocation{
+ Url: server.Url(),
+ PublicUrl: server.PublicUrl,
+ NewVids: []uint32{uint32(vid)},
+ })
+ }
+ }
+ return
}
// 1. find the main data node
@@ -181,7 +190,7 @@ func (vg *VolumeGrowth) findEmptySlotsForOneVolume(topo *Topology, option *Volum
return nil, rackErr
}
- //find main rack and other racks
+ //find main server and other servers
mainServer, otherServers, serverErr := mainRack.(*Rack).PickNodesByWeight(rp.SameRackCount+1, option, func(node Node) error {
if option.DataNode != "" && node.IsDataNode() && node.Id() != NodeId(option.DataNode) {
return fmt.Errorf("Not matching preferred data node:%s", option.DataNode)
diff --git a/weed/topology/volume_layout.go b/weed/topology/volume_layout.go
index f315cb7e4..167aee8ea 100644
--- a/weed/topology/volume_layout.go
+++ b/weed/topology/volume_layout.go
@@ -114,6 +114,8 @@ type VolumeLayout struct {
volumeSizeLimit uint64
replicationAsMin bool
accessLock sync.RWMutex
+ growRequestCount int
+ growRequestTime time.Time
}
type VolumeLayoutStats struct {
@@ -138,9 +140,7 @@ func NewVolumeLayout(rp *super_block.ReplicaPlacement, ttl *needle.TTL, diskType
}
func (vl *VolumeLayout) String() string {
- vl.accessLock.RLock()
- defer vl.accessLock.RUnlock()
- return fmt.Sprintf("rp:%v, ttl:%v, vid2location:%v, writables:%v, volumeSizeLimit:%v", vl.rp, vl.ttl, vl.vid2location, vl.writables, vl.volumeSizeLimit)
+ return fmt.Sprintf("rp:%v, ttl:%v, writables:%v, volumeSizeLimit:%v", vl.rp, vl.ttl, vl.writables, vl.volumeSizeLimit)
}
func (vl *VolumeLayout) RegisterVolume(v *storage.VolumeInfo, dn *DataNode) {
@@ -218,6 +218,13 @@ func (vl *VolumeLayout) ensureCorrectWritables(vid needle.VolumeId) {
vl.setVolumeWritable(vid)
}
} else {
+ if !vl.enoughCopies(vid) {
+ glog.V(0).Infof("volume %d does not have enough copies", vid)
+ }
+ if !vl.isAllWritable(vid) {
+ glog.V(0).Infof("volume %d are not all writable", vid)
+ }
+ glog.V(0).Infof("volume %d remove from writable", vid)
vl.removeFromWritable(vid)
}
}
@@ -279,7 +286,7 @@ func (vl *VolumeLayout) PickForWrite(count uint64, option *VolumeGrowOption) (*n
//glog.V(0).Infoln("No more writable volumes!")
return nil, 0, nil, errors.New("No more writable volumes!")
}
- if option.DataCenter == "" {
+ if option.DataCenter == "" && option.Rack == "" && option.DataNode == "" {
vid := vl.writables[rand.Intn(lenWriters)]
locationList := vl.vid2location[vid]
if locationList != nil {
@@ -293,23 +300,45 @@ func (vl *VolumeLayout) PickForWrite(count uint64, option *VolumeGrowOption) (*n
for _, v := range vl.writables {
volumeLocationList := vl.vid2location[v]
for _, dn := range volumeLocationList.list {
- if dn.GetDataCenter().Id() == NodeId(option.DataCenter) {
- if option.Rack != "" && dn.GetRack().Id() != NodeId(option.Rack) {
- continue
- }
- if option.DataNode != "" && dn.Id() != NodeId(option.DataNode) {
- continue
- }
- counter++
- if rand.Intn(counter) < 1 {
- vid, locationList = v, volumeLocationList
- }
+ if option.DataCenter != "" && dn.GetDataCenter().Id() != NodeId(option.DataCenter) {
+ continue
+ }
+ if option.Rack != "" && dn.GetRack().Id() != NodeId(option.Rack) {
+ continue
+ }
+ if option.DataNode != "" && dn.Id() != NodeId(option.DataNode) {
+ continue
+ }
+ counter++
+ if rand.Intn(counter) < 1 {
+ vid, locationList = v, volumeLocationList.Copy()
}
}
}
return &vid, count, locationList, nil
}
+func (vl *VolumeLayout) HasGrowRequest() bool {
+ if vl.growRequestCount > 0 && vl.growRequestTime.Add(time.Minute).After(time.Now()) {
+ return true
+ }
+ return false
+}
+func (vl *VolumeLayout) AddGrowRequest() {
+ vl.growRequestTime = time.Now()
+ vl.growRequestCount++
+}
+func (vl *VolumeLayout) DoneGrowRequest() {
+ vl.growRequestTime = time.Unix(0, 0)
+ vl.growRequestCount = 0
+}
+
+func (vl *VolumeLayout) ShouldGrowVolumes(option *VolumeGrowOption) bool {
+ active, crowded := vl.GetActiveVolumeCount(option)
+ //glog.V(0).Infof("active volume: %d, high usage volume: %d\n", active, high)
+ return active <= crowded
+}
+
func (vl *VolumeLayout) GetActiveVolumeCount(option *VolumeGrowOption) (active, crowded int) {
vl.accessLock.RLock()
defer vl.accessLock.RUnlock()
@@ -411,8 +440,11 @@ func (vl *VolumeLayout) SetVolumeCapacityFull(vid needle.VolumeId) bool {
vl.accessLock.Lock()
defer vl.accessLock.Unlock()
- // glog.V(0).Infoln("Volume", vid, "reaches full capacity.")
- return vl.removeFromWritable(vid)
+ wasWritable := vl.removeFromWritable(vid)
+ if wasWritable {
+ glog.V(0).Infof("Volume %d reaches full capacity.", vid)
+ }
+ return wasWritable
}
func (vl *VolumeLayout) removeFromCrowded(vid needle.VolumeId) {
diff --git a/weed/topology/volume_location_list.go b/weed/topology/volume_location_list.go
index 548c4cd25..03580ae5b 100644
--- a/weed/topology/volume_location_list.go
+++ b/weed/topology/volume_location_list.go
@@ -31,6 +31,11 @@ func (dnll *VolumeLocationList) Head() *DataNode {
return dnll.list[0]
}
+func (dnll *VolumeLocationList) Rest() []*DataNode {
+ //mark first node as master volume
+ return dnll.list[1:]
+}
+
func (dnll *VolumeLocationList) Length() int {
if dnll == nil {
return 0