aboutsummaryrefslogtreecommitdiff
path: root/weed/topology
diff options
context:
space:
mode:
Diffstat (limited to 'weed/topology')
-rw-r--r--weed/topology/data_node.go20
-rw-r--r--weed/topology/store_replicate.go3
-rw-r--r--weed/topology/topology.go27
3 files changed, 29 insertions, 21 deletions
diff --git a/weed/topology/data_node.go b/weed/topology/data_node.go
index efdf5285b..0a4df63d0 100644
--- a/weed/topology/data_node.go
+++ b/weed/topology/data_node.go
@@ -44,6 +44,10 @@ func (dn *DataNode) String() string {
func (dn *DataNode) AddOrUpdateVolume(v storage.VolumeInfo) (isNew, isChangedRO bool) {
dn.Lock()
defer dn.Unlock()
+ return dn.doAddOrUpdateVolume(v)
+}
+
+func (dn *DataNode) doAddOrUpdateVolume(v storage.VolumeInfo) (isNew, isChangedRO bool) {
if oldV, ok := dn.volumes[v.Id]; !ok {
dn.volumes[v.Id] = v
dn.UpAdjustVolumeCountDelta(1)
@@ -71,11 +75,15 @@ func (dn *DataNode) AddOrUpdateVolume(v storage.VolumeInfo) (isNew, isChangedRO
}
func (dn *DataNode) UpdateVolumes(actualVolumes []storage.VolumeInfo) (newVolumes, deletedVolumes, changeRO []storage.VolumeInfo) {
+
actualVolumeMap := make(map[needle.VolumeId]storage.VolumeInfo)
for _, v := range actualVolumes {
actualVolumeMap[v.Id] = v
}
+
dn.Lock()
+ defer dn.Unlock()
+
for vid, v := range dn.volumes {
if _, ok := actualVolumeMap[vid]; !ok {
glog.V(0).Infoln("Deleting volume id:", vid)
@@ -90,9 +98,8 @@ func (dn *DataNode) UpdateVolumes(actualVolumes []storage.VolumeInfo) (newVolume
}
}
}
- dn.Unlock()
for _, v := range actualVolumes {
- isNew, isChangedRO := dn.AddOrUpdateVolume(v)
+ isNew, isChangedRO := dn.doAddOrUpdateVolume(v)
if isNew {
newVolumes = append(newVolumes, v)
}
@@ -103,8 +110,10 @@ func (dn *DataNode) UpdateVolumes(actualVolumes []storage.VolumeInfo) (newVolume
return
}
-func (dn *DataNode) DeltaUpdateVolumes(newlVolumes, deletedVolumes []storage.VolumeInfo) {
+func (dn *DataNode) DeltaUpdateVolumes(newVolumes, deletedVolumes []storage.VolumeInfo) {
dn.Lock()
+ defer dn.Unlock()
+
for _, v := range deletedVolumes {
delete(dn.volumes, v.Id)
dn.UpAdjustVolumeCountDelta(-1)
@@ -115,9 +124,8 @@ func (dn *DataNode) DeltaUpdateVolumes(newlVolumes, deletedVolumes []storage.Vol
dn.UpAdjustActiveVolumeCountDelta(-1)
}
}
- dn.Unlock()
- for _, v := range newlVolumes {
- dn.AddOrUpdateVolume(v)
+ for _, v := range newVolumes {
+ dn.doAddOrUpdateVolume(v)
}
return
}
diff --git a/weed/topology/store_replicate.go b/weed/topology/store_replicate.go
index 481e72fe0..faa16e2f6 100644
--- a/weed/topology/store_replicate.go
+++ b/weed/topology/store_replicate.go
@@ -14,6 +14,7 @@ import (
"github.com/chrislusf/seaweedfs/weed/security"
"github.com/chrislusf/seaweedfs/weed/storage"
"github.com/chrislusf/seaweedfs/weed/storage/needle"
+ "github.com/chrislusf/seaweedfs/weed/storage/types"
"github.com/chrislusf/seaweedfs/weed/util"
)
@@ -92,7 +93,7 @@ func ReplicatedWrite(masterNode string, s *storage.Store, volumeId needle.Volume
func ReplicatedDelete(masterNode string, store *storage.Store,
volumeId needle.VolumeId, n *needle.Needle,
- r *http.Request) (size uint32, err error) {
+ r *http.Request) (size types.Size, err error) {
//check JWT
jwt := security.GetJwt(r)
diff --git a/weed/topology/topology.go b/weed/topology/topology.go
index 993f444a7..a11a1bac6 100644
--- a/weed/topology/topology.go
+++ b/weed/topology/topology.go
@@ -5,6 +5,7 @@ import (
"fmt"
"math/rand"
"sync"
+ "time"
"github.com/chrislusf/raft"
@@ -65,31 +66,29 @@ func (t *Topology) IsLeader() bool {
if t.RaftServer.State() == raft.Leader {
return true
}
- if t.RaftServer.Leader() == "" {
- return true
- }
}
return false
}
func (t *Topology) Leader() (string, error) {
l := ""
- if t.RaftServer != nil {
- l = t.RaftServer.Leader()
- } else {
- return "", errors.New("Raft Server not ready yet!")
- }
-
- if l == "" {
- // We are a single node cluster, we are the leader
- return t.RaftServer.Name(), nil
+ for count := 0; count < 3; count++ {
+ if t.RaftServer != nil {
+ l = t.RaftServer.Leader()
+ } else {
+ return "", errors.New("Raft Server not ready yet!")
+ }
+ if l != "" {
+ break
+ } else {
+ time.Sleep(time.Duration(5+count) * time.Second)
+ }
}
-
return l, nil
}
func (t *Topology) Lookup(collection string, vid needle.VolumeId) (dataNodes []*DataNode) {
- //maybe an issue if lots of collections?
+ // maybe an issue if lots of collections?
if collection == "" {
for _, c := range t.collectionMap.Items() {
if list := c.(*Collection).Lookup(vid); list != nil {