diff options
Diffstat (limited to 'weed/server')
| -rw-r--r-- | weed/server/filer_grpc_server.go | 55 | ||||
| -rw-r--r-- | weed/server/filer_server.go | 33 | ||||
| -rw-r--r-- | weed/server/filer_server_handlers_write_autochunk.go | 1 | ||||
| -rw-r--r-- | weed/server/master_grpc_server.go | 17 | ||||
| -rw-r--r-- | weed/server/master_grpc_server_volume.go | 13 | ||||
| -rw-r--r-- | weed/server/master_server.go | 5 | ||||
| -rw-r--r-- | weed/server/raft_server.go | 133 | ||||
| -rw-r--r-- | weed/server/raft_server_handlers.go | 14 | ||||
| -rw-r--r-- | weed/server/volume_grpc_client_to_master.go | 2 | ||||
| -rw-r--r-- | weed/server/volume_server_handlers_read.go | 4 | ||||
| -rw-r--r-- | weed/server/volume_server_handlers_write.go | 3 | ||||
| -rw-r--r-- | weed/server/webdav_server.go | 2 |
12 files changed, 159 insertions, 123 deletions
diff --git a/weed/server/filer_grpc_server.go b/weed/server/filer_grpc_server.go index ecd23413f..943dbd2a2 100644 --- a/weed/server/filer_grpc_server.go +++ b/weed/server/filer_grpc_server.go @@ -32,12 +32,12 @@ func (fs *FilerServer) LookupDirectoryEntry(ctx context.Context, req *filer_pb.L return &filer_pb.LookupDirectoryEntryResponse{ Entry: &filer_pb.Entry{ - Name: req.Name, - IsDirectory: entry.IsDirectory(), - Attributes: filer.EntryAttributeToPb(entry), - Chunks: entry.Chunks, - Extended: entry.Extended, - HardLinkId: entry.HardLinkId, + Name: req.Name, + IsDirectory: entry.IsDirectory(), + Attributes: filer.EntryAttributeToPb(entry), + Chunks: entry.Chunks, + Extended: entry.Extended, + HardLinkId: entry.HardLinkId, HardLinkCounter: entry.HardLinkCounter, }, }, nil @@ -77,12 +77,12 @@ func (fs *FilerServer) ListEntries(req *filer_pb.ListEntriesRequest, stream file if err := stream.Send(&filer_pb.ListEntriesResponse{ Entry: &filer_pb.Entry{ - Name: entry.Name(), - IsDirectory: entry.IsDirectory(), - Chunks: entry.Chunks, - Attributes: filer.EntryAttributeToPb(entry), - Extended: entry.Extended, - HardLinkId: entry.HardLinkId, + Name: entry.Name(), + IsDirectory: entry.IsDirectory(), + Chunks: entry.Chunks, + Attributes: filer.EntryAttributeToPb(entry), + Extended: entry.Extended, + HardLinkId: entry.HardLinkId, HardLinkCounter: entry.HardLinkCounter, }, }); err != nil { @@ -135,16 +135,19 @@ func (fs *FilerServer) LookupVolume(ctx context.Context, req *filer_pb.LookupVol return resp, nil } -func (fs *FilerServer) lookupFileId(fileId string) (targetUrl string, err error) { +func (fs *FilerServer) lookupFileId(fileId string) (targetUrls []string, err error) { fid, err := needle.ParseFileIdFromString(fileId) if err != nil { - return "", err + return nil, err } locations, found := fs.filer.MasterClient.GetLocations(uint32(fid.VolumeId)) if !found || len(locations) == 0 { - return "", fmt.Errorf("not found volume %d in %s", fid.VolumeId, fileId) + return nil, fmt.Errorf("not found volume %d in %s", fid.VolumeId, fileId) + } + for _, loc := range locations { + targetUrls = append(targetUrls, fmt.Sprintf("http://%s/%s", loc.Url, fileId)) } - return fmt.Sprintf("http://%s/%s", locations[0].Url, fileId), nil + return } func (fs *FilerServer) CreateEntry(ctx context.Context, req *filer_pb.CreateEntryRequest) (resp *filer_pb.CreateEntryResponse, err error) { @@ -159,11 +162,11 @@ func (fs *FilerServer) CreateEntry(ctx context.Context, req *filer_pb.CreateEntr } createErr := fs.filer.CreateEntry(ctx, &filer.Entry{ - FullPath: util.JoinPath(req.Directory, req.Entry.Name), - Attr: filer.PbToEntryAttribute(req.Entry.Attributes), - Chunks: chunks, - Extended: req.Entry.Extended, - HardLinkId: filer.HardLinkId(req.Entry.HardLinkId), + FullPath: util.JoinPath(req.Directory, req.Entry.Name), + Attr: filer.PbToEntryAttribute(req.Entry.Attributes), + Chunks: chunks, + Extended: req.Entry.Extended, + HardLinkId: filer.HardLinkId(req.Entry.HardLinkId), HardLinkCounter: req.Entry.HardLinkCounter, }, req.OExcl, req.IsFromOtherCluster, req.Signatures) @@ -193,11 +196,11 @@ func (fs *FilerServer) UpdateEntry(ctx context.Context, req *filer_pb.UpdateEntr } newEntry := &filer.Entry{ - FullPath: util.JoinPath(req.Directory, req.Entry.Name), - Attr: entry.Attr, - Extended: req.Entry.Extended, - Chunks: chunks, - HardLinkId: filer.HardLinkId(req.Entry.HardLinkId), + FullPath: util.JoinPath(req.Directory, req.Entry.Name), + Attr: entry.Attr, + Extended: req.Entry.Extended, + Chunks: chunks, + HardLinkId: filer.HardLinkId(req.Entry.HardLinkId), HardLinkCounter: req.Entry.HardLinkCounter, } diff --git a/weed/server/filer_server.go b/weed/server/filer_server.go index ec0a4fb3e..59c149cef 100644 --- a/weed/server/filer_server.go +++ b/weed/server/filer_server.go @@ -3,6 +3,7 @@ package weed_server import ( "context" "fmt" + "github.com/chrislusf/seaweedfs/weed/stats" "net/http" "os" "sync" @@ -15,7 +16,6 @@ import ( "github.com/chrislusf/seaweedfs/weed/operation" "github.com/chrislusf/seaweedfs/weed/pb" "github.com/chrislusf/seaweedfs/weed/pb/master_pb" - "github.com/chrislusf/seaweedfs/weed/stats" "github.com/chrislusf/seaweedfs/weed/util" "github.com/chrislusf/seaweedfs/weed/filer" @@ -92,8 +92,9 @@ func NewFilerServer(defaultMux, readonlyMux *http.ServeMux, option *FilerOption) }) fs.filer.Cipher = option.Cipher - fs.maybeStartMetrics() + fs.checkWithMaster() + go stats.LoopPushingMetric("filer", stats.SourceName(fs.option.Port), fs.metricsAddress, fs.metricsIntervalSec) go fs.filer.KeepConnectedToMaster() v := util.GetViper() @@ -135,7 +136,7 @@ func NewFilerServer(defaultMux, readonlyMux *http.ServeMux, option *FilerOption) return fs, nil } -func (fs *FilerServer) maybeStartMetrics() { +func (fs *FilerServer) checkWithMaster() { for _, master := range fs.option.Masters { _, err := pb.ParseFilerGrpcAddress(master) @@ -145,10 +146,19 @@ func (fs *FilerServer) maybeStartMetrics() { } isConnected := false - var readErr error for !isConnected { for _, master := range fs.option.Masters { - fs.metricsAddress, fs.metricsIntervalSec, readErr = readFilerConfiguration(fs.grpcDialOption, master) + readErr := operation.WithMasterServerClient(master, fs.grpcDialOption, func(masterClient master_pb.SeaweedClient) error { + resp, err := masterClient.GetMasterConfiguration(context.Background(), &master_pb.GetMasterConfigurationRequest{}) + if err != nil { + return fmt.Errorf("get master %s configuration: %v", master, err) + } + fs.metricsAddress, fs.metricsIntervalSec = resp.MetricsAddress, int(resp.MetricsIntervalSeconds) + if fs.option.DefaultReplication == "" { + fs.option.DefaultReplication = resp.DefaultReplication + } + return nil + }) if readErr == nil { isConnected = true } else { @@ -157,17 +167,4 @@ func (fs *FilerServer) maybeStartMetrics() { } } - go stats.LoopPushingMetric("filer", stats.SourceName(fs.option.Port), fs.metricsAddress, fs.metricsIntervalSec) -} - -func readFilerConfiguration(grpcDialOption grpc.DialOption, masterAddress string) (metricsAddress string, metricsIntervalSec int, err error) { - err = operation.WithMasterServerClient(masterAddress, grpcDialOption, func(masterClient master_pb.SeaweedClient) error { - resp, err := masterClient.GetMasterConfiguration(context.Background(), &master_pb.GetMasterConfigurationRequest{}) - if err != nil { - return fmt.Errorf("get master %s configuration: %v", masterAddress, err) - } - metricsAddress, metricsIntervalSec = resp.MetricsAddress, int(resp.MetricsIntervalSeconds) - return nil - }) - return } diff --git a/weed/server/filer_server_handlers_write_autochunk.go b/weed/server/filer_server_handlers_write_autochunk.go index 61011fc20..2b37e3c5d 100644 --- a/weed/server/filer_server_handlers_write_autochunk.go +++ b/weed/server/filer_server_handlers_write_autochunk.go @@ -167,6 +167,7 @@ func (fs *FilerServer) saveMetaData(ctx context.Context, r *http.Request, fileNa TtlSec: ttlSec, Mime: contentType, Md5: md5bytes, + FileSize: uint64(chunkOffset), }, Chunks: fileChunks, } diff --git a/weed/server/master_grpc_server.go b/weed/server/master_grpc_server.go index 692909a29..e8fa3995d 100644 --- a/weed/server/master_grpc_server.go +++ b/weed/server/master_grpc_server.go @@ -3,6 +3,7 @@ package weed_server import ( "context" "fmt" + "github.com/chrislusf/seaweedfs/weed/storage/backend" "net" "strings" "time" @@ -302,3 +303,19 @@ func (ms *MasterServer) ListMasterClients(ctx context.Context, req *master_pb.Li } return resp, nil } + +func (ms *MasterServer) GetMasterConfiguration(ctx context.Context, req *master_pb.GetMasterConfigurationRequest) (*master_pb.GetMasterConfigurationResponse, error) { + + // tell the volume servers about the leader + leader, _ := ms.Topo.Leader() + + resp := &master_pb.GetMasterConfigurationResponse{ + MetricsAddress: ms.option.MetricsAddress, + MetricsIntervalSeconds: uint32(ms.option.MetricsIntervalSec), + StorageBackends: backend.ToPbStorageBackends(), + DefaultReplication: ms.option.DefaultReplicaPlacement, + Leader: leader, + } + + return resp, nil +} diff --git a/weed/server/master_grpc_server_volume.go b/weed/server/master_grpc_server_volume.go index 168975fb6..03b718291 100644 --- a/weed/server/master_grpc_server_volume.go +++ b/weed/server/master_grpc_server_volume.go @@ -3,8 +3,6 @@ package weed_server import ( "context" "fmt" - "github.com/chrislusf/seaweedfs/weed/storage/backend" - "github.com/chrislusf/raft" "github.com/chrislusf/seaweedfs/weed/pb/master_pb" @@ -179,14 +177,3 @@ func (ms *MasterServer) LookupEcVolume(ctx context.Context, req *master_pb.Looku return resp, nil } - -func (ms *MasterServer) GetMasterConfiguration(ctx context.Context, req *master_pb.GetMasterConfigurationRequest) (*master_pb.GetMasterConfigurationResponse, error) { - - resp := &master_pb.GetMasterConfigurationResponse{ - MetricsAddress: ms.option.MetricsAddress, - MetricsIntervalSeconds: uint32(ms.option.MetricsIntervalSec), - StorageBackends: backend.ToPbStorageBackends(), - } - - return resp, nil -} diff --git a/weed/server/master_server.go b/weed/server/master_server.go index 657b170c2..cc1c4b2ad 100644 --- a/weed/server/master_server.go +++ b/weed/server/master_server.go @@ -138,14 +138,11 @@ func NewMasterServer(r *mux.Router, option *MasterOption, peers []string) *Maste func (ms *MasterServer) SetRaftServer(raftServer *RaftServer) { ms.Topo.RaftServer = raftServer.raftServer ms.Topo.RaftServer.AddEventListener(raft.LeaderChangeEventType, func(e raft.Event) { - glog.V(0).Infof("event: %+v", e) + glog.V(0).Infof("leader change event: %+v => %+v", e.PrevValue(), e.Value()) if ms.Topo.RaftServer.Leader() != "" { glog.V(0).Infoln("[", ms.Topo.RaftServer.Name(), "]", ms.Topo.RaftServer.Leader(), "becomes leader.") } }) - ms.Topo.RaftServer.AddEventListener(raft.StateChangeEventType, func(e raft.Event) { - glog.V(0).Infof("state change: %+v", e) - }) if ms.Topo.IsLeader() { glog.V(0).Infoln("[", ms.Topo.RaftServer.Name(), "]", "I am the leader!") } else { diff --git a/weed/server/raft_server.go b/weed/server/raft_server.go index 958680d2b..073c1ff16 100644 --- a/weed/server/raft_server.go +++ b/weed/server/raft_server.go @@ -2,10 +2,8 @@ package weed_server import ( "encoding/json" - "io/ioutil" "os" "path" - "reflect" "sort" "time" @@ -28,7 +26,31 @@ type RaftServer struct { *raft.GrpcServer } -func NewRaftServer(grpcDialOption grpc.DialOption, peers []string, serverAddr, dataDir string, topo *topology.Topology, pulseSeconds int) *RaftServer { +type StateMachine struct { + raft.StateMachine + topo *topology.Topology +} + +func (s StateMachine) Save() ([]byte, error) { + state := topology.MaxVolumeIdCommand{ + MaxVolumeId: s.topo.GetMaxVolumeId(), + } + glog.V(1).Infof("Save raft state %+v", state) + return json.Marshal(state) +} + +func (s StateMachine) Recovery(data []byte) error { + state := topology.MaxVolumeIdCommand{} + err := json.Unmarshal(data, &state) + if err != nil { + return err + } + glog.V(1).Infof("Recovery raft state %+v", state) + s.topo.UpAdjustMaxVolumeId(state.MaxVolumeId) + return nil +} + +func NewRaftServer(grpcDialOption grpc.DialOption, peers []string, serverAddr, dataDir string, topo *topology.Topology, pulseSeconds int, raftResumeState bool) (*RaftServer, error) { s := &RaftServer{ peers: peers, serverAddr: serverAddr, @@ -46,48 +68,66 @@ func NewRaftServer(grpcDialOption grpc.DialOption, peers []string, serverAddr, d transporter := raft.NewGrpcTransporter(grpcDialOption) glog.V(0).Infof("Starting RaftServer with %v", serverAddr) - // always clear previous metadata - os.RemoveAll(path.Join(s.dataDir, "conf")) - os.RemoveAll(path.Join(s.dataDir, "log")) - os.RemoveAll(path.Join(s.dataDir, "snapshot")) - // Clear old cluster configurations if peers are changed - if oldPeers, changed := isPeersChanged(s.dataDir, serverAddr, s.peers); changed { - glog.V(0).Infof("Peers Change: %v => %v", oldPeers, s.peers) + if !raftResumeState { + // always clear previous metadata + os.RemoveAll(path.Join(s.dataDir, "conf")) + os.RemoveAll(path.Join(s.dataDir, "log")) + os.RemoveAll(path.Join(s.dataDir, "snapshot")) + } + if err := os.MkdirAll(path.Join(s.dataDir, "snapshot"), 0600); err != nil { + return nil, err } - s.raftServer, err = raft.NewServer(s.serverAddr, s.dataDir, transporter, nil, topo, "") + stateMachine := StateMachine{topo: topo} + s.raftServer, err = raft.NewServer(s.serverAddr, s.dataDir, transporter, stateMachine, topo, "") if err != nil { glog.V(0).Infoln(err) - return nil + return nil, err } s.raftServer.SetHeartbeatInterval(500 * time.Millisecond) s.raftServer.SetElectionTimeout(time.Duration(pulseSeconds) * 500 * time.Millisecond) - s.raftServer.Start() + if err := s.raftServer.LoadSnapshot(); err != nil { + return nil, err + } + if err := s.raftServer.Start(); err != nil { + return nil, err + } for _, peer := range s.peers { - s.raftServer.AddPeer(peer, pb.ServerToGrpcAddress(peer)) + if err := s.raftServer.AddPeer(peer, pb.ServerToGrpcAddress(peer)); err != nil { + return nil, err + } + } + + // Remove deleted peers + for existsPeerName := range s.raftServer.Peers() { + exists, existingPeer := false, "" + for _, peer := range s.peers { + if pb.ServerToGrpcAddress(peer) == existsPeerName { + exists, existingPeer = true, peer + break + } + } + if exists { + if err := s.raftServer.RemovePeer(existsPeerName); err != nil { + glog.V(0).Infoln(err) + return nil, err + } else { + glog.V(0).Infof("removing old peer %s", existingPeer) + } + } } s.GrpcServer = raft.NewGrpcServer(s.raftServer) if s.raftServer.IsLogEmpty() && isTheFirstOne(serverAddr, s.peers) { // Initialize the server by joining itself. - glog.V(0).Infoln("Initializing new cluster") - - _, err := s.raftServer.Do(&raft.DefaultJoinCommand{ - Name: s.raftServer.Name(), - ConnectionString: pb.ServerToGrpcAddress(s.serverAddr), - }) - - if err != nil { - glog.V(0).Infoln(err) - return nil - } + // s.DoJoinCommand() } glog.V(0).Infof("current cluster leader: %v", s.raftServer.Leader()) - return s + return s, nil } func (s *RaftServer) Peers() (members []string) { @@ -100,34 +140,6 @@ func (s *RaftServer) Peers() (members []string) { return } -func isPeersChanged(dir string, self string, peers []string) (oldPeers []string, changed bool) { - confPath := path.Join(dir, "conf") - // open conf file - b, err := ioutil.ReadFile(confPath) - if err != nil { - return oldPeers, true - } - conf := &raft.Config{} - if err = json.Unmarshal(b, conf); err != nil { - return oldPeers, true - } - - for _, p := range conf.Peers { - oldPeers = append(oldPeers, p.Name) - } - oldPeers = append(oldPeers, self) - - if len(peers) == 0 && len(oldPeers) <= 1 { - return oldPeers, false - } - - sort.Strings(peers) - sort.Strings(oldPeers) - - return oldPeers, !reflect.DeepEqual(peers, oldPeers) - -} - func isTheFirstOne(self string, peers []string) bool { sort.Strings(peers) if len(peers) <= 0 { @@ -135,3 +147,16 @@ func isTheFirstOne(self string, peers []string) bool { } return self == peers[0] } + +func (s *RaftServer) DoJoinCommand() { + + glog.V(0).Infoln("Initializing new cluster") + + if _, err := s.raftServer.Do(&raft.DefaultJoinCommand{ + Name: s.raftServer.Name(), + ConnectionString: pb.ServerToGrpcAddress(s.serverAddr), + }); err != nil { + glog.Errorf("fail to send join command: %v", err) + } + +} diff --git a/weed/server/raft_server_handlers.go b/weed/server/raft_server_handlers.go index fd38cb977..252570eab 100644 --- a/weed/server/raft_server_handlers.go +++ b/weed/server/raft_server_handlers.go @@ -1,20 +1,24 @@ package weed_server import ( + "github.com/chrislusf/seaweedfs/weed/storage/needle" "net/http" ) type ClusterStatusResult struct { - IsLeader bool `json:"IsLeader,omitempty"` - Leader string `json:"Leader,omitempty"` - Peers []string `json:"Peers,omitempty"` + IsLeader bool `json:"IsLeader,omitempty"` + Leader string `json:"Leader,omitempty"` + Peers []string `json:"Peers,omitempty"` + MaxVolumeId needle.VolumeId `json:"MaxVolumeId,omitempty"` } func (s *RaftServer) StatusHandler(w http.ResponseWriter, r *http.Request) { ret := ClusterStatusResult{ - IsLeader: s.topo.IsLeader(), - Peers: s.Peers(), + IsLeader: s.topo.IsLeader(), + Peers: s.Peers(), + MaxVolumeId: s.topo.GetMaxVolumeId(), } + if leader, e := s.topo.Leader(); e == nil { ret.Leader = leader } diff --git a/weed/server/volume_grpc_client_to_master.go b/weed/server/volume_grpc_client_to_master.go index 8698a4c64..199f8faba 100644 --- a/weed/server/volume_grpc_client_to_master.go +++ b/weed/server/volume_grpc_client_to_master.go @@ -90,7 +90,7 @@ func (vs *VolumeServer) StopHeartbeat() (isAlreadyStopping bool) { return true } vs.isHeartbeating = false - vs.stopChan <- true + close(vs.stopChan) return false } diff --git a/weed/server/volume_server_handlers_read.go b/weed/server/volume_server_handlers_read.go index bb04678d6..15fd446e7 100644 --- a/weed/server/volume_server_handlers_read.go +++ b/weed/server/volume_server_handlers_read.go @@ -93,6 +93,10 @@ func (vs *VolumeServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request) } else if hasEcVolume { count, err = vs.store.ReadEcShardNeedle(volumeId, n) } + if err != nil && err != storage.ErrorDeleted && r.FormValue("type") != "replicate" && hasVolume { + glog.V(4).Infof("read needle: %v", err) + // start to fix it from other replicas, if not deleted and hasVolume and is not a replicated request + } // glog.V(4).Infoln("read bytes", count, "error", err) if err != nil || count < 0 { glog.V(3).Infof("read %s isNormalVolume %v error: %v", r.URL.Path, hasVolume, err) diff --git a/weed/server/volume_server_handlers_write.go b/weed/server/volume_server_handlers_write.go index 78cbf08c5..01a77b901 100644 --- a/weed/server/volume_server_handlers_write.go +++ b/weed/server/volume_server_handlers_write.go @@ -13,6 +13,7 @@ import ( "github.com/chrislusf/seaweedfs/weed/stats" "github.com/chrislusf/seaweedfs/weed/storage/needle" "github.com/chrislusf/seaweedfs/weed/topology" + "github.com/chrislusf/seaweedfs/weed/util" ) func (vs *VolumeServer) PostHandler(w http.ResponseWriter, r *http.Request) { @@ -67,7 +68,7 @@ func (vs *VolumeServer) PostHandler(w http.ResponseWriter, r *http.Request) { ret.Name = string(reqNeedle.Name) } ret.Size = uint32(originalSize) - ret.ETag = reqNeedle.Etag() + ret.ETag = fmt.Sprintf("%x", util.Base64Md5ToBytes(contentMd5)) ret.Mime = string(reqNeedle.Mime) setEtag(w, ret.ETag) w.Header().Set("Content-MD5", contentMd5) diff --git a/weed/server/webdav_server.go b/weed/server/webdav_server.go index 121c0d2bb..f13e73a7b 100644 --- a/weed/server/webdav_server.go +++ b/weed/server/webdav_server.go @@ -100,7 +100,7 @@ type WebDavFile struct { func NewWebDavFileSystem(option *WebDavOption) (webdav.FileSystem, error) { - chunkCache := chunk_cache.NewTieredChunkCache(256, option.CacheDir, option.CacheSizeMB) + chunkCache := chunk_cache.NewTieredChunkCache(256, option.CacheDir, option.CacheSizeMB, 1024*1024) return &WebDavFileSystem{ option: option, chunkCache: chunkCache, |
