diff options
| author | guol-fnst <goul-fnst@fujitsu.com> | 2022-05-16 10:41:18 +0800 |
|---|---|---|
| committer | guol-fnst <goul-fnst@fujitsu.com> | 2022-05-16 19:33:51 +0800 |
| commit | de6aa9cce81cb2214e7b7f3dd1a3c9383d054779 (patch) | |
| tree | 0451da338c468cc8565c5ec433070a6879e99efe /weed/server | |
| parent | 8f103ae613fb4752e7ec41394e155e8ad4c06826 (diff) | |
| download | seaweedfs-de6aa9cce81cb2214e7b7f3dd1a3c9383d054779.tar.xz seaweedfs-de6aa9cce81cb2214e7b7f3dd1a3c9383d054779.zip | |
avoid duplicated volume directory
Diffstat (limited to 'weed/server')
| -rw-r--r-- | weed/server/master_grpc_server.go | 53 | ||||
| -rw-r--r-- | weed/server/volume_grpc_client_to_master.go | 11 | ||||
| -rw-r--r-- | weed/server/volume_server.go | 8 | ||||
| -rw-r--r-- | weed/server/volume_server_tcp_handlers_write.go | 7 |
4 files changed, 70 insertions, 9 deletions
diff --git a/weed/server/master_grpc_server.go b/weed/server/master_grpc_server.go index 981f663e4..b2ac4e700 100644 --- a/weed/server/master_grpc_server.go +++ b/weed/server/master_grpc_server.go @@ -2,12 +2,16 @@ package weed_server import ( "context" + "errors" + "fmt" + "net" + "sort" + "time" + "github.com/chrislusf/seaweedfs/weed/pb" "github.com/chrislusf/seaweedfs/weed/stats" "github.com/chrislusf/seaweedfs/weed/storage/backend" "github.com/chrislusf/seaweedfs/weed/util" - "net" - "time" "github.com/chrislusf/raft" "google.golang.org/grpc/peer" @@ -18,6 +22,37 @@ import ( "github.com/chrislusf/seaweedfs/weed/topology" ) +func (ms *MasterServer) RegisterUUIDs(heartbeat *master_pb.Heartbeat) error { + ms.Topo.UUIDAccessLock.Lock() + defer ms.Topo.UUIDAccessLock.Unlock() + key := fmt.Sprintf("%s:%d", heartbeat.Ip, heartbeat.Port) + if ms.Topo.UUIDMap == nil { + ms.Topo.UUIDMap = make(map[string][]string) + } + // find whether new UUID exists + for k, v := range ms.Topo.UUIDMap { + for _, id := range heartbeat.LocationUUIDs { + sort.Strings(v) + index := sort.SearchStrings(v, id) + if index < len(v) && v[index] == id { + glog.Error("directory of ", id, " on ", k, " has been loaded") + return errors.New("volume: Duplicated volume directory was been loaded") + } + } + } + ms.Topo.UUIDMap[key] = heartbeat.LocationUUIDs + glog.V(0).Infof("found new UUID:%v %v , %v", key, heartbeat.LocationUUIDs, ms.Topo.UUIDMap) + return nil +} + +func (ms *MasterServer) UnRegisterUUIDs(ip string, port int) { + ms.Topo.UUIDAccessLock.Lock() + defer ms.Topo.UUIDAccessLock.Unlock() + key := fmt.Sprintf("%s:%d", ip, port) + delete(ms.Topo.UUIDMap, key) + glog.V(0).Infof("remove volume server %v, online volume server: %v", key, ms.Topo.UUIDMap) +} + func (ms *MasterServer) SendHeartbeat(stream master_pb.Seaweed_SendHeartbeatServer) error { var dn *topology.DataNode @@ -32,6 +67,7 @@ func (ms *MasterServer) SendHeartbeat(stream master_pb.Seaweed_SendHeartbeatServ // the unregister and register can race with each other ms.Topo.UnRegisterDataNode(dn) glog.V(0).Infof("unregister disconnected volume server %s:%d", dn.Ip, dn.Port) + ms.UnRegisterUUIDs(dn.Ip, dn.Port) message := &master_pb.VolumeLocation{ Url: dn.Url(), @@ -69,7 +105,18 @@ func (ms *MasterServer) SendHeartbeat(stream master_pb.Seaweed_SendHeartbeatServ dc := ms.Topo.GetOrCreateDataCenter(dcName) rack := dc.GetOrCreateRack(rackName) dn = rack.GetOrCreateDataNode(heartbeat.Ip, int(heartbeat.Port), int(heartbeat.GrpcPort), heartbeat.PublicUrl, heartbeat.MaxVolumeCounts) - glog.V(0).Infof("added volume server %d: %v:%d", dn.Counter, heartbeat.GetIp(), heartbeat.GetPort()) + glog.V(0).Infof("added volume server %d: %v:%d %v", dn.Counter, heartbeat.GetIp(), heartbeat.GetPort(), heartbeat.LocationUUIDs) + err := ms.RegisterUUIDs(heartbeat) + if err != nil { + if stream_err := stream.Send(&master_pb.HeartbeatResponse{ + HasDuplicatedDirectory: true, + }); stream_err != nil { + glog.Warningf("SendHeartbeat.Send DuplicatedDirectory response to %s:%d %v", dn.Ip, dn.Port, stream_err) + return stream_err + } + return err + } + if err := stream.Send(&master_pb.HeartbeatResponse{ VolumeSizeLimit: uint64(ms.option.VolumeSizeLimitMB) * 1024 * 1024, }); err != nil { diff --git a/weed/server/volume_grpc_client_to_master.go b/weed/server/volume_grpc_client_to_master.go index f3f99ee7b..5e341f07c 100644 --- a/weed/server/volume_grpc_client_to_master.go +++ b/weed/server/volume_grpc_client_to_master.go @@ -2,9 +2,12 @@ package weed_server import ( "fmt" - "github.com/chrislusf/seaweedfs/weed/operation" + "os" + "syscall" "time" + "github.com/chrislusf/seaweedfs/weed/operation" + "google.golang.org/grpc" "github.com/chrislusf/seaweedfs/weed/pb" @@ -116,6 +119,12 @@ func (vs *VolumeServer) doHeartbeat(masterAddress pb.ServerAddress, grpcDialOpti doneChan <- err return } + if in.HasDuplicatedDirectory { + glog.Error("Shut Down Volume Server due to duplicated volume directory") + glog.V(0).Infof("send SIGINT to Volume Server") + p, _ := os.FindProcess(vs.pid) + p.Signal(syscall.SIGINT) + } if in.GetVolumeSizeLimit() != 0 && vs.store.GetVolumeSizeLimit() != in.GetVolumeSizeLimit() { vs.store.SetVolumeSizeLimit(in.GetVolumeSizeLimit()) if vs.store.MaybeAdjustVolumeMax() { diff --git a/weed/server/volume_server.go b/weed/server/volume_server.go index dcd27673c..f927dbdb8 100644 --- a/weed/server/volume_server.go +++ b/weed/server/volume_server.go @@ -1,11 +1,13 @@ package weed_server import ( + "net/http" + "os" + "sync" + "github.com/chrislusf/seaweedfs/weed/pb" "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb" "github.com/chrislusf/seaweedfs/weed/storage/types" - "net/http" - "sync" "google.golang.org/grpc" @@ -43,6 +45,7 @@ type VolumeServer struct { fileSizeLimitBytes int64 isHeartbeating bool stopChan chan bool + pid int } func NewVolumeServer(adminMux, publicMux *http.ServeMux, ip string, @@ -86,6 +89,7 @@ func NewVolumeServer(adminMux, publicMux *http.ServeMux, ip string, inFlightDownloadDataLimitCond: sync.NewCond(new(sync.Mutex)), concurrentUploadLimit: concurrentUploadLimit, concurrentDownloadLimit: concurrentDownloadLimit, + pid: os.Getpid(), } vs.SeedMasterNodes = masterNodes diff --git a/weed/server/volume_server_tcp_handlers_write.go b/weed/server/volume_server_tcp_handlers_write.go index a009611da..24ad916e6 100644 --- a/weed/server/volume_server_tcp_handlers_write.go +++ b/weed/server/volume_server_tcp_handlers_write.go @@ -3,12 +3,13 @@ package weed_server import ( "bufio" "fmt" - "github.com/chrislusf/seaweedfs/weed/glog" - "github.com/chrislusf/seaweedfs/weed/storage/needle" - "github.com/chrislusf/seaweedfs/weed/util" "io" "net" "strings" + + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/storage/needle" + "github.com/chrislusf/seaweedfs/weed/util" ) func (vs *VolumeServer) HandleTcpConnection(c net.Conn) { |
